okHttp源码分析
整体设计
OkHttp设计较为复杂,但是层次还算清晰,我把主要的类关系画了个图,大概是这样。
OkHttp的类较多,这里只描述下重要的类关系,可以看出OkHttp的主要功能都集中在Interceptor中,通过Interceptor完成构建请求,建立Socket连接,建立SSLSocket连接,证书校验等步骤。
我们看下Interceptor的调用过程:
在这个拦截器设计中,我们可以对Request进行操作,同样也可以对Response进行操作。当然,我们添加的Interceprot是在所有Interceptor之前,意味着Request最先操作,Response最后操作。
RealCall类的getResponseWithInterceptorChain是创建Interceptor的方法,可以看出okHttp共添加了5个Interceptor,它们在网络请求的不同阶段起着不同的作用,下面将详细分析。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List<Interceptor> interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors()); interceptors.add(retryAndFollowUpInterceptor); // 添加Http参数,处理Http返回 interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); // 添加NetworkInterceptor if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0, originalRequest, this, eventListener, client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis()); return chain.proceed(originalRequest); } |
OkHttpClient模块
OkHttpClient使用Builder进行构建,同时提供了一个newBuilder()的方法用来创建一个Builder,可以对原有OkHttpClient修改部分参数后创建一个新实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
OkHttpClient(Builder builder) { this.dispatcher = builder.dispatcher; this.proxy = builder.proxy; this.protocols = builder.protocols; this.connectionSpecs = builder.connectionSpecs; this.interceptors = Util.immutableList(builder.interceptors); this.networkInterceptors = Util.immutableList(builder.networkInterceptors); this.eventListenerFactory = builder.eventListenerFactory; this.proxySelector = builder.proxySelector; this.cookieJar = builder.cookieJar; this.cache = builder.cache; this.internalCache = builder.internalCache; this.socketFactory = builder.socketFactory; boolean isTLS = false; for (ConnectionSpec spec : connectionSpecs) { isTLS = isTLS || spec.isTls(); } if (builder.sslSocketFactory != null || !isTLS) { this.sslSocketFactory = builder.sslSocketFactory; this.certificateChainCleaner = builder.certificateChainCleaner; } else { X509TrustManager trustManager = Util.platformTrustManager(); this.sslSocketFactory = newSslSocketFactory(trustManager); this.certificateChainCleaner = CertificateChainCleaner.get(trustManager); } if (sslSocketFactory != null) { Platform.get().configureSslSocketFactory(sslSocketFactory); } this.hostnameVerifier = builder.hostnameVerifier; this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner( certificateChainCleaner); this.proxyAuthenticator = builder.proxyAuthenticator; this.authenticator = builder.authenticator; this.connectionPool = builder.connectionPool; this.dns = builder.dns; this.followSslRedirects = builder.followSslRedirects; this.followRedirects = builder.followRedirects; this.retryOnConnectionFailure = builder.retryOnConnectionFailure; this.connectTimeout = builder.connectTimeout; this.readTimeout = builder.readTimeout; this.writeTimeout = builder.writeTimeout; this.pingInterval = builder.pingInterval; if (interceptors.contains(null)) { throw new IllegalStateException("Null interceptor: " + interceptors); } if (networkInterceptors.contains(null)) { throw new IllegalStateException("Null network interceptor: " + networkInterceptors); } } |
OkHttpClient同时实现了CallFactory接口,这个是发起网络请求的关键类。 CallFactory
1 2 3 4 |
interface Factory { Call newCall(Request request); } |
OkHttpClient中
1 2 3 4 5 6 7 8 |
/** * Prepares the {@code request} to be executed at some point in the future. */ @Override public Call newCall(Request request) { return RealCall.newRealCall(this, request, false /* for web socket */); } |
RealCall中
1 2 3 4 5 6 7 |
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { // Safely publish the Call instance to the EventListener. RealCall call = new RealCall(client, originalRequest, forWebSocket); call.eventListener = client.eventListenerFactory().create(call); return call; } |
Router路由选择模块
我们知道,发起网络请求首先涉及到DNS解析的过程,okHttp对这个DNS解析过程了优化,引入了Router的概念,对失效的IP地址进行缓存,一定程度上提高了DNS解析速度。
我们先看下OkHttp的DNS查询逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
public interface Dns { /** * A DNS that uses {@link InetAddress#getAllByName} to ask the underlying operating system to * lookup IP addresses. Most custom {@link Dns} implementations should delegate to this instance. */ Dns SYSTEM = new Dns() { @Override public List<InetAddress> lookup(String hostname) throws UnknownHostException { if (hostname == null) throw new UnknownHostException("hostname == null"); try { return Arrays.asList(InetAddress.getAllByName(hostname)); } catch (NullPointerException e) { UnknownHostException unknownHostException = new UnknownHostException("Broken system behaviour for dns lookup of " + hostname); unknownHostException.initCause(e); throw unknownHostException; } } }; /** * Returns the IP addresses of {@code hostname}, in the order they will be attempted by OkHttp. If * a connection to an address fails, OkHttp will retry the connection with the next address until * either a connection is made, the set of IP addresses is exhausted, or a limit is exceeded. */ List<InetAddress> lookup(String hostname) throws UnknownHostException; } |
核心是调用InetAddress.getAllByName(hostname)
进行查询。
建立连接这个过程中是在ConnectInterceptor
中完成的,我们看下ConnectInterceptor的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
/** Opens a connection to the target server and proceeds to the next interceptor. */ public final class ConnectInterceptor implements Interceptor { public final OkHttpClient client; public ConnectInterceptor(OkHttpClient client) { this.client = client; } @Override public Response intercept(Chain chain) throws IOException { System.out.println("ConnectIntercept #intercept"); RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); StreamAllocation streamAllocation = realChain.streamAllocation(); // We need the network to satisfy this request. Possibly for validating a conditional GET. boolean doExtensiveHealthChecks = !request.method().equals("GET"); // 发起网络请求,将数据放置在HttpCodec中 HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks); RealConnection connection = streamAllocation.connection(); return realChain.proceed(request, streamAllocation, httpCodec, connection); } } |
调用到StreamAllocation的newStream方法,StreamAllocation中就涉及到DNS解析->Router选择->建立Socket/SSLSocket连接的过程了。
下面看下Router模块的类关系。
StreamAllocation中通过RouteSelector获取所有IP地址,然后从ConnectionPool中查找可复用的连接,如果没有,则使用第一个Route创建RealConnection对象建立连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException { boolean foundPooledConnection = false; RealConnection result = null; Route selectedRoute = null; Connection releasedConnection; Socket toClose; synchronized (connectionPool) { if (released) throw new IllegalStateException("released"); if (codec != null) throw new IllegalStateException("codec != null"); if (canceled) throw new IOException("Canceled"); // Attempt to use an already-allocated connection. We need to be careful here because our // already-allocated connection may have been restricted from creating new streams. releasedConnection = this.connection; toClose = releaseIfNoNewStreams(); if (this.connection != null) { // We had an already-allocated connection and it's good. result = this.connection; releasedConnection = null; } if (!reportedAcquired) { // If the connection was never reported acquired, don't report it as released! releasedConnection = null; } if (result == null) { // Attempt to get a connection from the pool. // 从 ConnectionPools里获取Connection Internal.instance.get(connectionPool, address, this, null); if (connection != null) { foundPooledConnection = true; result = connection; } else { selectedRoute = route; } } } closeQuietly(toClose); if (releasedConnection != null) { eventListener.connectionReleased(call, releasedConnection); } if (foundPooledConnection) { eventListener.connectionAcquired(call, result); } if (result != null) { // If we found an already-allocated or pooled connection, we're done. return result; } // If we need a route selection, make one. This is a blocking operation. boolean newRouteSelection = false; if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) { newRouteSelection = true; routeSelection = routeSelector.next(); } synchronized (connectionPool) { if (canceled) throw new IOException("Canceled"); if (newRouteSelection) { // Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. This could match due to connection coalescing. List<Route> routes = routeSelection.getAll(); for (int i = 0, size = routes.size(); i < size; i++) { Route route = routes.get(i); Internal.instance.get(connectionPool, address, this, route); if (connection != null) { foundPooledConnection = true; result = connection; this.route = route; break; } } } if (!foundPooledConnection) { if (selectedRoute == null) { selectedRoute = routeSelection.next(); } // Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. route = selectedRoute; refusedStreamCount = 0; // 创建新的Connection result = new RealConnection(connectionPool, selectedRoute); acquire(result, false); } } // If we found a pooled connection on the 2nd time around, we're done. // 找寻到之前建立的Http连接, 直接返回,不需要重新三次握手 if (foundPooledConnection) { eventListener.connectionAcquired(call, result); return result; } // Do TCP + TLS handshakes. This is a blocking operation. // 开始TCP和TLS握手 result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener); routeDatabase().connected(result.route()); Socket socket = null; synchronized (connectionPool) { reportedAcquired = true; // Pool the connection.,将Connection 缓存到ConnectionPools中 Internal.instance.put(connectionPool, result); // If another multiplexed connection to the same address was created concurrently, then // release this connection and acquire that one. if (result.isMultiplexed()) { socket = Internal.instance.deduplicate(connectionPool, address, this); result = connection; } } closeQuietly(socket); eventListener.connectionAcquired(call, result); return result; } |
RealConnection Http(s)网络请求模块
前面的StreamAllocation的分析中可以看出,如果在ConnectionPools中没有取出可以复用的Connection那么会重新创建一个Connection,即Socket连接,下面我们看下创建过程。
1 2 3 4 |
// StreamAllocation 的findConnection中, result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener); |
建立连接需要两个步骤,建立Socket连接;如果是Https的话,还需要建立一个SSLSocket连接,我们先来看下建立Socket连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
private void connectSocket(int connectTimeout, int readTimeout, Call call, EventListener eventListener) throws IOException { Proxy proxy = route.proxy(); Address address = route.address(); rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP ? address.socketFactory().createSocket() : new Socket(proxy); eventListener.connectStart(call, route.socketAddress(), proxy); rawSocket.setSoTimeout(readTimeout); try { Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout); } catch (ConnectException e) { ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress()); ce.initCause(e); throw ce; } // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0 // More details: // https://github.com/square/okhttp/issues/3245 // https://android-review.googlesource.com/#/c/271775/ try { // source会包装成Response返回给上层调用 source = Okio.buffer(Okio.source(rawSocket)); sink = Okio.buffer(Okio.sink(rawSocket)); } catch (NullPointerException npe) { if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) { throw new IOException(npe); } } } |
下面看下建立SSLSocket连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
// TCP/Socket连接建立完毕后,进行一次TLS握手 private void connectTls(ConnectionSpecSelector connectionSpecSelector) throws IOException { Address address = route.address(); SSLSocketFactory sslSocketFactory = address.sslSocketFactory(); boolean success = false; SSLSocket sslSocket = null; try { // Create the wrapper over the connected socket. sslSocket = (SSLSocket) sslSocketFactory.createSocket( rawSocket, address.url().host(), address.url().port(), true /* autoClose */); // Configure the socket's ciphers, TLS versions, and extensions. ConnectionSpec connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket); if (connectionSpec.supportsTlsExtensions()) { Platform.get().configureTlsExtensions( sslSocket, address.url().host(), address.protocols()); } // 开始进行TLS握手 // Force handshake. This can throw! sslSocket.startHandshake(); // block for session establishment SSLSession sslSocketSession = sslSocket.getSession(); Handshake unverifiedHandshake = Handshake.get(sslSocketSession); // TLS握手后,获取服务器端证书,然后进行HostnameVerifier校验 // Verify that the socket's certificates are acceptable for the target host. if (!address.hostnameVerifier().verify(address.url().host(), sslSocketSession)) { X509Certificate cert = (X509Certificate) unverifiedHandshake.peerCertificates().get(0); throw new SSLPeerUnverifiedException("Hostname " + address.url().host() + " not verified:" + "\n certificate: " + CertificatePinner.pin(cert) + "\n DN: " + cert.getSubjectDN().getName() + "\n subjectAltNames: " + OkHostnameVerifier.allSubjectAltNames(cert)); } // Check that the certificate pinner is satisfied by the certificates presented. address.certificatePinner().check(address.url().host(), unverifiedHandshake.peerCertificates()); // Success! Save the handshake and the ALPN protocol. String maybeProtocol = connectionSpec.supportsTlsExtensions() ? Platform.get().getSelectedProtocol(sslSocket) : null; socket = sslSocket; source = Okio.buffer(Okio.source(socket)); sink = Okio.buffer(Okio.sink(socket)); handshake = unverifiedHandshake; protocol = maybeProtocol != null ? Protocol.get(maybeProtocol) : Protocol.HTTP_1_1; success = true; } catch (AssertionError e) { if (Util.isAndroidGetsocknameError(e)) throw new IOException(e); throw e; } finally { if (sslSocket != null) { Platform.get().afterHandshake(sslSocket); } if (!success) { closeQuietly(sslSocket); } } } |
socket会被包装成sslSocket,source,sink都会被重新生成。
HttpCodec/Response模块
前面介绍了建立Socket连接,路由选择等过程,但是实际上解析网络流数据是通过HttpCodec进行的,先看下HttpCodec的定义。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
public interface HttpCodec { /** * The timeout to use while discarding a stream of input data. Since this is used for connection * reuse, this timeout should be significantly less than the time it takes to establish a new * connection. */ int DISCARD_STREAM_TIMEOUT_MILLIS = 100; /** Returns an output stream where the request body can be streamed. */ Sink createRequestBody(Request request, long contentLength); /** This should update the HTTP engine's sentRequestMillis field. */ void writeRequestHeaders(Request request) throws IOException; /** Flush the request to the underlying socket. */ void flushRequest() throws IOException; /** Flush the request to the underlying socket and signal no more bytes will be transmitted. */ void finishRequest() throws IOException; /** * Parses bytes of a response header from an HTTP transport. * * @param expectContinue true to return null if this is an intermediate response with a "100" * response code. Otherwise this method never returns null. */ Response.Builder readResponseHeaders(boolean expectContinue) throws IOException; /** Returns a stream that reads the response body. */ ResponseBody openResponseBody(Response response) throws IOException; /** * Cancel this stream. Resources held by this stream will be cleaned up, though not synchronously. * That may happen later by the connection pool thread. */ void cancel(); } |
HttpCodec有两个实现分别为Http1Codec和Http2Codec。
HttpCodec实例是通过RealConnection中进行获取的,最上层的调用是在ConnectInterceptor中。
1 2 3 4 5 6 7 8 9 10 11 12 |
public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain, StreamAllocation streamAllocation) throws SocketException { if (http2Connection != null) { return new Http2Codec(client, chain, streamAllocation, http2Connection); } else { socket.setSoTimeout(chain.readTimeoutMillis()); source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS); sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS); return new Http1Codec(client, streamAllocation, source, sink); } } |
我们先分析下Http1.1版本的HttpCodec实现,可以看出HttpCodec是将OkHttpClient,StreamAllocation,Source , Sink都封装到一起了。
然后ConnectIntercptor将HttpCodec实例传给Chain,让下一个拦截器记性处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
@Override public Response intercept(Chain chain) throws IOException { System.out.println("ConnectIntercept #intercept"); RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); StreamAllocation streamAllocation = realChain.streamAllocation(); // We need the network to satisfy this request. Possibly for validating a conditional GET. boolean doExtensiveHealthChecks = !request.method().equals("GET"); // 发起网络请求,将数据放置在HttpCodec中 HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks); RealConnection connection = streamAllocation.connection(); return realChain.proceed(request, streamAllocation, httpCodec, connection); } |
HttpCodec真正的调用是在CallServerInterceptor中,通过HttpCodec,将RequestBody写入输入流,同时解码网络传输的数据。
CallServerInterceptor的核心逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
@Override public Response intercept(Chain chain) throws IOException { System.out.println("CallServerIntercept #intercept"); RealInterceptorChain realChain = (RealInterceptorChain) chain; HttpCodec httpCodec = realChain.httpStream(); StreamAllocation streamAllocation = realChain.streamAllocation(); RealConnection connection = (RealConnection) realChain.connection(); Request request = realChain.request(); long sentRequestMillis = System.currentTimeMillis(); realChain.eventListener().requestHeadersStart(realChain.call()); httpCodec.writeRequestHeaders(request); realChain.eventListener().requestHeadersEnd(realChain.call(), request); Response.Builder responseBuilder = null; // 是否允许携带Request body if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 // Continue" response before transmitting the request body. If we don't get that, return // what we did get (such as a 4xx response) without ever transmitting the request body. if ("100-continue".equalsIgnoreCase(request.header("Expect"))) { httpCodec.flushRequest(); realChain.eventListener().responseHeadersStart(realChain.call()); responseBuilder = httpCodec.readResponseHeaders(true); } if (responseBuilder == null) { // Write the request body if the "Expect: 100-continue" expectation was met. realChain.eventListener().requestBodyStart(realChain.call()); long contentLength = request.body().contentLength(); CountingSink requestBodyOut = new CountingSink(httpCodec.createRequestBody(request, contentLength)); BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut); request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); realChain.eventListener() .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount); } else if (!connection.isMultiplexed()) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection // from being reused. Otherwise we're still obligated to transmit the request body to // leave the connection in a consistent state. streamAllocation.noNewStreams(); } } httpCodec.finishRequest(); if (responseBuilder == null) { realChain.eventListener().responseHeadersStart(realChain.call()); responseBuilder = httpCodec.readResponseHeaders(false); } Response response = responseBuilder .request(request) .handshake(streamAllocation.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); int code = response.code(); if (code == 100) { // server sent a 100-continue even though we did not request one. // try again to read the actual response responseBuilder = httpCodec.readResponseHeaders(false); response = responseBuilder .request(request) .handshake(streamAllocation.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); code = response.code(); } realChain.eventListener() .responseHeadersEnd(realChain.call(), response); if (forWebSocket && code == 101) { // Connection is upgrading, but we need to ensure interceptors see a non-null response body. response = response.newBuilder() .body(Util.EMPTY_RESPONSE) .build(); } else { // 创建ResponseBody response = response.newBuilder() .body(httpCodec.openResponseBody(response)) .build(); } if ("close".equalsIgnoreCase(response.request().header("Connection")) || "close".equalsIgnoreCase(response.header("Connection"))) { streamAllocation.noNewStreams(); } if ((code == 204 || code == 205) && response.body().contentLength() > 0) { throw new ProtocolException( "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength()); } return response; } |
上面逻辑中,通过HttpCodec的openResponseBody获取ResponseBody实例,这个ResponseBody会返回给上层调用者。
看下Http1Codec的openResponseBody实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
@Override public ResponseBody openResponseBody(Response response) throws IOException { streamAllocation.eventListener.responseBodyStart(streamAllocation.call); String contentType = response.header("Content-Type"); // 返回FixedLengthSource if (!HttpHeaders.hasBody(response)) { Source source = newFixedLengthSource(0); return new RealResponseBody(contentType, 0, Okio.buffer(source)); } // 返回ChunkedSource if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) { Source source = newChunkedSource(response.request().url()); return new RealResponseBody(contentType, -1L, Okio.buffer(source)); } long contentLength = HttpHeaders.contentLength(response); if (contentLength != -1) { Source source = newFixedLengthSource(contentLength); return new RealResponseBody(contentType, contentLength, Okio.buffer(source)); } return new RealResponseBody(contentType, -1L, Okio.buffer(newUnknownLengthSource())); } |
这个openReponseBody的作用是将HttpCodec的 Source进行包装,然后返回给上层调用。
这个时候ResponseBody的Source类型实际是对Socket的多重包装,所以,当我们不再需要这个Response的数据时,需要将source关闭,否则这个socket可能就泄漏了。
Socket也算一个FD类型,如果FD泄漏达到上限,会触发Crash。
ConnectionPool 连接池模块
ConnectionPool的作用是将Socket连接进行缓存,如果两次请求的同一个Host的话,可以复用之前的连接,减少TCP握手和TLS握手。
在StreamAllocation的findConnection中,会尝试通过ConnecitonPool获取已经缓存的Connection,获取之前会有个比较逻辑,具体是Connection的isEligible方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
/** * Returns true if this connection can carry a stream allocation to {@code address}. If non-null * {@code route} is the resolved route for a connection. */ public boolean isEligible(Address address, @Nullable Route route) { // If this connection is not accepting new streams, we're done. // 该连接是否正在使用 if (allocations.size() >= allocationLimit || noNewStreams) return false; // If the non-host fields of the address don't overlap, we're done. if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false; // HOST相同,直接使用之前的连接 // If the host exactly matches, we're done: this connection can carry the address. if (address.url().host().equals(this.route().address().url().host())) { return true; // This connection is a perfect match. } // At this point we don't have a hostname match. But we still be able to carry the request if // our connection coalescing requirements are met. See also: // https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding // https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/ // 1. This connection must be HTTP/2. if (http2Connection == null) return false; // 2. The routes must share an IP address. This requires us to have a DNS address for both // hosts, which only happens after route planning. We can't coalesce connections that use a // proxy, since proxies don't tell us the origin server's IP address. if (route == null) return false; if (route.proxy().type() != Proxy.Type.DIRECT) return false; if (this.route.proxy().type() != Proxy.Type.DIRECT) return false; if (!this.route.socketAddress().equals(route.socketAddress())) return false; // 3. This connection's server certificate's must cover the new host. if (route.address().hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false; if (!supportsUrl(address.url())) return false; // 4. Certificate pinning must match the host. try { address.certificatePinner().check(address.url().host(), handshake().peerCertificates()); } catch (SSLPeerUnverifiedException e) { return false; } return true; // The caller's address can be carried by this connection. } |
可以看出,Http2的规则和Http1的有所不同。 针对Http1.1来说,满足两个条件即可复用Connection。
- 没有正在使用的Http连接
- Host相同
这样的话,其实是忽略了Keep-Alive字段了,如果Http(s)连接满足上面的条件,都可以拿来复用。
ConnectionPools不会无限缓存连接,会有时间限制和个数限制,并且超过时间会自动清理连接池中的连接。