概述 2019 年 Google I/O 大会上宣布今后将越来越优先采用 Kotlin 进行 Android 开发,Kotlin 是一种富有表现力且简洁的编程语言,不仅可以减少常见代码错误,还可以轻松集成到现有应用中。关于 Kotlin 的基础用法可参考: Kotlin 官方文档  和 Kotlin笔记系列 。
通常来说,协程(Coroutines)是轻量级的线程,它不需要从用户态切换到内核态,Coroutine是编译器级的,Process 和 Thread 是操作系统级的,协程没有直接和操作系统关联,但它也是跑在线程中的,可以是单线程,也可以是多线程。协程设计的初衷是为了解决并发问题,让协作式多任务实现起来更加方便,它可以有效地消除回调地狱。
在Kotlin中,协程是一套线程 API, 就像 Java 的 Executors 和 Android 的 Handler 等,是一套比较方便的线程框架,它能够在同一个代码块里进行多次的线程切换,可以用看起来同步的方式写出异步的代码,即非阻塞式挂起 。
协程的优势:避免回调地狱(也可以通过 RxJava 或者 CompletableFuture 实现):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 api.getAvatar(id) { avatar ->     api.getName(id) { name ->         show(getLabel(avatar, name))     } } coroutineScope.launch(Dispatchers.Main) {     val  avatar = async { api.getAvatar(id) }     val  name = async { api.getName(id) }     val  label = getLabelSuspend(avatar.await(), name.await())     show(label) } 
使用时需要先添加依赖:
1 2 3 4 5 6 def  kotlin_coroutines = `1.3 .9 `implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlin_coroutines"  implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$kotlin_coroutines"  
启动协程可以使用下面几种方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 runBlocking { getName(id) } GlobalScope.launch { getName(id) } val  coroutineScope = CoroutineScope(context)coroutineScope.launch { getName(id) } val  id = coroutineScope.async { getName(id) }id.await() 
启动协程需要三样东西,分别是上下文(CoroutineContext)、启动模式(CoroutineStart)、协程体。
基本概念 suspend suspend 是挂起的意思,它挂起的对象是协程。
当线程执行到协程的 suspend 函数的时候,暂时不继续执行协程代码了,它会跳出协程的代码块,然后这个线程该干什么就去干什么。 
当协程执行到 suspend 函数的时候,这个协程会被「suspend」,也就是从当前线程被挂起。换句话说,就是这个协程从正在执行它的线程上脱离。线程的代码在到达 suspend 函数的时候被掐断,接下来协程会从这个 suspend 函数开始继续往下执行,不过是在这个 suspend 函数指定的线程里执行。 
 
