RxJava Tutorial

Filed Under: RxJava
Rxjava

In this tutorial, we’ll be introducing you to Reactive Programming and the library RxJava. We’ll be covering the basics of RxJava. How is it useful? What makes it so important for Android Development these days? It’s Anatomy. Everything will be discussed in this RxJava tutorial.

Reactive Programming

  • Reactive comes from the word react, which means to react to changes in the state instead of actually doing the state change.
  • Reactive Programming is a programming paradigm that’s concerned with data streams and propagation of change.
  • The reactive model listens to changes in the event and runs the relevant code accordingly.

Assume the data streams are in the form of a river that flows continuously. Any observer/subscriber attached listening to the stream would receive the data. The data received can be further transformed using functions and this is where Functional Programming joins the already so powerful Reactive Programming.

Together they are often called as Functional Reactive Programming.

In reactive programming, the flow is asynchronous thereby preventing any blocks on the main thread.

How is it different from the Observer Pattern?
Observer Pattern does changes in the object based on certain triggers. Every Observer that’s dependent on the Subject gets notified when a change happens. What makes reactive programming different from this is the fact that reactive paradigm is based on the flow of data in the form of streams asynchronously.

Why Reactive Programming for Android?

Reactive Programming Paradigm and its library RxJava have gained huge popularity in the Android World.
Up until now, to do asynchronous tasks on Android we use the async tasks. That being said, it does have a lot of disadvantages and limitations.

Following are the things that Reactive Programming tries to make better:

  • Easier to chain multiple requests: There’s a problem in AsyncTask when you need to call multiple requests. It leads to bloated code. This is where Reactive code scores over the traditional async tasks.
  • CallBack processing and tracking errors: With multiple requests in async tasks, background animations, it tends to get difficult to determine and track errors. Each asynchronous method call contains callback functions. Nest a few async methods and it’ll quickly lead to callback hell. Again, Reactive code tends to make this easier.

RxJava

  • Rx stands for Reactive Extensions.
  • RxJava is a JVM implementation of Reactive Extensions.
  • Reactive Extension is a library that’s used for writing asynchronous event-based reactive code by using observables. We’ll see what are observables shortly.
  • RxJava is useful and very powerful in the sense that it takes care of multithreading very well.
  • If you’re a Java developer, you’ll be well aware of the fact that multithreading can get tricky. RxJava takes care of multi-threading by doing complex thread operations, keeping everything synchronized and returning the relevant things to the main thread.
  • In Android, the main thread is the UI thread. RxJava handles multithreading with a level of abstraction. We need to write less code and the underlying methods do the rest for us.

RxJava Basics

The basic building blocks of RxJava are:

  • Observables: That emits data streams
  • Observers and Subscribers: That consume the data stream. The only difference between an Observer and a Subscriber is that a Subscriber class has the methods to unsubscribe/resubscribe independently without the need of the observerable methods.
  • Operators: That transform the data stream

rxjava basics flow

Before we get into the details of each of the above, let’s analyze each of them which an interesting analogy.
Let’s say the Observables is me the tutor, flowing the data.
You are the Observer/Subscriber who receives the data.
Additionally, Operators can transform the data that you receive from me. Example: An operator can change the default language of this tutorial data from English to any other language.

RxJava Tutorial – Project Setup in IntelliJ

Before we get down to the nitty-gritty details of RxJava and it’s implementation let’s create a new project in IntelliJ and add the RxJava dependency library in it.

RxJava Tutorial IntelliJ project

Set the group and artifact id as shown below.
intellij setting group artefacts

We need to add the following dependency to our build.gradle of this project.


compile 'io.reactivex:rxjava:1.1.0'

Now create a new Java class in the package. Your project structure should look like this:
RxJava Tutorial Project Structure

RxJava Observables, Observers, Subscribers

The role of an observable is to emit data. Let’s create a basic observable in our Java class.

The Observable class can emit one or more sequence of streams using the method. It is meant to asynchronously push the items.

This is in contrast to the Iterators interface which tends to pull items synchronously.

Creating An Observable


import rx.Observable;
import rx.Subscriber;

public class MyRxClass {

    public static void main(String[] args) {

        Observable<String> myObservable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                //Code to pass the data to the subscribers and observers goes here.
            }
        });

    }
}

