0%

Kotlin笔记之协程基础

概述

2019 年 Google I/O 大会上宣布今后将越来越优先采用 Kotlin 进行 Android 开发,Kotlin 是一种富有表现力且简洁的编程语言,不仅可以减少常见代码错误,还可以轻松集成到现有应用中。关于 Kotlin 的基础用法可参考: Kotlin 官方文档Kotlin笔记系列

通常来说,协程(Coroutines)是轻量级的线程,它不需要从用户态切换到内核态,Coroutine是编译器级的,Process 和 Thread 是操作系统级的,协程没有直接和操作系统关联,但它也是跑在线程中的,可以是单线程,也可以是多线程。协程设计的初衷是为了解决并发问题,让协作式多任务实现起来更加方便,它可以有效地消除回调地狱。

在Kotlin中,协程是一套线程 API, 就像 Java 的 Executors 和 Android 的 Handler 等,是一套比较方便的线程框架,它能够在同一个代码块里进行多次的线程切换,可以用看起来同步的方式写出异步的代码,即非阻塞式挂起

协程的优势:避免回调地狱(也可以通过 RxJava 或者 CompletableFuture 实现):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 回调
api.getAvatar(id) { avatar ->
api.getName(id) { name ->
show(getLabel(avatar, name))
}
}

// 协程
coroutineScope.launch(Dispatchers.Main) {
val avatar = async { api.getAvatar(id) }
val name = async { api.getName(id) }
val label = getLabelSuspend(avatar.await(), name.await())
show(label)
}

使用时需要先添加依赖:

1
2
3
4
5
6
def kotlin_coroutines = `1.3.9`

// 依赖协程核心库
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlin_coroutines"
// 依赖当前平台所对应的平台库
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$kotlin_coroutines"

启动协程可以使用下面几种方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 线程阻塞,适用于单元测试的场景
runBlocking { getName(id) }

// 不会阻塞线程,但在 Android 中不推荐,因为它的生命周期会和 app 一致
GlobalScope.launch { getName(id) }

// 推荐使用,通过 CoroutineContext 参数去管理和控制协程的生命周期
// 例如:context = Dispatchers.Default + EmptyCoroutineContext
val coroutineScope = CoroutineScope(context)
coroutineScope.launch { getName(id) }

// async启动的Job是Deferred类型,它可以有返回结果,通过await方法获取
// public suspend fun await(): T
val id = coroutineScope.async { getName(id) }
id.await()

启动协程需要三样东西,分别是上下文(CoroutineContext)、启动模式(CoroutineStart)、协程体。

基本概念

suspend

suspend 是挂起的意思,它挂起的对象是协程。

  • 当线程执行到协程的 suspend 函数的时候,暂时不继续执行协程代码了,它会跳出协程的代码块,然后这个线程该干什么就去干什么。
  • 当协程执行到 suspend 函数的时候,这个协程会被「suspend」,也就是从当前线程被挂起。换句话说,就是这个协程从正在执行它的线程上脱离。线程的代码在到达 suspend 函数的时候被掐断,接下来协程会从这个 suspend 函数开始继续往下执行,不过是在这个 suspend 函数指定的线程里执行。

紧接着在 suspend 函数执行完成之后,协程会自动帮我们把线程再切回来,然后接着执行协程后面的代码(resume),resume 功能是协程特有的,所以 suspend 函数必须在 协程 或者 另一个suspend函数 里被调用。

suspend 的非阻塞式指的是它能用看起来阻塞的代码写出非阻塞的操作,单协程可以是非阻塞式的,因为它可以用 suspend 函数来切换线程,本质上还是多线程,只不过写法上连续两行代码看起来是阻塞式的。

CoroutineScope

1
2
3
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}

GlobeScope

GlobeScope 启动的协程是一个单独的作用域,不会继承上层协程的作用域,其内部的子协程遵守默认的作用域规则。

coroutineScope

coroutineScope 启动的协程 cancel 时会 cancel 所有子协程,也会 cancel 父协程,子协程未捕获的异常也会向上传递给父协程。

supervisorScope

supervisorScope 启动的协程 cancel 和传递异常时,只会由父协程向子协程单向传播。MainScope 是 supervisorScope 作用域。

