我们知道Android实现异步有很多方案,比如AsyncTask,Executor,RxJava等,它们在复杂业务场景下使用起来比较复杂,代码可读性差,容易出现问题。而Kotlin协程使用更简单,性能更好,比上述方案更有优势,其中很大部分原因是因为其实现了协程挂起。理解协程的挂起流程,对我们正确使用协程,灵活使用协程都非常有帮助。
协程源自Simula和Modula-2语言,它是一种编程思想,并不局限于特定的语言,在1958年的时候,Melvin Edward Conway提出这个术语并用于构建汇编程序。
协程是一个轻量级的线程,能提高大型项目中的异步任务的开发效率。Kotlin协程在不同的平台实现的原理并不相同,要在Android中用好协程,就需要理解其在Android上的设计思路。
在Android中,通过Kotlin 1.3中引入协程库,开始支持协程。
Android官方文档的协程定义如下:
❝协程是一种并发设计模式,您能够在 Android 平台上应用它来简化异步执行的代码
举个例子,异步顺序执行的3个网络请求,返回结果后,在主线程更新UI 的代码对比:
fun test1() { //Kotlin普通版异步实现
Thread() { //切换到子线程
request1(param) { value1 -> //第一个异步请求返回结果
request2(value1) { value2 -> //第二个异步请求返回结果
request3(value2) { value3 -> //第三个异步请求返回结果
Handler mainHandler = new Handler(Looper.getMainLooper());
mainHandler.post(new Runnable() {//切换回主线程
@Override
public void run() { //更新UI
updateUI(value3)
}
});
}
}
}
}
}
fun test2() { //Kotlin协程版异步实现
CoroutineScope(Dispatchers.IO).launch { //切换到子线程
val value1 = async{request1(param)}.await() //第一个异步请求返回结果
val value2 = async{request2(value1)}.await() //第二个异步请求返回结果
val value3 = async{request3(value2)}.await() //第三个异步请求返回结果
launch(Dispatchers.Main) { //切换回主线程
updateUI(value3) //更新UI
}
}
}
test1 是普通版本的异步请求更新,test2是协程版本,实现的功能完全一样,但协程版本明显更简洁,可读性和可维护性大大提高了。可见协程是一种异步设计,协程的设计目标,就是简化异步执行的代码,消除传统的Callback回调,以类同步代码的形式,进行异步编程。
通过上一节介绍,我们了解了协程是什么。这一节我们需要进一步来了解协程中的几个核心概念。它们分别是作用域,创建协程/子协程,派发器,和挂起函数。
协程的作用域指定了协程机制运转的代码区块范围。在协程的源代码中,通过CoroutineScope来指定协程的作用域。根据使用场景的不同,对CoroutineScope扩展出其它几个作用域接口。
作用域 | 意义 | 解释 |
---|---|---|
CoroutineScope | 创建协程的局部作用域 | 局部协程作用域,可以指定派发器Dispatcher,并且通过调用返回的Job对象的cancel()方法,可以取消该scope下所有正在进行的任务 |
GlobalScope | 创建协程的全局作用域 | 该API启动的协程为顶层协程,是个单例协程作用域,没有父任务,且该scope没有Job对象,所以无法对整个scope执行cancel()操作,所以需要手动管理内部的每个协程 |
MainScope | 创建协程的局部作用域,且指定Dispatcher.Main | CoroutineScope和Dispatcher.Main组合的协程作用域,会指定派发到Dispatcher.Main中执行,可以通过调用返回的Job对象管理协程 |
runBlocking | 创建协程的局部作用域,且阻塞协程所在线程等待结果 | 局部协程作用域,可以指定派发器Dispatcher,会阻塞调用者所在的线程,直到协程内部返回结果。且该scope没有返回Job对象用于协程管理 |
在一个协程内部可以创建新协程。创建的协程与外部的协程成为父子关系,他们之间可以联动进行cancel、异常处理等协同管理。创建协程的API是CoroutineScope的扩展函数,所以协程和子协程必须在协程作用域中被创建。协程创建后会马上启动,所以我们会经常把创建和启动协程当做一个意思。
创建协程 | 意义 | 解释 |
---|---|---|
launch | 创建一个Job协程,或子协程 | 创建一个新协程,可以通过返回的Job管理协程 |
async | 创建一个Deferred协程,或子协程 | 和launch的区别是,Deferred可以用来等待该协程执行完毕的返回值 |
在协程作用域内做到异步执行任务,需要有派发器将异步任务派发到不同的线程执行。Kotlin库默认提供了4个默认的派发器,都放在Dispatchers类下:
派发器 | 意义 | 解释 |
---|---|---|
Dispatchers.Main | 任务派发到Android主线程中执行 | 通过MainLooper的handler来向主线程派发任务到主线程执行 |
Dispatchers.Default | 任务派发到默认的工作线程池中执行 | 协程内部实现的Excutor线程池,核心线程数和最大线程数依赖CPU数量,适用于计算类耗时任务调度 |
Dispatchers.IO | 任务派发到工作线程池中执行 | IO和Default共享线程池,但运行并发数不同,支持最大并行任务数,适用IO任务调度 |
Dispatchers.Unconfined | 任务会根据上下文环境,派发到指定线程or线程池执行 | 无指定派发线程,会根据运行时的上下文环境决定。 |
挂起函数,是指把协程代码挂起不继续执行的函数,也叫协程被函数挂起了。协程中调用挂起函数时,协程所在的线程不会挂起也不会阻塞,但是协程被挂起了。也就是说,协程内挂起函数之后的代码停止执行了,直到挂起函数完成后恢复协程,协程才继续执行后续的代码。所有挂起函数都会通过suspend修饰符修饰。
挂起函数 | 意义 | 解释 |
---|---|---|
join | 挂起当前协程,直到等待的子协程执行完毕 | 通过当前协程返回的Job接口的join方法,可以单纯的挂起当前协程,等待子协程完成后再恢复继续执行 |
await | 挂起当前协程,直到等待的子协程返回结果 | 和join的区别是,它属于Job接口的子接口Deferred的方法,可以等待子协程完成后,带着返回值恢复当前协程 |
delay | 挂起当前协程,直到指定时间后恢复当前协程 | 单纯挂起当前协程,指定时长后恢复协程执行 |
withContext() | 挂起外部协程,直到自己内部协程全部返回后,才会恢复外部的协程。 | 没有创建新的协程,在指定协程上运行挂起代码块,并挂起该协程直至代码块运行完成并返回结果。类似async.await的效果 |
从上面的介绍和示例,我们了解了协程的核心概念和API,也看到协程实现了用同步代码的形式实现了异步编程。协程实现异步的核心原理就是通过挂起函数实现协程体的挂起,还不阻塞协程体所在的线程。
那我们进一步来看看,协程实现异步的具体流程。我们通过一个示例来认识协程的工作过程,和协程挂起的效果:
fun testInMain() {
Log.d("["+Thread.currentThread().name+"]testInMain start")
var job = CoroutineScope(Dispatchers.Main).launch { //启动协程job
Log.d("[" + Thread.currentThread().name+"]job start")
var job1 = async(Dispatchers.IO) { //启动协程job1
Log.d("["+Thread.currentThread().name+"]job1 start")
delay(3000) //挂起job1协程 3秒
Log.d("["+Thread.currentThread().name+"]job1 end ")
"job1-Return"
} //job1协程 续体执行完毕
var job2 = async(Dispatchers.Default) {
Log.d("["+Thread.currentThread().name+"]job2 start" )
delay(1000) //挂起job2协程 1秒
Log.d("["+Thread.currentThread().name+"]job2 end")
"job2-Return"
} //job2协程 续体执行完毕
Log.d("["+Thread.currentThread().name+"]before job1 return")
Log.d("["+Thread.currentThread().name+"]job1 result = " + job1.await()) //挂起job协程,等待job1返回结果;如果已有结果,不挂起,直接返回
Log.d("["+Thread.currentThread().name+"]before job2 return")
Log.d("["+Thread.currentThread().name+"]job2 result = " + job2.await()) //挂起job协程,等待job2返回结果;如果已有结果,不挂起,直接返回
Log.d("["+Thread.currentThread().name+"]job end ")
} //job协程 续体执行完毕
Log.d("["+Thread.currentThread().name+"]testInMain end")
} //testInMain
示例代码的log输出如下,我们需要重点关注Log输出的次序,和时间间隔:
10:15:04.046 26079-26079/com.example.myapplication D/TC: [main]testInMain start
10:15:04.067 26079-26079/com.example.myapplication D/TC: [main]testInMain end
10:15:04.080 26079-26079/com.example.myapplication D/TC:
[main]job start
10:15:04.083 26079-26079/com.example.myapplication D/TC:
[main]before job1 return
10:15:04.086 26079-26683/com.example.myapplication D/TC: [DefaultDispatcher-worker-1]job1 start
10:15:04.087 26079-26684/com.example.myapplication D/TC: [DefaultDispatcher-worker-2]job2 start
10:15:05.090 26079-26683/com.example.myapplication D/TC: [DefaultDispatcher-worker-1]job2 end
10:15:05.095 26079-26079/com.example.myapplication D/TC:
[main]button-2 onclick now
10:15:07.090 26079-26685/com.example.myapplication D/TC: [DefaultDispatcher-worker-3]job1 end
10:15:07.091 26079-26079/com.example.myapplication D/TC:
[main]job1 result = job1-Return
10:15:07.091 26079-26079/com.example.myapplication D/TC:
[main]before job2 return
10:15:07.091 26079-26079/com.example.myapplication D/TC:
[main]job2 result = job2-Return
10:15:07.091 26079-26079/com.example.myapplication D/TC:
[main]job end
通过输出,我们可以了解到示例代码的具体运行情况:
至此,testInMain中所有的相关协程运行完毕,比testInMain函数晚约3.5秒(04.067 - 07.091)后结束。
上述代码的运行过程,我们根据执行片段梳理,可以进一步简化为图,方便大家理解:
图片
从图中,我们可以清晰的得到几点结论:
所以可以看出,协程的挂起,并不会阻塞协程所在的线程,而只是中断了协程后面的代码执行。然后等待挂起函数完成后,恢复协程的后续代码执行。这就是协程挂起最最基本的关键点。
从示例我们知道协程的挂起,是通过await挂起函数实现的,其函数的定义如下:
// Deferred.kt
public interface Deferred<out T> : Job {
public suspend fun await(): T
}
// Delay.kt
public suspend fun delay(timeMillis: Long) {...}
挂起函数比正常的函数多了一个suspend关键字修饰。这个关键字的意思就是“挂起”,所以suspend关键字修饰的函数也叫“挂起函数”。挂起函数有个调用约定:挂起函数必须在协程或者其他挂起函数中被调用。这保证了所有挂起函数都是在协程中被调用,是协程机制运转的基本要求。
挂起函数会把所在协程挂起来,中断协程后续的代码执行。协程的代码被中断执行,但它所在的线程并没有被阻塞,如上图实例中的job被await阻塞后,所在的main线程,仍然可以执行其它代码。换个方式说,就是协程被挂起时,协程与它运行所在的线程解绑了,线程会被线程池继续调度执行别的代码。
当挂起函数等待的结果返回后,会通知协程的恢复后续代码的执行。协程的恢复由挂起函数内部的续体机制来实现,后面会具体说明。
了解了Kotlin协程挂起的整体运行流程,我们进一步来看看协程挂起的实现原理。要想了解Kotlin在Android中的实现原理,必须分析其在Android中的源码实现。不过,因为Kotlin协程的源码和运行时的代码有些差别,实际运行时的代码,很多是通过编译器自动处理生成的。所以,我们需要借助Kotlin Bytedcode Decomile工具反编译出java文件,来进行代码分析。
上节中的示例代码,经过反编译后的核心代码如下:
//TestCoroutin.decompiled.java
public final void testInMain() {
Log.d("cjf---", var10001.append("testInMain start").toString());
Job job = BuildersKt.launch$default( CoroutineScopeKt.CoroutineScope((CoroutineContext)Dispatchers.getMain()), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
//单独拆分到下面,需要详细讲解
}
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {/......./}
public final Object invoke(Object var1, Object var2){/......./}
}), 3, (Object)null);
Log.d("cjf---", var10001.append("testInMain end ").toString());
}
//job协程的SuspendLambda续体,其invokeSuspend方法代码
public final Object invokeSuspend(@NotNull Object $result) {
... ...
label17: {
Object var8 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
Log.d(var10001.append("job start ").toString());
Deferred job1 = BuildersKt.async$default(/......./);
job2 = BuildersKt.async$defaultdefault(/......./);
Log.d(var10001.append("before job1 return").toString());
var6 = var10001.append("job1 result =");
this.L$0 = job2;
this.L$1 = var5;
this.L$2 = var6;
this.label = 1;
var10000 = job1.await(this);
if (var10000 == var8) {
return var8;
}
break;
case 1:
var6 = (StringBuilder)this.L$2;
var5 = (String)this.L$1;
job2 = (Deferred)this.L$0;
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
case 2:
var6 = (StringBuilder)this.L$1;
var5 = (String)this.L$0;
ResultKt.throwOnFailure($result);
var10000 = $result;
break label17;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
var7 = var10000;
Log.d(var5, var6.append((String)var7).toString());
Log.d(var10001.append("before wait job2 return").toString());
var6 = var10001.append("job2 result = ");
this.L$0 = var5;
this.L$1 = var6;
this.L$2 = null;
this.label = 2;
var10000 = job2.await(this);
if (var10000 == var8) {
return var8;
}
} //end of label17
Log.d(var5, var6.append((String)var7).toString());
Log.d("cjf---", var10001.append("job end ").toString());
return Unit.INSTANCE;
} //end of invokeSuspend
我们可以看到,反编译后的TestCoroutine类的testInMain和源代码中的区别,除了job协程块不同,其它变化不大,先打印“[main]testInMain start”,然后创建job协后,最后打印“[main]testInMain end”,完成整个方法的执行。
反编译后的主要区别在job协程,其Lambda代码块转换成了Function2 实现。
我们借助APK反编译工具,可以看到执行代码中,Function2 实际上被SuspendLambda 类继承实现。
图片
SuspendLambda实现类的关键逻辑在invokeSuspend方法中,而invokeSuspend方法中采用了CPS(Continuation-Passing-Style) 续体传递风格。
续体传递风格会将job协程的Lambda代码块,通过label标签和switch分割成多个代码块。代码块分割的点,就是协程中调用suspend挂起函数的地方。
分支代码调用到await挂起函数时,如果返回了COROUTINE_SUSPENDED,就退出invokeSuspend,进入挂起状态。
我们用流程图来描述上面示例代码,转换后的续体传递风格代码,如下:
图片
我们可以看到,整个示例代码,被分割成了5个代码块。其中case1 代码块主要负责为label17 代码块进行参数转换;case2 代码块主要负责为最外层代码块进行参数转换;所以相当于2个await挂起函数,将lambda代码块分割成了3个实际执行的代码块。
而且job1.await和job2.await会根据挂起函数的返回值进行不同处理,如果返回挂起,则进行协程挂起,当前协程退出执行;如果返回其它值,则协程继续后续代码块的执行。
编译器在编译期间,除了会对所有suspend修饰的函数调用处进行续体传递风格变换,还会同时改变suspend函数的签名。比如await函数是个suspend函数,签名声明如下:
suspend fun <T> CompletableFuture<T>.await(): T
编译器进行变换后:
fun <T> CompletableFuture<T>.await(continuation: Continuation<T>): Any?
我们可以看到,改变后的函数多了一个类型为Continuation的参数。Continuation可以称之为协程续体,它提供了协程恢复的基本方法:resumeWith。Continuation续体声明很简单:
interface Continuation<in T> {
val context: CoroutineContext
fun resumeWith(result: Result<T>)
}
其具体实现在SuspendLambda的父类BaseContinuationImpl中:
//class BaseContinuationImpl 中 fun resumeWith 内部核心代码
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result<Any?> = //协程返回了结果,说明协程执行完毕
try {
val outcome = invokeSuspend(param)//执行协程的续体代码块
if (outcome === COROUTINE_SUSPENDED) return //挂起函数返回挂起标志,退出后续代码执行
Result.success(outcome) //没有返回挂起标志,将返回值outcome封装为Result返给外层outcome
} catch (exception: Throwable) {
Result.failure(exception)//将异常Result返给外层outcome
}
releaseIntercepted() // 释放当前协程的拦截器
if (completion is BaseContinuationImpl) {//如果上一层续体是一个单纯的续体,则将结果作为上一层续体的恢复参数,进行上一层续体的恢复
current = completion
param = outcome
} else {//上一层续体是一个协程,则调用协程的恢复函数,进行上一层的协程恢复
completion.resumeWith(outcome)
return
}
}
}
如果invokeSuspend函数返回中断标志时,会直接从函数中返回,等待后续继续被恢复执行。
如果invokeSuspend函数返回的是结果,且上一层续体不是单纯的续体而是协程体,它会调用参数completion的resumeWith函数,恢复上一层协程的invokeSuspend代码的执行。
协程被resumeWith恢复后,会继续调用invokeSuspend函数,根据label值执行下一个case分支代码块。按照这个恢复流程,直到所有invokeSuspend代码执行完,返回非COROUTINE_SUSPENDED的结果,协程就执行结束。
我们继续看job续体在invokeSuspend中调用到job1.await函数时,await是怎么实现返回挂起标志,和后续恢复job协程的。核心代码可以在awaitSuspend中查看:
// JobSupport.kt中 awaitSuspend方法
private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
val cont = AwaitContinuation(uCont.intercepted(), this)
cont.disposeOnCancellation(invokeOnCompletion(
ResumeAwaitOnCompletion(this, cont).asHandler))
cont.getResult()
}
// JobSupport.kt中 invokeOnCompletion方法
public final override fun invokeOnCompletion(...):DisposableHandle {
var nodeCache: JobNode<*>? = null
loopOnState { state ->
when (state) {
is Empty -> { // 没有completion handlers,直接创建Node放入state
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
if (_state.compareAndSet(state, node)) return nod
}
is Incomplete -> {// 有completion handlers,加入到node list列表
val list = state.list
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
if (!addLastAtomic(state, list, node)) return@loopOnState /
}
else -> { // 已经完成,不需要加入结果监听Node
if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause) return NonDisposableHandle
}
}
}
}
// AbstractCoroutine.kt 中 resumeWith方法
// 通知state node,进行恢复
public final override fun resumeWith(result: Result<T>) {
// makeCompletingOnce 大致实现是修改协程状态,如果需要的话还会将结果返回给调用者协程,并恢复调用者协程
makeCompletingOnce(result.toState(), defaultResumeMode)
}
可以看出,job1.await()首先会通过getResult()去获取job1的结果,如果有结果则直接返回结果,否则立即返回中断标志,这样就实现了await挂起点挂起job协程了。await()挂起函数恢复job协程的流程是,将job 协程封装为 ResumeAwaitOnCompletion,并将其再次封装成handler 节点,添加job1协程的 state.list。
等job1协程完成后,会通知 handler 节点调用job协程的 resumeWith(result) 方法,从而恢复 job协程await 挂起点之后的代码块的执行。
我们再次结合示例代码, 来梳理这个挂起和恢复流程:
图片
note:绿色底色,表示在主线程执行;红色字体,表示调用挂起函数;
可以看到整个过程:
通过续体传递风格的invokeSuspend代码,和续体之间形成的resumewith恢复链,协程得以实现挂起和恢复的核心流程。
本文通过示例Log和反编译后的源码分析,讲解了Kotlin协程的挂起和恢复流程。
协程的挂起和恢复,是通过invokeSuspend和resumeWith这2个核心方法串起来的,其最重要的就是Continuation续体链的建立和恢复。
协程中的Coroutine和SuspendLambda,以及线程派发器DispatchedContinuation,都是继承自Continuation续体,都能通过resumeWith恢复上一层续体或协程。
协程的挂起流程就是通过Coroutine,SuspendLambda,和Dispatcher这三层进行Cotinuation封装,形成Cotinuation恢复链,配合CPS续传体变形实现了协程内有序代码的异步挂起和恢复。
通过了解协程的挂起原理,我们可灵活运用协程的特点,进行业务中的异步任务调度,输出清晰明了,安全可维护的代码。
如果我们新的业务在用协程时,想要复用原有的异步任务,就可以对异步任务进行suspend封装;对于原异步模块无复用需求的模块,我们可以进一步引入Jetpack对协程的支持,如生命周期感知型ViewModelScope,LifecycleScope,来控制业务中异步任务的生命周期。
❝许多 Jetpack 库都包含提供全面协程支持的扩展。某些库还提供自己的协程作用域,可供您用于结构化并发。ViewModel 包含一组可直接与协程配合使用的 KTX 扩展。这些扩展是lifecycle-viewmodel-ktx 库。
Copyright© 2013-2019