Skip to content

Example : Converting examples from RxJava

johnmcclean-aol edited this page Mar 30, 2015 · 13 revisions

Very early conversion take. This was written against a very early version of simple-react. simple-react and RxJava take opposite approaches to Streaming data. RxJava pushes data through a Stream of Observables to Subscribers, where as simple-react uses a mixed pull/ push model. For simple-react Streams a pull always starts the chain and the work. Data can be pushed into Streams via async datastructures (see PushableStreamBuilder).

Dan Lew has an excellent blog post on Grokking RxJava. I thought it might be instructive to convert some of the examples into SimpleReact, and highlight what the differences are - as SimpleReact and RxJava take a fundamentally different approach to concurrency.

Example 1 : Reacting to a collection of URLs

http://blog.danlew.net/2014/09/22/grokking-rxjava-part-2/

Given a query method that Asynchronously returns a List of URLs :-

RxJava

  Observable<List<String>> query(String text);

Observable

SimpleReact

  CompletableFuture<List<String>> query(String string);

CompletableFuture

The Api's of our starting points, if you follow the links, are very feature rich, but radically different.

The ask

Get the title of each url, filter any null null titles, take the first 5 results, save them to disk and print the title to the console.

RxJava

  query("Hello, world!")
.flatMap(urls -> Observable.from(urls))
.flatMap(url -> getTitle(url))
.filter(title -> title != null)
.take(5)
.doOnNext(title -> saveTitle(title))
.subscribe(title -> System.out.println(title));

My understanding of the RxJava code is that it is single threaded but free threaded. The Observable section of the code and the subscriber section of the code are free to target a single (and different) thread to run on.

This choice will result in very different implementations under the hood from SimpleReact. For example see this comment on the RxJava implementation of take :- https://github.com/ReactiveX/RxJava/issues/1334). There is no need to synchronize access to the count in the OperatorTake class, because it is not operating in a multi-threaded environment - https://github.com/benjchristensen/RxBackpressure/blob/master/rx-bp-prototype/src/main/java/rx/operators/OperatorTake.java.

SimpleReact

 int count =0;
 List<String> titles = new SimpleReact().fromStream(Stream.of(query("Hello, world!")))
							.flatMap(Collection::stream)
							.<String>then(url -> getTitle(url))
							.filter(Objects::nonNull)
							.filter(Predicates.take(5))
							.peek(title -> saveTitle(title) )
							.peek(System.out::println)

The functionally equivalent code in SimpleReact behaves very differently in terms of concurrency. First all of the URLs are processed concurrently once they are extracted asynchronously from the CompletableFuture.

The core SimpleReact api doesn't provide a built-in 'take' implementation (although it does via Stream and Seq). Take is a subset of filter, and so can be implemented as a java.util.Predicate. Examples, of filtering Predicates - including this one are available in the Predicates class in SimpleReact. In the example above we've used a lambda expression with a synchronized block to ensure an atomic increment and check operation of the current number of extracted titles.

As an aside you can target a different taskExecutor for each stage in the flow in SimpleReact.

Converting filter to take

If we define a Take class (which should be included in the next SimpleReact minor release) :

 class Take<T> implements Predicate<T>{

	private final int limit;
	private int count=0;
	
	public Take(int limit) {
	
		this.limit = limit;
	}
	
	@Override
	public synchronized boolean test(T o) {
		return count++<limit;
	}
	
}

We can reuse that with SimpleReact's filter to method to implement a concurrent take equivalent.

 List<String> titles = new SimpleReact().reactToCollection(query("Hello, world!").get())
			.<String>then(url -> getTitle(url))
			.filter(Objects::nonNull)
			.filter(new Take(5))
			.peek(title -> saveTitle(title) )
			.peek(System.out::println)

Our concurrent take predicate can now be used with any JDK 8 library that uses Predicates.

Clone this wiki locally