OkHttp源码解析

2146次阅读  |  发布于5年以前

概述

OkHttp是一个适用于AndroidJava应用程序的HTTP + HTTP/2框架。

使用示例

                //创建OkHttpClient.Builder
        OkHttpClient.Builder builder = new OkHttpClient.Builder();

                //创建OkHttpClient
        OkHttpClient okHttpClient = builder.build();

                //创建Request
        Request request = new Request.Builder().build();

                //创建Call
        Call call = okHttpClient.newCall(request);

                //发起同步请求
        try {
            Response response = call.execute();
        } catch (IOException e) {
            e.printStackTrace();
        }

                //发起异步请求
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {

            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {

            }
        });

一步一步来看,首先第一步是获取一个OkHttpClient.Builder对象,然后第二步通过这个builder对象build()出来了一个OkHttp对象,不用说,这是简单的建造者(Builder)设计模式,看一眼OkHttpClient.Builder的构造方法:

 public Builder() {
      dispatcher = new Dispatcher();
      protocols = DEFAULT_PROTOCOLS;
      connectionSpecs = DEFAULT_CONNECTION_SPECS;
      eventListenerFactory = EventListener.factory(EventListener.NONE);
      proxySelector = ProxySelector.getDefault();
      if (proxySelector == null) {
        proxySelector = new NullProxySelector();
      }
      cookieJar = CookieJar.NO_COOKIES;
      socketFactory = SocketFactory.getDefault();
      hostnameVerifier = OkHostnameVerifier.INSTANCE;
      certificatePinner = CertificatePinner.DEFAULT;
      proxyAuthenticator = Authenticator.NONE;
      authenticator = Authenticator.NONE;
      connectionPool = new ConnectionPool();
      dns = Dns.SYSTEM;
      followSslRedirects = true;
      followRedirects = true;
      retryOnConnectionFailure = true;
      callTimeout = 0;
      connectTimeout = 10_000;
      readTimeout = 10_000;
      writeTimeout = 10_000;
      pingInterval = 0;
    }

这里初始化了一堆东西,我们重点注意一下第2行的dispatcher对象,对应的Dispathcher类时OkHttp中的核心类,我们后面会重点详细解析,大家先有个印象即可。

第三步是通过Request request = new Request.Builder().build()初始化了一个Request对象,也是建造者模式,这个Request对象主要是描述要发起请求的详细信息。

第四步通过Call call = okHttpClient.newCall(request)创建了一个Call的对象,来看看这个newCall()方法:

 @Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }

再继续深入到RealCall.newRealCall()中:

  static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.transmitter = new Transmitter(client, call);
    return call;
  }

到这里我们就明白了,我们获取到的Call对象实际上是一个RealCall对象,看看CallRealCall的声明:

Call

public interface Call extends Cloneable

RealCall

final class RealCall implements Call 

明白了,Call是一个接口,而RealCall实现了这个接口,所以返回new RealCall()call对象当然是没问题的。

然后,如果我们想发起同步网络请求,则执行:

 Response response = call.execute();

如果想发起异步网络请求,则执行:

 call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {

            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {

            }
        });

我们先来分析同步的情况。

RealCall.excute()

我们上面提到,这个call实际上是一个RealCall对象,那么我们看看这个RealCall.excute()方法的源码:

   @Override public Response execute() throws IOException {
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        transmitter.timeoutEnter();
        transmitter.callStart();
        try {
          client.dispatcher().executed(this);
          return getResponseWithInterceptorChain();
        } finally {
          client.dispatcher().finished(this);
        }
  }

可以看到,这里首先使用了一个synchronized锁判断了executed标志位的值,如果executedtrue,则抛出异常,异常信息为"Already Executed",否则将executed置为true,继续执行下面的逻辑。所以,这个executed就是用来标记当前RealCallexcute()方法是否已经被执行过,后面到异步请求enqueue()的代码中我们会发现同样使用了这个executed标志位做了相同逻辑的判断,所以我们可以得出一个Call对象只能被执行一次(不管是同步请求还是异步请求)的结论。