The create() method is used to create a new Observable that can emit items. The call method is where the items are pushed on an instance of the subscriber.

Creating Observers and Subscribers

Observers and Subscribers can be created in the following ways:


       Subscriber<String> mySubscriber = new Subscriber<String>() {
            @Override
            public void onNext(String s) {
                System.out.println(s);
            }

            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
            }
        };

        Observer<String> myObserver = new Observer<String>() {
            @Override
            public void onNext(String s) {
                System.out.println("MyObserver onNext(): "+ s);
            }

            @Override
            public void onCompleted() {
                System.out.println("Observer complete");
            }

            @Override
            public void onError(Throwable e) {
            }
        };

The onNext() method gets the current value.

The onComplete() method gets triggered when there is no more data left to be sent by the observable.

The onError() method gets triggered in case an exception occurs.

Note: Iterator does have the equivalents for onNext() and onComplete() (hasNext()). It doesn’t have one when an exception is thrown. Another advantage of Reactive code.

Linking Observer, Subscribers to the Observable

For the Observer and Subscriber to listen to the data stream emitted by the Observable they need to be subscribed using the subscribe() method as shown below.


myObservable.subscribe(mySubscriber);
myObservable.subscribe(myObserver);

The following code prints a Hello World String emitted by the observable.


import rx.Observable;
import rx.Observer;
import rx.Subscriber;

public class MyRxClass {

    public static void main(String[] args) {

        Observable<String> createObserver = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {

                subscriber.onNext("Hello World");
                subscriber.onCompleted();
            }
        });

        Subscriber<String> mySubscriber = new Subscriber<String>() {
            @Override
            public void onNext(String s) {
                System.out.println("MySubscriber onNext(): "+ s);
            }

            @Override
            public void onCompleted() {
                System.out.println("Subscriber completed");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("OnError");
            }
        };



        Observer<String> myObserver = new Observer<String>() {
            @Override
            public void onNext(String s) {
                System.out.println("MyObserver onNext(): "+ s);
            }

            @Override
            public void onCompleted() {
                System.out.println("Observer completed");
            }

            @Override
            public void onError(Throwable e) {
            }
        };

        createObserver.subscribe(mySubscriber);
        createObserver.subscribe(myObserver);


    }
}

The following gets printed in the log console.


MySubscriber onNext(): Hello World
Subscriber completed
MyObserver onNext(): Hello World
Observer completed

We can add more onNext() statements in the call function. onNext() / onError() statements after the onComplete() statement won’t matter and will be skipped.
Try adding the following in the call() method.


i=0;
while(i<5)
{
subscriber.onNext("Hello World "+ i);
i++;
}

Each of the items are emitted in onNext() one by one.

Observable.from()

The create() method is a bit verbose. Let’s look at what the from() method does.


import rx.Observable;
import rx.Observer;
import rx.Subscriber;

import java.util.ArrayList;
import java.util.List;

public class MyRxClass {

    public static void main(String[] args) {

        List<Integer> numbers = new ArrayList<>();
        numbers.add(1);
        numbers.add(2);
        numbers.add(3);
        numbers.add(4);
        Observable<Integer> fromObservable = Observable.from(numbers);

        Subscriber<Integer> intSubscriber = new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("onNext: "+ integer);
            }
        };

        fromObservable.subscribe(intSubscriber);

    }
}


//Following is printed in the log console.
//onNext: 1
//onNext: 2
//onNext: 3
//onNext: 4
//onCompleted

The from() method dissolves the list/array and emits each value one at a time.

Observable.just()

Observable.just() emits whatever is present inside the just function. It can take between 2 to 9 parameters. If you pass a List/Array in just() it’ll emit the List/Array only.


import rx.Observable;
import rx.Observer;
import rx.Subscriber;

import java.util.ArrayList;
import java.util.List;

public class MyRxClass {

    public static void main(String[] args) {


        Subscriber<Integer> intSubscriber = new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("onNext: "+ integer);
            }
        };


       Observable<Integer> justObservable = Observable.just(4,4,6,null);
       justObservable.subscribe(intSubscriber);


    }
}
//Prints :
//onNext: 4
//onNext: 4
//onNext: 6
//onNext: null
//onCompleted