CoroutineContext

Job, CoroutineDispatcher, ContinuationInterceptor 等都是 CoroutineContext 的子类,即它们都是协程上下文。CoroutineContext 中有一个重载了(+)操作符的plus方法,可以将 Job 和 CoroutineDispatcher 等元素集合起来,代表一个协程的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface CoroutineContext {
// 重载 [] 操作符
public operator fun <E : Element> get(key: Key<E>): E?

public fun <R> fold(initial: R, operation: (R, Element) -> R): R

// 重载 + 操作符
public operator fun plus(context: CoroutineContext): CoroutineContext = /* ... */

public fun minusKey(key: Key<*>): CoroutineContext

public interface Key<E : Element>

public interface Element : CoroutineContext {
// ...
}
}

public interface ContinuationInterceptor : CoroutineContext.Element {
// ...
}

public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
// ...
}

CoroutineStart

几种启动模式如下:

1
2
3
4
5
6
public enum class CoroutineStart {
DEFAULT,
LAZY,
ATOMIC,
UNDISPATCHED; // 立即在当前线程执行协程体,直到第一个suspend调用
}

DEFAULT

DEFAULT是饿汉式启动,launch调用后,会立即进入待调度状态,一旦调度器OK就可以开始执行。

LAZY

LAZY是懒汉式启动,launch后并不会有任何调度行为,协程体也不会进入执行状态,直到需要它执行(调用job.start/join等)的时候才会执行。

ATOMIC

@ExperimentalCoroutinesApi. This is similar to [DEFAULT], but the coroutine cannot be cancelled before it starts executing.

UNDISPATCHED

@ExperimentalCoroutinesApi. This is similar to [ATOMIC] in the sense that coroutine starts executing even if it was already cancelled, but the difference is that it starts executing in the same thread.

Dispatchers

协程调度器是用来指定协程体在哪个线程中执行,Kotlin提供了几个调度器:

Default

默认选项,指定协程体在线程池中执行:

1
2
3
4
5
6
7
8
9
10
11
12
GlobalScope.launch(Dispatchers.Default) {
println("1: ${Thread.currentThread().name}")
launch(Dispatchers.Default) {
println("2: ${Thread.currentThread().name}")
}
println("3: ${Thread.currentThread().name}")
}

-->output
1: DefaultDispatcher-worker-1
3: DefaultDispatcher-worker-1
2: DefaultDispatcher-worker-2

Main

指定协程体在主线程中执行。

Unconfined

协程体运行在父协程所在的线程:

1
2
3
4
5
6
7
8
9
10
11
12
GlobalScope.launch(Dispatchers.Default) {
println("1: ${Thread.currentThread().name}")
launch(Dispatchers.Unconfined) {
println("2: ${Thread.currentThread().name}")
}
println("3: ${Thread.currentThread().name}")
}

-->output
1: DefaultDispatcher-worker-1
2: DefaultDispatcher-worker-1
3: DefaultDispatcher-worker-1

IO

基于 Default 调度器背后的线程池(designed for offloading blocking IO tasks),因此从 Default 切换到 IO 不会触发线程切换:

1
2
3
4
5
6
7
8
9
10
11
12
GlobalScope.launch(Dispatchers.Default) {
println("1: ${Thread.currentThread().name}")
launch(Dispatchers.IO) {
println("2: ${Thread.currentThread().name}")
}
println("3: ${Thread.currentThread().name}")
}

-->output
1: DefaultDispatcher-worker-1
3: DefaultDispatcher-worker-1
2: DefaultDispatcher-worker-1

Job&Deferred

Job 可以取消并且有简单生命周期,它有三种状态:

State isActive isCompleted isCancelled
New (optional initial state) false false false
Active (default initial state) true false false
Completing (optional transient state) true false false
Cancelling (optional transient state) false false true
Cancelled (final state) false true true
Completed (final state) false true false

Job 完成时是没有返回值的,如果需要返回值的话,应该使用 Deferred,它是 Job 的子类: public interface Deferred<out T> : Job

launch 方法返回一个Job类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Job : CoroutineContext.Element {
public companion object Key : CoroutineContext.Key<Job> {
// ...
}

public val isActive: Boolean
public val isCompleted: Boolean
public val isCancelled: Boolean
public fun start(): Boolean
public fun cancel(): Unit = cancel(null)
public suspend fun join()

// ...
}

