概述
协程系列文章:
Channel 类似于 Java 的 BlockingQueue 阻塞队列,不同之处在于 Channel 提供了挂起的 send() 和 receive() 方法。另外,通道 Channel 可以被关闭表明不再有数据会进入 Channel, 而接收端可以通过 for 循环取出数据。
Channel 也是生产-消费者模式,这个设计模式在协程中很常见。
基本使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| val channel = Channel<Int>()
launch { repeat(10) { channel.send(it) delay(200) } channel.close() }
launch { for (i in channel) { println("receive: $i") } println("closed") }
|
produce 和 actor
produce 和 actor 是 Kotlin 提供的构造生产者与消费者的便捷方法。
其中 produce 方法用来启动一个生产者协程,并返回一个 ReceiveChannel 在其他协程中接收数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| val receiveChannel = CoroutineScope(Dispatchers.IO).produce { repeat(10) { send(it) delay(200) } }
launch { for (i in receiveChannel) { println("receive-1: $i") } }
launch { for (i in receiveChannel) { println("receive-2: $i") } }
|
输出:
1 2 3 4 5 6 7 8 9 10
| 2022-11-29 10:48:03.045 I/System.out: receive-1: 0 2022-11-29 10:48:03.250 I/System.out: receive-1: 1 2022-11-29 10:48:03.451 I/System.out: receive-2: 2 2022-11-29 10:48:03.654 I/System.out: receive-1: 3 2022-11-29 10:48:03.856 I/System.out: receive-2: 4 2022-11-29 10:48:04.059 I/System.out: receive-1: 5 2022-11-29 10:48:04.262 I/System.out: receive-2: 6 2022-11-29 10:48:04.466 I/System.out: receive-1: 7 2022-11-29 10:48:04.669 I/System.out: receive-2: 8 2022-11-29 10:48:04.871 I/System.out: receive-1: 9
|
反之也可以用 actor 来启动一个消费协程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| val sendChannel = CoroutineScope(Dispatchers.IO).actor<Int> { while (true) { println("receive: ${receive()}") } }
launch { repeat(10) { sendChannel.send(it) delay(200) } }
launch { repeat(10) { sendChannel.send(it * it) delay(200) } }
|
可以看出 produce 创建的是一个单生产者——多消费者
的模型,而 actor 创建的是一个单消费者--多生产者
的模型。
不过这些相关的 API 要不就是 ExperimentalCoroutinesApi 实验性标记的,要不就是 ObsoleteCoroutinesApi 废弃标记的,个人感觉暂时没必要使用它们。
Channel 是公平的
发送和接收操作是公平的
,它们遵守先进先出原则。官方也给了一个例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| data class Ball(var hits: Int)
fun main() = runBlocking { val table = Channel<Ball>() launch { player("ping", table) } launch { player("pong", table) } table.send(Ball(0)) delay(1000) coroutineContext.cancelChildren() }
suspend fun player(name: String, table: Channel<Ball>) { for (ball in table) { ball.hits++ println("$name $ball") delay(300) table.send(ball) } }
|
由于 ping
协程首先被启动,所以它首先接收到了球,接着即使 ping
协程在将球发送后会立即开始接收,但是球还是被 pong
协程接收了,因为它一直在等待着接收球:
1 2 3 4
| ping Ball(hits=1) pong Ball(hits=2) ping Ball(hits=3) pong Ball(hits=4)
|
带缓冲的 Channel
前面已经说过 Channel 实际上是一个队列,那它当然也存在一个缓存区以及缓存满后的策略(处理背压之类的问题),在创建 Channel 时可以指定两个相关的参数:
1 2 3 4 5
| public fun <E> Channel( capacity: Int = RENDEZVOUS, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, onUndeliveredElement: ((E) -> Unit)? = null ): Channel<E>
|
这里的 Channel() 其实并不是构造函数,而是一个顶层函数,它内部会根据不同的入参来创建不同类型的 Channel 实例。其参数含义如下:
- capacity: Channel 缓存区的容量,默认为
RENDEZVOUS = 0
- onBufferOverflow: 缓冲区满后发送端的处理策略,默认挂起。当消费者处理数据比生产者生产数据慢时,新生产的数据会存入缓存区,当缓存区满后,生产者再调用 send() 方法会挂起,等待消费者处理数据。
看个小栗子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| val channel = Channel<Int>(4)
launch { repeat(10) { channel.send(it) println("send: $it") delay(200) } }
launch { val channel = viewModel.channel for (i in channel) { println("receive: $i") delay(1000) } }
|
输出结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| 2022-11-28 17:16:47.905 I/System.out: send: 0 2022-11-28 17:16:47.907 I/System.out: receive: 0 2022-11-28 17:16:48.107 I/System.out: send: 1 2022-11-28 17:16:48.310 I/System.out: send: 2 2022-11-28 17:16:48.512 I/System.out: send: 3 2022-11-28 17:16:48.715 I/System.out: send: 4 2022-11-28 17:16:48.910 I/System.out: receive: 1 2022-11-28 17:16:48.916 I/System.out: send: 5 2022-11-28 17:16:49.913 I/System.out: receive: 2 2022-11-28 17:16:49.914 I/System.out: send: 6 2022-11-28 17:16:50.917 I/System.out: receive: 3 2022-11-28 17:16:50.917 I/System.out: send: 7 2022-11-28 17:16:51.920 I/System.out: receive: 4 2022-11-28 17:16:51.920 I/System.out: send: 8 2022-11-28 17:16:52.923 I/System.out: receive: 5 2022-11-28 17:16:52.923 I/System.out: send: 9 2022-11-28 17:16:53.925 I/System.out: receive: 6 2022-11-28 17:16:54.928 I/System.out: receive: 7 2022-11-28 17:16:55.932 I/System.out: receive: 8 2022-11-28 17:16:56.935 I/System.out: receive: 9
|
Channel 构造类型
这一节来简单看看 Channel 构造的几种类型,为防止内容过于枯燥,就不深入剖析一些源码细节了。
Channel 构造
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public fun <E> Channel( capacity: Int = RENDEZVOUS, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, onUndeliveredElement: ((E) -> Unit)? = null ): Channel<E> = when (capacity) { RENDEZVOUS -> { if (onBufferOverflow == BufferOverflow.SUSPEND) RendezvousChannel(onUndeliveredElement) else ArrayChannel(1, onBufferOverflow, onUndeliveredElement) } CONFLATED -> { require(onBufferOverflow == BufferOverflow.SUSPEND) { "CONFLATED capacity cannot be used with non-default onBufferOverflow" } ConflatedChannel(onUndeliveredElement) } UNLIMITED -> LinkedListChannel(onUndeliveredElement) BUFFERED -> ArrayChannel( if (onBufferOverflow == BufferOverflow.SUSPEND) CHANNEL_DEFAULT_CAPACITY else 1, onBufferOverflow, onUndeliveredElement ) else -> { if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST) ConflatedChannel(onUndeliveredElement) else ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement) } }
|
前面我们说了 Channel()
并不是构造函数,而是一个顶层函数,它内部会根据不同的入参来创建不同类型的 Channel 实例。我们看看入参可取的值:
1 2 3 4 5 6 7 8
| public const val UNLIMITED: Int = Int.MAX_VALUE public const val RENDEZVOUS: Int = 0 public const val CONFLATED: Int = -1 public const val BUFFERED: Int = -2
public enum class BufferOverflow { SUSPEND, DROP_OLDEST, DROP_LATEST }
|
其实光看这个构造的过程,以及两个入参的取值,我们基本上就能知道生成的这个 Channel 实例的表现了。
比如说 UNLIMITED 表示缓存区无限大的管道,它所创建的 Channel 叫 LinkedListChannel; 而 BUFFERED 或指定 capacity 大小的入参,创建的则是 ArrayChannel 实例,这也正是命名为 LinkedList(链表) 和 Array(数组) 的数据结构一个区别,前者可以视为无限大,后者有固定的容量大小。
比如说 SUSPEND
表示缓存区满后挂起, DROP_OLDEST
表示缓存区满后会删除缓存区里最旧的那个元素且把当前 send 的数据存入缓存区, DROP_LATEST
表示缓存区满后会删除缓存区里最新的那个元素且把当前 send 的数据存入缓存区。
Channel 类型
上面创建的这四种 Channel 都有一个共同的基类——AbstractChannel
,简单看看他们的继承关系:
在 AbstractSendChannel 中有个重要的成员变量:
1
| protected val queue = LockFreeLinkedListHead()
|
它是一个循环双向链表,形成了一个队列 queue 结构,send()
数据时存入链表尾部,receive()
数据时就从链表头第一个节点取。至于具体的挂起,恢复等流程,感兴趣的可以自己看看源码。
值得一提的是, queue 中的节点类型可以大体分为三种:
- Send
- Receive
- Closed: 当调用
Channel.close()
方法时,会往 queue 队列中加入 Closed 节点,这样当 send or receive
时就知道 Channel 已经关闭了。
另外,对于 ArrayChannel 管道,它有一个成员变量:
1
| private var buffer: Array<Any?> = arrayOfNulls<Any?>(min(capacity, 8)).apply { fill(EMPTY) }
|
这是一个数组类型,用来实现指定 capacity 的缓存区。但是它的初始大小不是 capacity, 主要是用来防止一些不必要的内存分配。
总结
Channel 类似于 BlockingQueue 阻塞队列,其不同之处是默认把阻塞行为换成了挂起,这也是协程的一大特性。它的思想是生产-消费模式(观察者模式)。
简单比较一下四种 Channel 类型:
- RendezvousChannel: 翻译成约会类型,缓存区大小为0,且指定为 SUSPEND 挂起策略。发送者和接收者一对一出现,接收者没出现,则发送者 send 会被挂起;发送者没出现,则接收者 receive 会被挂起。
- ConflatedChannel: 混合类型。发送者不会挂起,它只有一个 value 值,会被新的值覆盖掉;如果没有数据,则接收者会被挂起。
- LinkedListChannel: 不限缓存区大小的类型。发送者不会挂起,能一直往队列里存数据;队列无数据时接收者会被挂起。
- ArrayChannel: 指定缓存区大小的类型。当缓存区满时,发送者根据 BufferOverflow 策略来处理(是否挂起);当缓存区空时,接收者会被挂起。