==那么可能有同学会有疑问了,为什么OkHttp中的一个Call对象只能发起一次请求?这个和OkHttp中的连接池有关系,我们会在后面讲ConnectInterceptor拦截器的时候详细分析。==

如果excuted判断没有问题之后,就会执行:

transmitter.timeoutEnter();
transmitter.callStart();
try {
    client.dispatcher().executed(this);
    return getResponseWithInterceptorChain();
} finally {
    client.dispatcher().finished(this);
}

我们抓住重点,直接从第4行开始看,这里执行了 client.dispatcher().executed(this),注意这个client是我们刚才传进来的OkHttpClient对象,dispather对象是我们刚才在上面提到的OkHttpClient.Builder中初始化的,我们来看看这个Dispatcher.excuted()方法:

 /** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

可以看到这里主要是将当前这次请求的call对象加入到了runningSyncCalls中,我们看看这份runningSyncCalls的声明:

 /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

这个runningSyncCalls是一个队列,从其源代码的注释我们可以得知这个runningSyncCalls的作用是存储当前OkHttpClient正在执行的同步请求。

好,下一步我们来分析:

@Override public Response execute() throws IOException {
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        transmitter.timeoutEnter();
        transmitter.callStart();
        try {
          client.dispatcher().executed(this);
          return getResponseWithInterceptorChain();
        } finally {
          client.dispatcher().finished(this);
        }
  }

中第9行之后的代码,可以看到,第10行直接返回了一个getResponseWithInterceptorChain(),而public Response execute()方法返回的是一个Response对象,所以说这个getResponseWithInterceptorChain()方法返回的也是一个Response对象,即这个getResponseWithInterceptorChain()方法中执行了真正的同步请求的逻辑并返回了Response对象,其具体实现细节我们后面详细分析。

注意,Response execute()方法的第11行到第13行,这是try...finally语句块中的finally体,也就是说无论try中语句的执行结构如何,都会执行这个finally块中代码,其中只有一行代码:

client.dispatcher().finished(this);

我们来看看Dispatcher.finished()方法的实现:

 /** Used by {@code Call#execute} to signal completion. */
  void finished(RealCall call) {
    finished(runningSyncCalls, call);
  }

  private <T> void finished(Deque<T> calls, T call) {
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      idleCallback = this.idleCallback;
    }

    boolean isRunning = promoteAndExecute();

    if (!isRunning && idleCallback != null) {
      idleCallback.run();
    }
  }

client.dispatcher().finished(this)调用了dispatcher().finished(this)方法并将自身(call)传递了进去,在finished(RealCall call)方法中又调用了finished(Deque<T> calls, T call)方法,传入了runningSyncCalls和我们当前的call对象,还记得这个runningSyncCalls在哪里出现过吗?对的,它在dispater.excuted()方法中出现过,当时的操作是将当前call对象加入到这个runningSyncCalls队列中,那么现在请求结束了,finished()方法中应该做什么?当然是将当前call对象从runningSyncCalls队列中移除,在代码中就是:

synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      idleCallback = this.idleCallback;
    }

可以看到为了考虑线程安全,这里使用了synchronized锁保证了线程同步,然后将当前callrunningSyncCalls队列中移除。

到这里我们就分析完了同步请求的大致流程,现在我们来看一下OkHttp中发起请求的核心方法getResponseWithInterceptorChain(),可以看到在同步请求中仅仅调用了这一个方法就得到了返回的Response,我们来看一下它的源码:

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,originalRequest, this, client.connectTimeoutMillis(),client.readTimeoutMillis(), client.writeTimeoutMillis());

    boolean calledNoMoreExchanges = false;
    try {
      Response response = chain.proceed(originalRequest);
      if (transmitter.isCanceled()) {
        closeQuietly(response);
        throw new IOException("Canceled");
      }
      return response;
    } catch (IOException e) {
      calledNoMoreExchanges = true;
      throw transmitter.noMoreExchanges(e);
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null);
      }
    }
  }