紧接着在 suspend 函数执行完成之后,协程会自动帮我们把线程再切回来,然后接着执行协程后面的代码(resume),resume 功能是协程特有的,所以 suspend 函数必须在 协程  或者 另一个suspend函数  里被调用。
suspend 的非阻塞式指的是它能用看起来阻塞的代码写出非阻塞的操作,单协程可以是非阻塞式的,因为它可以用 suspend 函数来切换线程,本质上还是多线程,只不过写法上连续两行代码看起来是阻塞式的。
CoroutineScope 1 2 3 public  interface  CoroutineScope      public  val  coroutineContext: CoroutineContext } 
GlobeScope 
GlobeScope 启动的协程是一个单独的作用域,不会继承上层协程的作用域,其内部的子协程遵守默认的作用域规则。
coroutineScope 
coroutineScope 启动的协程 cancel 时会 cancel 所有子协程,也会 cancel 父协程,子协程未捕获的异常也会向上传递给父协程。
supervisorScope 
supervisorScope 启动的协程 cancel 和传递异常时,只会由父协程向子协程单向传播。MainScope 是 supervisorScope 作用域。
CoroutineContext Job, CoroutineDispatcher, ContinuationInterceptor 等都是 CoroutineContext 的子类,即它们都是协程上下文。CoroutineContext 中有一个重载了(+)操作符的plus方法,可以将 Job 和 CoroutineDispatcher 等元素集合起来,代表一个协程的场景。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public  interface  CoroutineContext           public  operator  fun  <E : Element>  get (key: Key <E >)      public  fun  <R>  fold (initial: R , operation: (R , Element ) -> R )           public  operator  fun  plus (context: CoroutineContext )      public  fun  minusKey (key: Key <*>)      public  interface  Key <E : Element >     public  interface  Element  : CoroutineContext {               } } public  interface  ContinuationInterceptor  : CoroutineContext.Element {      } public  abstract  class  CoroutineDispatcher  :    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {      } 
CoroutineStart 几种启动模式如下:
1 2 3 4 5 6 public  enum  class  CoroutineStart      DEFAULT,     LAZY,     ATOMIC,     UNDISPATCHED;  } 
DEFAULT 
DEFAULT是饿汉式启动,launch调用后,会立即进入待调度状态,一旦调度器OK就可以开始执行。
LAZY 
LAZY是懒汉式启动,launch后并不会有任何调度行为,协程体也不会进入执行状态,直到需要它执行(调用job.start/join等)的时候才会执行。
ATOMIC 
@ExperimentalCoroutinesApi. This is similar to [DEFAULT], but the coroutine cannot be cancelled before it starts executing.
UNDISPATCHED 
@ExperimentalCoroutinesApi. This is similar to [ATOMIC] in the sense that coroutine starts executing even if it was already cancelled, but the difference is that it starts executing in the same thread.
Dispatchers 协程调度器是用来指定协程体在哪个线程中执行,Kotlin提供了几个调度器:
Default 
默认选项,指定协程体在线程池中执行:
1 2 3 4 5 6 7 8 9 10 11 12 GlobalScope.launch(Dispatchers.Default) {     println("1: ${Thread.currentThread().name} " )     launch(Dispatchers.Default) {         println("2: ${Thread.currentThread().name} " )     }     println("3: ${Thread.currentThread().name} " ) } -->output 1 : DefaultDispatcher-worker-1 3 : DefaultDispatcher-worker-1 2 : DefaultDispatcher-worker-2 
Main 
指定协程体在主线程中执行。
Unconfined 
协程体运行在父协程所在的线程:
1 2 3 4 5 6 7 8 9 10 11 12 GlobalScope.launch(Dispatchers.Default) {     println("1: ${Thread.currentThread().name} " )     launch(Dispatchers.Unconfined) {         println("2: ${Thread.currentThread().name} " )     }     println("3: ${Thread.currentThread().name} " ) } -->output 1 : DefaultDispatcher-worker-1 2 : DefaultDispatcher-worker-1 3 : DefaultDispatcher-worker-1 
IO 
基于 Default 调度器背后的线程池(designed for offloading blocking IO tasks),因此从 Default 切换到 IO 不会触发线程切换:
1 2 3 4 5 6 7 8 9 10 11 12 GlobalScope.launch(Dispatchers.Default) {     println("1: ${Thread.currentThread().name} " )     launch(Dispatchers.IO) {         println("2: ${Thread.currentThread().name} " )     }     println("3: ${Thread.currentThread().name} " ) } -->output 1 : DefaultDispatcher-worker-1 3 : DefaultDispatcher-worker-1 2 : DefaultDispatcher-worker-1 
Job&Deferred Job 可以取消并且有简单生命周期,它有三种状态:
State 
isActive 
isCompleted 
isCancelled 
 
 
New (optional initial state) 
false 
false 
false 
 
Active (default initial state) 
true 
false 
false 
 
Completing (optional transient state) 
true 
false 
false 
 
Cancelling (optional transient state) 
false 
false 
true 
 
Cancelled (final state) 
false 
true 
true 
 
Completed (final state) 
false 
true 
false 
 
