Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Commit 4de36fe

Browse files
authored
Merge pull request #23 from lightbend/remove-query
Remove kafka scala query to separate repository
2 parents be3fdaf + 9a5865f commit 4de36fe

File tree

103 files changed

+71
-4300
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

103 files changed

+71
-4300
lines changed

README.md

Lines changed: 71 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,77 @@
1-
# Kafka Streams Goodies for Scala developers
1+
# A Thin Scala Wrapper Around the Kafka Streams Java API
22

3-
This repository contains the following:
3+
The library wraps Java APIs in Scala thereby providing:
44

5-
1. [Scala APIs for Kafka Streams](https://github.com/lightbend/kafka-streams-scala/blob/develop/kafka-stream-s/README.md): This is a thin wrapper on top of Java APIs to provide less boilerplates and better type inference.
6-
2. [An http layer for Kafka Streams Interactive Queries](https://github.com/lightbend/kafka-streams-scala/blob/develop/kafka-stream-q/README.md): This is a utility that's quite useful for developing global queries across local states in a Kafka Streams application. More useful when the application is deployed in a distributed manner across multiple nodes.
7-
3. [An example application](https://github.com/lightbend/kafka-streams-scala/blob/develop/kafka-stream-q-example-dsl/README.md) based on Kafka Streams DSL that uses the library in (2).
5+
1. much better type inference in Scala
6+
2. less boilerplate in application code
7+
3. the usual builder-style composition that developers get with the original Java API
88

9-
These tools support Kafka 1.0.0. By default, they build for Scala 2.12 (with Scala 2.12.4), but you can build targets for both 2.12 and 2.11 (using Scala 2.11.11) in SBT by adding a plus, `+`, before each command. For example:
9+
The design of the library was inspired by the work started by Alexis Seigneurin in [this repository](https://github.com/aseigneurin/kafka-streams-scala).
1010

11+
## Quick Start
12+
13+
`kafka-streams-scala` is published and cross-built for Scala `2.11`, and `2.12`, so you can just add the following to your build:
14+
15+
```scala
16+
val kafka_streams_scala_version = "0.0.1"
17+
18+
libraryDependencies ++= Seq("com.lightbend" %%
19+
"kafka-streams-scala" % kafka_streams_scala_version)
1120
```
12-
$ sbt
13-
> +clean
14-
> +publishLocal
21+
22+
> Note: `kafka-streams-scala` supports Kafka Streams `1.0.0`.
23+
24+
## Running the Tests
25+
26+
The library comes with an embedded Kafka server. To run the tests, simply run `sbt testOnly` and all tests will run on the local embedded server.
27+
28+
> The embedded server is started and stopped for every test and takes quite a bit of resources. Hence it's recommended that you allocate more heap space to `sbt` when running the tests. e.g. `sbt -mem 1500`.
29+
30+
## Type Inference and Composition
31+
32+
Here's a sample code fragment using the Scala wrapper library. Compare this with the Scala code from the same [example](https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/test/scala/io/confluent/examples/streams/StreamToTableJoinScalaIntegrationTest.scala) in Confluent's repository.
33+
34+
```scala
35+
// Compute the total per region by summing the individual click counts per region.
36+
val clicksPerRegion: KTableS[String, Long] = userClicksStream
37+
38+
// Join the stream against the table.
39+
.leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks))
40+
41+
// Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
42+
.map((_, regionWithClicks) => regionWithClicks)
43+
44+
// Compute the total per region by summing the individual click counts per region.
45+
.groupByKey(Serialized.`with`(stringSerde, longSerde))
46+
.reduce(_ + _)
1547
```
48+
49+
> **Note:** The left quotes around "with" are there because `with` is a Scala keyword. This is the mechanism you use to "escape" a Scala keyword when it's used as a normal identifier in a Java library.
50+
51+
## Better Abstraction
52+
53+
The wrapped Scala APIs also incur less boilerplate by taking advantage of Scala function literals that get converted to Java objects in the implementation of the API. Hence the surface syntax of the client API looks simpler and less noisy.
54+
55+
Here's an example of a snippet built using the Java API from Scala ..
56+
57+
```scala
58+
val approximateWordCounts: KStream[String, Long] = textLines
59+
.flatMapValues(value => value.toLowerCase.split("\\W+").toIterable.asJava)
60+
.transform(
61+
new TransformerSupplier[Array[Byte], String, KeyValue[String, Long]] {
62+
override def get() = new ProbabilisticCounter
63+
},
64+
cmsStoreName)
65+
approximateWordCounts.to(outputTopic, Produced.`with`(Serdes.String(), longSerde))
66+
```
67+
68+
And here's the corresponding snippet using the Scala library. Note how the noise of `TransformerSupplier` has been abstracted out by the function literal syntax of Scala.
69+
70+
```scala
71+
textLines
72+
.flatMapValues(value => value.toLowerCase.split("\\W+").toIterable)
73+
.transform(() => new ProbabilisticCounter, cmsStoreName)
74+
.to(outputTopic, Produced.`with`(Serdes.String(), longSerde))
75+
```
76+
77+
Also, the explicit conversion `asJava` from a Scala `Iterable` to a Java `Iterable` is done for you by the Scala library.
File renamed without changes.

kafka-stream-q-example-dsl/README.md

Lines changed: 0 additions & 191 deletions
This file was deleted.

0 commit comments

Comments
 (0)