RxJava combineLatest, withLatestFrom

Filed Under: RxJava

In this tutorial, we’ll be discussing the two important operators of RxJava and how they differ from one another. To know the basics about RxJava and RxJava Operators refer this and this respectively.

combineLatest

CombineLatest operator is used when you’re combining multiple observables. It allows for 2 to 9 observables.
CombineLatest operator states that the item would be emitted only when any of the Observable sources emits a value. When that happens, CombineLatest combines the most recently emitted items from each of the source Observables.

combineLatest expects two arguments:

  • List Of Observables
  • Function

Following is the diagram of it from the docs:

rxjava combinelatestfrom diagram

CombineLatestFrom is often used in cases such as Android Login Form Validation.

How is it different from Zip operator?

Zip emits items only when all of the zipped source Observables have emitted a previously unzipped item.
CombineLatest emits when any of the observables have emitted an item.

Example:

In the following code we’ve added two observables:


Observable<Integer> observable1 = Observable.just(1, 2, 3);
        Observable<Integer> observable2 = Observable.just(4);


        List<Observable<Integer>> observableList = Arrays.asList(observable1, observable2);


        Observable observable = Observable.combineLatest(observableList, new Function<Object[], Integer>() {

            @Override
            public Integer apply(Object... objects) throws Exception {
                int concat = 0;
                for (Object value : objects) {
                    if (value instanceof Integer) {
                        concat += (Integer) value;
                    }
                }
                return concat;
            }
        });

In the above code, we pass the List of Observables inside the combineLatest operator. Along with that, we pass a function. This function would be called each time any of the observables emits a value.
It’ll pass on the most recently emitted values from each of the observables.

Let’s subscribe to the above observable.


observable.subscribe(value -> System.out.println("Value is " + value));

The value is 7. 3 and 4 are the last emitted items.

Let’s change the Observables to:


Observable<Integer> observable1 = Observable.just(1, 2, 3);
Observable<Integer> observable2 = Observable.just(4,5,6);

The output will be :

rxjava combinelatest operator

So, combineLatest would emit items every time an item from observable2 is emitted.
Observable.just() emits items one by one. So each item would be the last emitted item and would be combined with the last emitted item of observable1

Let’s add a Subject which is like a Hot Observable to the mix.


PublishSubject<Integer> publishSubject = PublishSubject.create();

        Observable<Integer> observable1 = Observable.just(1, 2, 3);
        Observable<Integer> observable2 = Observable.just(4);
        List<Observable<Integer>> observableList = Arrays.asList(observable1, observable2, publishSubject);


        Observable observable = Observable.combineLatest(observableList, new Function<Object[], Integer>() {

            @Override
            public Integer apply(Object... objects) throws Exception {
                int concat = 0;
                for (Object value : objects) {
                    if (value instanceof Integer) {
                        concat += (Integer) value;
                    }
                }
                return concat;
            }
        });

        publishSubject.onNext(-10);
        observable.subscribe(value -> System.out.println("Value is " + value));
        publishSubject.onNext(10);

The output is 3 + 4 + 10 = 17.

Values emitted by the PublishSubject would be only received after Subscription.


withLatestFrom

Unlike combineLatest, withLatestFrom can be used with only two observables.
withLatestFrom operator would emit items only when the second observable starts emitting items.
Till then it will keep dropping items emitted by the first Observable.
Once the second Observable has an emitted an item, the combined Observable would emit the last emitted items by each of the Observables.

Following is the diagram for the above operator:

rxjava withlatestfrom diagram

Let’s look at a sample example using the withLatestFrom operator.


Subject<String> subject = PublishSubject.create();
        Subject<String> publishSubject = PublishSubject.create();

        subject.onNext("a");

        Observable o = publishSubject.withLatestFrom(subject, new BiFunction<String, String, String>() {
            @Override
            public String apply(String s, String s2) throws Exception {
                return s + s2;
            }
        });

Inside the BiFunction, the values, when emitted from each of the observables is received.
In the above code, s contains the last string emitted from the observable subject and s2 comes from publishSubject.

We concatenate both of the strings. Now let’s subscribe to the combined observable and emit items.


        subject.onNext("b");
        o.subscribe(value -> System.out.println("Value is " + value), error -> System.out.println("onError"), () -> System.out.println("onComplete"), onSubscribe -> System.out.println("onSubscribe"));
        publishSubject.onNext("c");
        subject.onNext("d");
        publishSubject.onNext("e");
        publishSubject.onNext("f");
        publishSubject.onNext("g");
        subject.onNext("h");

The subscribe function consists of four lambda functions. onSubscribe, onNext, onComplete, onError.
Now we’ve started emitting values from each of the Subject.
Values from PublishSubject are only received by the observer after the point of subscription.
To use previous values you can always use a BehaviorSubject.

Now in the above code, until subject emits a value, values from the publishSubject object would be of no use.

So the output of the above code is:

rxjava withlatestfrom output

As you can see, in the last statement, subject emits the string h, but since the publishSubject doesn’t emit anything after that, it isn’t printed.

So in withLatestFrom one observable acts a checkpoint for the other.

That brings an end to this tutorial. We’ve discussed the two vital operators combineLatest and withLatestFrom and seen how they are completely different from one another.

Leave a Reply

Your email address will not be published. Required fields are marked *

close
Generic selectors
Exact matches only
Search in title
Search in content
Search in posts
Search in pages