This article shows how you can periodically (every so many hours) refresh your data by downloading it from Firebase Cloud Firestore.

Alternatively, you could listen to the updates in real time, but this technique is not going to be discussed in this article.

Previous article

Repository pattern was already discussed in another article. Then I was discussing how to observe an instance of LiveData to display the fresh data in the GUI. I have since refactored the project to also watch changes in the repository over the BroadcastChannel. This refactoring will be discussed further in the present article.

The project

The project I am using in this article as an example is Victor-Events, but because it is still work in progress, and it is already rather large now, you will probably want to just focus on the code presented in the article instead of cloning the whole project.

The problem

Write a coroutine that updates the data stored in the memory cache (repository).

Before the coroutine is run, the list of the data that may be updated is already present in the memory.

The coroutine is given a transform to perform concurrently on every item. The result of the transform may be a dummy (empty) item, which is then removed from the repository.

When the coroutine is finished updating the repository, it returs a list containing the results of running the transform on every item on the original list. The returned list does contain dummy items, if such have been generated by the transform, althought the dummy items are no longer present in the repository.

The coroutine does not add new items to the repository. It therefore ignores any items that may be otherwise present in Cloud Firestore, if they have not already been stored in the repository in the memory of the device.

Observable repository

This is the updated LiveRepository:

abstract class LiveRepository {

    private val liveData = MutableLiveData<Unit>()

    private val channel = BroadcastChannel<Unit>(Channel.CONFLATED)

    protected fun notifyDataSetChanged() {
        if (Looper.getMainLooper().thread === Thread.currentThread()) liveData.value = Unit
        else liveData.postValue(Unit)
        channel.sendBlocking(Unit)
    }

    operator fun plus(owner: LifecycleOwner): HotData<Unit> = DefaultHotData(liveData, owner)

    suspend operator fun invoke(block: () -> Unit) {
        with (channel.openSubscription()) {
            if (!isEmpty) receive()
            while(true) {
                receive()
                block()
            }
        }
    }
}

Notice the operator invoke(). It first checks whether the channel is empty, and if it is not - clears it.

This is an approach different from the one presented in the previous article. Then I wanted the GUI to be notified and updated even if data was already in the repository before the Fragment or other view was created.

Because this particular operator is not meant to deal with GUI directly, I chose not to invoke the registered action to notify the observer that data is already present in the repository. It will only be notified when there are changes.

Because BroaccastChannel is experimental, I added the following at the beginning of the file:

@file:Suppress("EXPERIMENTAL_API_USAGE")

package pl.org.seva.events.main.model

The ViewModel

This is the ViewModel watching the repository discussed in the above section:

class CommViewModel : ViewModel() {

    init { viewModelScope.launch { comms { comm = comms[comm.name] } } }

    var comm = Comm.DUMMY
    set(value) {
        field = value
        reset()
    }
    val name by lazy { MutableLiveData<String>() }
    val desc by lazy { MutableLiveData<String>() }

    fun reset() {
        name.value = comm.name
        desc.value = comm.desc
    }
}

The ViewModel initiates watching changes in the repository using the viewModelScope.

By my convention I use the invoke() operator to start watching things. In this case whenever comms repository is updated, I retrieve from it by name the item I am interested in (the name of the community never changes). I keep the reference to it in my comm property. It has a setter that sets the new community description to the liveData. Because viewModelScope works on Dispatchers.Main, I can just write desc.value, knowing that it will be run on main thread.

In case the community is no longer present in the repository, the repository will just return a dummy item, which has both its name and description set to an empty String. This is the operator that does it:

operator fun get(name: String) = commCache.firstOrNull { it.name == name } ?: Comm.DUMMY

The local database

I use Room version 2.1.0-alpha07, which allows me to treat all database operations as suspend:

suspend infix fun CommDao.delete(comm: Comm) = delete(Comm.Entity(comm))

suspend infix fun CommDao.add(comm: Comm) = insert(Comm.Entity(comm))

suspend infix fun CommDao.update(comm: Comm) = update(Comm.Entity(comm))

suspend fun CommDao.getAllValues() = getAll().map { it.value() }

@Dao
interface CommDao {

    @Query("select * from ${EventsDb.COMMUNITIES_TABLE_NAME}")
    suspend fun getAll(): List<Comm.Entity>