Job 完成时是没有返回值的,如果需要返回值的话,应该使用 Deferred,它是 Job 的子类: public interface Deferred<out T> : Job。
launch 方法返回一个Job类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public  interface  Job  : CoroutineContext.Element {     public  companion  object  Key : CoroutineContext.Key<Job> {              }     public  val  isActive: Boolean      public  val  isCompleted: Boolean      public  val  isCancelled: Boolean      public  fun  start () Boolean      public  fun  cancel () Unit  = cancel(null )     public  suspend  fun  join ()       } 
async 方法返回一个 Deferred 类型:
1 2 3 4 5 public  interface  Deferred <out T > : Job {          public  suspend  fun  await ()       } 
示例:
1 2 3 4 5 6 7 fun  main ()     val  job = async {         delay(500 )         "Hello"      }     println("${job.await()} , World" ) } 
Android-Kotlin协程使用 MainScope Android 中一般不建议使用 GlobalScope, 因为它会创建一个顶层协程,需要保持所有对 GlobalScope 启动的协程的引用,然后在 Activity destory 等场景的时候 cancel 掉这些的协程,否则就会造成内存泄露等问题。可以使用 MainScope:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class  CoroutineActivity  : AppCompatActivity     private  val  mainScope = MainScope()     fun  request1 ()          mainScope.launch {                      }     }          override  fun  onDestroy ()          super .onDestroy()         mainScope.cancel()     } } 
MainScope 的定义:
1 public  fun  MainScope () 
Lifecycle协程 关于 Lifecycle 可以参考 Android-Jetpack组件之Lifecycle 。
添加依赖:
1 implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0"  
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 val  LifecycleOwner.lifecycleScope: LifecycleCoroutineScope    get () = lifecycle.coroutineScope val  Lifecycle.coroutineScope: LifecycleCoroutineScope    get () {         while  (true ) {             val  existing = mInternalScopeRef.get () as  LifecycleCoroutineScopeImpl?             if  (existing != null ) {                 return  existing             }             val  newScope = LifecycleCoroutineScopeImpl(                 this ,                 SupervisorJob() + Dispatchers.Main.immediate             )             if  (mInternalScopeRef.compareAndSet(null , newScope)) {                 newScope.register()                 return  newScope             }         }     } abstract  class  LifecycleCoroutineScope  internal  constructor     internal  abstract  val  lifecycle: Lifecycle          fun  launchWhenCreated (block: suspend  CoroutineScope .() -> Unit )          lifecycle.whenCreated(block)     }          fun  launchWhenStarted (block: suspend  CoroutineScope .() -> Unit )          lifecycle.whenStarted(block)     }          fun  launchWhenResumed (block: suspend  CoroutineScope .() -> Unit )          lifecycle.whenResumed(block)     } } 
使用:
1 2 3 4 5 6 7 8 9 class  MainActivity  : AppCompatActivity     fun  test ()          lifecycleScope.launchWhenCreated {                      }     } } 
LiveData协程 关于 LiveData 可以参考 Android-Jetpack组件之LiveData-ViewModel 。
添加依赖:
1 implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0"  
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 fun  <T>  liveData (     context: CoroutineContext  = EmptyCoroutineContext,     timeoutInMs: Long  = DEFAULT_TIMEOUT,     @BuilderInference  block: suspend  LiveDataScope <T >.() -> Unit  ) internal  class  CoroutineLiveData <T >    context: CoroutineContext = EmptyCoroutineContext,     timeoutInMs: Long  = DEFAULT_TIMEOUT,     block: Block<T> ) : MediatorLiveData<T>() {     private  var  blockRunner: BlockRunner<T>?     private  var  emittedSource: EmittedSource? = null      init  {         val  supervisorJob = SupervisorJob(context[Job])         val  scope = CoroutineScope(Dispatchers.Main.immediate + context + supervisorJob)         blockRunner = BlockRunner(             liveData = this ,             block = block,             timeoutInMs = timeoutInMs,             scope = scope         ) {             blockRunner = null          }     }     internal  suspend  fun  emitSource (source: LiveData <T >)          clearSource()         val  newSource = addDisposableSource(source)         emittedSource = newSource         return  newSource     }     internal  suspend  fun  clearSource ()          emittedSource?.disposeNow()         emittedSource = null      }          override  fun  onActive ()          super .onActive()         blockRunner?.maybeRun()     }          override  fun  onInactive ()          super .onInactive()         blockRunner?.cancel()     } } 
使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class  MainActivity  : AppCompatActivity     fun  test ()          liveData {             try  {                                  emit("success" )             } catch (e: Exception) {                 emit("error" )             }         }.observe(this , Observer {             Log.d("LLL" , it)         })     } } 
ViewModel协程 关于 ViewModel 可以参考 Android-Jetpack组件之LiveData-ViewModel 。
添加依赖:
1 implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.2.0"  
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 val  ViewModel.viewModelScope: CoroutineScope        get () {             val  scope: CoroutineScope? = this .getTag(JOB_KEY)             if  (scope != null ) {                 return  scope             }             return  setTagIfAbsent(JOB_KEY,                 CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main.immediate))         } internal  class  CloseableCoroutineScope     override  val  coroutineContext: CoroutineContext = context     override  fun  close ()          coroutineContext.cancel()     } } 
协程的并发 启动一百个协程,它们都做一千次相同的操作,同时会测量它们的完成时间以便进一步的比较:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 suspend  fun  massiveRun (action: suspend  () -> Unit )     val  n = 100        val  k = 1000       val  time = measureTimeMillis {         coroutineScope {              repeat(n) {                 launch {                     repeat(k) { action() }                 }             }         }     }     println("Completed ${n * k}  actions in $time  ms" )     } var  counter = 0 fun  main ()     withContext(Dispatchers.Default) {         massiveRun {             counter++         }     }     println("Counter = $counter " ) } Completed 100000  actions in  55  ms Counter = 92805  
因为一百个协程在多个线程中同时递增计数器但没有做并发处理,所以不太可能输出 100000 。使用 volatile 并不能解决这个问题,因为 volatile 不能保证原子性,只有内存可见性。可以用以下等方式解决:
使用原子类 AtomicInteger 进行递增 
以细粒度限制线程:对特定共享状态的所有访问权都限制在单个线程中 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 val  counterContext = newSingleThreadContext("CounterContext" )var  counter = 0 fun  main ()     withContext(Dispatchers.Default) {         massiveRun {                          withContext(counterContext) {                 counter++             }         }     }     println("Counter = $counter " ) } Completed 100000  actions in  1652  ms Counter = 100000  
以粗粒度限制线程:线程限制在大段代码中执行 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 val  counterContext = newSingleThreadContext("CounterContext" )var  counter = 0 fun  main ()          withContext(counterContext) {         massiveRun {             counter++         }     }     println("Counter = $counter " ) } Completed 100000  actions in  40  ms Counter = 100000  
互斥:除了Java已有的一些互斥方式如 Lock 等之外,Kotlin 中提供了 Mutex 类,它具有 lock 和 unlock 方法,lock 是一个挂起函数,它不会阻塞线程。可以使用 withLock 扩展函数替代常用的 mutex.lock(); try { …… } finally { mutex.unlock() } 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 val  mutex = Mutex()var  counter = 0 fun  main ()     withContext(Dispatchers.Default) {         massiveRun {                          mutex.withLock {                 counter++             }         }     }     println("Counter = $counter " ) } Completed 100000  actions in  640  ms Counter = 100000  
协程Flow 更多用法参考: Flow 。
flow创建 flow 构造器中的代码会直到 flow 被 collect 的时候才会运行,且 collect 方法是 suspend 函数,会挂起当前协程。创建 flow 有几种方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 flowOf(1 , 2 , 3 , 4 , 5 ).onEach { delay(100 ) }.collect{ println(it) } listOf(1 , 2 , 3 , 4 , 5 ).asFlow().onEach { delay(100 ) }.collect { println(it) } fun  simple () Int > = flow {    for  (i in  1 ..3 ) {         delay(500 )          emit(i)      } } channelFlow {     for  (i in  1 ..5 ) {         delay(100 )         send(i)     } }.collect { println(it) } 
flow 是 Cold Stream, 在没有切换线程的情况下,生产者和消费者是同步非阻塞的。channel 是 Hot Stream, channelFlow 实现了生产者和消费者异步非阻塞模型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 runBlocking {     val  time = measureTimeMillis {         flow {             for  (i in  1 ..5 ) {                 delay(100 )                 emit(i)             }         }.collect {             delay(100 )             println(it)         }     }     println("cost $time " )  } runBlocking {     val  time = measureTimeMillis {         channelFlow {             for  (i in  1 ..5 ) {                 delay(100 )                 send(i)             }         }.collect {             delay(100 )             println(it)         }     }     println("cost $time " )  } 
这两者运行逻辑如下:
当然如果使用 flowOn 切换线程或者使用 buffer() 的话,得到的时间也差不多是 700 ms。
切换线程 流的 collect 总是在调用协程的上下文中发生,使用 flowOn 可以切换流 emit 的上下文(包括切换线程,不要使用 withContext() 来切换 flow 的线程),上游都会受到影响:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 fun  simple () Int > = flow {    for  (i in  1 ..3 ) {         Thread.sleep(100 )          log("Emitting $i " )         emit(i)      } }.flowOn(Dispatchers.Default)  fun  main () Unit > {    simple().collect { value ->         log("Collected $value " )      }  } [DefaultDispatcher-worker-1  @coroutine #2 ] Emitting 1  [main @coroutine #1 ] Collected 1  [DefaultDispatcher-worker-1  @coroutine #2 ] Emitting 2  [main @coroutine #1 ] Collected 2  [DefaultDispatcher-worker-1  @coroutine #2 ] Emitting 3  [main @coroutine #1 ] Collected 3  
可以在流上使用 buffer 操作符来并发运行流中发射和收集的代码(注意,当更改 CoroutineDispatcher 时,flowOn 操作符使用了相同的缓冲机制,但是在这里显式地请求了缓冲而不改变执行上下文):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 fun  simple () Int > = flow {    for  (i in  1 ..3 ) {         delay(100 )          emit(i)      } } fun  main () Unit > {     val  time = measureTimeMillis {         simple().collect { value ->              delay(300 )              println(value)          }      }        println("Collected in $time  ms" ) } 1 2 3 Collected in  1217  ms val  time = measureTimeMillis {    simple()         .buffer()          .collect { value ->              delay(300 )              println(value)          }  } println("Collected in $time  ms" ) 1 2 3 Collected in  1043  ms 
flow取消 如果 flow 是在一个挂起函数内被挂起了,那么 flow 是可以被取消的,否则不能取消。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 fun  simple () Int > = flow {     for  (i in  1 ..3 ) {         delay(100 )                   println("Emitting $i " )         emit(i)     } } fun  main () Unit > {    withTimeoutOrNull(250 ) {          simple().collect { value -> println(value) }      }     println("Done" ) } 
操作符 操作符: map, filter, transform, take 等:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 suspend  fun  performRequest (request: Int )     delay(1000 )      return  "response $request "  } fun  main () Unit > {    (1 ..3 ).asFlow()          .map { request -> performRequest(request) }         .collect { response -> println(response) } } response 1  response 2  response 3  
由于 collect 是 suspend 函数,后续操作会等待它执行完毕后才会接着执行,因此可以使用 launchIn 配合 onEach 完成这个任务, launchIn 的参数是 CoroutineScope, 指定了用哪一个协程来启动流的收集:
1 2 3 4 5 6 7 8 9 fun  events () Int > = (1 ..3 ).asFlow().onEach { delay(100 ) }fun  main () Unit > {    events()         .onEach { event -> println("Event: $event " ) }         .launchIn(this )      println("Done" ) } 
异常处理 flow 可以使用传统的 try-catch 处理异常,另外还可以使用 catch 操作符捕获来自上游的异常,而 onCompletion 不能捕获异常,只能用于判断是否有异常:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 fun  main ()     flow {         emit(1 )         throw  RuntimeException()     }     .onCompletion { cause ->         if  (cause != null )             println("Flow completed exceptionally" )         else              println("Done" )     }     .catch { println("catch exception" ) }     .collect { println(it) } } 1 Flow completed exceptionally catch  exception
上面的代码如果把 onCompletion 和 catch 交换一下位置,则 catch 操作符捕获到异常后,不会影响到下游,因此 onCompletion 操作符不会走异常分支。
但 catch 只是中间操作符,它不能捕获下游的异常,对于下游如 collect 中的异常,除了使用 try-catch 去处理外,还可以借助 onEach 操作符,把业务逻辑放到 onEach 操作符内:
1 2 3 4 5 6 7 8 9 10 fun  main () Unit > {    flow {          ......     }     .onEach {           ......     }    .catch  { ... }    .collect()  } 
flow 可以使用 retry 和 retryWhen 操作符重试指定次数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 runBlocking {     (1 ..5 ).asFlow()         .onEach {             if  (it == 3 ) throw  RuntimeException("Error on $it " )         }         .retry(2 ) {             if  (it is  RuntimeException) {                 return @retry  true              }             false          }         .onEach { println("Emitting $it " ) }         .catch  { it.printStackTrace() }         .collect() } Emitting 1  Emitting 2  Emitting 1  Emitting 2  Emitting 1  Emitting 2  java.lang.RuntimeException: Error on 3  
flow 可以使用操作符来监听其生命周期:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 runBlocking {     (1 ..5 ).asFlow()         .onEach { if  (it == 3 ) throw  RuntimeException("Error on $it " ) }         .onEach { println("OnEach $it " ) }         .onStart { println("Flow starting" ) }         .onCompletion { println("Flow completed" ) }         .catch  { println("Exception : ${it.message} " ) }         .collect() } Flow starting OnEach 1  OnEach 2  Flow completed Exception : Error on 3  
总结 
Flow 以挂起的方式执行,它是非阻塞的,只有遇到末端操作符,才会触发所有操作的执行; 
map, filter, take, zip 等是中间操作符,collect, collectLatest, single, reduce, toList 等是末端操作符; 
遇到中间操作符,并不会执行任何操作,这些操作符构建了一个待执行的调用链,末端操作符是可挂起函数,遇到末端操作符会触发所有操作的执行。 
 
