RxJava Operators

Filed Under: RxJava
Rxjava Operators

In this tutorial, we’ll discuss and implement the various operators that RxJava has. How each operator transforms the Observable sequences and what the subscriber sees? Let’s start!

RxJava Operators

As we had seen in the first RxJava tutorial, Operators are something that acts on an Observable and passes the transformed data to the Subscribers. With multiple operators, each operator finishes its own task and then passes the transformed data to the next operator.

Create a new IntelliJ Java Project and add the RxJava dependency to get started. We’ll use functional programming and lambdas to make the code simpler and less verbose.

1. map and filter

map()


import rx.Observable;

public class RxOperators {

    public static void main(String[] args)
    {
        
        //map operator
        Observable<String> mapObservable = Observable.just("hello world","the observable emits lower case sentences","subscriber sees it as upper case","map operator");
        mapObservable.map(String::toUpperCase).subscribe(System.out::println);
    }
}
//Prints 
//HELLO WORLD
//THE OBSERVABLE EMITS LOWER CASE SENTENCES
//SUBSCRIBER SEES IT AS UPPER CASE
//MAP OPERATOR

filter()


//filter operator
        Observable<String> filterObservable = Observable.from(new String[]{"Hello","How are you?", "doing"});
        filterObservable.filter(string->string.contains(" ")).subscribe(System.out::println);


//Prints
//How are you?

Using Map and Filter : To return the square of all the even numbers.


//map and filter
Observable<Integer> mapFilter = Observable.range(0,10);
mapFilter.filter(i-> i%2==0).map(i-> i*i).subscribe(System.out::println);

2. take, count and skip

take operator has three main variants. Let’s look at each of them.

  • take() passes the first n items emitted by the Observable.
  • takeFirst() emits the first item that meets the condition specified in the operator.
  • takeLast() prints the last n items. Just the opposite of take()
  • takeUntil() emits items until a second observable doesn’t start emitting values. Alternatively, it can be used for conditions as well.
  • takeWhile() emits items as long as a condition is true. It ignores the remaining.

Observable<Integer> takeObservable = Observable.range(0,100); 
takeObservable.take(5).subscribe(System.out::println); //prints 0 to 4 
takeObservable.takeFirst(i -> i*i>1000).subscribe(System.out::println); //prints 32
takeObservable.takeLast(1).subscribe(System.out::println); //prints 99

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Observable<Integer> takeUntilObservable = Observable.from(numbers);
takeUntilObservable.takeUntil(number -> number>2)
                   .subscribe(System.out::println); //prints 1 to 3

takeObservable.takeWhile(number -> number<4)
                .subscribe(System.out::println); //prints 1 to 3
takeObservable.takeWhile(number -> number>2)
                .subscribe(System.out::println); //prints nothing

The takeFirst() function prints the first value whose square is greater than 1000.
The takeUntil() operator is like a do-while loop. It first prints the statement before checking for the condition for the next iteration.

As the name says, count() returns the number of values that reach the subscriber.


Observable<String> countObservable = Observable.from(new String[]{"First","Second", "Third", "Seventh"});
countObservable.filter(string-> string.length()>5).count().subscribe(System.out::println); //prints 2

Passing null as one of the values would cause a runtime crash in the above case. Hence we need to specify the action for onError()


Observable<String> countObservable = Observable.from(new String[]{"First","Second", "Third", null});
countObservable.filter(string-> string.length()>5).count().subscribe(System.out::println, throwable -> System.out.println("One of the values is not valid"));

//prints 
//One of the values is not valid

skip operator has a few variants that are described and implemented below

  • skip() ignores the first n values.
  • skipLast() ignored the last n values
  • skipWhile() ignores all the values until a specific condition is met. It emits all the remainder of values.
  • skipUntil() ignores all the values until another observable starts emitting. We’ll look at this later

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
        Observable<Integer> skipObservable = Observable.from(numbers);
        skipObservable.skip(3).subscribe(System.out::println); //prints 4 and 5
        skipObservable.skipLast(3).subscribe(System.out::println); //prints 1 and 2
        skipObservable.skipWhile(i-> i<3).subscribe(System.out::println); prints 3 4 and 5

