Migrating From RxKotlin to Kotlin Coroutines

Peter Graham
FloSports Engineering
6 min readDec 15, 2019

--

Photo by Rolands Varsbergs on Unsplash

The Rx library is a powerful set of tools for asynchronous reactive programming. It comes with many benefits and, when done right, can produce predictable and stable outcomes in asynchronous flows. With that said, it can also be a pain to work with, difficult to read and write and produce head scratching error messages.

Enter Kotlin Coroutines. They offer a different way of thinking about asynchronous programming. They have the end result of not dealing with your data in the form of Observables and streams but as plain data.

At FloSports, when building version 1.0 of the Android app, we relied heavily on the battle-tested Rx toolchain. Our team already knew a good amount of Rx from other projects and it was easy to get started. We were still interested in Coroutines and, in simpler situations, we wrote asynchronous logic using Coroutines so we could try out both and see which one worked better for us.

After 1.0 we evaluated the two tools and found that Coroutines fit our needs better and produced more readable code. We decided to replace every instance of Rx logic with Coroutines. This allowed us to rip out the Rx toolkit entirely and maintain more consistent asynchronous patterns. In replacing Rx, we got the diff for free so I present to you, a side-by-side comparison of Rx vs Kotlin Coroutines.

Let’s start with Room, which is an Android library facilitating access to the onboard SQLite database. Generally, client databases are light and queries should execute quickly. However, it’s possible that queries could take a while. You’d never want to tie up the main thread running a query, so move it to the IO thread and introduce asynchronous logic. Room supports both Rx and Coroutines out of the box so the transition was pretty smooth for most things.

The simplest transition was for one-off queries. At the DAO layer, you define your query function and what type the query should return. For one-off queries in Rx, we used Single types.

@Query("SELECT * FROM VideoOnDemandPlaybackProgress WHERE videoOnDemandId = :videoOnDemandId")
fun getByVodId(videoOnDemandId: Long): Single<VideoOnDemandPlaybackProgress>

Using Coroutines, we expect straight data to come out of the function but to acknowledge its asynchronous nature, we tag the function as suspend. This enforces that the function must be called in a Coroutine scope. Notice that in the Coroutine example, we defined the output as nullable but we didn’t in the Rx example. Rx Single requires the result to be of the type defined, not null. This can be tricky when dealing with something that is nullable in the database. In this example, the user may not have watched this video and therefore not have a row for this query. More on this later. Here’s the Coroutine implementation.

@Query("SELECT * FROM VideoOnDemandPlaybackProgress WHERE videoOnDemandId = :videoOnDemandId")
suspend fun getByVodId(videoOnDemandId: Long): VideoOnDemandPlaybackProgress?

Now that we’ve defined the query, let’s see about its usage. In Rx, we’d subscribe

First the compositeDisposable is something we made on every Activity/Fragment to gather all of our subscriptions so we can clean them up when the Activity/Fragment gets disconnected to prevent memory leaks.

  • We need to observeOn the mainThread so we can do UI actions.
  • We need to subscribeOn the ioThread to take the listener off the main thread.
  • We can now set the title based on the vodProgress output. The onSuccess case is only for instances where the query returned a result. Rx considers a null result to be an error. Because of this, we handle the null case in onError.

Now for Coroutines…

You might notice some things missing. We don’t need the compositeDisposable because Coroutines attached to the scope of the Activity/Fragment are automatically cleaned up. We don’t need to subscribeOn the io thread because Room automatically puts the database operations on io for us. We don’t need to observeOn the main thread because the scope we’re on is the main already (although we could change this if we wanted). In the event that getByVodId is null, we use the null-safety operator ? to just return null in that spot.

The Coroutine implementation is definitely cleaner here because we don’t have to handle the valid nullable input in the case of Rx’s onError. However, Rx’s code is still pretty readable and it’s not that bad of a tradeoff to maintain its power. Let’s get into something a little bit more complicated…

Here’s what we were trying to accomplish:

  • When we ask for sites from the site repository, it should check the in-memory cache for sites
  • If there are sites in-memory, return them and stop
  • Else, ask the database for sites and return them
  • If there are no sites in the database, ask the API for sites
  • Set the API result to the in-memory cache and add the sites to the onboard database

Not too bad right? Here’s our Rx implementation:

Let’s walk through this. getSites needs to return data once so it will return the Single type of a list of sites. Anything we’d get from the database or the api would come in a Single type, so even though cachedSites has the data we need right now, make it a Single with Single.just. If cachedSites doesn’t have what we need, ask the database with siteDao.getAll() and if it gets sites back set the cache. Now we’ll use flatMap to merge this Single in with the api call.

Here’s the Coroutine implementation:

Note the suspend tag on the function definition which allows us to do asynchronous operations synchronously. Also note that this function does not return an Observable or Single. It simply returns the List<Site> object we need.

Let’s walk through the First, we check the cachedSites object to see if there’s sites available for us. If so, we return them right there. If there’s nothing in the cache, check the database and set cachedSites to it’s output. No database sites? Get the sites from the api and return them. Be sure to put the api call on the io thread. If we can’t get sites from anywhere, return an empty list.

The Coroutine implementation is far more readable because it allows the data to remain flat. We don’t need to wrap anything in a Single. Just use and return the data.

Up until now, we’ve been doing asynchronous operations that only return data once. This is what Coroutines were made for. However, what about data that needs to be observed and updated?

In the following example we need to get sites and determine if they should show up in the “My Sports” section. Simple enough using Rx:

compositeDisposable.add(
siteRepository.getAllSitesWithFavoriteInfo()
.subscribeOn(ioThread())
.observeOn(mainThread())
.subscribe({ siteList ->
determineMySportsVisibility(siteList)
}, { throwable -> Crashlytics.logException(throwable) })
)

siteRepository.getAllSitesWithFavoriteInfo returns an Observable with the site information. Room will automatically push updates for us which we subscribe to.

If we were to use Coroutines again, they would only return the result once and stop listening. For subscribing we must use Flow, which is reactive streams compliant and will operate in much the same way as Observable.

try {
siteRepository.getSites().mainCollect(this) { siteList ->
determineMySportsVisibility(siteList)
}
} catch (e: Exception) {
Crashlytics.logException(e)
}

Done! The data will continue to be observed on until the outer Fragment gets closed (hence the this that got passed into the mainCollect) and any exception will bubble up to the outer scope for which we can use a try/catch.

This usage of observables was pretty simple, and I’m sure other apps have much more complicated use cases for which they need more tools to manage many complicated reactivate streams. For our app, the asynchronous operations we needed were fairly simple, and there were only a couple of times we needed to continue to stream data instead of just listening once.

We decided to fully embrace Coroutines and rip out all of RxKotlin. Surprisingly it only saved us 100kb of final bundle size. We may end up adding back Rx in some spots if the asynchronous nature of our app gets more complex. However, for the current state of our app, replacing Rx with Coroutines and Flow made our logic much simpler and easier to maintain.

--

--