RxJava Observables and Observers

Filed Under: RxJava

In this tutorials, we’ll be discussing RxJava Observables and Observers in length. We’ll discuss their various types and what each of them has to offer.

Observables and Observers

In RxJava, Observables are the source which emits items to the Observers. For Observers to listen to the Observables, they need to subscribe first. The instance created after subscribing in RxJava2 is called Disposable.

In order to stop listening to Observables, we can call unsubscribe by calling the method dispose() on the Disposable instance.

Subscription class has been deprecated in RxJava2 since Disposable has been introduced. For more info on what’s new in RxJava2 refer here.

Creating Observables

We can create Observables in many ways. One of the ways are:


Observable<Integer> observable = new ObservableCreate<Integer>(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(10);
                emitter.onNext(20);
                emitter.onComplete();
            }
        });

Observable.OnSubscribe is an interface which defines the action to be taken when a subscriber subscribes to the Observable. The subscribe method would only run when an Observer is subscribed to the Observable.

onNext is used to emit the next item.
onError is triggered when an error occurs.
onComplete is called after the last item is emitted.

Now in order to catch these values, we must subscriber an observer. For that we have to create an observer first:


Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Integer o) {
                System.out.println("onNext " + o);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };

Let’s subscribe to it:


observable.subscribe(observer);

This creates a Subscription between the observer and observable. The Observable would now emit values which would be caught by the onNext of the Observer.

The output from the console is:


Output:
onSubscribe
onNext 10
onNext 20
onComplete

If you subscribe() multiple times, each time the items would be emitted.

Methods to Create Observables

We can create Observables in the following ways:

  • Observable.from()
  • Observable.just() – Pass one or more values inside this.
  • Observable.range – The first argument expects the starting value. The second expects the size. Eg: Observable.range(1,2) would emit 1 and 2.
  • Observable.interval() – Emits the values in the interval defined. The values emitted would be of the type Long. More on this later.

For more info check out the RxJava Tutorial.

Cold Observables and Hot Observables

Cold Observables are Observables that emit one or values such that each Subscriber would receive all the values from the beginning.

Hot Observables are Observables in which the Observer won’t be able to receive items emitted before it subscribed. Only items emitted after the Observer is emitted could be received.

The example we’d defined above was a Cold Observable.

To create a Hot Observable we do:


Observable<Long> observableInterval = Observable.interval(2, TimeUnit.SECONDS);

        PublishSubject<Long> publishSubject = PublishSubject.create();
        observableInterval.subscribe(publishSubject);

        publishSubject.subscribe(l -> System.out.println("Subscriber #1 onNext: " + l));

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        publishSubject.subscribe(l -> System.out.println("Subscriber #2 onNext: " + l));

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

To create a Hot Observable we need to use Subject. A Subject can act as an Observable or Observer at any given time.

Values from 0 would be emitted every 2 seconds. We’ve set the thread to sleep for 2 seconds after the first observer is subscribed. Hence the second observer won’t get the initial emitted items as shown in the output below:

rxjava hot observable

Types of Observables

Following are the major types of Observables with each have a slightly different functionality and use case:

  • Observable – Emits one or more values. We have already discussed this above.
  • Single – Emits a single value or throws an error.
  • Maybe – This may or may not emit a value. Should be used when you need some data optionally.
  • Flowable – Used when a huge amount of data has to be emitted. It is used for backpressure. More on this later.
  • Completable – This just emits success or failure. No data is emitted.

Types of Observers

For every Observable type above we have an Observer type as well in RxJava.

  • Observer.
  • SingleObservable
  • MaybeObservable
  • CompletableObserver
Flowable Observable uses the default Observer. Since RxJava2 for Flowables, Subscribers are used instead of Observers

Let’s now look at the basic implementation of each of the Observables with the Observers.

Single

This emits just one value. This can be used with Retrofit network calls.

Following is an example of Single:


Observable<Integer> integerObservable = Observable.just(1,2,3);

        Single<Integer> integerSingle = integerObservable.single(1);

        integerSingle.subscribe(l -> System.out.println("Subscriber #1 onNext: " + l), (Throwable e) -> System.out.println("onError"));


        integerSingle = integerObservable.singleOrError();

        integerSingle.subscribe(l -> System.out.println("Subscriber #1 onNext: " + l), (Throwable e) -> System.out.println("onError"));

This would give an onError in both the cases since neither of has a single value.

single(Integer defaultValue) and singleOrError() are just two of the methods.
We can add plenty of other operators as well just as all, any,contains, count etc. Generally, a predicate is set in these methods which would return a single value.

Example:


Observable<Integer> integerObservable = Observable.just(1, 2, 3);

        Single<Boolean> booleanSingle = integerObservable.any(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer % 2 == 0;
            }
        });

        booleanSingle.subscribe(l -> System.out.println("Subscriber #1 onNext: " + l), (Throwable e) -> System.out.println("onError"));


        Single<Long> integerSingle = integerObservable.count();


        integerSingle.subscribe(l -> System.out.println("Subscriber #1 onNext: " + l), (Throwable e) -> System.out.println("onError"));

This prints true and 3 respectively.

Maybe

Maybe emits 0 or 1 items. The MaybeObserver has the method onSuccess in place of onNext().

Following is an example using Maybe in which we print the maximum number from an Observable of Integers.


Observable<Integer> integerObservable = Observable.just(1, 2, 3, 4, 5);

        Maybe<Integer> integerMaybe = integerObservable.reduce(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                if (integer > integer2)
                    return integer;
                else
                    return integer2;
            }
        });

        MaybeObserver<Integer> maybeObserver = new MaybeObserver<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onSuccess(Integer o) {
                System.out.println("onSuccess : " + o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };

        integerMaybe.subscribe(maybeObserver);

This prints 5.

Besides the reduce function, there are plenty of other functions such as firstElement(), lastElement() etc.

Maybe is similar to Single except that Maybe allows for zero emissions too.
To create a zero emission observable, do:

 Maybe<Integer> emptySource = Maybe.empty();

Completable

Completable is used in cases where you need to know whether an operation is completable successfully or not. Example: Uploading an image to the server. Unlike Maybe and Single, the CompletableObserver doesn’t return any value at all. Neither does the Completable Observable has a type.

Example:


Observable<Integer> integerObservable = Observable.empty();

        Completable completable = integerObservable.ignoreElements();


        CompletableObserver completableObserver = new CompletableObserver() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }
        };

        completable.subscribe(completableObserver);

Flowable

Flowable is used when you need to handle lots of data. It supports backpressure. We’ll discuss it at length in another tutorial. For now, a Flowable Observable needs a Subscriber class as the Observer since RxJava2.

Following is a sample of Flowable:


 Flowable<Integer> integerFlowable = Flowable.range(1,500000);

        integerFlowable.reduce(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });

        Subscriber<Integer> integerSubscriber = new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                System.out.println("onSubscribe");
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("onNext: " + integer);
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };


        integerFlowable.subscribe(integerSubscriber);

For a Subscriber to start receiving emissions we must manually invoke request() on the Subscription instance as done above.

Converting Between Observables

We have various helper methods to convert an Observable type into another.

For example:

To convert any type to a Completable, either of the methods are available:

  • toCompletable()
  • ignoreElements()

Similarly, to convert to Observable, toObservable() method is suffice.
FlowabletoFlowable()
MaybetoMaybe()
Singlereduce()/firstElement() etc.

This brings an end to this tutorial on RxJava Observables.

Leave a Reply

Your email address will not be published. Required fields are marked *

close
Generic selectors
Exact matches only
Search in title
Search in content
Search in posts
Search in pages