skip is somewhat the opposite of take.

skip is the ideal operator to use when avoiding null values.

3. startWith,reduce, repeat, scan

startWith() appends the given element at the start of the emission.


Observable<String> startWithObservable = Observable.just(" Rx", "Java", " Operators", " Tutorial");
        startWithObservable.startWith("Welcome to the").subscribe(System.out::print);

//Prints
//Welcome to the RxJava Operators Tutorial

reduce() operator acts as an accumulator. It adds a the next value to previously added values.
Finally prints the accumulated value for the subscriber.


Observable<Integer> reduceObservable = Observable.range(1, 5);
reduceObservable.reduce((integer, integer2) -> integer + integer2).subscribe(System.out::println);

In the above code, for the first emission, integer is 1 integer2 is 2, for the second emission, 3 and 3 respectively and so on.

reduce operator is useful for calculating the sum, appending strings etc.
The following code finds the length of the concatenated strings.


Observable<String> reduceStringObservable = Observable.just("Rx", "Java");
reduceStringObservable.reduce((x, y) -> x + y).map(String::length).subscribe(System.out::println);

repeat operator repeats the emission twice.


Observable.just("Android", "iOS", "Windows")
                .repeat(2)
                .subscribe(System.out::println);

//Prints
//Android
//iOS
//Android
//iOS

scan operator unlike reduce, prints the accumulator value incrementally


Observable.range(1, 5).scan((integer, integer2) -> integer + integer2).subscribe(System.out::println);

//prints
1
3
6
10
15

4. all, contains, elementAt

The all() operator checks whether each value meets the condition. It returns a true/false.


Observable.range(1, 5).all(i-> i%2==0).subscribe(System.out::println);//prints false
Observable.range(1, 5).map(i-> i*2).all(i-> i%2==0).subscribe(System.out::println); //prints true

contains(): It checks if the value mentioned exists in the Observable lot.


Observable.range(1, 5).contains(6).subscribe(System.out::println); //prints false
Observable.range(1, 5).contains(4).subscribe(System.out::println); //prints true

elementAt() : It prints the value present at the given index among the list of emitted values.


Observable.range(1, 5).elementAt(4).subscribe(System.out::println); //prints 5

5. distinct, toList, toSortedList

distinct() operator eliminates duplicate values.


Observable.just(1,2,3,1,5,1,2,3).count().subscribe(System.out::println); //prints 8
Observable.just(1,2,3,1,5,1,2,3).distinct().count().subscribe(System.out::println); //prints 4

toList and toSortedList are used to convert the emission into another observable that is of the type list.
toSortedList sorts the emission in ascending order.
We can also set our own comparator to sort.


Observable.just(1, 2, 3, 1, 5, 1, 2, 3).toList().subscribe(System.out::println);
Observable.just(1, 2, 3, 1, 5, 1, 2, 3).toSortedList().subscribe(System.out::println);
Observable.just(1, 10, 20, 2).toSortedList((integer, integer2) -> integer2 < integer ? -1 : integer == integer2 ? 1 : 0).subscribe(System.out::println);

//prints
//[1, 2, 3, 1, 5, 1, 2, 3]
//[1, 1, 1, 2, 2, 3, 3, 5]
//[20, 10, 2, 1]

In the above code, the third observable sorts the values in descending order.

6. concat, merge, zip

concat is used to concatenate observables without interleaving them.
merge is used to concatenate observables by interleaving them
That means that in concat the new observable would contain the first followed by the second.
In merge they can be mixed, depending on when they arrive.


Observable.concat(
                Observable.interval(1, TimeUnit.SECONDS).map(id -> "CA" + id),
                Observable.interval(1, TimeUnit.SECONDS).map(id -> "CB" + id))
                .subscribe(System.out::println);

        Thread.sleep(5000);