协程Channel 
Channel 是非阻塞的,它用于发送方 (SendChannel) 和接收方 (ReceiveChannel) 之间通信,Channel 实现了 SendChannel 和 ReceiveChannel 接口,所以既可以发送数据又可以接受数据; 
Channel 和 Java 中的 BlockingQueue 类似,不同之处在于 BlockingQueue 是阻塞的,而 Channel 是挂起的; 
发送方 (SendChannel) 和 接收方 (ReceiveChannel) 之间有一个通道,也就是缓冲区,发送方和接收方之间通过缓冲区进行同步,发送方将数据发送到缓冲区,通过接收方从缓冲区获取数据; 
缓冲区的作用帮我们同步发送方和接收方发送和接受的数据,也就意味着多个协程可以向同一个 channel 发送数据, 一个 channel 的数据也可以被多个协程接收。 
 
send() 和 offer() 的区别:
send: suspend 函数,缓冲区没有满则立即添加元素,缓冲区已满则调用者被挂起; 
offer: 缓冲区没有满则立即添加元素,添加成功会返回 true, 失败会返回 false; 
 
receive() 和 poll() 的区别:
receive: suspend 函数,异步获取元素,如果缓冲区为空则调用者被挂起,直到一个新值被发送到缓冲区; 
poll: 用于同步获取一个元素,如果缓冲区是空的,则返回 null; 
 