    @Insert
    suspend fun insert(comm: Comm.Entity)

    @Delete
    suspend fun delete(comm: Comm.Entity)

    @Update
    suspend fun update(comm: Comm.Entity)

    @Query("delete from ${EventsDb.COMMUNITIES_TABLE_NAME}")
    suspend fun clear()
}

By my convention I do not directly store in the database the objects that I otherwise use in my projects, but create a dedicated @Entity class for the purpose. It allows me to have a greater control over the way I want to persist things (for example in String form) versus the way I otherwise use them.

Room already allows you to convert your objects on the fly to a form that is easy to store in the database, so you might choose not to follow my pattern. This is, anyway, an example of one of such conversions:

data class Comm(
        val name: String,
        val desc: String = "",
        val color: Int = Color.GRAY,
        val isAdmin: Boolean = false) {

    ...

    @androidx.room.Entity(tableName = EventsDb.COMMUNITIES_TABLE_NAME)
    class Entity() {
        @PrimaryKey
        lateinit var name: String
        lateinit var description: String
        var color: Int = Color.GRAY
        var isAdmin: Boolean = false

        constructor(comm: Comm) : this() {
            name = comm.name
            description = comm.desc
            color = comm.color
            isAdmin = comm.isAdmin
        }

        fun value() = Comm(name = name, desc = description, color = color, isAdmin = isAdmin)
    }
}

Cloud Firestore

This is an example of a suspend function that reads data from Cloud Firestore, wrapping a standard function that returns Task<QuerySnapshot>, and by itself does not suspend:

private suspend fun CollectionReference.read(): List<DocumentSnapshot> = suspendCancellableCoroutine { continuation ->
    get().addOnCompleteListener { result ->
        if (result.isSuccessful) {
            continuation.resume(result.result!!.documents)
        } else {
            continuation.resumeWithException(result.exception!!)
        }
    }
}

A similar suspend function reading from Firestore was previously discussed in more details in a separate article in this blog.

Refreshing from the GUI

The following is a function that is meant to be called from GUI.

When you log in or out, this function checks whether you are an admin of every community:

suspend fun refreshAdminStatuses() = coroutineScope {
    refresh { it.copy(isAdmin = fsReader.isAdmin(it.lcName)) }
    Unit
}

The last line Unit hides the results of operation, so that refreshAdminStanuses() returns no value. This is the way it is called from the Fragment:

override fun onActivityResult(requestCode: Int, resultCode: Int, data: Intent?) {
    super.onActivityResult(requestCode, resultCode, data)
    if (requestCode == LOGIN_REQUEST && resultCode == Activity.RESULT_OK) {
	...
        io {
            comms.refreshAdminStatuses()
            back()
        }
    }
}

The function io() just launches the block using Dispatchers.IO:

fun io(block: suspend CoroutineScope.() -> Unit) = GlobalScope.launch(Dispatchers.IO) { block() }

Because the above coroutine scope uses Dispatchers.IO, the back() function, which is called after refreshing admin statuses is complete, does not run on the main thread. It needs therefore to use post() internally, so that the desired popBackStack() is called on the main thread anyway:

fun Fragment.back() = view!!.post { findNavController().popBackStack() }

Refreshing from the WorkManager

This is the function that is going to be called from the WorkManager, for example, every two hours:

suspend fun refresh() = coroutineScope {
    refresh { fsReader.findCommunity(it.name) }
}

The function tries to find every community in the way already described in a separate article in this blog. Each community that hasn’t been found is removed from the repository, but otherwise the new version of the community is stored. The coroutine then returns a list of the newly found communities, or dummy items for communities that are no longer preset.

This is how the coroutine works internally:

private suspend fun refresh(transform: suspend (Comm) -> Comm): List<Comm> = coroutineScope {
    val commCopy = commCache.toList()
    val transformed = mutableListOf<Comm>()

    commCopy.concurrent { transformed.add(transform(it)) }
    commCache.clear()
    commCache.addAll(transformed.filter { !it.isDummy })
    commDao.clear()
    commCache.concurrent { commDao add it }
    notifyDataSetChanged()
    transformed
}

