概述 2019 年 Google I/O 大会上宣布今后将越来越优先采用 Kotlin 进行 Android 开发,Kotlin 是一种富有表现力且简洁的编程语言,不仅可以减少常见代码错误,还可以轻松集成到现有应用中。关于 Kotlin 的基础用法可参考: Kotlin 官方文档 和 Kotlin笔记系列 。
通常来说,协程(Coroutines)是轻量级的线程,它不需要从用户态切换到内核态,Coroutine是编译器级的,Process 和 Thread 是操作系统级的,协程没有直接和操作系统关联,但它也是跑在线程中的,可以是单线程,也可以是多线程。协程设计的初衷是为了解决并发问题,让协作式多任务实现起来更加方便,它可以有效地消除回调地狱。
在Kotlin中,协程是一套线程 API, 就像 Java 的 Executors 和 Android 的 Handler 等,是一套比较方便的线程框架,它能够在同一个代码块里进行多次的线程切换,可以用看起来同步的方式写出异步的代码,即非阻塞式挂起 。
协程的优势:避免回调地狱(也可以通过 RxJava 或者 CompletableFuture 实现):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 api.getAvatar(id) { avatar -> api.getName(id) { name -> show(getLabel(avatar, name)) } } coroutineScope.launch(Dispatchers.Main) { val avatar = async { api.getAvatar(id) } val name = async { api.getName(id) } val label = getLabelSuspend(avatar.await(), name.await()) show(label) }
使用时需要先添加依赖:
1 2 3 4 5 6 def kotlin_coroutines = `1.3 .9 `implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlin_coroutines" implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$kotlin_coroutines"
启动协程可以使用下面几种方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 runBlocking { getName(id) } GlobalScope.launch { getName(id) } val coroutineScope = CoroutineScope(context)coroutineScope.launch { getName(id) } val id = coroutineScope.async { getName(id) }id.await()
启动协程需要三样东西,分别是上下文(CoroutineContext)、启动模式(CoroutineStart)、协程体。
基本概念 suspend suspend 是挂起的意思,它挂起的对象是协程。
当线程执行到协程的 suspend 函数的时候,暂时不继续执行协程代码了,它会跳出协程的代码块,然后这个线程该干什么就去干什么。
当协程执行到 suspend 函数的时候,这个协程会被「suspend」,也就是从当前线程被挂起。换句话说,就是这个协程从正在执行它的线程上脱离。线程的代码在到达 suspend 函数的时候被掐断,接下来协程会从这个 suspend 函数开始继续往下执行,不过是在这个 suspend 函数指定的线程里执行。
紧接着在 suspend 函数执行完成之后,协程会自动帮我们把线程再切回来,然后接着执行协程后面的代码(resume),resume 功能是协程特有的,所以 suspend 函数必须在 协程 或者 另一个suspend函数 里被调用。
suspend 的非阻塞式指的是它能用看起来阻塞的代码写出非阻塞的操作,单协程可以是非阻塞式的,因为它可以用 suspend 函数来切换线程,本质上还是多线程,只不过写法上连续两行代码看起来是阻塞式的。
CoroutineScope 1 2 3 public interface CoroutineScope { public val coroutineContext: CoroutineContext }
GlobeScope
GlobeScope 启动的协程是一个单独的作用域,不会继承上层协程的作用域,其内部的子协程遵守默认的作用域规则。
coroutineScope
coroutineScope 启动的协程 cancel 时会 cancel 所有子协程,也会 cancel 父协程,子协程未捕获的异常也会向上传递给父协程。
supervisorScope
supervisorScope 启动的协程 cancel 和传递异常时,只会由父协程向子协程单向传播。MainScope 是 supervisorScope 作用域。
CoroutineContext Job, CoroutineDispatcher, ContinuationInterceptor 等都是 CoroutineContext 的子类,即它们都是协程上下文。CoroutineContext 中有一个重载了(+)操作符的plus方法,可以将 Job 和 CoroutineDispatcher 等元素集合起来,代表一个协程的场景。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public interface CoroutineContext { public operator fun <E : Element> get (key: Key <E >) : E? public fun <R> fold (initial: R , operation: (R , Element ) -> R ) : R public operator fun plus (context: CoroutineContext ) : CoroutineContext = public fun minusKey (key: Key <*>) : CoroutineContext public interface Key <E : Element > public interface Element : CoroutineContext { } } public interface ContinuationInterceptor : CoroutineContext.Element { } public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { }
CoroutineStart 几种启动模式如下:
1 2 3 4 5 6 public enum class CoroutineStart { DEFAULT, LAZY, ATOMIC, UNDISPATCHED; }
DEFAULT
DEFAULT是饿汉式启动,launch调用后,会立即进入待调度状态,一旦调度器OK就可以开始执行。
LAZY
LAZY是懒汉式启动,launch后并不会有任何调度行为,协程体也不会进入执行状态,直到需要它执行(调用job.start/join等)的时候才会执行。
ATOMIC
@ExperimentalCoroutinesApi
. This is similar to [DEFAULT], but the coroutine cannot be cancelled before it starts executing.
UNDISPATCHED
@ExperimentalCoroutinesApi
. This is similar to [ATOMIC] in the sense that coroutine starts executing even if it was already cancelled, but the difference is that it starts executing in the same thread.
Dispatchers 协程调度器是用来指定协程体在哪个线程中执行,Kotlin提供了几个调度器:
Default
默认选项,指定协程体在线程池中执行:
1 2 3 4 5 6 7 8 9 10 11 12 GlobalScope.launch(Dispatchers.Default) { println("1: ${Thread.currentThread().name} " ) launch(Dispatchers.Default) { println("2: ${Thread.currentThread().name} " ) } println("3: ${Thread.currentThread().name} " ) } -->output 1 : DefaultDispatcher-worker-1 3 : DefaultDispatcher-worker-1 2 : DefaultDispatcher-worker-2
Main
指定协程体在主线程中执行。
Unconfined
协程体运行在父协程所在的线程:
1 2 3 4 5 6 7 8 9 10 11 12 GlobalScope.launch(Dispatchers.Default) { println("1: ${Thread.currentThread().name} " ) launch(Dispatchers.Unconfined) { println("2: ${Thread.currentThread().name} " ) } println("3: ${Thread.currentThread().name} " ) } -->output 1 : DefaultDispatcher-worker-1 2 : DefaultDispatcher-worker-1 3 : DefaultDispatcher-worker-1
IO
基于 Default 调度器背后的线程池(designed for offloading blocking IO tasks),因此从 Default 切换到 IO 不会触发线程切换:
1 2 3 4 5 6 7 8 9 10 11 12 GlobalScope.launch(Dispatchers.Default) { println("1: ${Thread.currentThread().name} " ) launch(Dispatchers.IO) { println("2: ${Thread.currentThread().name} " ) } println("3: ${Thread.currentThread().name} " ) } -->output 1 : DefaultDispatcher-worker-1 3 : DefaultDispatcher-worker-1 2 : DefaultDispatcher-worker-1
Job&Deferred Job 可以取消并且有简单生命周期,它有三种状态:
State
isActive
isCompleted
isCancelled
New (optional initial state)
false
false
false
Active (default initial state)
true
false
false
Completing (optional transient state)
true
false
false
Cancelling (optional transient state)
false
false
true
Cancelled (final state)
false
true
true
Completed (final state)
false
true
false
Job 完成时是没有返回值的,如果需要返回值的话,应该使用 Deferred,它是 Job 的子类: public interface Deferred<out T> : Job
。
launch 方法返回一个Job类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public interface Job : CoroutineContext.Element { public companion object Key : CoroutineContext.Key<Job> { } public val isActive: Boolean public val isCompleted: Boolean public val isCancelled: Boolean public fun start () : Boolean public fun cancel () : Unit = cancel(null ) public suspend fun join () }
async 方法返回一个 Deferred 类型:
1 2 3 4 5 public interface Deferred <out T > : Job { public suspend fun await () : T }
示例:
1 2 3 4 5 6 7 fun main () = runBlocking { val job = async { delay(500 ) "Hello" } println("${job.await()} , World" ) }
Android-Kotlin协程使用 MainScope Android 中一般不建议使用 GlobalScope, 因为它会创建一个顶层协程,需要保持所有对 GlobalScope 启动的协程的引用,然后在 Activity destory 等场景的时候 cancel 掉这些的协程,否则就会造成内存泄露等问题。可以使用 MainScope:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class CoroutineActivity : AppCompatActivity () { private val mainScope = MainScope() fun request1 () { mainScope.launch { } } override fun onDestroy () { super .onDestroy() mainScope.cancel() } }
MainScope 的定义:
1 public fun MainScope () : CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
Lifecycle协程 关于 Lifecycle 可以参考 Android-Jetpack组件之Lifecycle 。
添加依赖:
1 implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0"
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope get () = lifecycle.coroutineScope val Lifecycle.coroutineScope: LifecycleCoroutineScope get () { while (true ) { val existing = mInternalScopeRef.get () as LifecycleCoroutineScopeImpl? if (existing != null ) { return existing } val newScope = LifecycleCoroutineScopeImpl( this , SupervisorJob() + Dispatchers.Main.immediate ) if (mInternalScopeRef.compareAndSet(null , newScope)) { newScope.register() return newScope } } } abstract class LifecycleCoroutineScope internal constructor () : CoroutineScope { internal abstract val lifecycle: Lifecycle fun launchWhenCreated (block: suspend CoroutineScope .() -> Unit ) : Job = launch { lifecycle.whenCreated(block) } fun launchWhenStarted (block: suspend CoroutineScope .() -> Unit ) : Job = launch { lifecycle.whenStarted(block) } fun launchWhenResumed (block: suspend CoroutineScope .() -> Unit ) : Job = launch { lifecycle.whenResumed(block) } }
使用:
1 2 3 4 5 6 7 8 9 class MainActivity : AppCompatActivity () { fun test () { lifecycleScope.launchWhenCreated { } } }
LiveData协程 关于 LiveData 可以参考 Android-Jetpack组件之LiveData-ViewModel 。
添加依赖:
1 implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0"
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 fun <T> liveData ( context: CoroutineContext = EmptyCoroutineContext, timeoutInMs: Long = DEFAULT_TIMEOUT, @BuilderInference block: suspend LiveDataScope <T >.() -> Unit ) : LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)internal class CoroutineLiveData <T > ( context: CoroutineContext = EmptyCoroutineContext, timeoutInMs: Long = DEFAULT_TIMEOUT, block: Block<T> ) : MediatorLiveData<T>() { private var blockRunner: BlockRunner<T>? private var emittedSource: EmittedSource? = null init { val supervisorJob = SupervisorJob(context[Job]) val scope = CoroutineScope(Dispatchers.Main.immediate + context + supervisorJob) blockRunner = BlockRunner( liveData = this , block = block, timeoutInMs = timeoutInMs, scope = scope ) { blockRunner = null } } internal suspend fun emitSource (source: LiveData <T >) : DisposableHandle { clearSource() val newSource = addDisposableSource(source) emittedSource = newSource return newSource } internal suspend fun clearSource () { emittedSource?.disposeNow() emittedSource = null } override fun onActive () { super .onActive() blockRunner?.maybeRun() } override fun onInactive () { super .onInactive() blockRunner?.cancel() } }
使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class MainActivity : AppCompatActivity () { fun test () { liveData { try { emit("success" ) } catch (e: Exception) { emit("error" ) } }.observe(this , Observer { Log.d("LLL" , it) }) } }
ViewModel协程 关于 ViewModel 可以参考 Android-Jetpack组件之LiveData-ViewModel 。
添加依赖:
1 implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.2.0"
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 val ViewModel.viewModelScope: CoroutineScope get () { val scope: CoroutineScope? = this .getTag(JOB_KEY) if (scope != null ) { return scope } return setTagIfAbsent(JOB_KEY, CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main.immediate)) } internal class CloseableCoroutineScope (context: CoroutineContext) : Closeable, CoroutineScope { override val coroutineContext: CoroutineContext = context override fun close () { coroutineContext.cancel() } }
协程的并发 启动一百个协程,它们都做一千次相同的操作,同时会测量它们的完成时间以便进一步的比较:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 suspend fun massiveRun (action: suspend () -> Unit ) { val n = 100 val k = 1000 val time = measureTimeMillis { coroutineScope { repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms" ) } var counter = 0 fun main () = runBlocking { withContext(Dispatchers.Default) { massiveRun { counter++ } } println("Counter = $counter " ) } Completed 100000 actions in 55 ms Counter = 92805
因为一百个协程在多个线程中同时递增计数器但没有做并发处理,所以不太可能输出 100000 。使用 volatile 并不能解决这个问题,因为 volatile 不能保证原子性,只有内存可见性。可以用以下等方式解决:
使用原子类 AtomicInteger 进行递增
以细粒度限制线程:对特定共享状态的所有访问权都限制在单个线程中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 val counterContext = newSingleThreadContext("CounterContext" )var counter = 0 fun main () = runBlocking { withContext(Dispatchers.Default) { massiveRun { withContext(counterContext) { counter++ } } } println("Counter = $counter " ) } Completed 100000 actions in 1652 ms Counter = 100000
以粗粒度限制线程:线程限制在大段代码中执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 val counterContext = newSingleThreadContext("CounterContext" )var counter = 0 fun main () = runBlocking { withContext(counterContext) { massiveRun { counter++ } } println("Counter = $counter " ) } Completed 100000 actions in 40 ms Counter = 100000
互斥:除了Java已有的一些互斥方式如 Lock 等之外,Kotlin 中提供了 Mutex 类,它具有 lock 和 unlock 方法,lock 是一个挂起函数,它不会阻塞线程。可以使用 withLock 扩展函数替代常用的 mutex.lock(); try { …… } finally { mutex.unlock() }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 val mutex = Mutex()var counter = 0 fun main () = runBlocking { withContext(Dispatchers.Default) { massiveRun { mutex.withLock { counter++ } } } println("Counter = $counter " ) } Completed 100000 actions in 640 ms Counter = 100000
协程Flow 更多用法参考: Flow 。
flow创建 flow 构造器中的代码会直到 flow 被 collect 的时候才会运行,且 collect 方法是 suspend 函数,会挂起当前协程。创建 flow 有几种方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 flowOf(1 , 2 , 3 , 4 , 5 ).onEach { delay(100 ) }.collect{ println(it) } listOf(1 , 2 , 3 , 4 , 5 ).asFlow().onEach { delay(100 ) }.collect { println(it) } fun simple () : Flow<Int > = flow { for (i in 1 ..3 ) { delay(500 ) emit(i) } } channelFlow { for (i in 1 ..5 ) { delay(100 ) send(i) } }.collect { println(it) }
flow 是 Cold Stream, 在没有切换线程的情况下,生产者和消费者是同步非阻塞的。channel 是 Hot Stream, channelFlow 实现了生产者和消费者异步非阻塞模型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 runBlocking { val time = measureTimeMillis { flow { for (i in 1 ..5 ) { delay(100 ) emit(i) } }.collect { delay(100 ) println(it) } } println("cost $time " ) } runBlocking { val time = measureTimeMillis { channelFlow { for (i in 1 ..5 ) { delay(100 ) send(i) } }.collect { delay(100 ) println(it) } } println("cost $time " ) }
这两者运行逻辑如下:
当然如果使用 flowOn 切换线程或者使用 buffer() 的话,得到的时间也差不多是 700 ms。
切换线程 流的 collect 总是在调用协程的上下文中发生,使用 flowOn 可以切换流 emit 的上下文(包括切换线程,不要使用 withContext() 来切换 flow 的线程),上游都会受到影响:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 fun simple () : Flow<Int > = flow { for (i in 1 ..3 ) { Thread.sleep(100 ) log("Emitting $i " ) emit(i) } }.flowOn(Dispatchers.Default) fun main () = runBlocking<Unit > { simple().collect { value -> log("Collected $value " ) } } [DefaultDispatcher-worker-1 @coroutine #2 ] Emitting 1 [main @coroutine #1 ] Collected 1 [DefaultDispatcher-worker-1 @coroutine #2 ] Emitting 2 [main @coroutine #1 ] Collected 2 [DefaultDispatcher-worker-1 @coroutine #2 ] Emitting 3 [main @coroutine #1 ] Collected 3
可以在流上使用 buffer 操作符来并发运行流中发射和收集的代码(注意,当更改 CoroutineDispatcher 时,flowOn 操作符使用了相同的缓冲机制,但是在这里显式地请求了缓冲而不改变执行上下文):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 fun simple () : Flow<Int > = flow { for (i in 1 ..3 ) { delay(100 ) emit(i) } } fun main () = runBlocking<Unit > { val time = measureTimeMillis { simple().collect { value -> delay(300 ) println(value) } } println("Collected in $time ms" ) } 1 2 3 Collected in 1217 ms val time = measureTimeMillis { simple() .buffer() .collect { value -> delay(300 ) println(value) } } println("Collected in $time ms" ) 1 2 3 Collected in 1043 ms
flow取消 如果 flow 是在一个挂起函数内被挂起了,那么 flow 是可以被取消的,否则不能取消。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 fun simple () : Flow<Int > = flow { for (i in 1 ..3 ) { delay(100 ) println("Emitting $i " ) emit(i) } } fun main () = runBlocking<Unit > { withTimeoutOrNull(250 ) { simple().collect { value -> println(value) } } println("Done" ) }
操作符 操作符: map, filter, transform, take 等:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 suspend fun performRequest (request: Int ) : String { delay(1000 ) return "response $request " } fun main () = runBlocking<Unit > { (1 ..3 ).asFlow() .map { request -> performRequest(request) } .collect { response -> println(response) } } response 1 response 2 response 3
由于 collect 是 suspend 函数,后续操作会等待它执行完毕后才会接着执行,因此可以使用 launchIn 配合 onEach 完成这个任务, launchIn 的参数是 CoroutineScope, 指定了用哪一个协程来启动流的收集:
1 2 3 4 5 6 7 8 9 fun events () : Flow<Int > = (1 ..3 ).asFlow().onEach { delay(100 ) }fun main () = runBlocking<Unit > { events() .onEach { event -> println("Event: $event " ) } .launchIn(this ) println("Done" ) }
异常处理 flow 可以使用传统的 try-catch 处理异常,另外还可以使用 catch 操作符捕获来自上游的异常,而 onCompletion 不能捕获异常,只能用于判断是否有异常:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 fun main () = runBlocking { flow { emit(1 ) throw RuntimeException() } .onCompletion { cause -> if (cause != null ) println("Flow completed exceptionally" ) else println("Done" ) } .catch { println("catch exception" ) } .collect { println(it) } } 1 Flow completed exceptionally catch exception
上面的代码如果把 onCompletion 和 catch 交换一下位置,则 catch 操作符捕获到异常后,不会影响到下游,因此 onCompletion 操作符不会走异常分支。
但 catch 只是中间操作符,它不能捕获下游的异常,对于下游如 collect 中的异常,除了使用 try-catch 去处理外,还可以借助 onEach 操作符,把业务逻辑放到 onEach 操作符内:
1 2 3 4 5 6 7 8 9 10 fun main () = runBlocking<Unit > { flow { ...... } .onEach { ...... } .catch { ... } .collect() }
flow 可以使用 retry 和 retryWhen 操作符重试指定次数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 runBlocking { (1 ..5 ).asFlow() .onEach { if (it == 3 ) throw RuntimeException("Error on $it " ) } .retry(2 ) { if (it is RuntimeException) { return @retry true } false } .onEach { println("Emitting $it " ) } .catch { it.printStackTrace() } .collect() } Emitting 1 Emitting 2 Emitting 1 Emitting 2 Emitting 1 Emitting 2 java.lang.RuntimeException: Error on 3
flow 可以使用操作符来监听其生命周期:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 runBlocking { (1 ..5 ).asFlow() .onEach { if (it == 3 ) throw RuntimeException("Error on $it " ) } .onEach { println("OnEach $it " ) } .onStart { println("Flow starting" ) } .onCompletion { println("Flow completed" ) } .catch { println("Exception : ${it.message} " ) } .collect() } Flow starting OnEach 1 OnEach 2 Flow completed Exception : Error on 3
总结
Flow 以挂起的方式执行,它是非阻塞的,只有遇到末端操作符,才会触发所有操作的执行;
map, filter, take, zip 等是中间操作符,collect, collectLatest, single, reduce, toList 等是末端操作符;
遇到中间操作符,并不会执行任何操作,这些操作符构建了一个待执行的调用链,末端操作符是可挂起函数,遇到末端操作符会触发所有操作的执行。
协程Channel
Channel 是非阻塞的,它用于发送方 (SendChannel) 和接收方 (ReceiveChannel) 之间通信,Channel 实现了 SendChannel 和 ReceiveChannel 接口,所以既可以发送数据又可以接受数据;
Channel 和 Java 中的 BlockingQueue 类似,不同之处在于 BlockingQueue 是阻塞的,而 Channel 是挂起的;
发送方 (SendChannel) 和 接收方 (ReceiveChannel) 之间有一个通道,也就是缓冲区,发送方和接收方之间通过缓冲区进行同步,发送方将数据发送到缓冲区,通过接收方从缓冲区获取数据;
缓冲区的作用帮我们同步发送方和接收方发送和接受的数据,也就意味着多个协程可以向同一个 channel 发送数据, 一个 channel 的数据也可以被多个协程接收。
send() 和 offer() 的区别:
send: suspend 函数,缓冲区没有满则立即添加元素,缓冲区已满则调用者被挂起;
offer: 缓冲区没有满则立即添加元素,添加成功会返回 true, 失败会返回 false;
receive() 和 poll() 的区别:
receive: suspend 函数,异步获取元素,如果缓冲区为空则调用者被挂起,直到一个新值被发送到缓冲区;
poll: 用于同步获取一个元素,如果缓冲区是空的,则返回 null;
Flow 与 Channel 的区别:
Flow: 中间操作符 (map , filter 等等) 会构建了一个待执行的调用链,只有遇到末端操作符 (collect , toList 等等) 才会触发所有操作的执行,所以 Flow 也被称为冷数据流;
Channel: 发送方 (SendChannel) 发送数据,并不依赖于接受方(ReceiveChannel),所以 Channel 也被称为热数据流;
Channel类型
RendezvousChannel: 默认类型,大小为 0 的缓冲区,只有当 send() 方法和 receive() 方法都调用的时候,元素才会从发送方传输到接收方,否则将会被挂起
LinkedListChannel: 通过 Channel.Factory.CONFLATED 会创建一个容量无限的缓冲区 (受限于内存的大小) ,send() 方法远不会挂起,offer() 方法始终返回 true
ConflatedChannel: 最多缓冲一个元素,新元素会覆盖掉旧元素,只会接收最后发送的元素,之前的元素都会丢失,send() 方法永远不会挂起,offer() 方法始终返回 true
ArrayChannel: 通过 Channel.Factory.BUFFERED 或者 指定大小 会创建一个固定容量的数组缓冲区,send() 方法仅在缓冲区满时挂起,receive() 方法仅在缓冲区为空时挂起
Channel 在概念上有点类似 BlockingQueue, 元素从一端被加入, 从另一端被消费。区别在于, 读写的方法不是 blocking 的, 而是 suspending 的,在为空或为满时. channel 可以 suspend 它的 send 和 receive 操作。
Kotlin 库中定义了多种 Channel 类型,主要区别在于内部可以存储的元素数量及send是否可以被挂起 ,至于 receive 方法都是同样的动作:如果channel不为空, 接收一个元素, 否则挂起。
1 2 3 4 val rendezvousChannel = Channel<String>() val bufferedChannel = Channel<String>(10 ) val conflatedChannel = Channel<String>(CONFLATED) val unlimitedChannel = Channel<String>(UNLIMITED)
看一个示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 val channel = Channel<Int >(6 )suspend fun receiveData () { coroutineScope { while (!channel.isClosedForReceive) { println("${Thread.currentThread().name} receive: ${channel.receive()} " ) } } } suspend fun sendData () { coroutineScope { if (!channel.isClosedForSend) { (1 ..10 ).forEach { println("${Thread.currentThread().name} send: $it " ) channel.send(it) } } } } fun main () = runBlocking { val sendJob = GlobalScope.launch { sendData() } val receiveJob = GlobalScope.launch { receiveData() } joinAll(sendJob, receiveJob) }