RxJava Subject

Filed Under: RxJava

In this tutorial, we’ll be discussing Subjects in RxJava. We won’t be covering the basics of RxJava. If you are new to RxJava, do read this tutorial before proceeding ahead.

What is a Subject?

We know Observables and Observers. Observables emit data. Observers listen to those emissions by subscribing with the help of Subscribers.

Subjects can emit and subscribe to the data. They have the implementations of Observables as well as Observers.

A Subject has the same operators that an Observable has.
They can multicast too. This means all the Observers subscribed to it will receive the same emissions from the point of subscription.
This prevents doing duplicate operations for multiple subscribers.

Where is a Subject used?

Subjects are commonly used when you need to observe some data and then later can emit that data to other observers.

Let’s look at an example with and without Subject. We’ll see how Subject saves us from redundant operations.


Observable<Integer> observable =
                Observable.just(1, 2).subscribeOn(Schedulers.computation())
                        .map(val -> {
                            System.out.println("map operation. Squaring : " + val * val);
                            return val;
                        });

        observable.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("first subscriber : " + integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

        observable.subscribe(l ->
                System.out.println("second subscriber :" + l));

We’ve subscribed two Observers. Let’s see what the output prints.


rxjava observable observer output

The map operation is computed each time for each observer.

By using Subject we can remove this redundancy as shown below:


Observable<Integer> observable =
                Observable.just(1, 2).subscribeOn(Schedulers.computation())
                        .map(val -> {
                            System.out.println("map operation. Squaring : " + val * val);
                            return val;
                        });


        PublishSubject<Integer> pSubject = PublishSubject.create();
        observable.subscribe(pSubject);


        pSubject.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("First Subscriber");
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("onNext");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });


        pSubject.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("Second Subscriber");
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("onNext");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

PublishSubject is a subclass of Subject.

The output is:

rxjava subject output

So the map operation isn’t repeated for the second subscriber.
Thus the Subject multicasts to all of its subscribers. That means that the data would be emitted and computed just once.

Cold Observables vs Hot Observables

Let’s explain this with a metaphor.

Case 1
You are watching a movie on Youtube. You call your friend and tell him to subscribe to that channel and watch it too. He opens his Youtube and starts the movie.

Case 2
You are watching a live cricket/football match. Your friend joins you after some time.

In Case 1, you and your friend would be able to watch the full movie from the beginning.
In Case 2, the friend won’t be able to watch the match from the beginning.

  • Case 1 = Cold Observables. All observers would get the emissions from the beginning.
  • Case 2 = Hot Observables. Every Observer would receive emissions only from the point when they subscribed.

Subjects are Hot Observables

Now that we’re clear about Hot Observables, let’s validate whether Subjects are Hot Observables or not with an example:


PublishSubject<Integer> pSubject = PublishSubject.create();


        pSubject.onNext(0);

        pSubject.subscribe(it -> {
            System.out.println("Observer 1 onNext: " + it);
        }, (Throwable onError) -> {
        }, () -> System.out.println("Observer 1 onCompleted"), on1 -> System.out.println("Observer 1 onSubscribe"));

        pSubject.onNext(1);
        pSubject.onNext(2);


        pSubject.subscribe(it -> {
            System.out.println("Observer 2 onNext: " + it);
        }, (Throwable onError) -> {
        }, () -> System.out.println("Observer 2 onCompleted"), on1 -> System.out.println("Observer 2 onSubscribe"));

        pSubject.onNext(3);

        pSubject.subscribe(it -> {
            System.out.println("Observer 3 onNext: " + it);
        }, (Throwable onError) -> {
        }, () -> System.out.println("Observer 3 onCompleted"), on1 -> System.out.println("Observer 3 onSubscribe"));

        pSubject.onNext(4);
        pSubject.onComplete();

The output printed in the console is:

rxjava subjects hot observable

As you can see the Observer 2 and Observer 3 don’t get the values emitted by the Subject before the point of subscription.

Another metaphor for Hot Observables – They are like Whatsapp Group messages. You can only read messages after joining the group

Types of Subject

Following are the different types of Subjects. Each works differently.

  • PublishSubject
  • BehaviorSubject
  • AsyncSubject
  • ReplaySubject
  • UnicastSubject
  • SingleSubject

PublishSubject

This emits all the items at the point of subscription. This is the most basic form of Subject and we’ve implemented it above.

BehaviorSubject

An observer, when subscribed to the BehaviorSubject, would get the last emitted item before it subscribed and all subsequent items.

Metaphor: Your friend gets to watch the last replay when he joins for the cricket match besides viewing the rest of the live match.

Example:


BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create();


        behaviorSubject.onNext(0);

        behaviorSubject.subscribe(it -> {
            System.out.println("Observer 1 onNext: " + it);
        }, (Throwable onError) -> {
        }, () -> System.out.println("Observer 1 onCompleted"), on1 -> System.out.println("Observer 1 onSubscribe"));

        behaviorSubject.onNext(1);
        behaviorSubject.onNext(2);


        behaviorSubject.subscribe(it -> {
            System.out.println("Observer 2 onNext: " + it);
        }, (Throwable onError) -> {
        }, () -> System.out.println("Observer 2 onCompleted"), on1 -> System.out.println("Observer 2 onSubscribe"));

        behaviorSubject.onNext(3);

        behaviorSubject.subscribe(it -> {
            System.out.println("Observer 3 onNext: " + it);
        }, (Throwable onError) -> {
        }, () -> System.out.println("Observer 3 onCompleted"), on1 -> System.out.println("Observer 3 onSubscribe"));

        behaviorSubject.onNext(4);
        behaviorSubject.onComplete();