async 方法返回一个 Deferred 类型:

1
2
3
4
5
public interface Deferred<out T> : Job {
// 该方法可以得到返回值
public suspend fun await(): T
// ...
}

示例:

1
2
3
4
5
6
7
fun main() = runBlocking {
val job = async {
delay(500)
"Hello"
}
println("${job.await()}, World")
}

Android-Kotlin协程使用

MainScope

Android 中一般不建议使用 GlobalScope, 因为它会创建一个顶层协程,需要保持所有对 GlobalScope 启动的协程的引用,然后在 Activity destory 等场景的时候 cancel 掉这些的协程,否则就会造成内存泄露等问题。可以使用 MainScope:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class CoroutineActivity : AppCompatActivity() {
private val mainScope = MainScope()

fun request1() {
mainScope.launch {
// ...
}
}

// request2, 3, ...

override fun onDestroy() {
super.onDestroy()
mainScope.cancel()
}
}

MainScope 的定义:

1
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)

Lifecycle协程

关于 Lifecycle 可以参考 Android-Jetpack组件之Lifecycle

添加依赖:

1
implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0"

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
get() = lifecycle.coroutineScope

val Lifecycle.coroutineScope: LifecycleCoroutineScope
get() {
while (true) {
val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
if (existing != null) {
return existing
}
val newScope = LifecycleCoroutineScopeImpl(
this,
SupervisorJob() + Dispatchers.Main.immediate
)
if (mInternalScopeRef.compareAndSet(null, newScope)) {
newScope.register()
return newScope
}
}
}

abstract class LifecycleCoroutineScope internal constructor() : CoroutineScope {
internal abstract val lifecycle: Lifecycle

// 当 activity created 的时候执行协程体
fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job = launch {
lifecycle.whenCreated(block)
}

// // 当 activity started 的时候执行协程体
fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job = launch {
lifecycle.whenStarted(block)
}

// // 当 activity resumed 的时候执行协程体
fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job = launch {
lifecycle.whenResumed(block)
}
}

使用:

1
2
3
4
5
6
7
8
9
// AppCompatActivity 实现了 LifecycleOwner 接口
class MainActivity : AppCompatActivity() {

fun test() {
lifecycleScope.launchWhenCreated {
// ...
}
}
}

LiveData协程

关于 LiveData 可以参考 Android-Jetpack组件之LiveData-ViewModel

添加依赖:

1
implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0"

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
fun <T> liveData(
context: CoroutineContext = EmptyCoroutineContext,
timeoutInMs: Long = DEFAULT_TIMEOUT,
@BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)

internal class CoroutineLiveData<T>(
context: CoroutineContext = EmptyCoroutineContext,
timeoutInMs: Long = DEFAULT_TIMEOUT,
block: Block<T>
) : MediatorLiveData<T>() {
private var blockRunner: BlockRunner<T>?
private var emittedSource: EmittedSource? = null

init {
val supervisorJob = SupervisorJob(context[Job])
val scope = CoroutineScope(Dispatchers.Main.immediate + context + supervisorJob)
blockRunner = BlockRunner(
liveData = this,
block = block,
timeoutInMs = timeoutInMs,
scope = scope
) {
blockRunner = null
}
}

internal suspend fun emitSource(source: LiveData<T>): DisposableHandle {
clearSource()
val newSource = addDisposableSource(source)
emittedSource = newSource
return newSource
}

internal suspend fun clearSource() {
emittedSource?.disposeNow()
emittedSource = null
}

// 启动协程
override fun onActive() {
super.onActive()
blockRunner?.maybeRun()
}

// 取消协程
override fun onInactive() {
super.onInactive()
blockRunner?.cancel()
}
}

使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// AppCompatActivity 实现了 LifecycleOwner 接口
class MainActivity : AppCompatActivity() {

fun test() {
liveData {
try {
// ...
emit("success")
} catch(e: Exception) {
emit("error")
}
}.observe(this, Observer {
Log.d("LLL", it)
})
}
}

ViewModel协程

关于 ViewModel 可以参考 Android-Jetpack组件之LiveData-ViewModel