第3行到第12行是new了一个List并依次将用户自定义的应用拦截器集合(第4行)、OkHttp内置的拦截器集合(第5-8行)、用户自定义的网络拦截器集合(第9-11行)添加了进去,构成了一个大的拦截器集合。

然后执行:

 Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,originalRequest, this, client.connectTimeoutMillis(),client.readTimeoutMillis(), client.writeTimeoutMillis());

再看看RealInterceptorChain类的构造方法:

public RealInterceptorChain(List<Interceptor> interceptors, Transmitter transmitter,
      @Nullable Exchange exchange, int index, Request request, Call call,
      int connectTimeout, int readTimeout, int writeTimeout) {
    this.interceptors = interceptors;
    this.transmitter = transmitter;
    this.exchange = exchange;
    this.index = index;
    this.request = request;
    this.call = call;
    this.connectTimeout = connectTimeout;
    this.readTimeout = readTimeout;
    this.writeTimeout = writeTimeout;
  }

主要是初始化了一个RealInterceptorChain类的对象,注意一下传入的第1个参数(interceptors)和第4个参数(index),分别传入的是我们刚才构成的拦截器集合以及0,一会我们会用到。

初始化好RealInterceptorChain的对象后继续往下执行,关注一下第18行,可以看到,真正返回的response就是从这里的 chain.proceed(originalRequest)方法返回的,当前这个chainRealInterceptorChain类的对象,所以我们来看看RealInterceptorChain.proceed()方法中做了什么:

 @Override public Response proceed(Request request) throws IOException {
    return proceed(request, transmitter, exchange);
  }

可以看到,虽然我们调用的是chain.proceed(originalRequest),但是实际上它内部执行的是 proceed(request, transmitter, exchange),我们来看看这个方法的源码:

public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.exchange != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
  }

第2行首先检查了一下index是否超出了interceptorssize,还记得indexinterceptors是什么吗?对的,就是我们在getResponseWithInterceptorChain()源码的第14行传入的0和我们初始化的拦截器集合,为什么要检测indexinterceptorssize之间的关系呢?猜想是想通过index访问·中的元素,我们继续往下看,注意第19行到第21行,我们把这几行代码拿下来:

 RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

这里又初始化了一个RealInterceptorChain对象,那这里初始化的这个RealInterceptorChain对象和当前RealInterceptorChain有什么区别呢?我们再看看当前RealInterceptorChain对象初始化的代码:

Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,originalRequest, this, client.connectTimeoutMillis(),client.readTimeoutMillis(), client.writeTimeoutMillis());

可以发现,只有一个参数不一样,就是第4个参数,对应RealInterceptorChain构造方法中的index参数,现在这个RealInterceptorChain next对象的构造方法中的index值传入的是当前RealInterceptorChain对象的index+1。然后下一行果然通过index拿到了interceptors中的元素interceptor,这也是proceed()方法的开头为什么先检测indexinterceptors.size()大小关系的原因,就是为了防止这发生越界异常。拿到interceptor对象之后,下一行执行了interceptor.intercept(next)并返回了response,而最后也是将这个response最终作为当前proceed()方法的返回值,这时候我们就有必要深入一下interceptor.intercept(next)的源码了,我们尝试跟踪源码,会发现这个Interceptor其实是一个接口:
在这里插入图片描述

