Leveraging the power of RxJava in Android Apps

Travel Triangle is committed to providing a delightful experience to travellers, starting from choosing the right destination, customising their holiday experience and providing on-trip assistance. To this end, we launched our Android app in early 2015, given the increase in number of users accessing our website through mobile devices. Since launch, our Android app has been downloaded by more than half million users, with over 50,000 monthly active users.

Since launch we have revisited the core design of our  app several times, to ensure our users always have a slick and smooth experience, irrespective of the device characteristics. Through this article, we will share how migrating to RxJava enabled our mobile engineering team to ship features quickly, while maintaining a smooth experience across the app.

Why RxJava ?

In any Android app, some common use cases exist, such as:

  1. Switching across worker threads and main thread
  2. Request Chaining
  3. Operation Synchronization after completion of parallel jobs
  4. Cross screen (activity) messaging.

To address the above use cases in the first version of our app, we were using Result receiver, Custom listener, Broadcast receiver, Content observer, Loader. These constructs are provided natively by Android. However, as the number of features and our codebase grew, using above constructs became unmanageable.

After taking a closer look at our architecture and use cases we realised that we needed a framework  that supported both concurrency and event based programming. We explored many libraries such as EventBus, Otto etc, but they were providing either event propagation or only asynchronous behaviors. This is when we came across RxJava and figured it can be leveraged to solve all our use cases efficiently.

RxJava

RxJava is part of ReactiveX, a group of open source libraries that combine the ideas of Observer pattern, Iterator Pattern and functional programming, while exposing an easy to use API. Reactive libraries are available in several languages such as JavaScript, Groovy, Ruby, Java, C# and the list goes on.

RxJava, at it’s core, is built around two main constructs: Observables and Observers(Subscriber). However, there’s also subscription, producer, hot/cold observables, backpressure, scheduler, subject, and more. Observables are expected to emit items, that are consumed by Subscriber objects binded against the Observable. For each subscriber, Observable calls onNext() at any number of times, followed by onCompleted() or onError(). Refer the sample code snippet below for more details.

Observable<String> observable = Observable.create(
        new Observable.OnSubscribe<String>() {

            @Override
            public void call(Subscriber<? super String> sub) {

                sub.onNext("Hello, RxJava!");

                sub.onCompleted();
            }
        }
);

Subscriber<String> subscriber = new Subscriber<String>() {

    @Override
    public void onNext(String s) {
        System.out.println(s);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
    }
};

observable.subscribe(subscriber);

In the above snippet, upon subscription, observable calls subscriber’s onNext() and prints “Hello, RxJava!” and then emits the onComplete() event.

 

Here’s how RxJava can be used to solve the use cases mentioned above.

1) Switching across worker threads and main thread

Generally Handler is used for switching between threads, This simple approach works when frequency of switching the threads is not high. Suppose we have selected some images from a gallery, and we want to upload it to the server, before displaying the images on the UI. But the image servers have certain restrictions on image sizes and formats. Hence we have to compress the images, on the worker thread, and  then update the compressed image on UI, on main thread,  before uploading it to the server, which again happens via the worker thread. Post uploading the image to the server, the images have to be displayed on the UI thread.  As you can imagine, managing all this dependencies across threads, makes the code too complex.

In RxJava, everything is synchronous by default, and observeOn() and subscribeOn() is used for switching threads.  observeOn() is used to tell on which thread Subscriber will run and subscribeOn() tells which thread will run Observable code.

An RxJava approach to performing these operation looks like this:

ArrayList<Uri> selectedImageUris;
Observable.from(selectedImageUris)
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())  //switch to io thread
        .map(new CompressImgFunc1<Uri, Uri>()) // for compressing image
        .observeOn(AndroidSchedulers.mainThread()) //switch to main thread
        .map(new UpdateOnUiFunc1<Uri, Uri>())  //update on UI
        .observeOn(Schedulers.io()) //switch to io thread
        .map(new UploadImageFunc1<Uri, Uri>()) //upload image
        .observeOn(AndroidSchedulers.mainThread())   //switch to main thread
        .subscribe(new MySubscriber<Uri>()); // update on UI

