Coroutines internals

Based on my research, might be incorrect

part 1: suspend functions to continuation passing style,

also add how to convert suspend to other libs and other libs to coroutines starting coroutines in part 2 as coroutine context covered there part 2: threading, coroutine context and context elements plain start of coroutines cancelling part 3: coroutine scopes and structured concurrency part 4: channel -> flow -> shared flow -> state flow

Suspending functions actually mimic continuation passing style but without the lambdas.

  1. How we write blocking code:
fun postItem(item: Item){
  val someVariable = someNonSuspendingFunc(item)
  val token = requestToken()
  val post = createPost(token, item)
  processPost(post, someVariable)
}
  1. How we write non-blocking code with callbacks
fun postItem(item: Item){
  val someVariable = someNonSuspendingFunc(item)
  requestToken { token ->
      createPost(token, item) { post ->
          processPost(post, someVariable)
      }
  }
}

Kotlin’s coroutines way of doing things

suspend fun postItem(item: Item){
  val someVariable = someNonSuspendingFunc(item)
  val token = requestToken() //requestToken is suspending
  val post = createPost(token, item)  //createPost is suspending
  processPost(post, someVariable) //processPost suspending or not does not matter
}

How this code is transformed into callback like implementation using state machines:

  1. identifying suspension points: suspension points are where other suspending functions are called and write them into a when statement. We keep the label in state object
suspend fun postItem(item: Item){
    val sm = object: CoroutineImpl { ... }
    when(sm.label){
      0 -> {
        val someVariable = someNonSuspendingFunc(item)
        val token = requestToken()
      }
      1 -> {
        val post = createPost(token, item)
      }
      2 -> {
        processPost(post, someVariable)
      }
    }
}
  1. Right now if we execute this function with say sm.label initialized with 0, then only the first block of when will run. What about the other ones? So actually every suspending function have an additional parameter of type continuation. And the suspending function like requestToken, createPost etc. call the callback in the continuation when they’re done So continuation is an interface like this
interface Continuation<in T> {
    val context: CoroutineContext //this will come later
    fun resume(value: T)
    fun resumeWithException(exception: Throwable)
}

And the code becomes like this

suspend fun postItem(item: Item, cont: Continuation) {
  //if cont is of type CoroutineImplFoPostItem, means it is the same which was constructed from previous postItem call. So we use it to restore state(restoration in next point)
  val sm = cont as? CoroutineImplFoPostItem ?:  object : CoroutineImplFoPostItem { ...
    //resume will be something like this
    override fun resume(..){
      postItem(null, this)
    }
  }
  when (sm.label) {
    0 -> {
      val someVariable = someNonSuspendingFunc(item)
      val token = requestToken(sm) //sm passed as continuation, see above its resume function which will be called in requestToken after its done
    }
    1 -> {
      val post = createPost(token, item, sm)
    }
    2 -> {
      processPost(post, someVariable)
    }
  }
}
  1. Now restoration of state
suspend fun postItem(item: Item, cont: Continuation) {
  val sm = cont as? CoroutineImplFoPostItem ?:  object : CoroutineImplFoPostItem { ...
    override fun resume(..){
      postItem(null, this)
    }
  }
  when (sm.label) {
    0 -> {
      sm.item = item
      sm.label = 1
      val someVariable = someNonSuspendingFunc(item)
      val token = requestToken(sm)
    }
    1 -> {
      val item = sm.item
      val token = sm.result as Token
      sm.label = 2
      val post = createPost(token, item, sm)
    }
    2 -> {
      processPost(post, someVariable)
    }
  }
}

Converting callbacks from other libraries to coroutines at the lowest level

We have suspendCoroutines which provides us the continuation of enclosing suspending function and we can resume the enclosing suspending function with this continuation

//retrofit call
suspend fun <T> Call<T>.await(): T = suspendCoroutines { continuation: Continuation<T> ->
  enqueue(object: Callback<T>) {
    override fun onResponse(call: Call<T>, response: Response<T>){
        if(response.isSuccessful){
          cont.resume(response.body())
        } else {
          cont.resumeWithException(ErrorResponse(response))
        }
    }
    override fun onResponse(call: Call<T>, throwable: Throwable){
      cont.resumeWithException(throwable)
    } 
  }
}

Coroutine context and changing threads

  • the continuation object also have coroutine context which is a map of values Why it is needed: one such requirement is changing threads So there’s one type of element of that coroutine context map is(renamed to ContinuationInterceptor)
interface CoroutineInterceptor {
    companions object Key: CoroutineContext.Key<CoroutineInterceptor>

    fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
}

it allows us to install custom wrappers over continuations

  • So to change threads there’s one implementation of CoroutineInterceptor
class DispatchedContinuation<in T> {
  val dispatcher: CoroutineDispacher,//what is this? will come later
  val continuation: Continuation<T> 
}{
  override fun resume(value: T){
    //move the continuation to another thread
    dispatcher.dispatch(context, DispatchedTask(..))
  }
}

Starting corotuines

  • one simplest way is this
suspend fun doSomething(){
    var a = 10
    var b = 20
    delay(200)
    println("b:$b")
}
fun main() {
    val block: suspend () -> Unit = ::doSomething
    block.startCoroutine(object: Continuation<Unit> {
        override val context: CoroutineContext = Dispatchers.Default
        override fun resumeWith(result: Result<Unit>) {
            println("result:$result")
        }
    })
    Thread.sleep(2000)
}

This code prints this

b:20
result:Success(kotlin.Unit)
  • others will come later

Cancelling coroutines with jobs

fun main() {
    val block: suspend () -> Unit = ::doSomething
    val job = Job()
    block.startCoroutine(object: Continuation<Unit> {
        override val context: CoroutineContext = Dispatchers.Default + job

        override fun resumeWith(result: Result<Unit>) {
            println("result:$result")
        }
    })
    job.cancel()
    Thread.sleep(2000)
}

This code prints

result:Failure(kotlinx.coroutines.JobCancellationException: Job was cancelled; job=JobImpl{Cancelled}@1610bb6a)
  • But if we modify doSomething function like this
suspend fun doSomething(){
    var a = 10
    var b = 20
    Thread.sleep(200)
    println("b:$b")
}

It will print

b:20
result:Success(kotlin.Unit)
  • So coroutines are cooperative, they have to manually yield control and also manually handle cancel situation.
  • So to check if outside code cancelled the coroutine we can use the isActive construct inside coroutines
  • In suspend blocks we can also do things like job.join()
  • The job is also an element of coroutine context so we can do things like
suspend fun one(){
  val job = coroutineContext[Job]!!
  //similarily
  val interceptor = coroutineContext[CoroutineInterceptor]!!
}
//or
suspend fun two(){
  val job = coroutineContext.job
}
  • To cancel other library code inside coroutines we can do something like this
suspend fun <T> Call<T>.await(): T = suspendCancellableCoroutines { cont: CancellableContinuation<T> ->
  enqueue(..)
  cont.invokeOnCompletion {
    this@ await.cancel()
  }
}

Coroutines scope

= launch with without parent contxt

channels and flow