拦截器详解2(重点):连接、请求服务-转载
『转载自📚胡飞洋-拦截器详解2:连接、请求服务(重点)-https://cloud.tencent.com/developer/article/1667344 (opens new window)』
在本系列的上一篇文章你想要的系列:网络请求框架OkHttp3全解系列 - (三)拦截器详解1:重试重定向、桥、缓存(重点) (opens new window)中,我们分析了OkHttp拦截器链中的前三个拦截器:RetryAndFollowUpInterceptor、BridgeInterceptor、CacheInterceptor,它们在请求建立连接之前做了一些预处理。
如果请求经过这三个拦截器后,要继续往下传递,说明是需要进行网络请求的(缓存不能直接满足),也就是今天要分析的内容——剩下的两个拦截器:ConnectInterceptor、CallServerInterceptor,分别负责 连接建立、请求服务读写。
# 背景 - HTTP协议发展
在讲解拦截器之前,我们有必要了解http协议相关背景知识,因为okhttp的网络连接正是基于此实现的。HTTP协议经历了以下三个版本阶段。
# HTTP1.0
在HTTP1.0中,一次请求 会建立一个TCP连接,请求完成后主动断开连接。这种方法的好处是简单,各个请求互不干扰。 但每次请求都会经历 3次握手、2次或4次挥手的连接建立和断开过程——极大影响网络效率和系统开销。
# HTTP1.1
在HTTP1.1中,解决了HTTP1.0中连接不能复用的问题,支持持久连接——使用keep-alive机制:一次HTTP请求结束后不会立即断开TCP连接,如果此时有新的HTTP请求,且其请求的Host同上次请求相同,那么会直接复用TCP连接。这样就减少了建立和关闭连接的消耗和延迟。keep-alive机制在HTTP1.1中是默认打开的——即在请求头添加:connection:keep-alive。(keep-alive不会永久保持连接,它有一个保持时间,可在不同的服务器软件(如Apache)中设定这个时间)
# HTTP2.0
HTTP1.1中,连接的复用是串行的:一个请求建立了TCP连接,请求完成后,下一个相同host的请求继续使用这个连接。 但客户端想 同时 发起多个并行请求,那么必须建立多个TCP连接。将会产生网络延迟、增大网路开销。
并且HTTP1.1不会压缩请求和响应报头,导致了不必要的网络流量;HTTP1.1不支持资源优先级导致底层TCP连接利用率低下。在HTTP2.0中,这些问题都会得到解决,HTTP2.0主要有以下特性:
- 新的二进制格式(Binary Format):http/1.x使用的是明文协议,其协议格式由三部分组成:request line,header,body,其协议解析是基于文本,但是这种方式存在天然缺陷,文本的表现形式有多样性,要做到健壮性考虑的场景必然很多,二进制则不同,只认0和1的组合;基于这种考虑,http/2.0的协议解析决定采用二进制格式,实现方便且健壮
- 多路复用(MultiPlexing):即连接共享,使用streamId用来区分请求,一个request对应一个stream并分配一个id,这样一个TCP连接上可以有多个stream,每个stream的frame可以随机的混杂在一起,接收方可以根据stream id将frame再归属到各自不同的request里面
- 优先级和依赖(Priority、Dependency):每个stream都可以设置优先级和依赖,优先级高的stream会被server优先处理和返回给客户端,stream还可以依赖其它的sub streams;优先级和依赖都是可以动态调整的,比如在APP上浏览商品列表,用户快速滑动到底部,但是前面的请求已经发出,如果不把后面的优先级设高,那么当前浏览的图片将会在最后展示出来,显然不利于用户体验
- header压缩:http2.0使用encoder来减少需要传输的header大小,通讯双方各自cache一份header fields表,既避免了重复header的传输,又减小了需要传输的大小
- 重置连接:很多APP里都有停止下载图片的需求,对于http1.x来说,是直接断开连接,导致下次再发请求必须重新建立连接;http2.0引入RST_STREAM类型的frame,可以在不断开连接的前提下取消某个request的stream
其中涉及了两个新的概念:
- 数据流-stream:基于TCP连接之上的逻辑双向字节流,用于承载双向消息,对应一个请求及其响应。客户端每发起一个请求就建立一个数据流,后续该请求及其响应的所有数据都通过该数据流传输。每个数据流都有一个唯一的标识符和可选的优先级信息。
- 帧-frame:HTTP/2的最小数据切片单位,承载着特定类型的数据,例如 HTTP 标头、消息负载,等等。 来自不同数据流的帧可以交错发送,然后再根据每个帧头的数据流标识符重新组装,从而在宏观上实现了多个请求或响应并行传输的效果。
这里的 多路复用机制 就实现了 在同一个TCP连接上 多个请求 并行执行。
无论是HTTP1.1的Keep-Alive机制还是HTTP2.0的多路复用机制,在实现上都需要引入连接池来维护网络连接。下面就开始分析 OkHttp中的连接池实现——连接拦截器ConnectInterceptor。
# ConnectInterceptor
连接拦截器ConnectInterceptor代码如下:
//打开到目标服务的连接、处理下一个拦截器
public final class ConnectInterceptor implements Interceptor {
public final OkHttpClient client;
public ConnectInterceptor(OkHttpClient client) {
this.client = client;
}
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
Transmitter transmitter = realChain.transmitter();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
return realChain.proceed(request, transmitter, exchange);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
代码很少,主要是使用transmitter.newExchange获取Exchange实例,并作为参数调用拦截器链的proceed的方法。注意到前面分析过的拦截器调用的proceed方法是一个参数的,而这里是三个参数的。这是因为下一个拦截器(如果没有配置网络拦截器的话,就是CallServerInterceptor,也是最后一个)需要进行真正的网络IO操作,而 Exchange(意为交换)主要作用就是真正的IO操作:写入请求、读取响应(会在下一个拦截器做介绍)。
实际上获取Exchange实例的逻辑处理都封装在Transmitter中了。前面的文章提到过Transmitter,它是“发射器”,是把 请求 从应用端 发射到 网络层,它持有请求的 连接、请求、响应 和 流,一个请求对应一个Transmitter实例,一个数据流。下面就看下它的newExchange方法:
Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
synchronized (connectionPool) {
if (noMoreExchanges) {
throw new IllegalStateException("released");
}
if (exchange != null) {
throw new IllegalStateException("cannot make a new request because the previous response "
+ "is still open: please call response.close()");
}
}
ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
synchronized (connectionPool) {
this.exchange = result;
this.exchangeRequestDone = false;
this.exchangeResponseDone = false;
return result;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
若是第一次请求,前面两个if是没走进去的。接着看到使用exchangeFinder的find方法获取到了ExchangeCodec实例,然后作为参数构建了Exchange实例,并返回。嗯,看起来也很简单的样子。 注意到这个方法里涉及了连接池RealConnectionPool、交换寻找器ExchangeFinder、交换编解码ExchangeCodec、交换管理Exchange这几个类(翻译成这样尽力了?,意会吧)。
- RealConnectionPool,连接池,负责管理请求的连接,包括新建、复用、关闭,理解上类似线程池。
- ExchangeCodec,接口类,负责真正的IO操作—写请求、读响应,实现类有Http1ExchangeCodec、Http2ExchangeCodec,分别对应HTTP1.1协议、HTTP2.0协议。
- Exchange,管理IO操作,可以理解为 数据流,是ExchangeCodec的包装,增加了事件回调;一个请求对应一个Exchange实例。传给下个拦截器CallServerInterceptor使用。
- ExchangeFinder,(从连接池中)寻找可用TCP连接,然后通过连接得到ExchangeCodec。
# ExchangeFinder
ExchangeFinder的作用从名字就可以看出——Exchange寻找者,本质是为请求寻找一个TCP连接。如果已有可用连接就直接使用,没有则打开一个新的连接。 一个网络请求的执行,需要先有一个指向目标服务的TCP连接,然后再进行写请求、读响应的IO操作。ExchangeFinder是怎么寻找的呢?继续往下看~
我们先看exchangeFinder初始化的地方:
public void prepareToConnect(Request request) {
...
this.exchangeFinder = new ExchangeFinder(this, connectionPool, createAddress(request.url()),
call, eventListener);
}
2
3
4
5
看到这里应该会想起上一篇文章中分析RetryAndFollowUpInterceptor时提到过,prepareToConnect这个方法作用是连接准备,就是创建了ExchangeFinder实例。主要到传入的参数有connectionPool、createAddress方法返回的Address、call、eventListener。connectionPool是连接池,稍后分析,先看下createAddress方法:
private Address createAddress(HttpUrl url) {
SSLSocketFactory sslSocketFactory = null;
HostnameVerifier hostnameVerifier = null;
CertificatePinner certificatePinner = null;
if (url.isHttps()) {
sslSocketFactory = client.sslSocketFactory();
hostnameVerifier = client.hostnameVerifier();
certificatePinner = client.certificatePinner();
}
return new Address(url.host(), url.port(), client.dns(), client.socketFactory(),
sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator(),
client.proxy(), client.protocols(), client.connectionSpecs(), client.proxySelector());
}
2
3
4
5
6
7
8
9
10
11
12
13
14
使用url和client配置 创建一个Address实例。Address意思是指向服务的连接的地址,可以理解为请求地址及其配置。Address有一个重要作用:相同Address的HTTP请求 共享 相同的连接。这可以作为前面提到的 HTTP1.1和HTTP2.0 复用连接 的请求的判断。
回头看exchangeFinder的find方法
public ExchangeCodec find(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
//找到一个健康的连接
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
//利用连接实例化ExchangeCodec对象,如果是HTTP/2返回Http2ExchangeCodec,否则返回Http1ExchangeCodec
return resultConnection.newCodec(client, chain);
} catch (RouteException e) {
trackFailure();
throw e;
} catch (IOException e) {
trackFailure();
throw new RouteException(e);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
主要就是通过findHealthyConnection方法获取连接RealConnection实例,然后用RealConnection的newCodec方法获取了ExchangeCodec实例,如果是HTTP/2返回Http2ExchangeCodec,否则返回Http1ExchangeCodec,然后返回。
findHealthyConnection方法名透露着 就是去寻找可用TCP连接的,而我们猜测这个方法内部肯定和连接池ConnectionPool有紧密的关系。接着跟进findHealthyConnection方法:
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
//找连接
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// 是新连接 且不是HTTP2.0 就不用体检
synchronized (connectionPool) {
if (candidate.successCount == 0 && !candidate.isMultiplexed()) {
return candidate;
}
}
// 体检不健康,继续找
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
//标记不可用
candidate.noNewExchanges();
continue;
}
return candidate;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
循环寻找连接:如果是不健康的连接,标记不可用(标记后会移除,后面讲连接池会讲到),然后继续找。健康是指连接可以承载新的数据流,socket是连接状态。我们跟进findConnection方法,看看到底是怎么 找连接 的:
//为承载新的数据流 寻找 连接。寻找顺序是 已分配的连接、连接池、新建连接
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
RealConnection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
//请求已被取消(Call的cancel方法->transmitter的cancel方法),抛异常
if (transmitter.isCanceled()) throw new IOException("Canceled");
hasStreamFailure = false;
// 尝试使用 已给数据流分配的连接.(例如重定向请求时,可以复用上次请求的连接)
releasedConnection = transmitter.connection;
//有已分配的连接,但已经被限制承载新的数据流,就尝试释放掉(如果连接上已没有数据流),并返回待关闭的socket。
toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
? transmitter.releaseConnectionNoEvents()
: null;
if (transmitter.connection != null) {
// 不为空,说明上面没有释放掉,那么此连接可用
result = transmitter.connection;
releasedConnection = null;
}
if (result == null) {
// 没有已分配的可用连接,就尝试从连接池获取。(连接池稍后详细讲解)
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
foundPooledConnection = true;
result = transmitter.connection;
} else if (nextRouteToTry != null) {
selectedRoute = nextRouteToTry;//有可尝试的路由
nextRouteToTry = null;
} else if (retryCurrentRoute()) {
selectedRoute = transmitter.connection.route();
}
}
}
closeQuietly(toClose);//(如果有)关闭待关闭的socket
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);//(如果有)回调连接释放事件
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);//(如果有)回调(从连接池)获取连接事件
}
if (result != null) {
// 如果有已分配可用连接 或 从连接池获取到连接,结束! 没有 就走下面的新建连接过程。
return result;
}
// 如果需要路由信息,就获取。是阻塞操作
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
List<Route> routes = null;
synchronized (connectionPool) {
if (transmitter.isCanceled()) throw new IOException("Canceled");
if (newRouteSelection) {
//现在有了IP地址,再次尝试从连接池获取。可能会因为连接合并而匹配。(这里传入了routes,上面的传的null)
routes = routeSelection.getAll();
if (connectionPool.transmitterAcquirePooledConnection(
address, transmitter, routes, false)) {
foundPooledConnection = true;
result = transmitter.connection;
}
}
//第二次连接池也没找到,就新建连接
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
result = new RealConnection(connectionPool, selectedRoute);
connectingConnection = result;
}
}
// 如果第二次从连接池的尝试成功了,结束,因为连接池中的连接是已经和服务器建立连接的
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
// 第二次没成功,就把新建的连接,进行TCP + TLS 握手,与服务端建立连接. 是阻塞操作
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
connectionPool.routeDatabase.connected(result.route());//从失败名单中移除
Socket socket = null;
synchronized (connectionPool) {
connectingConnection = null;
// 最后一次尝试从连接池获取,注意最后一个参数为true,即要求 多路复用(http2.0)
//意思是,如果本次是http2.0,那么为了保证 多路复用性,(因为上面的握手操作不是线程安全)会再次确认连接池中此时是否已有同样连接
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
// 如果获取到,就关闭我们创建里的连接,返回获取的连接
result.noNewExchanges = true;
socket = result.socket();
result = transmitter.connection;
// 那么这个刚刚连接成功的路由 就可以 用作下次 尝试的路由
nextRouteToTry = selectedRoute;
} else {
//最后一次尝试也没有的话,就把刚刚新建的连接存入连接池
connectionPool.put(result);
transmitter.acquireConnectionNoEvents(result);//把连接赋给transmitter
}
}
closeQuietly(socket);//如果刚刚建立的连接没用到,就关闭
eventListener.connectionAcquired(call, result);
return result;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
代码看着很长,已经加了注释,方法目的就是 为 承载新的数据流 寻找 连接。寻找顺序是 已分配的连接、连接池、新建连接。梳理如下:
- 首先会尝试使用 已给数据流分配的连接。(已分配连接的情况例如重定向时的再次请求,说明上次已经有了连接)
- 若没有 已分配的可用连接,就尝试从连接池中 匹配获取。因为此时没有路由信息,所以匹配条件:address一致——host、port、代理等一致,且 匹配的连接可以接受新的数据流。
- 若从连接池没有获取到,则取下一个代理的路由信息(多个Route,即多个IP地址),再次尝试从连接池获取,此时可能因为连接合并而匹配到。
- 若第二次也没有获取到,就创建RealConnection实例,进行TCP + TLS 握手,与服务端建立连接。
- 此时为了确保Http2.0连接的多路复用性,会第三次从连接池匹配。因为新建立的连接的握手过程是非线程安全的,所以此时可能连接池新存入了相同的连接。
- 第三次若匹配到,就使用已有连接,释放刚刚新建的连接;若未匹配到,则把新连接存入连接池并返回。
流程图如下:
看到这里,小盆友,你是否有很多问号?
- 开始若有了已分配的连接,但已经被限制承载新的数据流,是如何释放的呢?
- 代理路由信息是如何获取的呢?
- 如何从连接池获取连接的?三次有什么不同?
没关系,慢慢来,我们先看第二个问号,代理路由信息的获取。
# RouteSelector
先来看下Route类:
public final class Route {
final Address address;
final Proxy proxy;//代理
final InetSocketAddress inetSocketAddress;//连接目标地址
public Route(Address address, Proxy proxy, InetSocketAddress inetSocketAddress) {
...
this.address = address;
this.proxy = proxy;
this.inetSocketAddress = inetSocketAddress;
}
2
3
4
5
6
7
8
9
10
11
Route,通过代理服务器信息 proxy、连接目标地址 InetSocketAddress 来描述一条 连接服务器的具体路由。
- proxy代理:可以为客户端显式配置代理服务器。否则,将使用ProxySelector代理选择器。可能会返回多个代理。
- IP地址:无论是直连还是代理,打开socket连接都需要IP地址。 DNS服务可能返回多个IP地址尝试。
上面分析的findConnection方法中是使用routeSelection.getAll()获取Route集合routes,而routeSelection是通过routeSelector.next()获取,routeSelector是在ExchangeFinder的构造方法内创建的,也就是说routeSelector在RetryAndFollowUpInterceptor中就创建了,那么我们看下RouteSelector:
RouteSelector(Address address, RouteDatabase routeDatabase, Call call,
EventListener eventListener) {
this.address = address;
this.routeDatabase = routeDatabase;//连接池中的路由黑名单(连接失败的路由)
this.call = call;
this.eventListener = eventListener;
resetNextProxy(address.url(), address.proxy());
}
//收集代理服务器
private void resetNextProxy(HttpUrl url, Proxy proxy) {
if (proxy != null) {
// 若指定了代理,那么就这一个。(就是初始化OkhttpClient时配置的)
proxies = Collections.singletonList(proxy);
} else {
//没配置就使用ProxySelector获取代理(若初始化OkhttpClient时没有配置ProxySelector,会使用系统默认的)
List<Proxy> proxiesOrNull = address.proxySelector().select(url.uri());
proxies = proxiesOrNull != null && !proxiesOrNull.isEmpty()
? Util.immutableList(proxiesOrNull)
: Util.immutableList(Proxy.NO_PROXY);
}
nextProxyIndex = 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
注意到RouteSelector的构造方法中传入了routeDatabase,是连接失败的路由黑名单(后面连接池也会讲到),并使用resetNextProxy方法获取代理服务器列表:若没有指定proxy就是用ProxySelector获取proxy列表(若没有配置ProxySelector会使用系统默认)。接着看next方法:
//收集代理的路由信息
public Selection next() throws IOException {
if (!hasNext()) {//还有下一个代理
throw new NoSuchElementException();
}
List<Route> routes = new ArrayList<>();
while (hasNextProxy()) {
Proxy proxy = nextProxy();
//遍历proxy经DNS后的所有IP地址,组装成Route
for (int i = 0, size = inetSocketAddresses.size(); i < size; i++) {
Route route = new Route(address, proxy, inetSocketAddresses.get(i));
if (routeDatabase.shouldPostpone(route)) {//此路由在黑名单中,存起来最后尝试
postponedRoutes.add(route);
} else {
routes.add(route);
}
}
if (!routes.isEmpty()) {
break;
}
}
if (routes.isEmpty()) {
// 若没有拿到路由,就尝试上面存的黑名单的路由
routes.addAll(postponedRoutes);
postponedRoutes.clear();
}
//routes包装成Selection返回
return new Selection(routes);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
next方法主要就是获取下一个代理Proxy的代理信息,即多个路由。具体是在resetNextInetSocketAddress方法中实现,主要是对代理服务地址进行DNS解析获取多个IP地址,这里就不展开了。
好了,到这里 就解决了第二个问号。其他两个问号涉及 连接池RealConnectionPool、连接RealConnection,下面就来瞅瞅。
# ConnectionPool
ConnectionPool,即连接池,用于管理http1.1/http2.0连接重用,以减少网络延迟。相同Address的http请求可以共享一个连接,ConnectionPool就是实现了连接的复用。
public final class ConnectionPool {
final RealConnectionPool delegate;
//最大空闲连接数5,最大空闲时间5分钟
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.delegate = new RealConnectionPool(maxIdleConnections, keepAliveDuration, timeUnit);
}
//返回空闲连接数
public int idleConnectionCount() {
return delegate.idleConnectionCount();
}
//返回池子中的连接数
public int connectionCount() {
return delegate.connectionCount();
}
//关闭并移除所以空闲连接
public void evictAll() {
delegate.evictAll();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ConnectionPool看起来比较好理解,默认配置是最大空闲连接数5,最大空闲时间5分钟(即一个连接空闲时间超过5分钟就移除),我们也可以在初始化okhttpClient时进行不同的配置。需要注意的是ConnectionPool是用于应用层,实际管理者是RealConnectionPool。RealConnectionPool是okhttp内部真实管理连接的地方。
连接池对连接的管理无非是 存、取、删,上面的两个问号分别对应 删、取,跟进RealConnectionPool我们一个个看:
# 存
private final Deque<RealConnection> connections = new ArrayDeque<>();
private final Runnable cleanupRunnable = () -> {
//循环清理
while (true) {
//清理
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (RealConnectionPool.this) {
try {
//下一次清理之前的等待
RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
};
//存
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
connections是用于存连接的队列Deque。看到在add之前 使用线程池executor执行了cleanupRunnable,意思是清理连接,为啥要清理呢?上面提到过 连接池有 最大空闲连接数、最大空闲时间的限制,所以不满足时是要进行清理的。并且注意到清理是一个循环,并且下一次清理前要等待waitNanos时间,啥意思呢?我们看下cleanup方法:
long cleanup(long now) {
int inUseConnectionCount = 0;//正在使用的连接数
int idleConnectionCount = 0;//空闲连接数
RealConnection longestIdleConnection = null;//空闲时间最长的连接
long longestIdleDurationNs = Long.MIN_VALUE;//最长的空闲时间
//遍历连接:找到待清理的连接, 找到下一次要清理的时间(还未到最大空闲时间)
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
//若连接正在使用,continue,正在使用连接数+1
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
//空闲连接数+1
idleConnectionCount++;
// 赋值最长的空闲时间和对应连接
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
//若最长的空闲时间大于5分钟 或 空闲数 大于5,就移除并关闭这个连接
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// else,就返回 还剩多久到达5分钟,然后wait这个时间再来清理
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
//连接没有空闲的,就5分钟后再尝试清理.
return keepAliveDurationNs;
} else {
// 没有连接,不清理
cleanupRunning = false;
return -1;
}
}
//关闭移除的连接
closeQuietly(longestIdleConnection.socket());
//关闭移除后 立刻 进行下一次的 尝试清理
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
思路还是很清晰的:
- 有空闲连接的话,如果最长的空闲时间大于5分钟 或 空闲数 大于5,就移除关闭这个最长空闲连接;如果 空闲数 不大于5 且 最长的空闲时间不大于5分钟,就返回到5分钟的剩余时间,然后等待这个时间再来清理。
- 没有空闲连接就等5分钟后再尝试清理。
- 没有连接不清理。
其中判断连接正在使用的方法pruneAndGetAllocationCount我们来看下:
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
//连接上的数据流,弱引用列表
List<Reference<Transmitter>> references = connection.transmitters;
for (int i = 0; i < references.size(); ) {
Reference<Transmitter> reference = references.get(i);
if (reference.get() != null) {
i++;
continue;
}
// 到这里,transmitter是泄漏的,要移除,且此连接不能再承载新的数据流(泄漏的原因就是下面的message)
TransmitterReference transmitterRef = (TransmitterReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
Platform.get().logCloseableLeak(message, transmitterRef.callStackTrace);
references.remove(i);
connection.noNewExchanges = true;
//连接因为泄漏没有数据流了,那么可以立即移除了。所以设置 开始空闲时间 是5分钟前(厉害厉害!)
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
//返回连接上的数据流数量,大于0说明正在使用。
return references.size();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
逻辑注释已经标明了,很好理解。其中connection.transmitters,表示在此连接上的数据流,transmitters size大于1即表示多个请求复用此连接。
另外,在findConnection中,使用connectionPool.put(result)存连接后,又调用transmitter.acquireConnectionNoEvents方法,瞅下:
void acquireConnectionNoEvents(RealConnection connection) {
assert (Thread.holdsLock(connectionPool));
if (this.connection != null) throw new IllegalStateException();
this.connection = connection;
connection.transmitters.add(new TransmitterReference(this, callStackTrace));
}
2
3
4
5
6
先把连接赋给transmitter,表示数据流transmitter依附在这个connection上;然后connection.transmitters add 这个transmitter的弱引用,connection.transmitters表示这个连接承载的所有数据流,即承载的所有请求。
好了,存 讲完了,主要就是把连接存入队列,同时开始循环尝试清理过期连接。
# 取
//为transmitter 从连接池 获取 对应address的连接。若果routes不为空,可能会因为 连接合并(复用) 而获取到HTTP/2连接。
boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
@Nullable List<Route> routes, boolean requireMultiplexed) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (requireMultiplexed && !connection.isMultiplexed()) continue;
if (!connection.isEligible(address, routes)) continue;
transmitter.acquireConnectionNoEvents(connection);
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
存的方法名是put,但你发现 取 的方法名却不是get,transmitterAcquirePooledConnection意思是 为transmitter 从连接池 获取连接,实际上transmitter就代表一个数据流,也就是一个http请求。注意到,在遍历中 经过判断后也是transmitter的acquireConnectionNoEvents方法,即把匹配到的connection赋给transmitter。所以方法名还是很生动的。
继续看是如何匹配的:如果requireMultiplexed为false,即不是多路复用(不是http/2),那么就要看Connection的isEligible方法了,isEligible方法返回true,就代表匹配成功:
//用于判断 连接 是否 可以承载指向address的数据流
boolean isEligible(Address address, @Nullable List<Route> routes) {
//连接不再接受新的数据流,false
if (transmitters.size() >= allocationLimit || noNewExchanges) return false;
//匹配address中非host的部分
if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
//匹配address的host,到这里也匹配的话,就return true
if (address.url().host().equals(this.route().address().url().host())) {
return true; // This connection is a perfect match.
}
//到这里hostname是没匹配的,但是还是有机会返回true:连接合并
// 1. 连接须是 HTTP/2.
if (http2Connection == null) return false;
// 2. IP 地址匹配
if (routes == null || !routeMatchesAny(routes)) return false;
// 3. 证书匹配
if (address.hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false;
if (!supportsUrl(address.url())) return false;
// 4. 证书 pinning 匹配.
try {
address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
} catch (SSLPeerUnverifiedException e) {
return false;
}
return true; // The caller's address can be carried by this connection.
}
private boolean routeMatchesAny(List<Route> candidates) {
for (int i = 0, size = candidates.size(); i < size; i++) {
Route candidate = candidates.get(i);
if (candidate.proxy().type() == Proxy.Type.DIRECT
&& route.proxy().type() == Proxy.Type.DIRECT
&& route.socketAddress().equals(candidate.socketAddress())) {
return true;
}
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
取的过程就是 遍历连接池,进行地址等一系列匹配,到这里第三个问号也解决了。
# 删
//移除关闭空闲连接
public void evictAll() {
List<RealConnection> evictedConnections = new ArrayList<>();
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
if (connection.transmitters.isEmpty()) {
connection.noNewExchanges = true;
evictedConnections.add(connection);
i.remove();
}
}
}
for (RealConnection connection : evictedConnections) {
closeQuietly(connection.socket());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
遍历连接池,如果连接上的数据流是空,那么就从连接池移除并且关闭。
我们回过头看下Transmitter的releaseConnectionNoEvents方法,也就第一个问号,如果连接不再接受新的数据流,就会调用这个方法:
//从连接上移除transmitter.
@Nullable Socket releaseConnectionNoEvents() {
assert (Thread.holdsLock(connectionPool));
int index = -1;
//遍历 此数据流依附的连接 上的所有数据流,找到index
for (int i = 0, size = this.connection.transmitters.size(); i < size; i++) {
Reference<Transmitter> reference = this.connection.transmitters.get(i);
if (reference.get() == this) {
index = i;
break;
}
}
if (index == -1) throw new IllegalStateException();
//transmitters移除此数据流
RealConnection released = this.connection;
released.transmitters.remove(index);
this.connection = null;
//如果连接上没有有数据流了,就置为空闲(等待清理),并返回待关闭的socket
if (released.transmitters.isEmpty()) {
released.idleAtNanos = System.nanoTime();
if (connectionPool.connectionBecameIdle(released)) {
return released.socket();
}
}
return null;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
主要就是尝试释放连接,连接上没有数据流就关闭socket等待被清理。
好了,到这里连接池的管理就分析完了。
从连接的查找 到 连接池的管理,就是ConnectInterceptor的内容了。
# CallServerInterceptor
哎呀,终于到最后一个拦截器了!
请求服务拦截器,也就是真正地去进行网络IO读写了——写入http请求的header和body数据、读取响应的header和body。
上面ConnectInterceptor主要介绍了如何 寻找连接 以及 连接池如何管理连接。在获取到连接后,调用了RealConnection的newCodec方法ExchangeCodec实例,然后使用ExchangeCodec实例创建了Exchange实例传入CallServerInterceptor了。上面提到过ExchangeCodec负责请求和响应的IO读写,我们先来看看ExchangeCodec创建过程——RealConnection的newCodec方法:
ExchangeCodec newCodec(OkHttpClient client, Interceptor.Chain chain) throws SocketException {
if (http2Connection != null) {
return new Http2ExchangeCodec(client, this, chain, http2Connection);
} else {
socket.setSoTimeout(chain.readTimeoutMillis());
source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
return new Http1ExchangeCodec(client, this, source, sink);
}
}
2
3
4
5
6
7
8
9
10
http2Connection不为空就创建Http2ExchangeCodec,否则是Http1ExchangeCodec。而http2Connection的创建是连接进行TCP、TLS握手的时候,即在RealConnection的connect方法中,具体就是connect方法中调用的establishProtocol方法:
private void establishProtocol(ConnectionSpecSelector connectionSpecSelector,
int pingIntervalMillis, Call call, EventListener eventListener) throws IOException {
//针对http请求,如果配置的协议包含Protocol.H2_PRIOR_KNOWLEDGE,则开启Http2连接
if (route.address().sslSocketFactory() == null) {
if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
socket = rawSocket;
protocol = Protocol.H2_PRIOR_KNOWLEDGE;
startHttp2(pingIntervalMillis);
return;
}
socket = rawSocket;
protocol = Protocol.HTTP_1_1;
return;
}
//针对https请求,会在TLS握手后,根据平台获取协议(),如果协议是Protocol.HTTP_2,则开启Http2连接
eventListener.secureConnectStart(call);
connectTls(connectionSpecSelector);
eventListener.secureConnectEnd(call, handshake);
if (protocol == Protocol.HTTP_2) {
startHttp2(pingIntervalMillis);
}
}
private void startHttp2(int pingIntervalMillis) throws IOException {
socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
http2Connection = new Http2Connection.Builder(true)
.socket(socket, route.address().url().host(), source, sink)
.listener(this)
.pingIntervalMillis(pingIntervalMillis)
.build();
http2Connection.start();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
好了,到这里不再深入了,继续了解可以参考HTTP 2.0与OkHttp。那么到这里,ExchangeCodec已经创建了,然后又包装成Exchange,最后传入了CallServerInterceptor。
下面就来看看这最后一个拦截器:
public final class CallServerInterceptor implements Interceptor {
private final boolean forWebSocket;
public CallServerInterceptor(boolean forWebSocket) {
this.forWebSocket = forWebSocket;
}
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Exchange exchange = realChain.exchange();//上个拦截器传入的exchange
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
//写请求头
exchange.writeRequestHeaders(request);
boolean responseHeadersStarted = false;
Response.Builder responseBuilder = null;
//含body的请求
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// 若请求头包含 "Expect: 100-continue" , 就会等服务端返回含有 "HTTP/1.1 100 Continue"的响应,然后再发送请求body.
//如果没有收到这个响应(例如收到的响应是4xx),那就不发送body了。
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
exchange.flushRequest();
responseHeadersStarted = true;
exchange.responseHeadersStart();
responseBuilder = exchange.readResponseHeaders(true);
}
//responseBuilder为null说明服务端返回了100,也就是可以继续发送body了
if (responseBuilder == null) {
if (request.body().isDuplex()) {//默认是false不会进入
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest();
BufferedSink bufferedRequestBody = Okio.buffer(
exchange.createRequestBody(request, true));
request.body().writeTo(bufferedRequestBody);
} else {
// 满足了 "Expect: 100-continue" ,写请求body
BufferedSink bufferedRequestBody = Okio.buffer(
exchange.createRequestBody(request, false));
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
}
} else {
//没有满足 "Expect: 100-continue" ,请求发送结束
exchange.noRequestBody();
if (!exchange.connection().isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection();
}
}
} else {
//没有body,请求发送结束
exchange.noRequestBody();
}
//请求发送结束
if (request.body() == null || !request.body().isDuplex()) {
exchange.finishRequest();
}
//回调 读响应头开始事件(如果上面没有)
if (!responseHeadersStarted) {
exchange.responseHeadersStart();
}
//读响应头(如果上面没有)
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(false);
}
//构建response
Response response = responseBuilder
.request(request)
.handshake(exchange.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (code == 100) {
//这里服务端又返回了个100,就再尝试获取真正的响应()
response = exchange.readResponseHeaders(false)
.request(request)
.handshake(exchange.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
code = response.code();
}
//回调读响应头结束
exchange.responseHeadersEnd(response);
//这里就是获取响应body了
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(exchange.openResponseBody(response))
.build();
}
//请求头中Connection是close,表示请求完成后要关闭连接
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
exchange.noNewExchangesOnConnection();
}
//204(无内容)、205(充值内容),body应该是空
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
你会发现,整个内容就是前面说的一句话:写入http请求的header和body、读取响应的header和body。这里就不再解释了。
这里我们可以看到,无论写请求还是读响应,都是使用Exchange对应的方法。上面也提到过Exchange理解上是对ExchangeCodec的包装,这写方法内部除了事件回调和一些参数获取外,核心工作都由 ExchangeCodec 对象完成,而 ExchangeCodec实际上利用的是 Okio,而 Okio 实际上还是用的 Socket。
ExchangeCodec的实现类 有对应Http1.1的Http1ExchangeCodec 和 对应Http2.0的Http2ExchangeCodec。其中Http2ExchangeCodec是使用Http2.0中 数据帧 的概念完成请求响应的读写。关于Http1ExchangeCodec、Http2ExchangeCodec具体实现原理涉及okio这不再展开。
最后一点,CallServerInterceptor的intercept方法中没有调用连接器链Chain的proceed方法,因为这是最后一个拦截器啦!
好了,到这里最后一个拦截器也分析完啦!
# 总结
本篇分析了ConnectInterceptor、CallServerInterceptor两个拦截器的作用和原理。ConnectInterceptor负责连接的获取,其中涉及到连接池的概念;CallServerInterceptor是真正的网络IO读写。ConnectInterceptor涉及的内容较多,它是Okhttp的核心。 结合上一篇,我们已经分析完了Okhttp内部所有的拦截器,最后给出Okhttp的整体架构图:
到这里,Okhttp源码解析部分就真的结束了,可真是一个漫长的过程! 从使用方式到工作流程,再到具体拦截器,掌握了这四篇文章的内容,应该说得上对Okhttp是比较熟悉了。这里还计划会出第五篇终章,来介绍一些Okhttp的常见问题和高级使用方式,敬请期待!
本文分享自微信公众号 - 胡飞洋(hfydwxgzh),作者:胡飞洋
原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。
原始发表时间:2020-06-15