This is an article on how to use kotlinx.coroutines.Flow.

Dependencies

In the present article I use Kotlin 1.3.50-eap-54, and the following coroutine dependency:

implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.0-RC'

The project

The project used in the article is Compass, also present on Google Play. Please note that the version on Play hasn’t been updated to reflect the changes being discussed in the present article.

The commit

There is a particular commit that accompanies the article. The reader is encouraged to view it on GitHub, as the article only discusses the new version, as opposed to how I went from one version of the code to the next. The changes can be manually reviewed on GitHub.

Learning curve

I am only beginning to learn about Flow, and do not use it in production, because of lack of time required to introduce it to my other projects.

I haven’t yet tested the changes outdoor, although I do have an automated test testing a Flow fed with mock locations.

Obtaining the location

The following code doesn’t differ much from the one discussed in another article in this blog. The only difference is that where I used to use Channel I use Flow now.

For reference, I kept the old code, using Channels, that is still used to calculate rotation of the device. It will probably be changed in the repository very soon, but if the reader checks out the particular commit, they shall be able to simultaneously see the code using Flow (for calculating location, distance and bearing), and Channel (for calculating device rotation).

This is the code obtaining location:

@ExperimentalCoroutinesApi
open fun getLocationFlow() = callbackFlow {
        val lastLocationJob = GlobalScope.launch {
            lastLocation = getLastLocation()?.also { offer(it) }
        }
        val callback = object : LocationCallback() {
            override fun onLocationResult(result: LocationResult) {
                lastLocationJob.cancel()
                lastLocation = result.lastLocation.toLatLng().also { offer(it) }
            }
        }

        client.requestLocationUpdates(request, callback, Looper.myLooper())
        awaitClose { client.removeLocationUpdates(callback) }
    }

private suspend fun getLastLocation(): LatLng? = lastLocation ?:
        suspendCancellableCoroutine { continuation ->
            client.lastLocation.addOnSuccessListener {
                continuation.resume(it?.toLatLng())
            }
        }

private fun Location.toLatLng() = LatLng(latitude, longitude)

The location needs then to be converted to distance and bearing.

I moved the code to a separate file, but it works the same way as it used to:

suspend fun LatLng.toDirection(destination: LatLng) = withContext(Dispatchers.Default) {
    val distance = distance(this@toDirection, destination)
    val bearing = bearing(this@toDirection, destination)
    DirectionModel(distance.await(), bearing.await())
}

private fun CoroutineScope.distance(location: LatLng, destination: LatLng) = async {
    val dLat = Math.toRadians(destination.latitude - location.latitude)
    val dLon = Math.toRadians(destination.longitude - location.longitude)
    val radLatLoc = Math.toRadians(location.latitude)
    val radLatDest = Math.toRadians(destination.latitude)
    val a = sin(dLat / 2).pow(2) +
            sin(dLon / 2) * sin(dLon / 2) * cos(radLatLoc) * cos(radLatDest)
    val c = 2 * asin(sqrt(a))
    RADIUS_KM * c
}

private fun CoroutineScope.bearing(location: LatLng, destination: LatLng) = async {
    val longDiff = destination.longitude - location.longitude
    val y = sin(longDiff) * cos(destination.latitude)
    val x = cos(location.latitude) *
            sin(destination.latitude) - sin(location.latitude) *
            cos(destination.latitude) * cos(longDiff)
    ((Math.toDegrees(atan2(y, x)) + 360 ) % 360).toFloat()
}

private const val RADIUS_KM = 6371.0

The ViewModel

Below is the ViewModel using the direction, bearing and device location data.

It really shows here what the difference are between using Channel and Flow:

class CompassViewModel(
        rotationChannelFactory: RotationChannelFactory,
        locationFlowFactory: LocationFlowFactory,
        liveDataContext: CoroutineContext = EmptyCoroutineContext) : ViewModel() {

    private val mutableDestination by lazy { MutableLiveData<DestinationModel?>() }

    @ExperimentalCoroutinesApi
    val rotation by channelLiveData(liveDataContext) {
        rotationChannelFactory.getRotationChannel()
    }

    @ExperimentalCoroutinesApi
    val direction by flowLiveData(liveDataContext) {
        locationFlowFactory.getLocationFlow()
            .map { it.toDirection(destinationLocation) }
    }

    val destination get() = mutableDestination as LiveData<DestinationModel?>

    private lateinit var destinationLocation: LatLng

    fun setDestination(destination: DestinationModel?) {
        if (destination != null) {
            destinationLocation = destination.location
        }
        mutableDestination.value = destination
    }
}

The code that creates LiveData from either Channel or Flow:

fun <T> channelLiveData(
        context: CoroutineContext = EmptyCoroutineContext,
        block: () -> ReceiveChannel<T>): Lazy<LiveData<T>> = lazy {
    liveData(context) {
        val channel = block()
        try {
            while (true) {
                emit(channel.receive())
            }
        }
        finally {
            channel.cancel()
        }
    }
}

fun <T> flowLiveData(
        context: CoroutineContext = EmptyCoroutineContext,
        block: () -> Flow<T>): Lazy<LiveData<T>> = lazy {
    liveData(context) {
        val flow = block()
        flow.collect { emit(it) }
    }
}

The above code shows the advantage of using Flow as opposed to Channel. The code creating LiveData from Flow is much more concise.

There is no need to use tryfinally blocks in the Flow version. collect() works in such a way that when the coroutine is canceled, the Flow is closed.

When the Flow containing locations was created, I used the following line to assure location updates would be removed when the Flow is closed:

 awaitClose { client.removeLocationUpdates(callback) }

Testing

Below is the code used for testing the Flow. Its previous version was already discussed in another article in this blog, so now I will focus only on the refactoring I had to perform in order to accommodate for Flow:

val mockLocationFlowFactory: LocationFlowFactory = mock(LocationFlowFactory::class.java)
val mockRotationChannelFactory: RotationChannelFactory = mock(RotationChannelFactory::class.java)

var locationClosed = false
val locationFlow = channelFlow {
    offer(HOME)
    var lat = HOME.latitude
    try {
        while (true) {
            delay(INTERVAL)
            lat += LATITUDE_STEP
            offer(LatLng(lat, HOME.longitude))
        }
    }
    finally {
        locationClosed = true
    }
}.flowOn(Dispatchers.IO)

In the above code I create a ChannelFlowBuilder that emits different locations until its coroutine is canceled. It then sets a flag to true.

I tell it to flow on Dispatchers.IO. Because of this, threads that should be used for computation or for UI will not be occuppied.

The above use of flowOn() seems alright for hardcoded values, such as the ones I use in test, but I am not sure how I should be using flowOn() on real location data. What is the CoroutineDispatcher that the code I contained in the commit works on? When exactly I want to use flowOn()? Right after I create the Flow, or later, when I know whether I prefer to have this data on Dispatchers.IO or Dispatchers.Main? This is the very first day I’ve been using Flow, but I will probably develop my personal programming style around this in the days and weeks that follow.

The following uses Mockito to use the Flow when requested:

val rotationChannel = Channel<Float>(Channel.CONFLATED)

`when`(mockLocationFlowFactory.getLocationFlow()).thenReturn(locationFlow)
`when`(mockRotationChannelFactory.getRotationChannel()).thenReturn(rotationChannel)

Right now in the project I still use Channel for calculating device location, so I use both Channel and Flow in this test.

This is the code that creates the ViewModel and runs the assertion. The fuction progress() used below intruduces a delay asserts that distance increases in a given time. The last few lines assert that the Flow is closed after canceling the coroutine:

val liveDataJob = Job()

val vm = CompassViewModel(
        mockRotationChannelFactory,
        mockLocationFlowFactory,
        coroutineContext + liveDataJob)
vm.setDestination(DestinationModel(HOME, ""))

val destination = vm.direction

val distanceObserver = Observer<DirectionModel> {
    distance = it.distance
}
destination.observeForever(distanceObserver)
delay(STABILIZE_DELAY)
progress()
progress()
assertFalse(locationClosed)
liveDataJob.cancel()
delay(STABILIZE_DELAY)
assertTrue(locationClosed)

Conclusion

In the present article I have demonstrated how code can be refactored to use Flow instead of Channel.

For comparison, in the discussed commit I left some code that still uses Channel, but later I will probably refactor the rest of the code in the project.

The program may be installed directly from Play, but I will probably not update it for a very long time. The discussed refactoring hasn’t introduced any visible change in the behavior of the app, so there is no need to upload a new version to Google Play.

It will be a while before I see a need to introduce similar changes in one of my more major projects, so the article served mostly as a demonstration of my learning curve.

It is important to note that in other articles I discussed changes that were possible only after I watched a piece of educational material on YouTube. Some of the articles have been also inspired by recruitment tasks I received when I applied for a job. (The project discussed in the present article was inspired by one of such tasks, although I’ve performed the present refactoring only as a hobby).

This time, however, I learned the presented material only from analysing in Android Studio the code of the actual source of the package kotlinx.coroutines.flow and reading the comments.

Donations

If the reader has enjoyed the present article, they might want to donate some bitcoin at the address presented below.

BTC: bc1qncxh5xs6erq6w4qz3a7xl7f50agrgn3w58dsfp

Readers may also look at my donations page.