Observable.range()

Observable.range(start,n) is used to emit n number of values starting from and inclusive of start.


Observable<Integer> rangeObservable = Observable.range(2,5);
rangeObservable.subscribe(intSubscriber); //emits 2,3,4,5,6
  • Observable.empty() creates an empty observable that emits nothing. It just completes.
  • Observable.error() creates an error. The onError() of all the subscribers would be called.
  • Observable.never() does nothing. Neither emits a complete nor an error.

Observable.interval(), Observable.timer(), Observable.defer()

Observable.interval() emits constant sequences of integers in ascending order which are evenly spaced by the interval specified.


Observable intervalObservable = Observable.interval(1, TimeUnit.SECONDS);
intervalObservable.subscribe(System.out::println);
Thread.sleep(5000);

Emits 0 to 4 each second.
We’ve set the thread to sleep to prevent the main function from returning immediately.

Unlike interval, an Observable.timer() emits value only after a certain time/delay.
It emits only a single value after the delay.


Observable intervalObservable = Observable.timer(2, TimeUnit.SECONDS);
intervalObservable.subscribe(System.out::println);
Thread.sleep(5000);

//Prints
//0

The Observable.defer() is similar to create() except that it postpones the actual creation until an Observer subscribes.
Each subscription would recall the Observable creation. This ensures that the Observer would always receive the latest data.
Also, it ensures that no API calling occurs until Subscription. The data would be only fetched when required by the observer.


Observable<Integer> deferObservable = Observable.defer(() -> Observable.just(1, 2, 3));

Operators

An operator is used to transform the data emitted by the observable before it reaches the subscriber.
Both the observable and subscriber are independent of the transformations done by the Operator.


import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;

import java.util.ArrayList;
import java.util.List;

public class MyRxClass {

    public static void main(String[] args) {


        Observable.just(1,2,3)
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer i) {
                        return i*i;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.println(integer);
                    }
                });

    }
}
//Prints
1
4
9

In the above code, we’ve used the map function which calls the Func1 method callback in which we do our transformation.

The Action callback is used to reduce the verbosity of the subscriber.

  • Action1<T> is equivalent to onNext() where T is the type emitted.
  • Action0 is equivalent to onComplete()
  • Action1<Throwable> is equivalent to onError(Throwable)

The above code took care of onNext(). We can take care of the others in the following way:


Observable.just(1,2,3)
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer i) {
                        return i*i;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.println(integer);
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {

                    }
                }, new Action0() {
                    @Override
                    public void call() {
                        System.out.println("Complete");
                    }
                });


//or

       Action0 actionComplete = new Action0() {
            @Override
            public void call() {
                System.out.println("Complete");
            }
        };
        
        Action1<Throwable> actionError = new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {

            }
        };
        
        Action1<Integer> actionNext = new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.println(integer);
            }
        };

        Observable.just(1,2,3)
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer i) {
                        return i*i;
                    }
                })
                .subscribe(actionNext,actionError , actionComplete);

Let’s unleash lambda expressions to reduce the verbosity even further.


Observable.just(1,2,3)
                .map(i -> i*i)
                .subscribe(System.out::println);

Wow. That’s so concise.
The Observable emits values which are squared by the Operators and the transformed values are printed.


 Observable.just(1,2,3)
                .map(i -> i*i)
                .map(i -> i*i)
                .subscribe(System.out::println);

//prints
1
16
81

Let’s look at another operator. Namely filter() which is used to filter the values based on certain conditions.


Observable.just(1,2,3)
                .map(i -> i*i)
                .map(i -> i*i)
                .filter(i -> i>10)
                .subscribe(System.out::println);

//Prints
16
81

This brings an end to the RxJava tutorial. You can download the sample code from the link below.

Comments

  1. Pradip Patil says:

    Hi
    I am a great fan of your tutorial!
    I used to follow it. Can you update it for RxJava 2 Series

    Many Thanks!!!

  2. Shivraj says:

    Nice tutorial. I am a big fan of JournalDev.

Leave a Reply

Your email address will not be published. Required fields are marked *

close
Generic selectors
Exact matches only
Search in title
Search in content
Search in posts
Search in pages