Refactoring Channel to Flow
This is an article on how to use kotlinx.coroutines.Flow
.
Dependencies
In the present article I use Kotlin 1.3.50-eap-54, and the following coroutine dependency:
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.0-RC'
The project
The project used in the article is Compass, also present on Google Play. Please note that the version on Play hasn’t been updated to reflect the changes being discussed in the present article.
The commit
There is a particular commit that accompanies the article. The reader is encouraged to view it on GitHub, as the article only discusses the new version, as opposed to how I went from one version of the code to the next. The changes can be manually reviewed on GitHub.
Learning curve
I am only beginning to learn about Flow
, and do not use it in production, because of lack of time required to introduce it to my other projects.
I haven’t yet tested the changes outdoor, although I do have an automated test testing a Flow
fed with mock locations.
Obtaining the location
The following code doesn’t differ much from the one discussed in another article in this blog. The only difference is that where I used to use Channel
I use Flow
now.
For reference, I kept the old code, using Channel
s, that is still used to calculate rotation of the device. It will probably be changed in the repository very soon, but if the reader checks out the particular commit, they shall be able to simultaneously see the code using Flow
(for calculating location, distance and bearing), and Channel
(for calculating device rotation).
This is the code obtaining location:
@ExperimentalCoroutinesApi
open fun getLocationFlow() = callbackFlow {
val lastLocationJob = GlobalScope.launch {
lastLocation = getLastLocation()?.also { offer(it) }
}
val callback = object : LocationCallback() {
override fun onLocationResult(result: LocationResult) {
lastLocationJob.cancel()
lastLocation = result.lastLocation.toLatLng().also { offer(it) }
}
}
client.requestLocationUpdates(request, callback, Looper.myLooper())
awaitClose { client.removeLocationUpdates(callback) }
}
private suspend fun getLastLocation(): LatLng? = lastLocation ?:
suspendCancellableCoroutine { continuation ->
client.lastLocation.addOnSuccessListener {
continuation.resume(it?.toLatLng())
}
}
private fun Location.toLatLng() = LatLng(latitude, longitude)
The location needs then to be converted to distance and bearing.
I moved the code to a separate file, but it works the same way as it used to:
suspend fun LatLng.toDirection(destination: LatLng) = withContext(Dispatchers.Default) {
val distance = distance(this@toDirection, destination)
val bearing = bearing(this@toDirection, destination)
DirectionModel(distance.await(), bearing.await())
}
private fun CoroutineScope.distance(location: LatLng, destination: LatLng) = async {
val dLat = Math.toRadians(destination.latitude - location.latitude)
val dLon = Math.toRadians(destination.longitude - location.longitude)
val radLatLoc = Math.toRadians(location.latitude)
val radLatDest = Math.toRadians(destination.latitude)
val a = sin(dLat / 2).pow(2) +
sin(dLon / 2) * sin(dLon / 2) * cos(radLatLoc) * cos(radLatDest)
val c = 2 * asin(sqrt(a))
RADIUS_KM * c
}
private fun CoroutineScope.bearing(location: LatLng, destination: LatLng) = async {
val longDiff = destination.longitude - location.longitude
val y = sin(longDiff) * cos(destination.latitude)
val x = cos(location.latitude) *
sin(destination.latitude) - sin(location.latitude) *
cos(destination.latitude) * cos(longDiff)
((Math.toDegrees(atan2(y, x)) + 360 ) % 360).toFloat()
}
private const val RADIUS_KM = 6371.0
The ViewModel
Below is the ViewModel
using the direction, bearing and device location data.
It really shows here what the difference are between using Channel
and Flow
:
class CompassViewModel(
rotationChannelFactory: RotationChannelFactory,
locationFlowFactory: LocationFlowFactory,
liveDataContext: CoroutineContext = EmptyCoroutineContext) : ViewModel() {
private val mutableDestination by lazy { MutableLiveData<DestinationModel?>() }
@ExperimentalCoroutinesApi
val rotation by channelLiveData(liveDataContext) {
rotationChannelFactory.getRotationChannel()
}
@ExperimentalCoroutinesApi
val direction by flowLiveData(liveDataContext) {
locationFlowFactory.getLocationFlow()
.map { it.toDirection(destinationLocation) }
}
val destination get() = mutableDestination as LiveData<DestinationModel?>
private lateinit var destinationLocation: LatLng
fun setDestination(destination: DestinationModel?) {
if (destination != null) {
destinationLocation = destination.location
}
mutableDestination.value = destination
}
}
The code that creates LiveData
from either Channel
or Flow
:
fun <T> channelLiveData(
context: CoroutineContext = EmptyCoroutineContext,
block: () -> ReceiveChannel<T>): Lazy<LiveData<T>> = lazy {
liveData(context) {
val channel = block()
try {
while (true) {
emit(channel.receive())
}
}
finally {
channel.cancel()
}
}
}
fun <T> flowLiveData(
context: CoroutineContext = EmptyCoroutineContext,
block: () -> Flow<T>): Lazy<LiveData<T>> = lazy {
liveData(context) {
val flow = block()
flow.collect { emit(it) }
}
}
The above code shows the advantage of using Flow
as opposed to Channel
. The code creating LiveData
from Flow
is much more concise.
There is no need to use try
… finally
blocks in the Flow
version. collect()
works in such a way that when the coroutine is canceled, the Flow
is closed.
When the Flow
containing locations was created, I used the following line to assure location updates would be removed when the Flow
is closed:
awaitClose { client.removeLocationUpdates(callback) }
Testing
Below is the code used for testing the Flow
. Its previous version was already discussed in another article in this blog, so now I will focus only on the refactoring I had to perform in order to accommodate for Flow
:
val mockLocationFlowFactory: LocationFlowFactory = mock(LocationFlowFactory::class.java)
val mockRotationChannelFactory: RotationChannelFactory = mock(RotationChannelFactory::class.java)
var locationClosed = false
val locationFlow = channelFlow {
offer(HOME)
var lat = HOME.latitude
try {
while (true) {
delay(INTERVAL)
lat += LATITUDE_STEP
offer(LatLng(lat, HOME.longitude))
}
}
finally {
locationClosed = true
}
}.flowOn(Dispatchers.IO)
In the above code I create a ChannelFlowBuilder
that emits different locations until its coroutine is canceled. It then sets a flag to true
.
I tell it to flow on Dispatchers.IO
. Because of this, threads that should be used for computation or for UI will not be occuppied.
The above use of flowOn()
seems alright for hardcoded values, such as the ones I use in test, but I am not sure how I should be using flowOn()
on real location data. What is the CoroutineDispatcher
that the code I contained in the commit works on? When exactly I want to use flowOn()
? Right after I create the Flow
, or later, when I know whether I prefer to have this data on Dispatchers.IO
or Dispatchers.Main
? This is the very first day I’ve been using Flow
, but I will probably develop my personal programming style around this in the days and weeks that follow.
The following uses Mockito to use the Flow
when requested:
val rotationChannel = Channel<Float>(Channel.CONFLATED)
`when`(mockLocationFlowFactory.getLocationFlow()).thenReturn(locationFlow)
`when`(mockRotationChannelFactory.getRotationChannel()).thenReturn(rotationChannel)
Right now in the project I still use Channel
for calculating device location, so I use both Channel
and Flow
in this test.
This is the code that creates the ViewModel
and runs the assertion. The fuction progress()
used below intruduces a delay asserts that distance increases in a given time. The last few lines assert that the Flow
is closed after canceling the coroutine:
val liveDataJob = Job()
val vm = CompassViewModel(
mockRotationChannelFactory,
mockLocationFlowFactory,
coroutineContext + liveDataJob)
vm.setDestination(DestinationModel(HOME, ""))
val destination = vm.direction
val distanceObserver = Observer<DirectionModel> {
distance = it.distance
}
destination.observeForever(distanceObserver)
delay(STABILIZE_DELAY)
progress()
progress()
assertFalse(locationClosed)
liveDataJob.cancel()
delay(STABILIZE_DELAY)
assertTrue(locationClosed)
Conclusion
In the present article I have demonstrated how code can be refactored to use Flow
instead of Channel
.
For comparison, in the discussed commit I left some code that still uses Channel
, but later I will probably refactor the rest of the code in the project.
The program may be installed directly from Play, but I will probably not update it for a very long time. The discussed refactoring hasn’t introduced any visible change in the behavior of the app, so there is no need to upload a new version to Google Play.
It will be a while before I see a need to introduce similar changes in one of my more major projects, so the article served mostly as a demonstration of my learning curve.
It is important to note that in other articles I discussed changes that were possible only after I watched a piece of educational material on YouTube. Some of the articles have been also inspired by recruitment tasks I received when I applied for a job. (The project discussed in the present article was inspired by one of such tasks, although I’ve performed the present refactoring only as a hobby).
This time, however, I learned the presented material only from analysing in Android Studio the code of the actual source of the package kotlinx.coroutines.flow
and reading the comments.
Donations
If the reader has enjoyed the present article, they might want to donate some bitcoin at the address presented below.
BTC: bc1qncxh5xs6erq6w4qz3a7xl7f50agrgn3w58dsfp
Readers may also look at my donations page.