//Prints:
//CA0
//CA1
//CA2
//CA3
//CA4

Note: We need to add a sleep to prevent early exit of the main function.
In the concat() method, until the first observable isn’t done it won’t move to the second.
So in the above code, the second doesn’t get printed since the first observable prints its quota in the 5 seconds.
Let’s set a limit on the first using take.


Observable.concat(
                Observable.interval(1, TimeUnit.SECONDS).take(2).map(id -> "CA" + id),
                Observable.interval(1, TimeUnit.SECONDS).map(id -> "CB" + id))
                .subscribe(System.out::println);

        Thread.sleep(5000);

//prints 
CA0
CA1
CB0
CB1
CB2

merge two observables


        Observable.merge(
                Observable.interval(1, TimeUnit.SECONDS).map(id -> "MA" + id),
                Observable.interval(1, TimeUnit.SECONDS).map(id -> "MB" + id))
                .subscribe(System.out::println);

        Thread.sleep(5000);

//prints 
MB0
MA0
MB1
MA1
MA2
MB2
MB3
MA3
MA4
MB4

As you can see merge interleaves them. But it doesn’t guarantee the sequence of emission.

merge and concat above work on Observables only.
To make them work as operators we need to use mergeWith and concatWith


Observable sourceA = Observable.interval(1, TimeUnit.SECONDS).map(id -> "MA" + id);
Observable sourceB = Observable.interval(1, TimeUnit.SECONDS).map(id -> "MB" + id);

sourceB.mergeWith(sourceA).subscribe(System.out::println);
Thread.sleep(5000);

Use concatWith in the above code to get the relevant operator for concat.

zip is used to pair each emission from each of the observables. Each observable would wait for the others to emit the current value and then each of the values is available for you in a function.
zip is useful to concatenate different types.


Observable.zip(
                Observable.interval(1, TimeUnit.SECONDS).map(id -> String.valueOf( (char)(id + 65))),
                Observable.interval(2, TimeUnit.SECONDS), (s1, s2) -> s1 + s2)
                .subscribe(System.out::println);

        Thread.sleep(9000);

//prints
//A0
//B1
//C2
//D3

In the above code, the first observable emits an integer every second. The map operator converts the integer to the relevant character using the ASCII code and returns it as a string.
The second observable emits an integer every two seconds. So the first observable needs to wait for this.

Try implementing the zipWith operator yourself. It’s analogous to concatWith and mergeWith.

7. debounce, delay

A debounce operator only emits an item from an Observable if a particular timespan has passed without it emitting another item. This operator is really useful in places like EditText in a SearchView in Android.
Typically entering anything, the search view would call a backend API with the current text. Using the debounce operator saves the immediate call. It gives the user a time to assess the text they’ve entered and whether they’re looking to modify it. Only after the certain timespan is passed the backend API would be called!


Observable.just(1,2,3,4,5,6)
                .debounce(1, TimeUnit.SECONDS)
                .subscribe(System.out::println, System.out::println, () -> System.out.print("OnComplete"));
//Prints 6

The last element is printed in the cases like above since there is nothing left and the debounce operator emits the value after the delay.

The delay operator delays the start of emission of values from the Observable by a certain time.


Observable.just(1,2,3,4,5,6)
                .delay(5, TimeUnit.SECONDS)
                .subscribe(System.out::println, System.out::println, () -> System.out.print("OnComplete"));

        Thread.sleep(4000);

In the above code, nothing happens since our main function would return after 4 seconds thereby preventing the emission that was to occur after 5 seconds.

This brings an end to this tutorial. We’ve covered some major operators. You can download the RxJavaOperators Project source code from the link below.

Leave a Reply

Your email address will not be published. Required fields are marked *

close
Generic selectors
Exact matches only
Search in title
Search in content
Search in posts
Search in pages