Skip to content

Getting started with a simple example

johnmcclean-aol edited this page Jan 24, 2015 · 10 revisions

Let's process a collection of data

Starting with a collection of words, and we'll count the letters.

List<String> list = Arrays.asList("hello","world","$da^","along","$orrupted");

Let's set up a reactive dataflow new SimpleReact().reactToCollection(list)

Let's modify it to filter out corrupt words (let's assume they all start with $)

new SimpleReact().reactToCollection(list)
.filter(it -> !it.startsWith("$"))

Now let's convert each word left in the flow into it's size.

new SimpleReact().reactToCollection(list)
    .filter(it -> !it.startsWith("$"))
    .then(it -> it.length())

At this point we would back a collection of [5,5,5] if we called block. We can call block and use JDK 8 streams to sum them.

int count  = new SimpleReact()
	.reactToCollection(list)
	.filter(it -> !it.startsWith("$"))
	.then(it -> it.length())
	.block().stream().reduce(0, (acc,next) -> acc+next );

assertThat(count, is(15));

Use peek to debug the flow

 int count  = new SimpleReact()
	.reactToCollection(list)
	.filter(it -> !it.startsWith("$"))
	.peek(it -> System.out.println(it))  // hello,world,along
	.then(it -> it.length())
	.peek(it -> System.out.println(it))  //5,5,5 [e.g. hello.length()==5] 
	.block().stream().reduce(0, (acc,next) -> acc+next );

Use capture to log errors

List list = Arrays.asList("hello","world","$da^","along","$orrupted",null);

	List<String> list = Arrays.asList("hello","world","$da^","along",null);
	int count  = new SimpleReact()
			.reactToCollection(list)
			.capture(e -> e.printStackTrace())
			.filter(it -> !it.startsWith("$"))
			.then(it -> it.length())
			.block().stream().reduce(0, (acc,next) -> acc+next );

	assertThat(count, is(15));

Note a StrackTrace is output when this code is run.

Use onFail to recover from errors

	List<String> list = Arrays.asList("hello","world","$da^","along","$orrupted",null);
	int count  = new SimpleReact()
			.reactToCollection(list)
			.capture(e -> e.printStackTrace())
			.filter(it -> !it.startsWith("$"))
		
			.onFail(e -> { if(e.getCause() instanceof NullPointerException) { return "null"; } return "";})

			.then(it -> it.length())
			.block().stream().reduce(0, (acc,next) -> acc+next );

	assertThat(count, is(19));

Note that no StackTrace is output when this code is run, as the exception has been handled.

Clone this wiki locally