Skip to main content

A introduction to Kotlin cotoutine-from zero to multiple

·6 mins

In the last post, I introduced the creation, use, and collaboration of Kotlin coroutines. This post will introduce more usage scenarios and continue to take you into the world of coroutines.

使用协程处理异步数据流 #

Common programming languages have built-in representations of datasets of different objects of the same type, often called container classes. Different container classes are suitable for different usage scenarios, and Kotlin’s Flow was introduced to represent asynchronous data flow in the context of asynchronous computing.

Flow #

Asynchronous data flow is basically about getting asynchronous data in some way, and Kotlin provides many ways to do this, the more common ones being the asFlow extension of the Kotlin coroutine package and the flow constructor. The former is a Flow-ized encapsulation of a common dataset, nothing more, so let’s focus on the latter.

The main goal of the flow constructor is to generate an asynchronous data flow, which is a generic function that takes a pending function as its argument and a FlowCollector as an extension function. This interface has only one emit method, which is to provide asynchronously computed data to the created Flow, and because it’s a pending function, we can use other pending functions inside it to compute asynchronous values, and then send the values out via the emit method, and so on to provide a constant stream of data for downstream operations.

That’s not all. The above steps only specify how the data is created, they don’t actually execute it, i.e., the road is built, but there’s no car on the road yet. So how do we get the car on the road, looking at the Flow interface you will see that it provides the collect method to process the data. collect receives a pending function as processing logic, but at the same time, the collect method itself is also a pending function, so the method can only be run in a pending function. With this knowledge, we can write the simplest asynchronous data flow.

 1uspend fun compute():Int{
             delay(123)
             return 1024
 }
 
 viewModelScope.launch {
     val flow=flow<Int> {
         emit(9527)
         emit(compute())
        delay(256)
        emit(256)
    }
    flow.collect {
        println(it)
    }
}

Feel free to do all sorts of operations inside the flow constructor, just pass the result when necessary, but note that the emit method can only run in the same coroutine. At first glance, there’s no fundamental difference between writing them separately and writing them together, but Flow can do a lot more.

It’s time to change Flow’s working environment #

In the previous section, our simple example, if we replaced the data fetch method in the constructor with a network request, the application would be dead in the water. This is because they are running in the main thread. At this point, those of you who have read the previous article will immediately react by using the withContext method to switch threads inside the constructor. The idea is correct, because the default configuration of Flow is that the constructor and the collect method work in the same thread, and since the main thread is not allowed to run now, then we can just switch the thread of the constructor. But that’s not the case. The code you write this way won’t run at all. This is because there is a unique flowOn method to switch the thread of execution of the constructor. It’s as simple as configuring the flowOn method once on the created Flow object.

val flow=["1.jpg","2.jpg"].asFlow()
flow.map { decode(it) }
        .flowOn(Dispatchers.IO)
viewModelScope.launch {
    flow.collect{
        adapter.add(it)
    }

Some intermediate processing logic #

Those of you who are familiar with RxJava may be wondering, these operations can be done by RxJava, and there are even more operators to support intermediate state processing, so can asynchronous data streaming do all this. Undoubtedly, it can. Ordinary datasets have map,filter and other methods that work just as well for asynchronous data streams. And all of these methods take pending functions as arguments, which can perform asynchronous operations. And it also has a more flexible transform method, this method can be customized with its own operators to achieve more flexible data manipulation.

Of course, all of the above operators can only be implemented for a single asynchronous stream, and it is equally well suited to support multiple streams. zip can combine two data sources, and the length of the combined stream is the length of the shortest of the two streams. Unlike zip, combine takes as input the most recent send of the two streams, i.e., if you have a piece of one slow stream, the elements of the slow stream may be fetched more than once, so that the final stream is longer than the shortest one.

val flow = flowOf(1, 2).delayEach(10)
val flow2 = flowOf("a", "b", "c").delayEach(15)
flow.combine(flow2) { i, s -> i.toString() + s }.collect {
     println(it) // Will print "1a 2a 2b 2c"
}

End State Tracking #

As mentioned in the previous section, since the data source and the processing logic are not in the same place, it is difficult to determine the size of the final data stream, and thus not know when the data stream is finished processing. Also, intermediate operations may change the size of the data stream, which makes it even more difficult to determine when the data processing is finished. But there are times when we need to do something after the data has been processed, so what should we do? This is where the onCompletion method comes in. This method takes a null Throwable parameter, which obviously indicates both results, success or failure, and passes in the exception if it fails.

Multiple Coroutines Working Together #

In many cases, it’s impossible to avoid having multiple coroutines working together. For coroutines that return a single value, as we mentioned in the previous article, you can pass Deferred, the return object of the async constructor, but the limitation is that you can only pass one value to this object. Kotlin provides a Channel solution to the multi-value passing case. A Channel is similar to a blocking queue, where data is sent out via the send method and received in another place using the receive method. With this approach, we can provide great efficiency in working with coroutines. It is easy to implement the producer and consumer model using it.

 val chanel=Channel<Int>()
 viewModelScope.launch(Dispatchers.IO) {
     for (i in 1..5){
         delay(1000)
         chanel.send(i)
     }
 }
 viewModelScope.launch { 
     for (i in chanel){
        println("Handle ${i}")
    }
}

Of course, this is just the simplest usage, and more producers can be added, or eliminated when the data is no longer needed, and there is even a specialized product constructor to directly get coroutines that return multiple values.

Summary #

Kotlin coroutines have a lot of useful APIs which cover most of the asynchronous usage scenarios. So when using coroutines, we first need to clarify the use of scenarios, and then according to the use of scenarios to determine the use of which set of APIs, which can make us avoid falling into API phobia. To this end, I have organized a scenario table based on the content of these two articles, you can refer to the use of the actual development. Kotlin coroutine constructor

APIUsage
launchPerforms time-consuming operations with no return value
asyncNeed to get a single return value for time-consuming operations
produceNeed to get multiple return values for time-consuming operations

Kotlin Coroutine Collaboration Tool

APIUsage
FlowManipulating Asynchronous Data Streams
ChannelCoroutines communicate with each other