添加依赖:

1
implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.2.0"

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val ViewModel.viewModelScope: CoroutineScope
get() {
val scope: CoroutineScope? = this.getTag(JOB_KEY)
if (scope != null) {
return scope
}
return setTagIfAbsent(JOB_KEY,
CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main.immediate))
}

internal class CloseableCoroutineScope(context: CoroutineContext) : Closeable, CoroutineScope {
override val coroutineContext: CoroutineContext = context

override fun close() {
coroutineContext.cancel()
}
}

协程的并发

启动一百个协程,它们都做一千次相同的操作,同时会测量它们的完成时间以便进一步的比较:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}

var counter = 0

fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}

// output
Completed 100000 actions in 55 ms
Counter = 92805

因为一百个协程在多个线程中同时递增计数器但没有做并发处理,所以不太可能输出 100000 。使用 volatile 并不能解决这个问题,因为 volatile 不能保证原子性,只有内存可见性。可以用以下等方式解决:

使用原子类 AtomicInteger 进行递增

以细粒度限制线程:对特定共享状态的所有访问权都限制在单个线程中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

// 运行比较慢
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// 将每次自增限制在单线程上下文中
withContext(counterContext) {
counter++
}
}
}
println("Counter = $counter")
}

// output
Completed 100000 actions in 1652 ms
Counter = 100000

以粗粒度限制线程:线程限制在大段代码中执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main() = runBlocking {
// 将一切都限制在单线程上下文中
withContext(counterContext) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}

// output
Completed 100000 actions in 40 ms
Counter = 100000

互斥:除了Java已有的一些互斥方式如 Lock 等之外,Kotlin 中提供了 Mutex 类,它具有 lock 和 unlock 方法,lock 是一个挂起函数,它不会阻塞线程。可以使用 withLock 扩展函数替代常用的 mutex.lock(); try { …… } finally { mutex.unlock() }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val mutex = Mutex()
var counter = 0

// 此示例中锁是细粒度的,因此会付出一些代价。
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// 用锁保护每次自增
mutex.withLock {
counter++
}
}
}
println("Counter = $counter")
}

// output
Completed 100000 actions in 640 ms
Counter = 100000

协程Flow

更多用法参考: Flow

flow创建

flow 构造器中的代码会直到 flow 被 collect 的时候才会运行,且 collect 方法是 suspend 函数,会挂起当前协程。创建 flow 有几种方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// flowOf()
flowOf(1, 2, 3, 4, 5).onEach { delay(100) }.collect{ println(it) }

// asFlow()
listOf(1, 2, 3, 4, 5).asFlow().onEach { delay(100) }.collect { println(it) }

// 流构建器
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(500) // 假装我们在这里做了一些有用的事情
emit(i) // 发送下一个值
}
}

// channelFlow {}
channelFlow {
for (i in 1..5) {
delay(100)
send(i)
}
}.collect { println(it) }

flow 是 Cold Stream, 在没有切换线程的情况下,生产者和消费者是同步非阻塞的。channel 是 Hot Stream, channelFlow 实现了生产者和消费者异步非阻塞模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
runBlocking {
val time = measureTimeMillis {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.collect {
delay(100)
println(it)
}
}
println("cost $time") // 1078
}

runBlocking {
val time = measureTimeMillis {
channelFlow {
for (i in 1..5) {
delay(100)
send(i)
}
}.collect {
delay(100)
println(it)
}
}
println("cost $time") // 686
}

这两者运行逻辑如下:

当然如果使用 flowOn 切换线程或者使用 buffer() 的话,得到的时间也差不多是 700 ms。

切换线程

流的 collect 总是在调用协程的上下文中发生,使用 flowOn 可以切换流 emit 的上下文(包括切换线程,不要使用 withContext() 来切换 flow 的线程),上游都会受到影响:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // 假装我们以消耗 CPU 的方式进行计算
log("Emitting $i")
emit(i) // 发射下一个值
}
}.flowOn(Dispatchers.Default) // 在流构建器中改变消耗 CPU 代码上下文的正确方式

fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}

// output
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3