Flow 与 Channel 的区别:
Flow: 中间操作符 (map , filter 等等) 会构建了一个待执行的调用链,只有遇到末端操作符 (collect , toList 等等) 才会触发所有操作的执行,所以 Flow 也被称为冷数据流; 
Channel: 发送方 (SendChannel) 发送数据,并不依赖于接受方(ReceiveChannel),所以 Channel 也被称为热数据流; 
 
Channel类型 
RendezvousChannel: 默认类型,大小为 0 的缓冲区,只有当 send() 方法和 receive() 方法都调用的时候,元素才会从发送方传输到接收方,否则将会被挂起 
LinkedListChannel: 通过 Channel.Factory.CONFLATED 会创建一个容量无限的缓冲区 (受限于内存的大小) ,send() 方法远不会挂起,offer() 方法始终返回 true 
ConflatedChannel: 最多缓冲一个元素,新元素会覆盖掉旧元素,只会接收最后发送的元素,之前的元素都会丢失,send() 方法永远不会挂起,offer() 方法始终返回 true 
ArrayChannel: 通过 Channel.Factory.BUFFERED 或者 指定大小 会创建一个固定容量的数组缓冲区,send() 方法仅在缓冲区满时挂起,receive() 方法仅在缓冲区为空时挂起 
 
