-
Notifications
You must be signed in to change notification settings - Fork 137
Example : Converting examples from RxJava
Dan Lew has an excellent blog post on Grokking RxJava. I thought it might be instructive to convert some of the examples into SimpleReact, and highlight what the differences are - as SimpleReact and RxJava take a fundamentally different approach to concurrency.
http://blog.danlew.net/2014/09/22/grokking-rxjava-part-2/
Given a query method that Asynchronously returns a List of URLs :-
RxJava
Observable<List<String>> query(String text);
SimpleReact
CompletableFuture<List<String>> query(String string);
The Api's of our starting points, if you follow the links, are radically different.
Get the title of each url, filter any null null titles, take the first 5 results, save them to disk and print the title to the console.
RxJava
query("Hello, world!")
.flatMap(urls -> Observable.from(urls))
.flatMap(url -> getTitle(url))
.filter(title -> title != null)
.take(5)
.doOnNext(title -> saveTitle(title))
.subscribe(title -> System.out.println(title));
My understanding of the RxJava code is that it is single threaded but free threaded. The Observable section of the code and the subscriber section of the code are free to target a single (and different) thread to run on.
This choice will result in very different implementations under the hood from SimpleReact. For example see this comment on the RxJava implementation of take :- https://github.com/ReactiveX/RxJava/issues/1334). There is no need to synchronize access to the count in the OperatorTake class, because it is not operating in a multi-threaded environment - https://github.com/benjchristensen/RxBackpressure/blob/master/rx-bp-prototype/src/main/java/rx/operators/OperatorTake.java.
SimpleReact
int count =0;
List<String> titles = new SimpleReact().reactToCollection(query("Hello, world!").get())
.<String>then(url -> getTitle(url))
.filter(Objects::nonNull)
.filter(title -> { synchronized(this){ return count++<5;} })
.peek(title -> saveTitle(title) )
.peek(System.out::println)
The functionally equivalent code in SimpleReact behaves very differently in terms of concurrency. First all of the URLs are processed concurrently once they are abstracted from the CompletableFuture (which itself can run concurrently). e.g. With a List of 10 urls, 10 concurrent calls to get title can be made, and as result comes in - it will be filtered out if it is non-null.
The simpler SimpleReact api doesn't provide a built-in 'take' implementation. Take is a subset of filter, and so can be implemented as a java.util.Predicate. Making this a resuable class is trivial and code you can reuse is included below (will also be in the next release!). In the example above we've used a lambda expression with a synchronized block to ensure an atomic increment and check operation of the current number of extracted titles.
Something to not in terms of concurrent behaviour is that the call to CompletableFuture.get() in SimpleReact will block the current thread. If that is a problem it is straightforward to move the reactive flow of the current thread. E.g.
Supplier flow = () -> new SimpleReact().reactToCollection(query("Hello, world!").get())
.<String>then(url -> getTitle(url))
.filter(Objects::nonNull)
.filter(new Take(5))
.peek(title -> saveTitle(title) )
.peek(System.out::println);
new SimpleReact().react(flow);
What would be interesting now, would be convert the SimpleReact implementation back into an equivalent concurrent RxJava implementation. (E.g. RxJava concurrency example)
If we define a Take class (which should be included in the next SimpleReact minor release) :
class Take<T> implements Predicate<T>{
private final int limit;
private int count=0;
public Take(int limit) {
this.limit = limit;
}
@Override
public synchronized boolean test(T o) {
return count++<limit;
}
}
We can reuse that with SimpleReact's filter to method to implement a concurrent take equivalent.
List<String> titles = new SimpleReact().reactToCollection(query("Hello, world!").get())
.<String>then(url -> getTitle(url))
.filter(Objects::nonNull)
.filter(new Take(5))
.peek(title -> saveTitle(title) )
.peek(System.out::println)
Our concurrent take predicate can now be used with any JDK 8 library that uses Predicates.
oops - my bad