我们看看Interceptor都有哪些实现类:
image-20190503114437541
我们看到了5个拦截器类,由于当前interceptor是通过interceptors.get(index)拿到的,而index当前传入的值为0,所以第一次执行的应该是第一个加入拦截器集合的那个拦截器类的intercept()方法,这里我们先不考虑用户自添加的拦截器,那么第一个拦截器就应该是RetryAndFollowUpInterceptor拦截器,我们来看看它的intercept()方法:

 @Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Transmitter transmitter = realChain.transmitter();

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      transmitter.prepareToConnect(request);

      if (transmitter.isCanceled()) {
        throw new IOException("Canceled");
      }

      Response response;
      boolean success = false;
      try {
        response = realChain.proceed(request, transmitter, null);
        success = true;
      } catch (RouteException e) {
        // The attempt to connect via a route failed. The request will not have been sent.
        if (!recover(e.getLastConnectException(), transmitter, false, request)) {
          throw e.getFirstConnectException();
        }
        continue;
      } catch (IOException e) {
        // An attempt to communicate with a server failed. The request may have been sent.
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, transmitter, requestSendStarted, request)) throw e;
        continue;
      } finally {
        // The network call threw an exception. Release any resources.
        if (!success) {
          transmitter.exchangeDoneDueToException();
        }
      }

      // Attach the prior response if it exists. Such responses never have a body.
      if (priorResponse != null) {
        response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                    .body(null)
                    .build())
            .build();
      }

      Exchange exchange = Internal.instance.exchange(response);
      Route route = exchange != null ? exchange.connection().route() : null;
      Request followUp = followUpRequest(response, route);

      if (followUp == null) {
        if (exchange != null && exchange.isDuplex()) {
          transmitter.timeoutEarlyExit();
        }
        return response;
      }

      RequestBody followUpBody = followUp.body();
      if (followUpBody != null && followUpBody.isOneShot()) {
        return response;
      }

      closeQuietly(response.body());
      if (transmitter.hasExchange()) {
        exchange.detachWithViolence();
      }

      if (++followUpCount > MAX_FOLLOW_UPS) {
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }

      request = followUp;
      priorResponse = response;
    }
  }

代码很长,请大家暂时忽略其他代码,只关注第3行和第18行,第3行将当前传入的Chain chain对象类型转化为了RealInterceptorChain realChain对象,第18行执行了realChain.proceed(request, transmitter, null)并返回了response,注意,这个realChain是我们调用当前intercept()方法时传入的chain参数,而这个chain参数传入的是:

 RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

即由interceptorsindex+1构建的新的RealInterceptorChain,所以整个逻辑就是:image-20190505084646823

可以发现,整个拦截器集合构成了一个链式结构,当前拦截器执行完对应的拦截方法后激活下一个拦截器开始工作,直到最后一个拦截器,这也告诉我们如果要添加自定义的拦截器,则必须在重写intercept(Chain chain)方法时返回chain.proceed(),否则责任链就会断链,自然不会成功地发起网络请求。

注意由于CallServerInterceptor这个拦截器是最后一个拦截器,所以它的intercept方法中没有像之前的拦截器那样执行next.proceed(),而是直接使用传入的Chain chain参数直接发起了网络请求并返回Response

至此,我们已经分析完了OkHttp同步请求的完整流程,总结一下:

Realcall.enqueue()

先看一下发起OkHttp异步网络请求的典型代码:

 call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {

            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {

            }
        });

可以看到,传入了一个CallBack的回调对象,该CallBack类中有onFailure()onResponse()两个方法,分别代表网络请求发起成功和失败的情况,这里抛出一个问题供大家思考:onFailure()onResponse()这两个方法是处于主线程的还是子线程的?这个问题我会在后面的分析中解答。

那么我们看一下RealCall.enqueue(Callback callback)的源码:

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    transmitter.callStart();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

可以看到,和同步的excute()一样,开头先会检测excuted标志位判断当前call对象是否已经被执行过,如果已经被执行过,抛出异常。

如果当前call对象没有被执行过,则执行第7行,调用Dispatcherenqueue()方法,传入了一个AsyncCall参数,我们先看看Dispatcher.enqueue()方法的源码:

void enqueue(AsyncCall call) {
    synchronized (this) {
        readyAsyncCalls.add(call);

        // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
        // the same host.
        if (!call.get().forWebSocket) {
          AsyncCall existingCall = findExistingCallWithHost(call.host());
          if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
        }
    }
    promoteAndExecute();
  }

首先加锁将传入的AsyncCall call加入readyAsyncCalls这个队列,然后执行了第7到第10行,首先判断call.get().forWebSocket的值,其声明及初始化如下:

 final boolean forWebSocket;
 private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
         ......
     this.forWebSocket = forWebSocket;
 }

RealCall的构造方法的调用代码如下:

  @Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }

forWebSocket的值默认为false,所以,会执行Dispatcher.enqueue()方法中 的第8-9行:

AsyncCall existingCall = findExistingCallWithHost(call.host());
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);

看看findExistingCallWithHost()方法 的源码:

@Nullable private AsyncCall findExistingCallWithHost(String host) {
    for (AsyncCall existingCall : runningAsyncCalls) {
      if (existingCall.host().equals(host)) return existingCall;
    }
    for (AsyncCall existingCall : readyAsyncCalls) {
      if (existingCall.host().equals(host)) return existingCall;
    }
    return null;
  }

可以看到这个方法就是遍历了一下当前正在执行的和准备执行的异步网络请求的call队列,看看有没有某一个callhost和当前callhost相同,如果有就返回。

然后:

if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);

判断了一下findExistingCallWithHost()返回的是否为null,如果不为null,调用call.reuseCallsPerHostFrom(existingCall)

 void reuseCallsPerHostFrom(AsyncCall other) {
      this.callsPerHost = other.callsPerHost;
    }

就是对call对象的callsPerHost进行了更新,注意这里是直接使用了=this.callsPerHost进行了赋值,而且在java中参数默认传递的是引用,所以当前callsPerHostfindExistingCallWithHost()中返回的那个AsyncCall对象的callsPerHost是同一个引用,那么再延伸一下,所有host相同的AsyncCall对象中的callsPerHost都是同一个引用,即如果改变其中一个AsyncCall对象的callsPerHost值,其他的所有AsyncCall对象的 callsPerHost的值也会随之改变,下面我们会看到作者巧妙地利用了这一点更新了所有host相同的 AsyncCall对象的callsPerHost值,实在是非常优秀。 这个callPerHost的声明如下:

 private volatile AtomicInteger callsPerHost = new AtomicInteger(0);

即初始为0。

然后执行 promoteAndExecute()方法,我们看看其源码:

private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();

        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
        asyncCall.callsPerHost().incrementAndGet();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }

    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }

这个方法中遍历了readyAsyncCalls这个队列,对于readyAsyncCalls中的每一个被遍历的当前AsyncCall对象,会首先判断runningAsyncCalls这个队列的长度是否大于了maxRequests,这个maxRequests默认是64,意思就是说要求当前正在执行的网络请求不能超过64个(为了节约资源考虑),如果runningAsyncCalls的元素数量不超过maxRequests,则判断asyncCall.callsPerHost().get()是否大于maxRequestsPerHostmaxRequestsPerHost的值默认为5,上面我们说到callsPerHost的初始为0,那么asyncCall.callsPerHost().get()初始应该是小于maxRequestsPerHost的,这里这个判断的意思就是当前正在执行的所有的请求中,与asyncCall对应的主机(host)相同请求的数量不能超过maxRequestsPerHost也就是5个,如果满足条件即同一个host的请求不超过5个,则往下执行13-16行,首先将当前AsyncCall对象从readyAsyncCalls中移除,然后执行asyncCall.callsPerHost().incrementAndGet(),就是将callsPerHost的值增1,上面我提到了,所有host相同的AsyncCall对象中的callsPerHost都是同一个引用,所以这里对当前这个callsPerHost的值增1实际上是更新了readyAsyncCalls中的所有AsyncCall对象中的callsPerHost的值,这样callsPerHost这个属性值就能够表示请求host与当前host相同的请求数量。

然后下面15行是将当前asyncCall对象加入到executableCalls中,下面会执行所有executableCalls中的请求,16行就是将当前这个asyncCall对象加入到runningAsyncCalls中表示其现在已经是正在执行了,注意这时executableCallsrunningAsyncCalls两个集合的不同。

然后下面第21行到24行,主要是对executableCalls进行了遍历,对于executableCalls中的每一个AsyncCall对象,执行asyncCall.executeOn()方法,传入了一个executorService(),我们首先看executorService()的源码:

private @Nullable ExecutorService executorService;
public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