Output:

rxjava behaviorsubject


AsyncSubject

Of all the emissions, AsyncSubject would emit only the last item. That too only when onComplete() gets called.

Metaphor: You turned on your television to watch the live cricket match and its the last ball of the game. If your friend joins you late, he/she can also just see the last ball of the game.

Example:


 AsyncSubject<Integer> asyncSubject = AsyncSubject.create();


        asyncSubject.onNext(0);
        
        asyncSubject.subscribe(it -> {
            System.out.println("Observer 1 onNext: " + it);
        }, (Throwable onError) -> {
        }, () -> System.out.println("Observer 1 onCompleted"), on1 -> System.out.println("Observer 1 onSubscribe"));

        asyncSubject.onNext(1);
        asyncSubject.onNext(2);
        asyncSubject.onComplete();


        asyncSubject.subscribe(it -> {
            System.out.println("Observer 2 onNext: " + it);
        }, (Throwable onError) -> {
        }, () -> System.out.println("Observer 2 onCompleted"), on1 -> System.out.println("Observer 2 onSubscribe"));

        asyncSubject.onNext(3);

Output:

rxjava asyncsubjects

As you can see if the onComplete was called before the second observer subscribed, still that observer would get the last emitted value, even if it was before it subscribed.


ReplaySubject

As the name says, when an observer subscribes to a ReplaySubject, it would get all the items from the beginning.

Metaphor: Your friend who had missed the live cricket match can now replay the whole. He probably recorded it!

Example:


ReplaySubject<Integer> replaySubject = ReplaySubject.create();


        replaySubject.onNext(0);


        replaySubject.subscribe(it -> {
            System.out.println("Observer 1 onNext: " + it);
        }, (Throwable onError) -> {
        }, () -> System.out.println("Observer 1 onCompleted"), on1 -> System.out.println("Observer 1 onSubscribe"));

        replaySubject.onNext(1);
        replaySubject.onNext(2);


        replaySubject.subscribe(it -> {
            System.out.println("Observer 2 onNext: " + it);
        }, (Throwable onError) -> {
        }, () -> System.out.println("Observer 2 onCompleted"), on1 -> System.out.println("Observer 2 onSubscribe"));

        replaySubject.onNext(3);

        replaySubject.subscribe(it -> {
            System.out.println("Observer 3 onNext: " + it);
        }, (Throwable onError) -> {
        }, () -> System.out.println("Observer 3 onCompleted"), on1 -> System.out.println("Observer 3 onSubscribe"));

        replaySubject.onNext(4);
        replaySubject.onComplete();

Output:

rxjava replaysubject


UnicastSubject

A UnicastSubject can have a single observer only. Otherwise it will throw an error. Also it emits all the values from the beginning.

Metaphor: You and only you are watching the live cricket match. You can rewind it. If your friend comes your television screen won’t be visible to him. It will still be to you.

Example:


 UnicastSubject<Integer> unicastSubject = UnicastSubject.create();


        unicastSubject.onNext(0);


        unicastSubject.subscribe(it -> {
            System.out.println("Observer 1 onNext: " + it);
        }, (Throwable onError) -> System.out.println("Observer 1 onError"), () -> System.out.println("Observer 1 onCompleted"), on1 -> System.out.println("Observer 1 onSubscribe"));

        unicastSubject.onNext(1);
        unicastSubject.onNext(2);


        unicastSubject.subscribe(it -> {
            System.out.println("Observer 2 onNext: " + it);
        }, (Throwable onError) ->
                System.out.println("Observer 2 onError"), () -> System.out.println("Observer 2 onCompleted"), on1 -> System.out.println("Observer 2 onSubscribe"));

        unicastSubject.onNext(3);

Output:

rxjava unicastsubject


SingleSubject

Last but not the least, a SingleSubject can emit only one item to all its observers. Multiple observers can subscribe to it. It can subscribe to multiple SingleObservables.

Metaphor: You can only watch the last ball of the match. Your friend can also just watch the single last ball of the match.

Example:


  Single<Integer> singleObservable = Single.just(1)
                .subscribeOn(Schedulers.computation());
        Single<Integer> singleObservableTwo = Single.just(2)
                .subscribeOn(Schedulers.computation());

        SingleSubject<Integer> singleSubject = SingleSubject.create();
        singleObservable.subscribe(singleSubject);
        singleObservableTwo.subscribe(singleSubject);


        singleSubject.subscribe(integer -> System.out.println("Observer 1 onNext: " + integer), error -> System.out.println("onError"));
        singleSubject.subscribe(integer -> System.out.println("Observer 2 onNext: " + integer), error -> System.out.println("onError"));


Output:

rxjava singlesubject


PS: We can set transformation operators on the Subject in the same way as we do on Observables:


PublishSubject<Integer> publishSubject = PublishSubject.create();

        Observable<Integer> observable = publishSubject.filter(it-> it%2==0);

        publishSubject.onNext(0);


        observable.subscribe(it -> {
            System.out.println("Observer 1 onNext: " + it);
        }, (Throwable onError) -> {
        }, () -> System.out.println("Observer 1 onCompleted"), on1 -> System.out.println("Observer 1 onSubscribe"));

        publishSubject.onNext(1);
        publishSubject.onNext(2);

Only 2 would be printed.

And that sums up RxJava Subject for us.

Leave a Reply

Your email address will not be published. Required fields are marked *

close
Generic selectors
Exact matches only
Search in title
Search in content
Search in posts
Search in pages