For performing image compression we switched thread by calling observeOn(Schedulers.io()) and for updating the UI, we switched thread to main thread by calling observeOn(AndroidSchedulers.mainThread()) and so on. It raised the level of abstraction around threading and we can switch back and forth between threads very easily. Interesting thing here is that, now we can pass thread as a parameter which makes testing easier in asynchronous environment.

 

2) Request Chaining

Request chaining is a common use case in most applications where an API call is made using the result of a previous call. In our first implementation we were using nested AsyncTask to solve this. An API call was made from doInBackground, the result of which was processed in onPostExecute where a new AsyncTask was used for executing the new API call. The above procedure was repeated for further chaining. This implementation led to memory and context leaks during config change and made the code unreadable and hard to debug.

Let’s assume a user wants to fetch weather information from an external service. Before invoking the weather service, the user location has to be determined, which requires request chaining. RxJava provides Transforming Operators such as flatmap, map to transform items emitted by Observable, and provide an easy API for request chaining.

apiService.getUserLocation()
        .flatMap(new Func1<Location, Observable<WeatherInfo>>() {

            @Override
            public Observable<WeatherInfo> call(Location loc) {

                return apiService.getWeatherInfo(loc);
            }
        }).subscribe(info -> showWeatherInfo(info));

 

Here apiService.getUserLocation() emits location of user, and then it is transformed to new Observable of type WeatherInfo by flatMap operator. Inside flatmap we called getWeatherInfo() with user location.

 

3) Synchronization of operations after completion of parallel jobs

In our app we show the user picture on the profile screen based on user’s preferences. This requires two operations: fetching raw photo of the user and fetching the user’s preference metadata, and then display the profile pic on the screen. In our earlier implementation we executed the two tasks serially. However, RxJava has a nifty Zip operator which can handle this case and will execute both task in parallel and will emit combined result.

 

Observable.zip(service.getUserPhoto(userId), service.getPhotoMetadata(userId),

        new Func2<String, String, PhotoWithData>() {

            @Override
            public PhotoWithData call(String UserPhotoResponse, String PhotoMetadataResponse) {

                    // Some operation

                return photoWithData;
            }
        })
        .subscribe(photoWithData -> showPhoto(photoWithData));

Here, two Retrofit services getUserPhoto(userId) and getPhotoMetadata(userId) are executing in parallel, return photoWithData after applying some operation on the result of both services.

 

4) Cross screen messaging

For cross screen messaging, LocalBroadcastManager, which is based on Android’s native Intent Filter system, is commonly used. For using LocalBroadcastManager, one has to go through the hassle of setting up the Intent, preparing up Intent’s extras, implementing broadcast receivers, and extracting Intent extras again which increases the boilerplate code in application and also impact to performance as it uses serialization. RxJava provides a much convenient API for solving the same problem.

RxJava’s Subject can be used to process event streams. A Subject acts both as an observer and as an Observable. As an observer, it can subscribe to one or more Observables, and because it is an Observable itself, it can pass through the items it observes by re-emitting them. At the same time, it can also emit new items.

 

private PublishSubject<Object> subject = PublishSubject.create();

subject.onNext(event1); // send event1

subject.onNext(event2); //  send event2

subject.subscribe(new Action1<Object>() {

    @Override
    public void call(Object o) { // upon firing event1 call() will be invoked

        if (o instanceof Event1) {
            // do something
        }
    }
});

Now we can use standard java classes as events, and send complex data objects via Subject without using serialization. We have used this pattern in multiple places for instant data updation between activities.   

Besides above use cases of RxJava, we also use it for querying the data from database, binding data/API to UI Widgets and for keeping user’s preferences in SharedPreferences etc.

 

Conclusion

RxJava has made coding more expressive, and life a lot easier for our mobile engineering team. It provides a simplified and unified event model, increases the reusability of asynchronous tasks, and allows simplified testing as concurrency is parameterized. It has become the defacto solution for any complex multi-threaded tasks within our Android app and we hope its adoption will increase at a rapid pace in the mobile app ecosystem in future.

 

If you are interested in solving similar problems, do reach out to careers@traveltriangle.com.

 

Further Reading

http://reactivex.io/

https://github.com/ReactiveX/RxJava

http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/

 

Comments