Channel 在概念上有点类似 BlockingQueue, 元素从一端被加入, 从另一端被消费。区别在于, 读写的方法不是 blocking 的, 而是 suspending 的,在为空或为满时. channel 可以 suspend 它的 send 和 receive 操作。
Kotlin 库中定义了多种 Channel 类型,主要区别在于内部可以存储的元素数量及send是否可以被挂起 ,至于 receive 方法都是同样的动作:如果channel不为空, 接收一个元素, 否则挂起。
1 2 3 4 val  rendezvousChannel = Channel<String>() val  bufferedChannel = Channel<String>(10 ) val  conflatedChannel = Channel<String>(CONFLATED) val  unlimitedChannel = Channel<String>(UNLIMITED) 
看一个示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 val  channel = Channel<Int >(6 )suspend  fun  receiveData ()     coroutineScope {         while  (!channel.isClosedForReceive) {             println("${Thread.currentThread().name}  receive: ${channel.receive()} " )         }     } } suspend  fun  sendData ()     coroutineScope {         if  (!channel.isClosedForSend) {             (1 ..10 ).forEach {                 println("${Thread.currentThread().name}  send: $it " )                 channel.send(it)             }         }     } } fun  main ()     val  sendJob = GlobalScope.launch {         sendData()     }     val  receiveJob = GlobalScope.launch {         receiveData()     }     joinAll(sendJob, receiveJob) }