可以看到这里是利用单例模式返回了一个线程池的对象,线程池的核心线程数为0,最大线程数为Integer.MAX_VALUE即不加限制,这里将线程池的最大线程数置位Integer.MAX_VALUE的原因是我们在Dispatcher中默认使用了maxRequests控制了同时并发的最大请求数量,所以这里就不用在线程池中加以控制了,然后设置了最大存活时间为60s,也就是说如果当前线程的任务执行完成了,60s内本线程不会被销毁,如果此时有其他网络请求的任务,就不用新建线程,可以直接复用之前的线程,如果60s后还没有被复用,则本线程会被销毁。

然后我们看asyncCall.executeOn()的源码:

void executeOn(ExecutorService executorService) {
      assert (!Thread.holdsLock(client.dispatcher()));
      boolean success = false;
      try {
        executorService.execute(this);
        success = true;
      } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        transmitter.noMoreExchanges(ioException);
        responseCallback.onFailure(RealCall.this, ioException);
      } finally {
        if (!success) {
          client.dispatcher().finished(this); // This call is no longer running!
        }
      }
    }

重点是第5行,主要是调用线程池的execute()方法执行当前asyncCall,所以AsyncCall类应该实现了Runnable接口,我们看看AsyncCall类的声明:

final class AsyncCall extends NamedRunnable

再看看NamedRunnable接口的源码:

public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

可以看到,NamedRunnable实现了Runnable接口,其run()方法中主要是调用了抽象方法execute(),而execute()方法会被AsyncCall类实现,所以,AsyncCall类的run()方法实际上执行的是其execute()中的内容:

@Override protected void execute() {
      boolean signalledCallback = false;
      transmitter.timeoutEnter();
      try {
        Response response = getResponseWithInterceptorChain();
        signalledCallback = true;
        responseCallback.onResponse(RealCall.this, response);
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }

可以看到,execute()方法中发起了真正的网络请求,核心方法是getResponseWithInterceptorChain(),这个方法我们在解析RealCall.excute()方法时已经解释过,其作用是发起网络请求,返回Response,注意这里使用了try...catch语句块对异常进行了捕捉,如果发生异常,则调用responseCallback.onFailure(RealCall.this, e),而这个responseCallback就是我们发起异步请求时传入的那个CallBack对象,所以就是在这里回调的onFailure()方法。如果请求成功,则调用responseCallback.onResponse(RealCall.this, response)即我们的onResponse()回调方法。那么由于当前execute()方法是在Runnable接口的run()方法中被调用的,而asyncCall又被传入了executorService.execute()中,所以当前execute()方法会在线程池中被执行,即onFailure()onResponse()这两个回调方法会在子线程被调用,这也说明了我们不能再RealCall.enqueue()方法的回调中直接更新UI,因为其回调方法都是在子线程被调用的。

最后关注一下第16行的finally语句块中的内容,主要是执行了client.dispatcher().finished(this)this指的是当前asyncCall对象,看看这个Dispatcher.finished()的源码:

/** Used by {@code AsyncCall#run} to signal completion. */
  void finished(AsyncCall call) {
    call.callsPerHost().decrementAndGet();
    finished(runningAsyncCalls, call);
  }

可以看到,首先执行 call.callsPerHost().decrementAndGet()asyncCall对象的callsPerHost的值减1,因为当前asyncCall请求结束了,那么就应该将与本asyncCall具有相同host的请求数量减1,然后调用了:

private <T> void finished(Deque<T> calls, T call) {
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      idleCallback = this.idleCallback;
    }

    boolean isRunning = promoteAndExecute();

    if (!isRunning && idleCallback != null) {
      idleCallback.run();
    }
  }

主要做的工作就是将当前call对象从runningAsyncCalls中移除。

至此,我们分析完了OkHttp异步网络请求的整体流程,可以发现,在异步网络请求中Dispatcher类扮演了相当重要的角色。

总结一下OkHttp异步请求的步骤:

到这里就分析完了OkHttp中同步请求和异步请求的执行流程,之后会推出OkHttp中内置的5大拦截器的源码分析,深入分析每一个拦截器的实现原理与作用,欢迎大家关注。

Copyright© 2013-2019

京ICP备2023019179号-2