可以在流上使用 buffer 操作符来并发运行流中发射和收集的代码(注意,当更改 CoroutineDispatcher 时,flowOn 操作符使用了相同的缓冲机制,但是在这里显式地请求了缓冲而不改变执行上下文):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 假装我们异步等待了 100 毫秒
emit(i) // 发射下一个值
}
}

fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(300) // 假装我们花费 300 毫秒来处理它
println(value)
}
}
println("Collected in $time ms")
}

// output
1
2
3
Collected in 1217 ms

val time = measureTimeMillis {
simple()
.buffer() // 缓冲发射项,无需等待
.collect { value ->
delay(300) // 假装我们花费 300 毫秒来处理它
println(value)
}
}
println("Collected in $time ms")

// output
1
2
3
Collected in 1043 ms

flow取消

如果 flow 是在一个挂起函数内被挂起了,那么 flow 是可以被取消的,否则不能取消。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 只会输出前两个
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}

fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // 在 250 毫秒后超时
simple().collect { value -> println(value) }
}
println("Done")
}

操作符

操作符: map, filter, transform, take 等:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
suspend fun performRequest(request: Int): String {
delay(1000) // 模仿长时间运行的异步工作
return "response $request"
}

fun main() = runBlocking<Unit> {
(1..3).asFlow() // 一个请求流
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}

// output 每秒打印一行
response 1
response 2
response 3

由于 collect 是 suspend 函数,后续操作会等待它执行完毕后才会接着执行,因此可以使用 launchIn 配合 onEach 完成这个任务, launchIn 的参数是 CoroutineScope, 指定了用哪一个协程来启动流的收集:

1
2
3
4
5
6
7
8
9
// 模仿事件流
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- 在单独的协程中执行流
println("Done")
}

异常处理

flow 可以使用传统的 try-catch 处理异常,另外还可以使用 catch 操作符捕获来自上游的异常,而 onCompletion 不能捕获异常,只能用于判断是否有异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
fun main() = runBlocking {
flow {
emit(1)
throw RuntimeException()
}
.onCompletion { cause ->
if (cause != null)
println("Flow completed exceptionally")
else
println("Done")
}
.catch{ println("catch exception") }
.collect { println(it) }
}

// output
1
Flow completed exceptionally
catch exception

上面的代码如果把 onCompletion 和 catch 交换一下位置,则 catch 操作符捕获到异常后,不会影响到下游,因此 onCompletion 操作符不会走异常分支。

但 catch 只是中间操作符,它不能捕获下游的异常,对于下游如 collect 中的异常,除了使用 try-catch 去处理外,还可以借助 onEach 操作符,把业务逻辑放到 onEach 操作符内:

1
2
3
4
5
6
7
8
9
10
fun main() = runBlocking<Unit> {
flow {
......
}
.onEach {
......
}
.catch { ... }
.collect() // 或 launchIn()
}

flow 可以使用 retry 和 retryWhen 操作符重试指定次数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
runBlocking {
(1..5).asFlow()
.onEach {
if (it == 3) throw RuntimeException("Error on $it")
}
.retry(2) {
if (it is RuntimeException) {
return@retry true
}
false
}
.onEach { println("Emitting $it") }
.catch { it.printStackTrace() }
.collect()
}

// output
Emitting 1
Emitting 2
Emitting 1
Emitting 2
Emitting 1
Emitting 2
java.lang.RuntimeException: Error on 3

flow 可以使用操作符来监听其生命周期:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
runBlocking {
(1..5).asFlow()
.onEach { if (it == 3) throw RuntimeException("Error on $it") }
.onEach { println("OnEach $it") }
.onStart { println("Flow starting") }
.onCompletion { println("Flow completed") }
.catch { println("Exception : ${it.message}") }
.collect()
}

// output
Flow starting
OnEach 1
OnEach 2
Flow completed
Exception : Error on 3

总结

  • Flow 以挂起的方式执行,它是非阻塞的,只有遇到末端操作符,才会触发所有操作的执行;
  • map, filter, take, zip 等是中间操作符,collect, collectLatest, single, reduce, toList 等是末端操作符;
  • 遇到中间操作符,并不会执行任何操作,这些操作符构建了一个待执行的调用链,末端操作符是可挂起函数,遇到末端操作符会触发所有操作的执行。

