本篇文章主要用来讨论Java中多线程并发原理与实践经验,并不是一篇使用例子教程,这方面内容可以参考网上其他文章。
线程是比进程更加轻量级的调度单位,线程的引入可以把进程的资源分配和执行调度分开,各个线程既可以共享进程资源,又可以独立调度。
通常大家都会这么去解释进程与线程的区别,在文章01Android进程框架:进程的启动创建、启动与调度流程中
我们剖析了进程的本质,我们这里再简单回忆一下。
关于进程本质的描述:
我们知道,代码是静态的,有代码和资源组成的系统要想运行起来就需要一种动态的存在,进程就是程序的动态执行过程。何为进程?
进程就是处理执行状态的代码以及相关资源的集合,包括代码端段、文件、信号、CPU状态、内存地址空间等。
进程使用task_struct结构体来描述,如下所示:
我们接着来看看Java线程的创建序列图,如下所示:
可以看到,最终调用pthread库的pthread_create()方法创建了新的线程,该线程也使用task_struct结构体来描述,但是它没有自己独立的地址空间,而是与其所在的进程共享地址空间和资源。
所以你可以发现,对于虚拟机而言,除了是否具有独立的地址空间外,进程与线程并没有本质上的区别。
我们接着来看看线程是如何调度的。
线程状态流程图图
NEW、WAITING、TIMED_WAITING都比较好理解,我们重点说一说RUNNABLE运行态和BLOCKED阻塞态。
线程进入RUNNABLE运行态一般分为五种情况:
线程进入BLOCKED阻塞态一般也分为五种情况:
我们再来看看和线程状态相关的一些方法。
线程安全,通常所说的线程安全指的是相对的线程安全,它指的是对这个对象单独的操作是线程安全的,我们在调用的时候无需做额外的保障措施。
什么叫相对安全?🤔
🤞举个栗子
我们知道Java里的Vector是个线程安全的类,在多线程环境下对其插入、删除和读取都是安全的,但这仅限于每次只有一个线程对其操作,如果多个线程同时操作
Vector,那它就不再是线程安全的了。
final Vector<String> vector = new Vector<>();
while (true) {
for (int i = 0; i < 10; i++) {
vector.add("项:" + i);
}
Thread removeThread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
});
Thread printThread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < vector.size(); i++) {
Log.d(TAG, vector.get(i));
}
}
});
removeThread.start();
printThread.start();
if (Thread.activeCount() >= 20) {
return;
}
}
但是程序却crash了
正确的做法应该是vector对象加上同步锁,如下:
final Vector<String> vector = new Vector<>();
while (true) {
for (int i = 0; i < 10; i++) {
vector.add("项:" + i);
}
Thread removeThread = new Thread(new Runnable() {
@Override
public void run() {
synchronized (vector){
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
}
});
Thread printThread = new Thread(new Runnable() {
@Override
public void run() {
synchronized (vector){
for (int i = 0; i < vector.size(); i++) {
Log.d(TAG, vector.get(i));
}
}
}
});
removeThread.start();
printThread.start();
if (Thread.activeCount() >= 20) {
return;
}
}
volatile也是互斥同步的一种实现,不过它非常的轻量级。
volatile有两条关键的语义:
要理解volatile关键字,我们得先从Java的线程模型开始说起。如图所示:
Java内存模型规定了所有字段(这些字段包括实例字段、静态字段等,不包括局部变量、方法参数等,因为这些是线程私有的,并不存在竞争)都存在主内存中,每个线程会
有自己的工作内存,工作内存里保存了线程所使用到的变量在主内存里的副本拷贝,线程对变量的操作只能在工作内存里进行,而不能直接读写主内存,当然不同内存之间也
无法直接访问对方的工作内存,也就是说主内存时线程传值的媒介。
我们来理解第一句话:
保证被volatile修饰的变量对所有线程都是可见的
如何保证可见性?🤔
被volatile修饰的变量在工作内存修改后会被强制写回主内存,其他线程在使用时也会强制从主内存刷新,这样就保证了一致性。
关于“保证被volatile修饰的变量对所有线程都是可见的”,有种常见的错误理解:
错误理解:由于volatile修饰的变量在各个线程里都是一致的,所以基于volatile变量的运算在多线程并发的情况下是安全的。
这句话的前半部分是对的,后半部分却错了,因此它忘记考虑变量的操作是否具有原子性这一问题。
:point_up:举个栗子
private volatile int start = 0;
private void volatileKeyword() {
Runnable runnable = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
start++;
}
}
};
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(runnable);
thread.start();
}
Log.d(TAG, "start = " + start);
}
这段代码启动了10个线程,每次10次自增,按道理最终结果应该是100,但是结果并非如此。
为什么会这样?:thinking:
仔细看一下start++,它其实并非一个原子操作,简单来看,它有两步:
所以volatile只能保证可见性,在不符合以下场景下我们依然需要通过加锁来保证原子性:
比方说我们会在线程里加个boolean变量,来判断线程是否停止,这种情况就非常适合使用volatile。
我们再来理解第二句话。
什么是指令重排序?🤔
指令重排序是值指令乱序执行,即在条件允许的情况下,直接运行当前有能力立即执行的后续指令,避开为获取下一条指令所需数据而造成的等待,通过乱序执行的技术,提供执行效率。
指令重排序绘制被volatile修饰的变量的赋值操作前,添加一个内存屏障,指令重排序时不能把后面的指令重排序的内存屏障之前的位置。
关于指令重排序不是本篇文章重点讨论的内容,更多细节可以参考指令重排序。
synchronized是互斥同步的一种实现。
synchronized:当某个线程访问被synchronized标记的方法或代码块时,这个线程便获得了该对象的锁,其他线程暂时无法访问这个方法,只有等待这个方法执行完毕或者代码块执行完毕,这个
线程才会释放该对象的锁,其他线程才能执行这个方法或代码块。
前面我们已经说了volatile关键字,这里我们举个例子来综合分析volatile与synchronized关键字的使用。
:point_up:举个栗子
public class Singleton {
//volatile保证了:1 instance在多线程并发的可见性 2 禁止instance在操作是的指令重排序
private volatile static Singleton instance;
public static Singleton getInstance() {
//第一次判空,保证不必要的同步
if (instance == null) {
//synchronized对Singleton加全局所,保证每次只要一个线程创建实例
synchronized (Singleton.class) {
//第二次判空时为了在null的情况下创建实例
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}
}
这是一个经典的DSL单例。
它的字节码如下:
可以看到被synchronized同步的代码块,会在前后分别加上monitorenter和monitorexit,这两个字节码都需要指定加锁和解锁的对象。
关于加锁和解锁的对象:
synchronized(this)添加的是对象锁,synchronized(ClassName.class)添加的是类锁,它们的区别如下:
对象锁:Java的所有对象都含有1个互斥锁,这个锁由JVM自动获取和释放。线程进入synchronized方法的时候获取该对象的锁,当然如果已经有线程获取了这个对象的锁,那么当前线
程会等待;synchronized方法正常返回或者抛异常而终止,JVM会自动释放对象锁。这里也体现了用synchronized来加锁的好处,方法抛异常的时候,锁仍然可以由JVM来自动释放。类锁:对象锁是用来控制实例方法之间的同步,类锁是用来控制静态方法(或静态变量互斥体)之间的同步。其实类锁只是一个概念上的东西,并不是真实存在的,它只是用来帮助我们理
解锁定实例方法和静态方法的区别的。我们都知道,java类可能会有很多个对象,但是只有1个Class对象,也就是说类的不同实例之间共享该类的Class对象。Class对象其实也仅仅是1个
java对象,只不过有点特殊而已。由于每个java对象都有1个互斥锁,而类的静态方法是需要Class对象。所以所谓的类锁,不过是Class对象的锁而已。获取类的Class对象有好几种,最简
单的就是MyClass.class的方式。 类锁和对象锁不是同一个东西,一个是类的Class对象的锁,一个是类的实例的锁。也就是说:一个线程访问静态synchronized的时候,允许另一个线程访
问对象的实例synchronized方法。反过来也是成立的,因为他们需要的锁是不同的。
关不同步锁还有ReentrantLock,eentrantLockR相对于synchronized具有等待可中断、公平锁等更多功能,这里限于篇幅,不再展开。
我们知道线程的创建、切换与销毁都会花费比较大代价,所以很自然的我们使用线程池来复用和管理线程。Java里的线程池我们通常通过ThreadPoolExecutor来实现。
接下来我们就来分析ThreadPoolExecutor的相关原理,以及ThreadPoolExecutor在Android上的应用AsyncTask。
线程池有五种运行状态,如下所示:
线程池状态图
另外,ThreadPoolExecutor是用一个AtomicInteger来记录线程池状态和线程池里的线程数量的,如下所示:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;// 111
private static final int SHUTDOWN = 0 << COUNT_BITS;// 000
private static final int STOP = 1 << COUNT_BITS;// 001
private static final int TIDYING = 2 << COUNT_BITS;// 010
private static final int TERMINATED = 3 << COUNT_BITS;// 110
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }//线程池状态
private static int workerCountOf(int c) { return c & CAPACITY; }//线程池当前线程数
private static int ctlOf(int rs, int wc) { return rs | wc; }
在正式介绍线程池调度原理之前,我们先来回忆一下Java实现任务的两个接口:
另外,还有个Future接口,它可以对Runnable、Callable执行的任务进行判断任务是否完成,中断任务以及获取任务结果的操作。我们通常会使用它的实现类FutureTask,FutureTask是一个Future、Runnable
以及Callable的包装类。利用它可以很方便的完成Future接口定义的操作。FutureTask内部的线程阻塞是基于LockSupport来实现的。
我们接下来看看线程池是和执行任务的。
ThreadPoolExecutor调度流程图
execute(Runnable command)
public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//1. 若线程池状态是RUNNING,线程池大小小于配置的核心线程数,则可以在线程池中创建新线程执行新任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2. 若线程池状态是RUNNING,线程池大小大于配置的核心线程数,则尝试将任务插入阻塞队列进行等待
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//若插入成功,则将次检查线程池的状态是否为RUNNING,如果不是则移除当前任务并进入拒绝策略。
if (! isRunning(recheck) && remove(command))
reject(command);
//如果线程池中的线程数为0,即线程池中的线程都执行完毕处于SHUTDOWN状态,此时添加了一个null任务
//(因为SHUTDOWN状态不再接受新任务)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3. 若无法插入阻塞队列,则尝试创建新线程,创建失败则进入拒绝策略。
else if (!addWorker(command, false))
reject(command);
}
}
这个其实很好理解,打个比方。我们公司的一个小组来完成任务,
addWorker(Runnable firstTask, boolean core)
addWorker(Runnable firstTask, boolean core) 表示添加个Worker,Worker实现了Runnable接口,是对Thread的封装,该方法添加完Worker后,则调用runWorker()来启动线程。
public class ThreadPoolExecutor extends AbstractExecutorService {
private boolean addWorker(Runnable firstTask, boolean core) {
//重试标签
retry:
for (;;) {
int c = ctl.get();
//获取当前线程池状态
int rs = runStateOf(c);
//以下情况表示不再接受新任务:1 线程池没有处于RUNNING状态 2 要执行的任务为空 3 阻塞队列已满
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//获取线程池当前的线程数
int wc = workerCountOf(c);
//如果超出容量,则不再接受新任务,core表示是否使用corePoolSize作为比较标准
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//增加线程数
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//如果线程池状态发生变化,重新开始循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//线程数增加成功,开始添加新线程,Worker是Thread的封装类
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将新启动的线程添加到线程池中
workers.add(w);
//更新线程池中线程的数量,注意这个数量不能超过largestPoolSize
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//调用runWorker()方法,开始执行线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
}
runWorker(Worker w)
runWorker()方法是整个阻塞队列的核心循环,在这个循环中,线程池会不断的从阻塞队列workerQueue中取出的新的task并执行。
public class ThreadPoolExecutor extends AbstractExecutorService {
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//从阻塞队列中不断取出任务,如果取出的任务为空,则循环终止
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//该方法为空,可以重新次方法,在任务执行开始前做一些处理
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//该方法为空,可以重新次方法,在任务执行结束后做一些处理
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
//从阻塞队列workerQueue中取出Task
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
//循环
for (;;) {
int c = ctl.get();
//获取线程池状态
int rs = runStateOf(c);
//以下情况停止循环:1 线程池状态不是RUNNING(>= SHUTDOWN)2 线程池状态>= STOP 或者阻塞队列为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//递减workCount
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 判断线程的IDLE超时机制是否生效,有两种情况:1 allowCoreThreadTimeOut = true,这是可以手动
//设置的 2 当前线程数大于核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//根据timed来决定是以poll超时等待的方式还是以take()阻塞等待的方式从阻塞队列中获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
}
所以你可以理解了,runWorker()方法是在新创建线程的run()方法里的,而runWorker()又不断的调用getTask()方法去获取阻塞队列里的任务,这样就实现了线程的复用。
我们先来看看ThreadPoolExecutor的构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
那么这些参数我们应该怎么配置呢?要合理配置线程池就需要先了解我们的任务特性,一般说来:
我们根据这些属性来一一分析这些参数的配置。
首先就是核心线程数corePoolSize与最大线程数maximumPoolSize。这个的配置我们通常要考虑CPU同时执行线程的阈值。一旦超过这个阈值,CPU就需要花费很多
时间来完成线程的切换与调度,这样会导致性能大幅下滑。
/**
* CPU核心数,注意该方法并不可靠,它返回的有可能不是真实的CPU核心数,因为CPU在某些情况下会对某些核
* 心进行睡眠处理,这种情况返回的知识已激活的CPU核心数。
*/
private static final int NUMBER_OF_CPU = Runtime.getRuntime().availableProcessors();
/**
* 核心线程数
*/
private static final int corePoolSize = Math.max(2, Math.min(NUMBER_OF_CPU - 1, 4));
/**
* 最大线程数
*/
private static final int maximumPoolSize = NUMBER_OF_CPU * 2 + 1;
至于keepAliveTime,该参数描述了线程不活动时存活的时间,如果是CPU密集型任务,则将时间设置的小一些,如果是IO密集型或者数据库连接任务,则将时间设置的长一些。
我们再来看看BlockingQueue参数的配置。BlockingQueue用来描述阻塞队列。它的方法以四种形式存在,以此来满足不同需求。
抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|
add(e) | offer(e) | put(e) | offer(e, time, unit) |
remove() | poll() | take() | poll(time, unit) |
element() | peek() | 不可用 | 不可用 |
它有以下特点:
它的实现类有:
工作窃取:例如有两个队列A、B,各自干自己的活,但是A效率比较高,很快把自己的活干完了,于是勤快的A就会去窃取B的任务来干,这是A、B会访问同一个队列,为了减少A、B的竞争,规定窃取者A
只从双端队列的尾部拿任务,被窃取者B只从双端队列的头部拿任务。
我们最后来看看RejectedExecutionHandler参数的配置。
RejectedExecutionHandler用来描述线程数大于或等于线程池最大线程数时的拒绝策略,它的实现类有:
另外,Executors提供了一系列工厂方法用来创建线程池。这些线程是适用于不同的场景。
ThreadPoolExecutor里提供了一些空方法,我们可以通过继承ThreadPoolExecutor,复写这些方法来实现对线程池的监控。
public class ThreadPoolExecutor extends AbstractExecutorService {
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
}
常见的监控指标有:
AsyncTask基于ThreadPoolExecutor实现,内部封装了Thread+Handler,多用来执行耗时较短的任务。
一个简单的AsyncTask例子
public class AsyncTaskDemo extends AsyncTask<String, Integer, String> {
/**
* 在后台任务开始执行之前调用,用于执行一些界面初始化操作,例如显示一个对话框,UI线程。
*/
@Override
protected void onPreExecute() {
super.onPreExecute();
}
/**
* 执行后台线程,执行完成可以通过return语句返回,worker线程
*
* @param strings params
* @return result
*/
@Override
protected String doInBackground(String... strings) {
return null;
}
/**
* 更新进度,UI线程
*
* @param values progress
*/
@Override
protected void onProgressUpdate(Integer... values) {
super.onProgressUpdate(values);
}
/**
* 后台任务执行完成并通过return语句返回后会调用该方法,UI线程。
*
* @param result result
*/
@Override
protected void onPostExecute(String result) {
super.onPostExecute(result);
}
/**
* 后台任务呗取消后回调
*
* @param reason reason
*/
@Override
protected void onCancelled(String reason) {
super.onCancelled(reason);
}
/**
* 后台任务呗取消后回调
*/
@Override
protected void onCancelled() {
super.onCancelled();
}
}
AsyncTask的使用非常的简单,接下来我们去分析AsyncTask的源码实现。
AsyncTask流程图
AsyncTask源码的一开始就是个创建线程池的流程。
public abstract class AsyncTask<Params, Progress, Result> {
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
//核心线程数,最少2个,最多4个
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
//线程不活动时的存活时间是30s
private static final int KEEP_ALIVE_SECONDS = 30;
//线程构建工厂,指定线程的名字
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
//一个由链表结构组成的无界阻塞队列
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
public static final Executor THREAD_POOL_EXECUTOR;
//构建线程池
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
}
另外,我们可以通过AsyncTask.executeOnExecutor(Executor exec, Params... params) 来自定义线程池。
我们再来看看构造方法。
public abstract class AsyncTask<Params, Progress, Result> {
//构造方法需要在UI线程里调用
public AsyncTask() {
//创建一个Callable对象,WorkerRunnable实现了Callable接口
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
Result result = doInBackground(mParams);
Binder.flushPendingCommands();
return postResult(result);
}
};
//创建一个FutureTask对象,该对象用来接收mWorker的结果
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
//将执行的结果通过发送给Handler处理,注意FutureTask的get()方法会阻塞直至结果返回
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
private void postResultIfNotInvoked(Result result) {
final boolean wasTaskInvoked = mTaskInvoked.get();
if (!wasTaskInvoked) {
postResult(result);
}
}
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
//内部的Handler
private static class InternalHandler extends Handler {
public InternalHandler() {
//UI线程的Looper
super(Looper.getMainLooper());
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
//返回结果
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
//返回进度
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
}
可以看到当我们调用AsyncTask的构造方法时,就创建了一个FutureTask对象,它内部包装了Callable对象(就是我们要执行的任务),并在FutureTask对象的done()方法里
将结果发送给Handler。
接着看看执行方法execute()。
public abstract class AsyncTask<Params, Progress, Result> {
//需要在UI线程里调用
@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
//任务执行前的处理,我们可以复写次方法
onPreExecute();
mWorker.mParams = params;
//执行任务,exec为sDefaultExecutor
exec.execute(mFuture);
return this;
}
}
接着看看这个sDefaultExecutor。
可以看到sDefaultExecutor是个SerialExecutor对象,SerialExecutor实现了Executor接口。
public abstract class AsyncTask<Params, Progress, Result> {
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
private static class SerialExecutor implements Executor {
//任务队列
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
//当前执行的任务
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
//开始执行任务
scheduleNext();
}
}
protected synchronized void scheduleNext() {
//取出队列头的任务开始执行
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
}
所以我们没调用一次AsyncTask.execute()方法就将FutureTask对象添加到队列尾部,然后会从队列头部取出任务放入线程池中执行,所以你可以看着这是一个串行执行器。
在Okhttp的任务调度器Dispatcher里有关于线程池的配置
public final class Dispatcher {
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
}
你可以看到它的配置:
这其实是Excutors.newCachedThreadPool()缓存池的实现。总结来说就是新任务过来进入SynchronousQueue,它是一个单工模式的队列,只传递任务,不存储任务,然后就创建
新线程执行任务,线程不活动的存活时间为60s。
Okhttp请求流程图
在发起网络请求时,每个请求执行完成后都会调用client.dispatcher().finished(this)。
final class RealCall implements Call {
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//异步请求
client.dispatcher().finished(this);
}
}
}
}
我们来看看client.dispatcher().finished(this)这个方法。
public final class Dispatcher {
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
//将已经结束的请求call移除正在运行的队列calls
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//异步请求promoteCalls为true
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
private void promoteCalls() {
//当前异步请求数大于最大请求数,不继续执行
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
//异步等待队列为空,不继续执行
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
//遍历异步等待队列
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
//如果没有超过相同host的最大请求数,则复用当前请求的线程
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
//运行队列达到上限,也不再执行
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
}
所以你可以看到Okhttp不是用线程池来控制线程个数,线程池里的线程执行的都是正在运行请请求,控制线程的是Dispatcher,Dispatcher.promoteCalls()方法通过
最大请求数maxRequests和相同host最大请求数maxRequestsPerHost来控制异步请求不超过两个最大值,在值范围内不断的将等待队列readyAsyncCalls中的请求添加
到运行队列runningAsyncCalls中去。
Copyright© 2013-2019