OkHttp3 HTTP请求执行流程分析
OkHttp3的基本用法
使用OkHttp3發送Http請求并獲得響應的過程大體為:
通過一段示例代碼來看一下具體要如何操作:
private void startRequestWithOkHttp3(String url) {//創建okHttpClient對象okhttp3.OkHttpClient mOkHttpClient = new okhttp3.OkHttpClient();//創建一個Requestfinal okhttp3.Request request = new okhttp3.Request.Builder().url(url).build();//new callokhttp3.Call call = mOkHttpClient.newCall(request);//請求加入調度call.enqueue(new okhttp3.Callback() {@Overridepublic void onFailure(okhttp3.Call call, final IOException e) {mTextScreen.post(new Runnable() {@Overridepublic void run() {mTextScreen.setText(e.toString());}});}@Overridepublic void onResponse(okhttp3.Call call, okhttp3.Response response) throws IOException {final String str = response.body().string();mTextScreen.post(new Runnable() {@Overridepublic void run() {mTextScreen.setText(str);}});}});}Call的執行
(后面所有的代碼分析都基于正式的發布版本3.4.0進行,'com.squareup.okhttp3:okhttp:3.4.0',由于OkHttp當前依然處于比較活躍的開發狀態,因而不同版本的內部實現相對于我們當前分析的這一版有可能會有比較大的變化。)
Call是一個接口,其定義如下:
package okhttp3;import java.io.IOException;/*** A call is a request that has been prepared for execution. A call can be canceled. As this object* represents a single request/response pair (stream), it cannot be executed twice.*/ public interface Call {/** Returns the original request that initiated this call. */Request request();/*** Invokes the request immediately, and blocks until the response can be processed or is in* error.** <p>The caller may read the response body with the response's {@link Response#body} method. To* avoid leaking resources callers must {@linkplain ResponseBody close the response body}.** <p>Note that transport-layer success (receiving a HTTP response code, headers and body) does* not necessarily indicate application-layer success: {@code response} may still indicate an* unhappy HTTP response code like 404 or 500.** @throws IOException if the request could not be executed due to cancellation, a connectivity* problem or timeout. Because networks can fail during an exchange, it is possible that the* remote server accepted the request before the failure.* @throws IllegalStateException when the call has already been executed.*/Response execute() throws IOException;/*** Schedules the request to be executed at some point in the future.** <p>The {@link OkHttpClient#dispatcher dispatcher} defines when the request will run: usually* immediately unless there are several other requests currently being executed.** <p>This client will later call back {@code responseCallback} with either an HTTP response or a* failure exception.** @throws IllegalStateException when the call has already been executed.*/void enqueue(Callback responseCallback);/** Cancels the request, if possible. Requests that are already complete cannot be canceled. */void cancel();/*** Returns true if this call has been either {@linkplain #execute() executed} or {@linkplain* #enqueue(Callback) enqueued}. It is an error to execute a call more than once.*/boolean isExecuted();boolean isCanceled();interface Factory {Call newCall(Request request);} }Call中還定義了一個Factory接口。
那在OkHttp中,我們調用的Call方法的實際執行過程是怎樣的呢?這就需要扒出來在OkHttp中實際使用的Call實現了。OkHttpClient實現了Call.Factory接口,通過接口方法OkHttpClient.newCall()可以看到具體使用的Call實現是哪個類:
/*** Prepares the {@code request} to be executed at some point in the future.*/@Override public Call newCall(Request request) {return new RealCall(this, request);}在OkHttp中使用了RealCall來執行整個Http請求。
package okhttp3;import java.io.IOException; import java.util.ArrayList; import java.util.List; import okhttp3.internal.NamedRunnable; import okhttp3.internal.cache.CacheInterceptor; import okhttp3.internal.connection.ConnectInterceptor; import okhttp3.internal.connection.StreamAllocation; import okhttp3.internal.http.BridgeInterceptor; import okhttp3.internal.http.CallServerInterceptor; import okhttp3.internal.http.RealInterceptorChain; import okhttp3.internal.http.RetryAndFollowUpInterceptor; import okhttp3.internal.platform.Platform;import static okhttp3.internal.platform.Platform.INFO;final class RealCall implements Call {private final OkHttpClient client;private final RetryAndFollowUpInterceptor retryAndFollowUpInterceptor;// Guarded by this.private boolean executed;/** The application's original request unadulterated by redirects or auth headers. */Request originalRequest;protected RealCall(OkHttpClient client, Request originalRequest) {this.client = client;this.originalRequest = originalRequest;this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client);}@Override public Request request() {return originalRequest;}@Override public Response execute() throws IOException {synchronized (this) {if (executed) throw new IllegalStateException("Already Executed");executed = true;}try {client.dispatcher().executed(this);Response result = getResponseWithInterceptorChain();if (result == null) throw new IOException("Canceled");return result;} finally {client.dispatcher().finished(this);}}synchronized void setForWebSocket() {if (executed) throw new IllegalStateException("Already Executed");this.retryAndFollowUpInterceptor.setForWebSocket(true);}@Override public void enqueue(Callback responseCallback) {synchronized (this) {if (executed) throw new IllegalStateException("Already Executed");executed = true;}client.dispatcher().enqueue(new AsyncCall(responseCallback));}@Override public void cancel() {retryAndFollowUpInterceptor.cancel();}@Override public synchronized boolean isExecuted() {return executed;}@Override public boolean isCanceled() {return retryAndFollowUpInterceptor.isCanceled();}StreamAllocation streamAllocation() {return retryAndFollowUpInterceptor.streamAllocation();}final class AsyncCall extends NamedRunnable {private final Callback responseCallback;private AsyncCall(Callback responseCallback) {super("OkHttp %s", redactedUrl().toString());this.responseCallback = responseCallback;}String host() {return originalRequest.url().host();}Request request() {return originalRequest;}RealCall get() {return RealCall.this;}@Override protected void execute() {boolean signalledCallback = false;try {Response response = getResponseWithInterceptorChain();if (retryAndFollowUpInterceptor.isCanceled()) {signalledCallback = true;responseCallback.onFailure(RealCall.this, new IOException("Canceled"));} else {signalledCallback = true;responseCallback.onResponse(RealCall.this, response);}} catch (IOException e) {if (signalledCallback) {// Do not signal the callback twice!Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);} else {responseCallback.onFailure(RealCall.this, e);}} finally {client.dispatcher().finished(this);}}}/*** Returns a string that describes this call. Doesn't include a full URL as that might contain* sensitive information.*/private String toLoggableString() {String string = retryAndFollowUpInterceptor.isCanceled() ? "canceled call" : "call";return string + " to " + redactedUrl();}HttpUrl redactedUrl() {return originalRequest.url().resolve("/...");}private Response getResponseWithInterceptorChain() throws IOException {// Build a full stack of interceptors.List<Interceptor> interceptors = new ArrayList<>();interceptors.addAll(client.interceptors());interceptors.add(retryAndFollowUpInterceptor);interceptors.add(new BridgeInterceptor(client.cookieJar()));interceptors.add(new CacheInterceptor(client.internalCache()));interceptors.add(new ConnectInterceptor(client));if (!retryAndFollowUpInterceptor.isForWebSocket()) {interceptors.addAll(client.networkInterceptors());}interceptors.add(new CallServerInterceptor(retryAndFollowUpInterceptor.isForWebSocket()));Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0, originalRequest);return chain.proceed(originalRequest);} }通過調用RealCall.execute()同步執行Http請求的過程大體為:
通過調用RealCall.enqueue()異步執行Http請求的過程則為,創建AsyncCall并將之丟給client的dispatcher。而在RealCall.AsyncCall的execute()中執行Http請求的過程與RealCall.execute()中的過程有些類似:
這里再通過Dispatcher的定義來看一下在OkHttp中,請求的執行管理及異步執行是怎么做的:
package okhttp3;import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.Deque; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import okhttp3.RealCall.AsyncCall; import okhttp3.internal.Util;/*** Policy on when async requests are executed.** <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your* own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number* of calls concurrently.*/ public final class Dispatcher {private int maxRequests = 64;private int maxRequestsPerHost = 5;private Runnable idleCallback;/** Executes calls. Created lazily. */private ExecutorService executorService;/** Ready async calls in the order they'll be run. */private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();/** Running synchronous calls. Includes canceled calls that haven't finished yet. */private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();public Dispatcher(ExecutorService executorService) {this.executorService = executorService;}public Dispatcher() {}public synchronized ExecutorService executorService() {if (executorService == null) {executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));}return executorService;}/*** Set the maximum number of requests to execute concurrently. Above this requests queue in* memory, waiting for the running calls to complete.** <p>If more than {@code maxRequests} requests are in flight when this is invoked, those requests* will remain in flight.*/public synchronized void setMaxRequests(int maxRequests) {if (maxRequests < 1) {throw new IllegalArgumentException("max < 1: " + maxRequests);}this.maxRequests = maxRequests;promoteCalls();}public synchronized int getMaxRequests() {return maxRequests;}/*** Set the maximum number of requests for each host to execute concurrently. This limits requests* by the URL's host name. Note that concurrent requests to a single IP address may still exceed* this limit: multiple hostnames may share an IP address or be routed through the same HTTP* proxy.** <p>If more than {@code maxRequestsPerHost} requests are in flight when this is invoked, those* requests will remain in flight.*/public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {if (maxRequestsPerHost < 1) {throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);}this.maxRequestsPerHost = maxRequestsPerHost;promoteCalls();}public synchronized int getMaxRequestsPerHost() {return maxRequestsPerHost;}/*** Set a callback to be invoked each time the dispatcher becomes idle (when the number of running* calls returns to zero).** <p>Note: The time at which a {@linkplain Call call} is considered idle is different depending* on whether it was run {@linkplain Call#enqueue(Callback) asynchronously} or* {@linkplain Call#execute() synchronously}. Asynchronous calls become idle after the* {@link Callback#onResponse onResponse} or {@link Callback#onFailure onFailure} callback has* returned. Synchronous calls become idle once {@link Call#execute() execute()} returns. This* means that if you are doing synchronous calls the network layer will not truly be idle until* every returned {@link Response} has been closed.*/public synchronized void setIdleCallback(Runnable idleCallback) {this.idleCallback = idleCallback;}synchronized void enqueue(AsyncCall call) {if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {runningAsyncCalls.add(call);executorService().execute(call);} else {readyAsyncCalls.add(call);}}/*** Cancel all calls currently enqueued or executing. Includes calls executed both {@linkplain* Call#execute() synchronously} and {@linkplain Call#enqueue asynchronously}.*/public synchronized void cancelAll() {for (AsyncCall call : readyAsyncCalls) {call.get().cancel();}for (AsyncCall call : runningAsyncCalls) {call.get().cancel();}for (RealCall call : runningSyncCalls) {call.cancel();}}private void promoteCalls() {if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {AsyncCall call = i.next();if (runningCallsForHost(call) < maxRequestsPerHost) {i.remove();runningAsyncCalls.add(call);executorService().execute(call);}if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.}}/** Returns the number of running calls that share a host with {@code call}. */private int runningCallsForHost(AsyncCall call) {int result = 0;for (AsyncCall c : runningAsyncCalls) {if (c.host().equals(call.host())) result++;}return result;}/** Used by {@code Call#execute} to signal it is in-flight. */synchronized void executed(RealCall call) {runningSyncCalls.add(call);}/** Used by {@code AsyncCall#run} to signal completion. */void finished(AsyncCall call) {finished(runningAsyncCalls, call, true);}/** Used by {@code Call#execute} to signal completion. */void finished(RealCall call) {finished(runningSyncCalls, call, false);}private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {int runningCallsCount;Runnable idleCallback;synchronized (this) {if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");if (promoteCalls) promoteCalls();runningCallsCount = runningCallsCount();idleCallback = this.idleCallback;}if (runningCallsCount == 0 && idleCallback != null) {idleCallback.run();}}/** Returns a snapshot of the calls currently awaiting execution. */public synchronized List<Call> queuedCalls() {List<Call> result = new ArrayList<>();for (AsyncCall asyncCall : readyAsyncCalls) {result.add(asyncCall.get());}return Collections.unmodifiableList(result);}/** Returns a snapshot of the calls currently being executed. */public synchronized List<Call> runningCalls() {List<Call> result = new ArrayList<>();result.addAll(runningSyncCalls);for (AsyncCall asyncCall : runningAsyncCalls) {result.add(asyncCall.get());}return Collections.unmodifiableList(result);}public synchronized int queuedCallsCount() {return readyAsyncCalls.size();}public synchronized int runningCallsCount() {return runningAsyncCalls.size() + runningSyncCalls.size();} }在Call的同步執行過程中,調用client.dispatcher().executed(this)向client的dispatcher注冊當前Call,Dispatcher僅僅是將Call放進了runningSyncCalls,其它便什么也沒做,目測同步執行Call時向Dispatcher注冊的主要目的是方便全局性的cancel所有的Call。
Dispatcher中異步的AsyncCall是被放在一個ExecutorService中執行的。默認情況下,這是一個不限容量的線程池。但Dispatcher會限制每個host同時執行的最大請求數量,默認為5,同時也會限制同時執行的總的最大請求數量。runningAsyncCalls中保存所有正在被ExecutorService執行的AsyncCall,而readyAsyncCalls則用于存放由于對單個host同時執行的最大請求數量的限制,或總的同時執行最大請求數量的限制,而暫時得不到執行的AsyncCall。
finished()中,除了會將執行結束的AsyncCall從runningAsyncCalls移除之外,還會檢查是否存在由于 單host同時進行的最大請求數量限制 或 總的同時執行最大請求數量限制,而暫時得不到執行的AsyncCall,若存在則滿足限制條件的請求會被執行。
所有的Call,不管是異步的AsyncCall還是同步的Call在執行結束后都會檢查是否沒有正在進行的Http請求了。若沒有了,則存在idle 回調時,該回調會被調用。
用戶可以通過Dispatcher的構造函數來定制ExecutorService,這需要通過OkHttpClient.Builder在OkHttpClient的構建過程中間接的做到。
回到RealCall,繼續來追Call的網絡請求及響應處理。來看一下RealCall.getResponseWithInterceptorChain():
private Response getResponseWithInterceptorChain() throws IOException {// Build a full stack of interceptors.List<Interceptor> interceptors = new ArrayList<>();interceptors.addAll(client.interceptors());interceptors.add(retryAndFollowUpInterceptor);interceptors.add(new BridgeInterceptor(client.cookieJar()));interceptors.add(new CacheInterceptor(client.internalCache()));interceptors.add(new ConnectInterceptor(client));if (!retryAndFollowUpInterceptor.isForWebSocket()) {interceptors.addAll(client.networkInterceptors());}interceptors.add(new CallServerInterceptor(retryAndFollowUpInterceptor.isForWebSocket()));Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0, originalRequest);return chain.proceed(originalRequest);}這里主要是創建了一個Interceptor的列表,繼而創建了一個Interceptor.Chain對象來處理請求并獲得響應。我們繼續追蹤一下RealInterceptorChain:
package okhttp3.internal.http;import java.io.IOException; import java.util.List; import okhttp3.Connection; import okhttp3.HttpUrl; import okhttp3.Interceptor; import okhttp3.Request; import okhttp3.Response; import okhttp3.internal.connection.StreamAllocation;/*** A concrete interceptor chain that carries the entire interceptor chain: all application* interceptors, the OkHttp core, all network interceptors, and finally the network caller.*/ public final class RealInterceptorChain implements Interceptor.Chain {private final List<Interceptor> interceptors;private final StreamAllocation streamAllocation;private final HttpStream httpStream;private final Connection connection;private final int index;private final Request request;private int calls;public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,HttpStream httpStream, Connection connection, int index, Request request) {this.interceptors = interceptors;this.connection = connection;this.streamAllocation = streamAllocation;this.httpStream = httpStream;this.index = index;this.request = request;}@Override public Connection connection() {return connection;}public StreamAllocation streamAllocation() {return streamAllocation;}public HttpStream httpStream() {return httpStream;}@Override public Request request() {return request;}@Override public Response proceed(Request request) throws IOException {return proceed(request, streamAllocation, httpStream, connection);}public Response proceed(Request request, StreamAllocation streamAllocation, HttpStream httpStream,Connection connection) throws IOException {if (index >= interceptors.size()) throw new AssertionError();calls++;// If we already have a stream, confirm that the incoming request will use it.if (this.httpStream != null && !sameConnection(request.url())) {throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)+ " must retain the same host and port");}// If we already have a stream, confirm that this is the only call to chain.proceed().if (this.httpStream != null && calls > 1) {throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)+ " must call proceed() exactly once");}// Call the next interceptor in the chain.RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpStream, connection, index + 1, request);Interceptor interceptor = interceptors.get(index);Response response = interceptor.intercept(next);// Confirm that the next interceptor made its required call to chain.proceed().if (httpStream != null && index + 1 < interceptors.size() && next.calls != 1) {throw new IllegalStateException("network interceptor " + interceptor+ " must call proceed() exactly once");}// Confirm that the intercepted response isn't null.if (response == null) {throw new NullPointerException("interceptor " + interceptor + " returned null");}return response;}private boolean sameConnection(HttpUrl url) {return url.host().equals(connection.route().address().url().host())&& url.port() == connection.route().address().url().port();} }在RealInterceptorChain.proceed()中,除了對狀態及獲取的reponse做檢查之外,最主要的事情即是構造新的RealInterceptorChain對象,獲取對應Interceptor,并調用Interceptor的intercept(next)了。在這里,index充當迭代器或指示器的角色,用于指出當前正在處理的Interceptor。
RealInterceptorChain + Interceptor實現了裝飾器模式,實現了請求/響應的串式或流式處理。只不過內層裝飾器不是外層裝飾器的成員變量,而是接口方法中創建的臨時變量。
但Interceptor鏈中具體都有哪些Interceptor呢?我們就在RealCall.getResponseWithInterceptorChain()中打個斷點來看一下:
this = {RealCall@830033700784} interceptors = {ArrayList@830033704824} size = 50 = {RetryAndFollowUpInterceptor@830033700816} 1 = {BridgeInterceptor@830033705520} 2 = {CacheInterceptor@830033705536} 3 = {ConnectInterceptor@830033705696} 4 = {CallServerInterceptor@830033706024} originalRequest = {Request@830033700704} "Request{method=GET, url=http://ip.taobao.com//service/getIpInfo.php?ip=123.58.191.68, tag=null}" retryAndFollowUpInterceptor = {RetryAndFollowUpInterceptor@830033700816}由此可見OkHttp中,Http請求的實際處理流程將大致如下圖這樣:
okhttp3.jpg
RetryAndFollowUpInterceptor
具體這些Interceptor中每一個都會做些什么事情呢?我們后面再來詳細地做分析。
首先來看RetryAndFollowUpInterceptor:
/*** This interceptor recovers from failures and follows redirects as necessary. It may throw an* {@link IOException} if the call was canceled.*/ public final class RetryAndFollowUpInterceptor implements Interceptor {/*** How many redirects and auth challenges should we attempt? Chrome follows 21 redirects; Firefox,* curl, and wget follow 20; Safari follows 16; and HTTP/1.0 recommends 5.*/private static final int MAX_FOLLOW_UPS = 20;private final OkHttpClient client;private StreamAllocation streamAllocation;private boolean forWebSocket;private volatile boolean canceled;public RetryAndFollowUpInterceptor(OkHttpClient client) {this.client = client;}......@Override public Response intercept(Chain chain) throws IOException {Request request = chain.request();streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(request.url()));int followUpCount = 0;Response priorResponse = null;while (true) {if (canceled) {streamAllocation.release();throw new IOException("Canceled");}Response response = null;boolean releaseConnection = true;try {response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);releaseConnection = false;} catch (RouteException e) {// The attempt to connect via a route failed. The request will not have been sent.if (!recover(e.getLastConnectException(), true, request)) throw e.getLastConnectException();releaseConnection = false;continue;} catch (IOException e) {// An attempt to communicate with a server failed. The request may have been sent.if (!recover(e, false, request)) throw e;releaseConnection = false;continue;} finally {// We're throwing an unchecked exception. Release any resources.if (releaseConnection) {streamAllocation.streamFailed(null);streamAllocation.release();}}// Attach the prior response if it exists. Such responses never have a body.if (priorResponse != null) {response = response.newBuilder().priorResponse(priorResponse.newBuilder().body(null).build()).build();}Request followUp = followUpRequest(response);if (followUp == null) {if (!forWebSocket) {streamAllocation.release();}return response;}closeQuietly(response.body());if (++followUpCount > MAX_FOLLOW_UPS) {streamAllocation.release();throw new ProtocolException("Too many follow-up requests: " + followUpCount);}if (followUp.body() instanceof UnrepeatableRequestBody) {throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());}if (!sameConnection(response, followUp.url())) {streamAllocation.release();streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(followUp.url()));} else if (streamAllocation.stream() != null) {throw new IllegalStateException("Closing the body of " + response+ " didn't close its backing stream. Bad interceptor?");}request = followUp;priorResponse = response;}}private Address createAddress(HttpUrl url) {SSLSocketFactory sslSocketFactory = null;HostnameVerifier hostnameVerifier = null;CertificatePinner certificatePinner = null;if (url.isHttps()) {sslSocketFactory = client.sslSocketFactory();hostnameVerifier = client.hostnameVerifier();certificatePinner = client.certificatePinner();}return new Address(url.host(), url.port(), client.dns(), client.socketFactory(),sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator(),client.proxy(), client.protocols(), client.connectionSpecs(), client.proxySelector());}RetryAndFollowUpInterceptor在intercept()中首先從client取得connection pool,用所請求的URL創建Address對象,并以此創建StreamAllocation對象。
Address描述某一個特定的服務器地址。StreamAllocation對象則用于分配一個到特定的服務器地址的流HttpStream,這個HttpStream可能是從connection pool中取得的之前沒有釋放的連接,也可能是重新分配的。RetryAndFollowUpInterceptor這里算是為后面的操作準備執行條件StreamAllocation。
隨后RetryAndFollowUpInterceptor.intercept()利用Interceptor鏈中后面的Interceptor來獲取網絡響應。并檢查是否為重定向響應。若不是就將響應返回,若是則做進一步處理。
對于重定向的響應,RetryAndFollowUpInterceptor.intercept()會利用響應的信息創建一個新的請求。并檢查新請求的服務器地址與老地址是否相同,若不相同則會根據新的地址創建Address對象及StreamAllocation對象。
RetryAndFollowUpInterceptor對重定向的響應也不會無休止的處理下去,它處理的最多的重定向級數為20次,超過20次時,它會拋異常出來。
RetryAndFollowUpInterceptor通過followUpRequest()從響應的信息中提取出重定向的信息,并構造新的網絡請求:
/*** Figures out the HTTP request to make in response to receiving {@code userResponse}. This will* either add authentication headers, follow redirects or handle a client request timeout. If a* follow-up is either unnecessary or not applicable, this returns null.*/private Request followUpRequest(Response userResponse) throws IOException {if (userResponse == null) throw new IllegalStateException();Connection connection = streamAllocation.connection();Route route = connection != null? connection.route(): null;int responseCode = userResponse.code();final String method = userResponse.request().method();switch (responseCode) {case HTTP_PROXY_AUTH:Proxy selectedProxy = route != null? route.proxy(): client.proxy();if (selectedProxy.type() != Proxy.Type.HTTP) {throw new ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy");}return client.proxyAuthenticator().authenticate(route, userResponse);case HTTP_UNAUTHORIZED:return client.authenticator().authenticate(route, userResponse);case HTTP_PERM_REDIRECT:case HTTP_TEMP_REDIRECT:// "If the 307 or 308 status code is received in response to a request other than GET// or HEAD, the user agent MUST NOT automatically redirect the request"if (!method.equals("GET") && !method.equals("HEAD")) {return null;}// fall-throughcase HTTP_MULT_CHOICE:case HTTP_MOVED_PERM:case HTTP_MOVED_TEMP:case HTTP_SEE_OTHER:// Does the client allow redirects?if (!client.followRedirects()) return null;String location = userResponse.header("Location");if (location == null) return null;HttpUrl url = userResponse.request().url().resolve(location);// Don't follow redirects to unsupported protocols.if (url == null) return null;// If configured, don't follow redirects between SSL and non-SSL.boolean sameScheme = url.scheme().equals(userResponse.request().url().scheme());if (!sameScheme && !client.followSslRedirects()) return null;// Redirects don't include a request body.Request.Builder requestBuilder = userResponse.request().newBuilder();if (HttpMethod.permitsRequestBody(method)) {if (HttpMethod.redirectsToGet(method)) {requestBuilder.method("GET", null);} else {requestBuilder.method(method, null);}requestBuilder.removeHeader("Transfer-Encoding");requestBuilder.removeHeader("Content-Length");requestBuilder.removeHeader("Content-Type");}// When redirecting across hosts, drop all authentication headers. This// is potentially annoying to the application layer since they have no// way to retain them.if (!sameConnection(userResponse, url)) {requestBuilder.removeHeader("Authorization");}return requestBuilder.url(url).build();case HTTP_CLIENT_TIMEOUT:// 408's are rare in practice, but some servers like HAProxy use this response code. The// spec says that we may repeat the request without modifications. Modern browsers also// repeat the request (even non-idempotent ones.)if (userResponse.request().body() instanceof UnrepeatableRequestBody) {return null;}return userResponse.request();default:return null;}}我們知道OkHttp提供了非常好用的容錯功能,它可以從某些類型的網絡錯誤中恢復,即出錯重試機制。這種出錯重試機制主要由recover()來實現:
/*** Report and attempt to recover from a failure to communicate with a server. Returns true if* {@code e} is recoverable, or false if the failure is permanent. Requests with a body can only* be recovered if the body is buffered.*/private boolean recover(IOException e, boolean routeException, Request userRequest) {streamAllocation.streamFailed(e);// The application layer has forbidden retries.if (!client.retryOnConnectionFailure()) return false;// We can't send the request body again.if (!routeException && userRequest.body() instanceof UnrepeatableRequestBody) return false;// This exception is fatal.if (!isRecoverable(e, routeException)) return false;// No more routes to attempt.if (!streamAllocation.hasMoreRoutes()) return false;// For failure recovery, use the same route selector with a new connection.return true;}private boolean isRecoverable(IOException e, boolean routeException) {// If there was a protocol problem, don't recover.if (e instanceof ProtocolException) {return false;}// If there was an interruption don't recover, but if there was a timeout connecting to a route// we should try the next route (if there is one).if (e instanceof InterruptedIOException) {return e instanceof SocketTimeoutException && routeException;}// Look for known client-side or negotiation errors that are unlikely to be fixed by trying// again with a different route.if (e instanceof SSLHandshakeException) {// If the problem was a CertificateException from the X509TrustManager,// do not retry.if (e.getCause() instanceof CertificateException) {return false;}}if (e instanceof SSLPeerUnverifiedException) {// e.g. a certificate pinning error.return false;}// An example of one we might want to retry with a different route is a problem connecting to a// proxy and would manifest as a standard IOException. Unless it is one we know we should not// retry, we return true and try a new route.return true;}主要是對某些類型IOException的恢復,恢復的次數會由StreamAllocation控制。
總結一下RetryAndFollowUpInterceptor做的事情:
BridgeInterceptor
如我們在RealCall.getResponseWithInterceptorChain()中所見,緊接在RetryAndFollowUpInterceptor之后的Interceptor是BridgeInterceptor:
package okhttp3.internal.http;import java.io.IOException; import java.util.List; import okhttp3.Cookie; import okhttp3.CookieJar; import okhttp3.Headers; import okhttp3.Interceptor; import okhttp3.MediaType; import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; import okhttp3.internal.Version; import okio.GzipSource; import okio.Okio;import static okhttp3.internal.Util.hostHeader;/*** Bridges from application code to network code. First it builds a network request from a user* request. Then it proceeds to call the network. Finally it builds a user response from the network* response.*/ public final class BridgeInterceptor implements Interceptor {private final CookieJar cookieJar;public BridgeInterceptor(CookieJar cookieJar) {this.cookieJar = cookieJar;}@Override public Response intercept(Chain chain) throws IOException {Request userRequest = chain.request();Request.Builder requestBuilder = userRequest.newBuilder();RequestBody body = userRequest.body();if (body != null) {MediaType contentType = body.contentType();if (contentType != null) {requestBuilder.header("Content-Type", contentType.toString());}long contentLength = body.contentLength();if (contentLength != -1) {requestBuilder.header("Content-Length", Long.toString(contentLength));requestBuilder.removeHeader("Transfer-Encoding");} else {requestBuilder.header("Transfer-Encoding", "chunked");requestBuilder.removeHeader("Content-Length");}}if (userRequest.header("Host") == null) {requestBuilder.header("Host", hostHeader(userRequest.url(), false));}if (userRequest.header("Connection") == null) {requestBuilder.header("Connection", "Keep-Alive");}// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing// the transfer stream.boolean transparentGzip = false;if (userRequest.header("Accept-Encoding") == null) {transparentGzip = true;requestBuilder.header("Accept-Encoding", "gzip");}List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());if (!cookies.isEmpty()) {requestBuilder.header("Cookie", cookieHeader(cookies));}if (userRequest.header("User-Agent") == null) {requestBuilder.header("User-Agent", Version.userAgent());}Response networkResponse = chain.proceed(requestBuilder.build());HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());Response.Builder responseBuilder = networkResponse.newBuilder().request(userRequest);if (transparentGzip&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))&& HttpHeaders.hasBody(networkResponse)) {GzipSource responseBody = new GzipSource(networkResponse.body().source());Headers strippedHeaders = networkResponse.headers().newBuilder().removeAll("Content-Encoding").removeAll("Content-Length").build();responseBuilder.headers(strippedHeaders);responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));}return responseBuilder.build();}/** Returns a 'Cookie' HTTP request header with all cookies, like {@code a=b; c=d}. */private String cookieHeader(List<Cookie> cookies) {StringBuilder cookieHeader = new StringBuilder();for (int i = 0, size = cookies.size(); i < size; i++) {if (i > 0) {cookieHeader.append("; ");}Cookie cookie = cookies.get(i);cookieHeader.append(cookie.name()).append('=').append(cookie.value());}return cookieHeader.toString();} }這個Interceptor做的事情比較簡單??梢苑譃榘l送請求和收到響應兩個階段來看。在發送請求階段,BridgeInterceptor補全一些http header,這主要包括Content-Type、Content-Length、Transfer-Encoding、Host、Connection、Accept-Encoding、User-Agent,還加載Cookie,隨后創建新的Request,并交給后續的Interceptor處理,以獲取響應。
而在從后續的Interceptor獲取響應之后,會首先保存Cookie。如果服務器返回的響應的content是以gzip壓縮過的,則會先進行解壓縮,移除響應中的header Content-Encoding和Content-Length,構造新的響應并返回;否則直接返回響應。
CookieJar來自于OkHttpClient,它是OkHttp的Cookie管理器,負責Cookie的存取:
package okhttp3;import java.util.Collections; import java.util.List;/*** Provides <strong>policy</strong> and <strong>persistence</strong> for HTTP cookies.** <p>As policy, implementations of this interface are responsible for selecting which cookies to* accept and which to reject. A reasonable policy is to reject all cookies, though that may be* interfere with session-based authentication schemes that require cookies.** <p>As persistence, implementations of this interface must also provide storage of cookies. Simple* implementations may store cookies in memory; sophisticated ones may use the file system or* database to hold accepted cookies. The <a* href="https://tools.ietf.org/html/rfc6265#section-5.3">cookie storage model</a> specifies* policies for updating and expiring cookies.*/ public interface CookieJar {/** A cookie jar that never accepts any cookies. */CookieJar NO_COOKIES = new CookieJar() {@Override public void saveFromResponse(HttpUrl url, List<Cookie> cookies) {}@Override public List<Cookie> loadForRequest(HttpUrl url) {return Collections.emptyList();}};/*** Saves {@code cookies} from an HTTP response to this store according to this jar's policy.** <p>Note that this method may be called a second time for a single HTTP response if the response* includes a trailer. For this obscure HTTP feature, {@code cookies} contains only the trailer's* cookies.*/void saveFromResponse(HttpUrl url, List<Cookie> cookies);/*** Load cookies from the jar for an HTTP request to {@code url}. This method returns a possibly* empty list of cookies for the network request.** <p>Simple implementations will return the accepted cookies that have not yet expired and that* {@linkplain Cookie#matches match} {@code url}.*/List<Cookie> loadForRequest(HttpUrl url); }由OkHttpClient默認的構造過程可以看到,OkHttp中默認是沒有提供Cookie管理功能的。由這里的代碼,我們大概也能知道要支持Cookie的話,需要做些什么事情。
CacheInterceptor
CacheInterceptor緊接于BridgeInterceptor之后,它主要用來處理緩存:
public final class CacheInterceptor implements Interceptor {private static final ResponseBody EMPTY_BODY = new ResponseBody() {@Override public MediaType contentType() {return null;}@Override public long contentLength() {return 0;}@Override public BufferedSource source() {return new Buffer();}};final InternalCache cache;public CacheInterceptor(InternalCache cache) {this.cache = cache;}@Override public Response intercept(Chain chain) throws IOException {Response cacheCandidate = cache != null? cache.get(chain.request()): null;long now = System.currentTimeMillis();CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();Request networkRequest = strategy.networkRequest;Response cacheResponse = strategy.cacheResponse;if (cache != null) {cache.trackResponse(strategy);}if (cacheCandidate != null && cacheResponse == null) {closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.}// If we're forbidden from using the network and the cache is insufficient, fail.if (networkRequest == null && cacheResponse == null) {return new Response.Builder().request(chain.request()).protocol(Protocol.HTTP_1_1).code(504).message("Unsatisfiable Request (only-if-cached)").body(EMPTY_BODY).sentRequestAtMillis(-1L).receivedResponseAtMillis(System.currentTimeMillis()).build();}// If we don't need the network, we're done.if (networkRequest == null) {return cacheResponse.newBuilder().cacheResponse(stripBody(cacheResponse)).build();}Response networkResponse = null;try {networkResponse = chain.proceed(networkRequest);} finally {// If we're crashing on I/O or otherwise, don't leak the cache body.if (networkResponse == null && cacheCandidate != null) {closeQuietly(cacheCandidate.body());}}// If we have a cache response too, then we're doing a conditional get.if (cacheResponse != null) {if (validate(cacheResponse, networkResponse)) {Response response = cacheResponse.newBuilder().headers(combine(cacheResponse.headers(), networkResponse.headers())).cacheResponse(stripBody(cacheResponse)).networkResponse(stripBody(networkResponse)).build();networkResponse.body().close();// Update the cache after combining headers but before stripping the// Content-Encoding header (as performed by initContentStream()).cache.trackConditionalCacheHit();cache.update(cacheResponse, response);return response;} else {closeQuietly(cacheResponse.body());}}Response response = networkResponse.newBuilder().cacheResponse(stripBody(cacheResponse)).networkResponse(stripBody(networkResponse)).build();if (HttpHeaders.hasBody(response)) {CacheRequest cacheRequest = maybeCache(response, networkResponse.request(), cache);response = cacheWritingResponse(cacheRequest, response);}return response;}對于CacheInterceptor.intercept(Chain chain)的分析同樣可以分為兩個階段,即請求發送階段和響應獲取之后的階段。這兩個階段由chain.proceed(networkRequest)來分割。
在請求發送階段,主要是嘗試從cache中獲取響應,獲取成功的話,且響應可用未過期,則響應會被直接返回;否則通過后續的Interceptor來從網絡獲取,獲取到響應之后,若需要緩存的,則緩存起來。
關于HTTP具體的緩存策略這里暫時不再詳述。
由RealCall.getResponseWithInterceptorChain()可見CacheInterceptor的cache同樣來自于OkHttpClient。OkHttp已經有實現Cache的整套策略,在Cache類,但默認情況下不會被用起來,需要自己在創建OkHttpClient時,手動創建并傳給OkHttpClient.Builder。
ConnectInterceptor
CacheInterceptor接下來是ConnectInterceptor:
public final class ConnectInterceptor implements Interceptor {public final OkHttpClient client;public ConnectInterceptor(OkHttpClient client) {this.client = client;}@Override public Response intercept(Chain chain) throws IOException {RealInterceptorChain realChain = (RealInterceptorChain) chain;Request request = realChain.request();StreamAllocation streamAllocation = realChain.streamAllocation();// We need the network to satisfy this request. Possibly for validating a conditional GET.boolean doExtensiveHealthChecks = !request.method().equals("GET");HttpStream httpStream = streamAllocation.newStream(client, doExtensiveHealthChecks);RealConnection connection = streamAllocation.connection();return realChain.proceed(request, streamAllocation, httpStream, connection);} }這個類的定義看上去倒是蠻簡潔的。ConnectInterceptor的主要職責是建立與服務器之間的連接,但這個事情它主要是委托給StreamAllocation來完成的。如我們前面看到的,StreamAllocation對象是在RetryAndFollowUpInterceptor中分配的。
ConnectInterceptor通過StreamAllocation創建了HttpStream對象和RealConnection對象,隨后便調用了realChain.proceed(),向連接中寫入HTTP請求,并從服務器讀回響應。
連接建立過程的更多細節我們這里先不詳述。
CallServerInterceptor
ConnectInterceptor之后是CallServerInterceptor,這也是這個鏈中的最后一個Interceptor,它的主要職責是處理IO:
package okhttp3.internal.http;import java.io.IOException; import java.net.ProtocolException; import okhttp3.Interceptor; import okhttp3.Request; import okhttp3.Response; import okhttp3.internal.connection.StreamAllocation; import okio.BufferedSink; import okio.Okio; import okio.Sink;/** This is the last interceptor in the chain. It makes a network call to the server. */ public final class CallServerInterceptor implements Interceptor {private final boolean forWebSocket;public CallServerInterceptor(boolean forWebSocket) {this.forWebSocket = forWebSocket;}@Override public Response intercept(Chain chain) throws IOException {HttpStream httpStream = ((RealInterceptorChain) chain).httpStream();StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();Request request = chain.request();long sentRequestMillis = System.currentTimeMillis();httpStream.writeRequestHeaders(request);if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {Sink requestBodyOut = httpStream.createRequestBody(request, request.body().contentLength());BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);request.body().writeTo(bufferedRequestBody);bufferedRequestBody.close();}httpStream.finishRequest();Response response = httpStream.readResponseHeaders().request(request).handshake(streamAllocation.connection().handshake()).sentRequestAtMillis(sentRequestMillis).receivedResponseAtMillis(System.currentTimeMillis()).build();if (!forWebSocket || response.code() != 101) {response = response.newBuilder().body(httpStream.openResponseBody(response)).build();}if ("close".equalsIgnoreCase(response.request().header("Connection"))|| "close".equalsIgnoreCase(response.header("Connection"))) {streamAllocation.noNewStreams();}int code = response.code();if ((code == 204 || code == 205) && response.body().contentLength() > 0) {throw new ProtocolException("HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());}return response;} }CallServerInterceptor首先將http請求頭部發給服務器,如果http請求有body的話,會再將body發送給服務器,繼而通過httpStream.finishRequest()結束http請求的發送。
隨后便是從連接中讀取服務器返回的http響應,并構造Response。
如果請求的header或服務器響應的header中,Connection值為close,CallServerInterceptor還會關閉連接。
最后便是返回Response。
總結一下這幾個Interceptor的職責:
RetryAndFollowUpInterceptor --->創建StreamAllocation對象,處理http的redirect,出錯重試。對后續Interceptor的執行的影響:修改request及StreamAllocation。
BridgeInterceptor-------------->補全缺失的一些http header。對后續Interceptor的執行的影響:修改request。
CacheInterceptor-------------->處理http緩存。對后續Interceptor的執行的影響:若緩存中有所需請求的響應,則后續Interceptor不再執行。
ConnectInterceptor------------>借助于前面分配的StreamAllocation對象建立與服務器之間的連接,并選定交互所用的協議是HTTP 1.1還是HTTP 2。對后續Interceptor的執行的影響:創建了httpStream和connection。
CallServerInterceptor----------->處理IO,與服務器進行數據交換。對后續Interceptor的執行的影響:為Interceptor鏈中的最后一個Interceptor,沒有后續Interceptor。
End。
總結
以上是生活随笔為你收集整理的OkHttp3 HTTP请求执行流程分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: EventBus设计与实现分析——事件的
- 下一篇: EventBus设计与实现分析——订阅者