协程Channel

  • Channel 是非阻塞的,它用于发送方 (SendChannel) 和接收方 (ReceiveChannel) 之间通信,Channel 实现了 SendChannel 和 ReceiveChannel 接口,所以既可以发送数据又可以接受数据;
  • Channel 和 Java 中的 BlockingQueue 类似,不同之处在于 BlockingQueue 是阻塞的,而 Channel 是挂起的;
  • 发送方 (SendChannel) 和 接收方 (ReceiveChannel) 之间有一个通道,也就是缓冲区,发送方和接收方之间通过缓冲区进行同步,发送方将数据发送到缓冲区,通过接收方从缓冲区获取数据;
  • 缓冲区的作用帮我们同步发送方和接收方发送和接受的数据,也就意味着多个协程可以向同一个 channel 发送数据, 一个 channel 的数据也可以被多个协程接收。

send() 和 offer() 的区别:

  • send: suspend 函数,缓冲区没有满则立即添加元素,缓冲区已满则调用者被挂起;
  • offer: 缓冲区没有满则立即添加元素,添加成功会返回 true, 失败会返回 false;

receive() 和 poll() 的区别:

  • receive: suspend 函数,异步获取元素,如果缓冲区为空则调用者被挂起,直到一个新值被发送到缓冲区;
  • poll: 用于同步获取一个元素,如果缓冲区是空的,则返回 null;

Flow 与 Channel 的区别:

  • Flow: 中间操作符 (map , filter 等等) 会构建了一个待执行的调用链,只有遇到末端操作符 (collect , toList 等等) 才会触发所有操作的执行,所以 Flow 也被称为冷数据流;
  • Channel: 发送方 (SendChannel) 发送数据,并不依赖于接受方(ReceiveChannel),所以 Channel 也被称为热数据流;

Channel类型

  • RendezvousChannel: 默认类型,大小为 0 的缓冲区,只有当 send() 方法和 receive() 方法都调用的时候,元素才会从发送方传输到接收方,否则将会被挂起
  • LinkedListChannel: 通过 Channel.Factory.CONFLATED 会创建一个容量无限的缓冲区 (受限于内存的大小) ,send() 方法远不会挂起,offer() 方法始终返回 true
  • ConflatedChannel: 最多缓冲一个元素,新元素会覆盖掉旧元素,只会接收最后发送的元素,之前的元素都会丢失,send() 方法永远不会挂起,offer() 方法始终返回 true
  • ArrayChannel: 通过 Channel.Factory.BUFFERED 或者 指定大小 会创建一个固定容量的数组缓冲区,send() 方法仅在缓冲区满时挂起,receive() 方法仅在缓冲区为空时挂起

Channel 在概念上有点类似 BlockingQueue, 元素从一端被加入, 从另一端被消费。区别在于, 读写的方法不是 blocking 的, 而是 suspending 的,在为空或为满时. channel 可以 suspend 它的 send 和 receive 操作。

Kotlin 库中定义了多种 Channel 类型,主要区别在于内部可以存储的元素数量及send是否可以被挂起,至于 receive 方法都是同样的动作:如果channel不为空, 接收一个元素, 否则挂起。

1
2
3
4
val rendezvousChannel = Channel<String>() // 0尺寸buffer, send和receive要meet on time, 否则挂起. (默认类型).
val bufferedChannel = Channel<String>(10) // 指定大小, 满了之后send挂起.
val conflatedChannel = Channel<String>(CONFLATED) // 新元素会覆盖旧元素, receiver只会得到最新元素, send永不挂起
val unlimitedChannel = Channel<String>(UNLIMITED) // 无限元素, send不被挂起.

看一个示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
val channel = Channel<Int>(6)

suspend fun receiveData() {
coroutineScope {
while (!channel.isClosedForReceive) {
println("${Thread.currentThread().name} receive: ${channel.receive()}")
}
}
}

suspend fun sendData() {
coroutineScope {
if (!channel.isClosedForSend) {
(1..10).forEach {
println("${Thread.currentThread().name} send: $it")
channel.send(it)
}
}
}
}

fun main() = runBlocking {
val sendJob = GlobalScope.launch {
sendData()
}
val receiveJob = GlobalScope.launch {
receiveData()
}
joinAll(sendJob, receiveJob)
}