概述 2019 年 Google I/O 大会上宣布今后将越来越优先采用 Kotlin 进行 Android 开发,Kotlin 是一种富有表现力且简洁的编程语言,不仅可以减少常见代码错误,还可以轻松集成到现有应用中。关于 Kotlin 的基础用法可参考: Kotlin 官方文档 和 Kotlin笔记系列 。
通常来说,协程(Coroutines)是轻量级的线程,它不需要从用户态切换到内核态,Coroutine是编译器级的,Process 和 Thread 是操作系统级的,协程没有直接和操作系统关联,但它也是跑在线程中的,可以是单线程,也可以是多线程。协程设计的初衷是为了解决并发问题,让协作式多任务实现起来更加方便,它可以有效地消除回调地狱。
在Kotlin中,协程是一套线程 API, 就像 Java 的 Executors 和 Android 的 Handler 等,是一套比较方便的线程框架,它能够在同一个代码块里进行多次的线程切换,可以用看起来同步的方式写出异步的代码,即非阻塞式挂起 。
协程的优势:避免回调地狱(也可以通过 RxJava 或者 CompletableFuture 实现):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 api.getAvatar(id) { avatar -> api.getName(id) { name -> show(getLabel(avatar, name)) } } coroutineScope.launch(Dispatchers.Main) { val avatar = async { api.getAvatar(id) } val name = async { api.getName(id) } val label = getLabelSuspend(avatar.await(), name.await()) show(label) }
1 2 3 4 5 6 def kotlin_coroutines = `1.3 .9 `implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlin_coroutines" implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$kotlin_coroutines"
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 runBlocking { getName(id) } GlobalScope.launch { getName(id) } val coroutineScope = CoroutineScope(context)coroutineScope.launch { getName(id) } val id = coroutineScope.async { getName(id) }id.await()
基本概念 suspend suspend 是挂起的意思,它挂起的对象是协程。
当线程执行到协程的 suspend 函数的时候,暂时不继续执行协程代码了,它会跳出协程的代码块,然后这个线程该干什么就去干什么。
当协程执行到 suspend 函数的时候,这个协程会被「suspend」,也就是从当前线程被挂起。换句话说,就是这个协程从正在执行它的线程上脱离。线程的代码在到达 suspend 函数的时候被掐断,接下来协程会从这个 suspend 函数开始继续往下执行,不过是在这个 suspend 函数指定的线程里执行。
紧接着在 suspend 函数执行完成之后,协程会自动帮我们把线程再切回来,然后接着执行协程后面的代码(resume),resume 功能是协程特有的,所以 suspend 函数必须在 协程 或者 另一个suspend函数 里被调用。
suspend 的非阻塞式指的是它能用看起来阻塞的代码写出非阻塞的操作,单协程可以是非阻塞式的,因为它可以用 suspend 函数来切换线程,本质上还是多线程,只不过写法上连续两行代码看起来是阻塞式的。
CoroutineScope 1 2 3 public interface CoroutineScope { public val coroutineContext: CoroutineContext }
GlobeScope 启动的协程是一个单独的作用域,不会继承上层协程的作用域,其内部的子协程遵守默认的作用域规则。
coroutineScope 启动的协程 cancel 时会 cancel 所有子协程,也会 cancel 父协程,子协程未捕获的异常也会向上传递给父协程。
supervisorScope 启动的协程 cancel 和传递异常时,只会由父协程向子协程单向传播。MainScope 是 supervisorScope 作用域。
CoroutineContext Job, CoroutineDispatcher, ContinuationInterceptor 等都是 CoroutineContext 的子类,即它们都是协程上下文。CoroutineContext 中有一个重载了(+)操作符的plus方法,可以将 Job 和 CoroutineDispatcher 等元素集合起来,代表一个协程的场景。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public interface CoroutineContext { public operator fun <E : Element> get (key: Key <E >) : E? public fun <R> fold (initial: R , operation: (R , Element ) -> R ) : R public operator fun plus (context: CoroutineContext ) : CoroutineContext = public fun minusKey (key: Key <*>) : CoroutineContext public interface Key <E : Element > public interface Element : CoroutineContext { } } public interface ContinuationInterceptor : CoroutineContext.Element { } public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { }
CoroutineStart 几种启动模式如下:
1 2 3 4 5 6 public enum class CoroutineStart { DEFAULT, LAZY, ATOMIC, UNDISPATCHED; }
. This is similar to [DEFAULT], but the coroutine cannot be cancelled before it starts executing.
. This is similar to [ATOMIC] in the sense that coroutine starts executing even if it was already cancelled, but the difference is that it starts executing in the same thread.
Dispatchers 协程调度器是用来指定协程体在哪个线程中执行,Kotlin提供了几个调度器:
1 2 3 4 5 6 7 8 9 10 11 12 GlobalScope.launch(Dispatchers.Default) { println("1: ${Thread.currentThread().name} " ) launch(Dispatchers.Default) { println("2: ${Thread.currentThread().name} " ) } println("3: ${Thread.currentThread().name} " ) } -->output 1 : DefaultDispatcher-worker-1 3 : DefaultDispatcher-worker-1 2 : DefaultDispatcher-worker-2
1 2 3 4 5 6 7 8 9 10 11 12 GlobalScope.launch(Dispatchers.Default) { println("1: ${Thread.currentThread().name} " ) launch(Dispatchers.Unconfined) { println("2: ${Thread.currentThread().name} " ) } println("3: ${Thread.currentThread().name} " ) } -->output 1 : DefaultDispatcher-worker-1 2 : DefaultDispatcher-worker-1 3 : DefaultDispatcher-worker-1
基于 Default 调度器背后的线程池(designed for offloading blocking IO tasks),因此从 Default 切换到 IO 不会触发线程切换:
1 2 3 4 5 6 7 8 9 10 11 12 GlobalScope.launch(Dispatchers.Default) { println("1: ${Thread.currentThread().name} " ) launch(Dispatchers.IO) { println("2: ${Thread.currentThread().name} " ) } println("3: ${Thread.currentThread().name} " ) } -->output 1 : DefaultDispatcher-worker-1 3 : DefaultDispatcher-worker-1 2 : DefaultDispatcher-worker-1
Job&Deferred Job 可以取消并且有简单生命周期,它有三种状态:
New (optional initial state)
Active (default initial state)
Completing (optional transient state)
Cancelling (optional transient state)
Cancelled (final state)
Completed (final state)
Job 完成时是没有返回值的,如果需要返回值的话,应该使用 Deferred,它是 Job 的子类: public interface Deferred<out T> : Job
launch 方法返回一个Job类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public interface Job : CoroutineContext.Element { public companion object Key : CoroutineContext.Key<Job> { } public val isActive: Boolean public val isCompleted: Boolean public val isCancelled: Boolean public fun start () : Boolean public fun cancel () : Unit = cancel(null ) public suspend fun join () }
async 方法返回一个 Deferred 类型:
1 2 3 4 5 public interface Deferred <out T > : Job { public suspend fun await () : T }
1 2 3 4 5 6 7 fun main () = runBlocking { val job = async { delay(500 ) "Hello" } println("${job.await()} , World" ) }
Android-Kotlin协程使用 MainScope Android 中一般不建议使用 GlobalScope, 因为它会创建一个顶层协程,需要保持所有对 GlobalScope 启动的协程的引用,然后在 Activity destory 等场景的时候 cancel 掉这些的协程,否则就会造成内存泄露等问题。可以使用 MainScope:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class CoroutineActivity : AppCompatActivity () { private val mainScope = MainScope() fun request1 () { mainScope.launch { } } override fun onDestroy () { super .onDestroy() mainScope.cancel() } }
MainScope 的定义:
1 public fun MainScope () : CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
Lifecycle协程 关于 Lifecycle 可以参考 Android-Jetpack组件之Lifecycle 。
1 implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0"
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope get () = lifecycle.coroutineScope val Lifecycle.coroutineScope: LifecycleCoroutineScope get () { while (true ) { val existing = mInternalScopeRef.get () as LifecycleCoroutineScopeImpl? if (existing != null ) { return existing } val newScope = LifecycleCoroutineScopeImpl( this , SupervisorJob() + Dispatchers.Main.immediate ) if (mInternalScopeRef.compareAndSet(null , newScope)) { newScope.register() return newScope } } } abstract class LifecycleCoroutineScope internal constructor () : CoroutineScope { internal abstract val lifecycle: Lifecycle fun launchWhenCreated (block: suspend CoroutineScope .() -> Unit ) : Job = launch { lifecycle.whenCreated(block) } fun launchWhenStarted (block: suspend CoroutineScope .() -> Unit ) : Job = launch { lifecycle.whenStarted(block) } fun launchWhenResumed (block: suspend CoroutineScope .() -> Unit ) : Job = launch { lifecycle.whenResumed(block) } }
1 2 3 4 5 6 7 8 9 class MainActivity : AppCompatActivity () { fun test () { lifecycleScope.launchWhenCreated { } } }
LiveData协程 关于 LiveData 可以参考 Android-Jetpack组件之LiveData-ViewModel 。
1 implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0"
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 fun <T> liveData ( context: CoroutineContext = EmptyCoroutineContext, timeoutInMs: Long = DEFAULT_TIMEOUT, @BuilderInference block: suspend LiveDataScope <T >.() -> Unit ) : LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)internal class CoroutineLiveData <T > ( context: CoroutineContext = EmptyCoroutineContext, timeoutInMs: Long = DEFAULT_TIMEOUT, block: Block<T> ) : MediatorLiveData<T>() { private var blockRunner: BlockRunner<T>? private var emittedSource: EmittedSource? = null init { val supervisorJob = SupervisorJob(context[Job]) val scope = CoroutineScope(Dispatchers.Main.immediate + context + supervisorJob) blockRunner = BlockRunner( liveData = this , block = block, timeoutInMs = timeoutInMs, scope = scope ) { blockRunner = null } } internal suspend fun emitSource (source: LiveData <T >) : DisposableHandle { clearSource() val newSource = addDisposableSource(source) emittedSource = newSource return newSource } internal suspend fun clearSource () { emittedSource?.disposeNow() emittedSource = null } override fun onActive () { super .onActive() blockRunner?.maybeRun() } override fun onInactive () { super .onInactive() blockRunner?.cancel() } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class MainActivity : AppCompatActivity () { fun test () { liveData { try { emit("success" ) } catch (e: Exception) { emit("error" ) } }.observe(this , Observer { Log.d("LLL" , it) }) } }
ViewModel协程 关于 ViewModel 可以参考 Android-Jetpack组件之LiveData-ViewModel 。
1 implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.2.0"
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 val ViewModel.viewModelScope: CoroutineScope get () { val scope: CoroutineScope? = this .getTag(JOB_KEY) if (scope != null ) { return scope } return setTagIfAbsent(JOB_KEY, CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main.immediate)) } internal class CloseableCoroutineScope (context: CoroutineContext) : Closeable, CoroutineScope { override val coroutineContext: CoroutineContext = context override fun close () { coroutineContext.cancel() } }
协程的并发 启动一百个协程,它们都做一千次相同的操作,同时会测量它们的完成时间以便进一步的比较:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 suspend fun massiveRun (action: suspend () -> Unit ) { val n = 100 val k = 1000 val time = measureTimeMillis { coroutineScope { repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms" ) } var counter = 0 fun main () = runBlocking { withContext(Dispatchers.Default) { massiveRun { counter++ } } println("Counter = $counter " ) } Completed 100000 actions in 55 ms Counter = 92805
因为一百个协程在多个线程中同时递增计数器但没有做并发处理,所以不太可能输出 100000 。使用 volatile 并不能解决这个问题,因为 volatile 不能保证原子性,只有内存可见性。可以用以下等方式解决:
使用原子类 AtomicInteger 进行递增
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 val counterContext = newSingleThreadContext("CounterContext" )var counter = 0 fun main () = runBlocking { withContext(Dispatchers.Default) { massiveRun { withContext(counterContext) { counter++ } } } println("Counter = $counter " ) } Completed 100000 actions in 1652 ms Counter = 100000
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 val counterContext = newSingleThreadContext("CounterContext" )var counter = 0 fun main () = runBlocking { withContext(counterContext) { massiveRun { counter++ } } println("Counter = $counter " ) } Completed 100000 actions in 40 ms Counter = 100000
互斥:除了Java已有的一些互斥方式如 Lock 等之外,Kotlin 中提供了 Mutex 类,它具有 lock 和 unlock 方法,lock 是一个挂起函数,它不会阻塞线程。可以使用 withLock 扩展函数替代常用的 mutex.lock(); try { …… } finally { mutex.unlock() }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 val mutex = Mutex()var counter = 0 fun main () = runBlocking { withContext(Dispatchers.Default) { massiveRun { mutex.withLock { counter++ } } } println("Counter = $counter " ) } Completed 100000 actions in 640 ms Counter = 100000
协程Flow 更多用法参考: Flow 。
flow创建 flow 构造器中的代码会直到 flow 被 collect 的时候才会运行,且 collect 方法是 suspend 函数,会挂起当前协程。创建 flow 有几种方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 flowOf(1 , 2 , 3 , 4 , 5 ).onEach { delay(100 ) }.collect{ println(it) } listOf(1 , 2 , 3 , 4 , 5 ).asFlow().onEach { delay(100 ) }.collect { println(it) } fun simple () : Flow<Int > = flow { for (i in 1 ..3 ) { delay(500 ) emit(i) } } channelFlow { for (i in 1 ..5 ) { delay(100 ) send(i) } }.collect { println(it) }
flow 是 Cold Stream, 在没有切换线程的情况下,生产者和消费者是同步非阻塞的。channel 是 Hot Stream, channelFlow 实现了生产者和消费者异步非阻塞模型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 runBlocking { val time = measureTimeMillis { flow { for (i in 1 ..5 ) { delay(100 ) emit(i) } }.collect { delay(100 ) println(it) } } println("cost $time " ) } runBlocking { val time = measureTimeMillis { channelFlow { for (i in 1 ..5 ) { delay(100 ) send(i) } }.collect { delay(100 ) println(it) } } println("cost $time " ) }
当然如果使用 flowOn 切换线程或者使用 buffer() 的话,得到的时间也差不多是 700 ms。
切换线程 流的 collect 总是在调用协程的上下文中发生,使用 flowOn 可以切换流 emit 的上下文(包括切换线程,不要使用 withContext() 来切换 flow 的线程),上游都会受到影响:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 fun simple () : Flow<Int > = flow { for (i in 1 ..3 ) { Thread.sleep(100 ) log("Emitting $i " ) emit(i) } }.flowOn(Dispatchers.Default) fun main () = runBlocking<Unit > { simple().collect { value -> log("Collected $value " ) } } [DefaultDispatcher-worker-1 @coroutine #2 ] Emitting 1 [main @coroutine #1 ] Collected 1 [DefaultDispatcher-worker-1 @coroutine #2 ] Emitting 2 [main @coroutine #1 ] Collected 2 [DefaultDispatcher-worker-1 @coroutine #2 ] Emitting 3 [main @coroutine #1 ] Collected 3
可以在流上使用 buffer 操作符来并发运行流中发射和收集的代码(注意,当更改 CoroutineDispatcher 时,flowOn 操作符使用了相同的缓冲机制,但是在这里显式地请求了缓冲而不改变执行上下文):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 fun simple () : Flow<Int > = flow { for (i in 1 ..3 ) { delay(100 ) emit(i) } } fun main () = runBlocking<Unit > { val time = measureTimeMillis { simple().collect { value -> delay(300 ) println(value) } } println("Collected in $time ms" ) } 1 2 3 Collected in 1217 ms val time = measureTimeMillis { simple() .buffer() .collect { value -> delay(300 ) println(value) } } println("Collected in $time ms" ) 1 2 3 Collected in 1043 ms
flow取消 如果 flow 是在一个挂起函数内被挂起了,那么 flow 是可以被取消的,否则不能取消。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 fun simple () : Flow<Int > = flow { for (i in 1 ..3 ) { delay(100 ) println("Emitting $i " ) emit(i) } } fun main () = runBlocking<Unit > { withTimeoutOrNull(250 ) { simple().collect { value -> println(value) } } println("Done" ) }
操作符 操作符: map, filter, transform, take 等:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 suspend fun performRequest (request: Int ) : String { delay(1000 ) return "response $request " } fun main () = runBlocking<Unit > { (1 ..3 ).asFlow() .map { request -> performRequest(request) } .collect { response -> println(response) } } response 1 response 2 response 3
由于 collect 是 suspend 函数,后续操作会等待它执行完毕后才会接着执行,因此可以使用 launchIn 配合 onEach 完成这个任务, launchIn 的参数是 CoroutineScope, 指定了用哪一个协程来启动流的收集:
1 2 3 4 5 6 7 8 9 fun events () : Flow<Int > = (1 ..3 ).asFlow().onEach { delay(100 ) }fun main () = runBlocking<Unit > { events() .onEach { event -> println("Event: $event " ) } .launchIn(this ) println("Done" ) }
异常处理 flow 可以使用传统的 try-catch 处理异常,另外还可以使用 catch 操作符捕获来自上游的异常,而 onCompletion 不能捕获异常,只能用于判断是否有异常:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 fun main () = runBlocking { flow { emit(1 ) throw RuntimeException() } .onCompletion { cause -> if (cause != null ) println("Flow completed exceptionally" ) else println("Done" ) } .catch { println("catch exception" ) } .collect { println(it) } } 1 Flow completed exceptionally catch exception
上面的代码如果把 onCompletion 和 catch 交换一下位置,则 catch 操作符捕获到异常后,不会影响到下游,因此 onCompletion 操作符不会走异常分支。
但 catch 只是中间操作符,它不能捕获下游的异常,对于下游如 collect 中的异常,除了使用 try-catch 去处理外,还可以借助 onEach 操作符,把业务逻辑放到 onEach 操作符内:
1 2 3 4 5 6 7 8 9 10 fun main () = runBlocking<Unit > { flow { ...... } .onEach { ...... } .catch { ... } .collect() }
flow 可以使用 retry 和 retryWhen 操作符重试指定次数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 runBlocking { (1 ..5 ).asFlow() .onEach { if (it == 3 ) throw RuntimeException("Error on $it " ) } .retry(2 ) { if (it is RuntimeException) { return @retry true } false } .onEach { println("Emitting $it " ) } .catch { it.printStackTrace() } .collect() } Emitting 1 Emitting 2 Emitting 1 Emitting 2 Emitting 1 Emitting 2 java.lang.RuntimeException: Error on 3
flow 可以使用操作符来监听其生命周期:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 runBlocking { (1 ..5 ).asFlow() .onEach { if (it == 3 ) throw RuntimeException("Error on $it " ) } .onEach { println("OnEach $it " ) } .onStart { println("Flow starting" ) } .onCompletion { println("Flow completed" ) } .catch { println("Exception : ${it.message} " ) } .collect() } Flow starting OnEach 1 OnEach 2 Flow completed Exception : Error on 3
Flow 以挂起的方式执行,它是非阻塞的,只有遇到末端操作符,才会触发所有操作的执行;
map, filter, take, zip 等是中间操作符,collect, collectLatest, single, reduce, toList 等是末端操作符;
Channel 是非阻塞的,它用于发送方 (SendChannel) 和接收方 (ReceiveChannel) 之间通信,Channel 实现了 SendChannel 和 ReceiveChannel 接口,所以既可以发送数据又可以接受数据;
Channel 和 Java 中的 BlockingQueue 类似,不同之处在于 BlockingQueue 是阻塞的,而 Channel 是挂起的;
发送方 (SendChannel) 和 接收方 (ReceiveChannel) 之间有一个通道,也就是缓冲区,发送方和接收方之间通过缓冲区进行同步,发送方将数据发送到缓冲区,通过接收方从缓冲区获取数据;
缓冲区的作用帮我们同步发送方和接收方发送和接受的数据,也就意味着多个协程可以向同一个 channel 发送数据, 一个 channel 的数据也可以被多个协程接收。
send() 和 offer() 的区别:
send: suspend 函数,缓冲区没有满则立即添加元素,缓冲区已满则调用者被挂起;
offer: 缓冲区没有满则立即添加元素,添加成功会返回 true, 失败会返回 false;
receive() 和 poll() 的区别:
receive: suspend 函数,异步获取元素,如果缓冲区为空则调用者被挂起,直到一个新值被发送到缓冲区;
poll: 用于同步获取一个元素,如果缓冲区是空的,则返回 null;
Flow 与 Channel 的区别:
Flow: 中间操作符 (map , filter 等等) 会构建了一个待执行的调用链,只有遇到末端操作符 (collect , toList 等等) 才会触发所有操作的执行,所以 Flow 也被称为冷数据流;
Channel: 发送方 (SendChannel) 发送数据,并不依赖于接受方(ReceiveChannel),所以 Channel 也被称为热数据流;
RendezvousChannel: 默认类型,大小为 0 的缓冲区,只有当 send() 方法和 receive() 方法都调用的时候,元素才会从发送方传输到接收方,否则将会被挂起
LinkedListChannel: 通过 Channel.Factory.CONFLATED 会创建一个容量无限的缓冲区 (受限于内存的大小) ,send() 方法远不会挂起,offer() 方法始终返回 true
ConflatedChannel: 最多缓冲一个元素,新元素会覆盖掉旧元素,只会接收最后发送的元素,之前的元素都会丢失,send() 方法永远不会挂起,offer() 方法始终返回 true
ArrayChannel: 通过 Channel.Factory.BUFFERED 或者 指定大小 会创建一个固定容量的数组缓冲区,send() 方法仅在缓冲区满时挂起,receive() 方法仅在缓冲区为空时挂起
Channel 在概念上有点类似 BlockingQueue, 元素从一端被加入, 从另一端被消费。区别在于, 读写的方法不是 blocking 的, 而是 suspending 的,在为空或为满时. channel 可以 suspend 它的 send 和 receive 操作。
Kotlin 库中定义了多种 Channel 类型,主要区别在于内部可以存储的元素数量及send是否可以被挂起 ,至于 receive 方法都是同样的动作:如果channel不为空, 接收一个元素, 否则挂起。
1 2 3 4 val rendezvousChannel = Channel<String>() val bufferedChannel = Channel<String>(10 ) val conflatedChannel = Channel<String>(CONFLATED) val unlimitedChannel = Channel<String>(UNLIMITED)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 val channel = Channel<Int >(6 )suspend fun receiveData () { coroutineScope { while (!channel.isClosedForReceive) { println("${Thread.currentThread().name} receive: ${channel.receive()} " ) } } } suspend fun sendData () { coroutineScope { if (!channel.isClosedForSend) { (1 ..10 ).forEach { println("${Thread.currentThread().name} send: $it " ) channel.send(it) } } } } fun main () = runBlocking { val sendJob = GlobalScope.launch { sendData() } val receiveJob = GlobalScope.launch { receiveData() } joinAll(sendJob, receiveJob) }