The above code creates an immutable copy of the communities presently held by the repository. It then concurrently performs the transform operation on each of these items. It then clears the original cache, adds to it all transformed communities (apart from the ones that have been deleted), persists all of the comminities in the database, and calls notifyDataSetChanged{) so you can update the GUI if you want.

It then returns the results of the transform operation, even the dummy items that are by this time no longer present in the persistent database or the memory cache.

This is the function that concurrently runs a block of code on every item on the list and then waits for all of the blocks to complete:

suspend fun <T> Iterable<T>.concurrent(block: suspend (T) -> Unit) = coroutineScope {
    map { launch { block(it) } }.joinAll()
}

The above code maps all of the items on the list to a Job, and then suspends until all of them are completed.

This is the ListenableWorker that periodically refreshes the data:

class CommSyncWorker(private val context: Context, params: WorkerParameters) : CoroutineWorker(context, params) {

    override val coroutineContext = Dispatchers.IO

    override suspend fun doWork() = coroutineScope {
        comms.refresh()
                .filter { it.isDummy }
                .map { Message(
                        LocalDateTime.now(),
                        context.getString(R.string.system_message_comm_deleted)
                                .replace(NAME_PLACEHOLDER, it.originalName)) }
                .apply {
                    messages add this
                    messageDao add this
                }
        Result.success()
    }

    companion object {
        val TAG: String = this::class.java.name
        val FREQUENCY: Duration = Duration.ofHours(2)
        const val NAME_PLACEHOLDER = "[name]"
    }
}

The above ListenableWorker uses Dispatchers.IO to runs the work. It refreshes the list of communities in the way already described above. From the generated list it takes the dummy items (deleted communities), and for each one of them creates a system message informing the user that the community they had been watching was deleted. The system messsages are stored in a separate repository, beyond the scope of the present article.

Scheduling the work

This is the code that schedules the work:

class Bootstrap {

    fun boot() {
        login.setCurrentUser(FirebaseAuth.getInstance().currentUser)
        io {
            listOf(
                launch { comms cache db.commDao.getAllValues() },
                launch { messages add db.messageDao.getAllValues() })
                    .joinAll()
            WorkManager.getInstance().enqueueUniquePeriodicWork(
                    CommSyncWorker.TAG,
                    ExistingPeriodicWorkPolicy.REPLACE,
                    PeriodicWorkRequestBuilder<CommSyncWorker>(CommSyncWorker.FREQUENCY)
                            .setConstraints(Constraints.Builder().setRequiresBatteryNotLow(true).build())
                            .build())
        }
    }
}

By my convention I use the class Bootstrap to run some code each time when the application starts. The convention I use has been described in a dedicated article in this blog.

The above code first launches two concurrent jobs that initiate two repositories with the data read from the local database, and waits for both jobs to complete.

Then the code schedules work that updates both repositories by either transforming commuties held in one of them, or generating new system messages when some of these communities have been deleted.

Its important to call joinAll() before scheduling the work, because the work runs immediately. Part of the work is clearing the repository of communities and replenishing it with new versions of the communities that were held by it previously. If this work is performed too early, it will permanently delete all of the communities.

It is important to use ExistingPeriodicWorkPolicy.REPLACE. I want the data to be refreshed immediately when the application runs. Because previously scheduled work survives application restart, using another ExistingPeriodicWorkPolicy could lead to an uncontrolled accumulation of scheduled work, even to refreshing the data every couple of minutes.

To tag the scheduled work I just used the string this::class.java.name, because I want the tag to be unique per each potential ListenableWorker I might use in the project. I could as well use a frivulous tag like "foobar123", but I would then have to manually keep track of all of them. Please note that if I decide to rename the class after the project is already in production, which will result in changing the tag, I will have to provide a migration that manually cancels the work scheduled with the previous tag.

Conclusion

I recommend the reader experiments with the CoroutineWorker, or several extension functions taking in a suspend block of code, or a block of code that requires a CoroutineScope.

Before I wrote the present article I did have some idea that WorkManager might be used to periodically refresh data, but I didn’t fully understand how. I should currently be able to discuss the repository pattern - or refreshing data held in the data structure - in much finer details.

I recomment the reader writes articles about several of their favorite design patterns, or the refactorings they have carried out in their projects. The reader might thus improve their communication skills and be better prepared to talk to other people about the way they see good coding practices, or what they generally want to achieve when they write programs.