Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Commit be3fdaf

Browse files
authored
Merge pull request #22 from lightbend/local-kafka-proc
The proc example application now runs with local Kafka Server
2 parents e449708 + 29609f9 commit be3fdaf

File tree

14 files changed

+506
-64
lines changed

14 files changed

+506
-64
lines changed

kafka-stream-q-example-dsl/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ $ curl http://localhost:7070/weblog/bytes/world.std.com
8383

8484
The http query layer is designed to work even when your application runs in the distributed mode. Running your Kafka Streams application in the distributed mode means that all the instances must have the same application id.
8585

86-
> In order to run the application in distributed mode, you need to run an external Kafka and Zookeeper server. Set `kafka.localserver` to `false` to enbale this setting.
86+
> In order to run the application in distributed mode, you need to run an external Kafka and Zookeeper server. Set `kafka.localserver` to `false` to enable this setting.
8787
8888
Here are the steps that you need to follow to run the application in distributed mode. We assume here you are running both the instances in the same node with different port numbers. It's fairly easy to scale this on different nodes.
8989

kafka-stream-q-example-proc/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ The implementation is based on the [ClarkNet dataset](http://ita.ee.lbl.gov/html
1212

1313
## Build and Run Locally
1414

15+
By default the application runs through an embedded local Kafka Server. In case you want to run separate instances of Kafka and Zookeeper servers, change `kafka.localserver` to `false` in `application.conf`.
16+
1517
To run the application, do the following steps.
1618

1719
### Build the Libraries
@@ -20,6 +22,8 @@ You'll need to build the Scala API library, `kafka-scala-s`, and the interactive
2022

2123
### Start ZooKeeper and Kafka
2224

25+
> This is only required if the setting of `kafka.localserver` is `false` in `application.conf`. If this is set to `true`, the application runs with an embedded local Kafka server. However, note that if you want to run the application in a distributed mode(see below for details of running in distributed mode), you need to run a separate Kafka and Zookeeper server.
26+
2327
Start ZooKeeper and Kafka, if not already running. You can download Kafka 1.0.0 for Scala 2.12 [here](https://kafka.apache.org/documentation/#quickstart), then follow the [Quick Start](https://kafka.apache.org/documentation/#quickstart) instructions for running ZooKeeper and Kafka, steps 1 and 2.
2428

2529
### Download the ClarkNet dataset
@@ -32,8 +36,12 @@ Copy `src/main/resources/application-proc.conf.template` to `src/main/resources
3236

3337
Edit `src/main/resources/application-proc.conf` and set the entry for `directorytowatch` to match the folder name where you installed the ClarkNet dataset.
3438

39+
And note that you can run the application with a bundled local Kafka server by setting `kafka.localserver` to `true` in the `application.conf` file.
40+
3541
### Create the Kafka Topics
3642

43+
> This is only required if the setting of `kafka.localserver` is `false` in `application.conf`. If this is set to `true`, the application runs with an embedded local Kafka server and creates all necessary topics on its own. However, note that if you want to run the application in a distributed mode(see below for details of running in distributed mode), you need to run a separate Kafka and Zookeeper server.
44+
3745
Create the topics using the `kafka-topics.sh` command that comes with the Kafka distribution. We'll refer to the directory where you installed Kafka as `$KAFKA_HOME`. Run the following commands:
3846

3947
```bash
@@ -71,6 +79,8 @@ false
7179

7280
The http query layer is designed to work even when your application runs in the distributed mode. Running your Kafka Streams application in the distributed mode means that all the instances must have the same application id.
7381

82+
> In order to run the application in distributed mode, you need to run an external Kafka and Zookeeper server. Set `kafka.localserver` to `false` to enable this setting.
83+
7484
Here are the steps that you need to follow to run the application in distributed mode. We assume here you are running both the instances in the same node with different port numbers. It's fairly easy to scale this on different nodes.
7585

7686
### Step 1: Build and configure for distribution

kafka-stream-q-example-proc/build.sbt

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ lazy val app = appProject("app")(".")
3434
circeGeneric,
3535
circeParser,
3636
logback,
37-
scalaLogging
37+
scalaLogging,
38+
curator,
39+
kafka
3840
),
3941
scalacOptions ++= Seq(
4042
"-deprecation", // Emit warning and location for usages of deprecated APIs.
@@ -103,7 +105,8 @@ lazy val procRun = project
103105
resourceDirectory in Compile := (resourceDirectory in (app, Compile)).value,
104106
javaOptions in run ++= Seq(
105107
"-Dconfig.file=" + (resourceDirectory in Compile).value / "application-proc.conf",
106-
"-Dlogback.configurationFile=" + (resourceDirectory in Compile).value / "logback-proc.xml"),
108+
"-Dlogback.configurationFile=" + (resourceDirectory in Compile).value / "logback-proc.xml",
109+
"-Dlog4j.configurationFile=" + (resourceDirectory in Compile).value / "log4j.properties"),
107110
addCommandAlias("proc", "procRun/run")
108111
)
109112
.dependsOn(app)
@@ -114,7 +117,8 @@ lazy val procPackage = appProject("procPackage")("build/proc")
114117
resourceDirectory in Compile := (resourceDirectory in (app, Compile)).value,
115118
mappings in Universal ++= {
116119
Seq(((resourceDirectory in Compile).value / "application-proc.conf") -> "conf/application.conf") ++
117-
Seq(((resourceDirectory in Compile).value / "logback-proc.xml") -> "conf/logback.xml")
120+
Seq(((resourceDirectory in Compile).value / "logback-proc.xml") -> "conf/logback.xml") ++
121+
Seq(((resourceDirectory in Compile).value / "log4j.properties") -> "conf/log4j.properties")
118122
},
119123
scriptClasspath := Seq("../conf/") ++ scriptClasspath.value,
120124
mainClass in Compile := Some("com.lightbend.kafka.scala.iq.example.WeblogDriver")

kafka-stream-q-example-proc/project/Dependencies.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,6 @@ object Dependencies {
2020
val circeParser = "io.circe" %% "circe-parser" % circeVersion
2121
val logback = "ch.qos.logback" % "logback-classic" % logbackVersion
2222
val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion
23+
val curator = "org.apache.curator" % "curator-test" % curatorVersion
24+
val kafka = "org.apache.kafka" %% "kafka" % kafkaVersion excludeAll(ExclusionRule("org.slf4j", "slf4j-log4j12"), ExclusionRule("org.apache.zookeeper", "zookeeper"))
2325
}

kafka-stream-q-example-proc/project/Versions.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,7 @@ object Versions {
1515
val circeVersion = "0.8.0"
1616
val scalaLoggingVersion = "3.5.0"
1717
val logbackVersion = "1.2.3"
18+
val curatorVersion = "4.0.0"
19+
val kafkaVersion = "1.0.0"
1820
}
1921

kafka-stream-q-example-proc/src/main/resources/application-proc.conf.template

Lines changed: 45 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -6,54 +6,53 @@ akka {
66
event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
77
}
88

9-
dcos {
10-
11-
kafka {
12-
## bootstrap servers for Kafka
13-
brokers = "localhost:9092"
14-
brokers = ${?KAFKA_BROKERS}
15-
16-
## consumer group
17-
group = "group-proc"
18-
group = ${?KAFKA_GROUP_PROC}
19-
20-
## the source topic - processing starts with
21-
## data in this topic (to be loaded by ingestion)
22-
fromtopic = "server-log-proc"
23-
fromtopic = ${?KAFKA_FROM_TOPIC_PROC}
24-
25-
## error topic for the initial processing
26-
errortopic = "logerr-proc"
27-
errortopic = ${?KAFKA_ERROR_TOPIC_PROC}
28-
29-
zookeeper = "localhost:2181"
30-
zookeeper = ${?ZOOKEEPER_URL}
31-
32-
## folder where state stores are created by Kafka Streams
33-
statestoredir = "/tmp/kafka-streams"
34-
statestoredir = ${?STATESTOREDIR}
35-
36-
## settings for data ingestion
37-
loader {
38-
sourcetopic = ${dcos.kafka.fromtopic}
39-
sourcetopic = ${?KAFKA_FROM_TOPIC_PROC}
40-
41-
directorytowatch = "/Users/myhome/ClarkNet-HTTP"
42-
directorytowatch = ${?DIRECTORY_TO_WATCH}
43-
44-
pollinterval = 1 second
45-
}
9+
kafka {
10+
# true if use local kafka server
11+
# false otherwise
12+
# if true, then setting of brokers below is ignored and set to that of KafkaLocalServer
13+
localserver = true
14+
15+
## bootstrap servers for Kafka
16+
brokers = "localhost:9092"
17+
brokers = ${?KAFKA_BROKERS}
18+
19+
## consumer group
20+
group = "group-proc"
21+
group = ${?KAFKA_GROUP_PROC}
22+
23+
## the source topic - processing starts with
24+
## data in this topic (to be loaded by ingestion)
25+
fromtopic = "server-log-proc"
26+
fromtopic = ${?KAFKA_FROM_TOPIC_PROC}
27+
28+
## error topic for the initial processing
29+
errortopic = "logerr-proc"
30+
errortopic = ${?KAFKA_ERROR_TOPIC_PROC}
31+
32+
## folder where state stores are created by Kafka Streams
33+
statestoredir = "/tmp/kafka-streams"
34+
statestoredir = ${?STATESTOREDIR}
35+
36+
## settings for data ingestion
37+
loader {
38+
sourcetopic = ${kafka.fromtopic}
39+
sourcetopic = ${?KAFKA_FROM_TOPIC_PROC}
40+
41+
directorytowatch = "/Users/myhome/ClarkNet-HTTP"
42+
directorytowatch = ${?DIRECTORY_TO_WATCH}
43+
44+
pollinterval = 1 second
4645
}
46+
}
4747

48-
# http endpoints of the weblog microservice
49-
http {
50-
# The port the dashboard listens on
51-
port = 7071
52-
port = ${?PORT0}
48+
# http endpoints of the weblog microservice
49+
http {
50+
# The port the dashboard listens on
51+
port = 7071
52+
port = ${?PORT0}
5353

54-
# The interface the dashboard listens on
55-
interface = "localhost"
56-
interface = ${?INTERFACE_PROC}
57-
}
54+
# The interface the dashboard listens on
55+
interface = "localhost"
56+
interface = ${?INTERFACE_PROC}
5857
}
5958

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Set root logger level to DEBUG and its only appender to A1.
2+
log4j.rootLogger=ERROR, R
3+
4+
# A1 is set to be a ConsoleAppender.
5+
log4j.appender.A1=org.apache.log4j.ConsoleAppender
6+
7+
log4j.appender.R=org.apache.log4j.RollingFileAppender
8+
log4j.appender.R.File=logs/kafka-server.log
9+
10+
log4j.appender.R.MaxFileSize=100KB
11+
# Keep one backup file
12+
log4j.appender.R.MaxBackupIndex=1
13+
14+
# A1 uses PatternLayout.
15+
log4j.appender.R.layout=org.apache.log4j.PatternLayout
16+
log4j.appender.R.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

kafka-stream-q-example-proc/src/main/scala/com/lightbend/kafka/scala/iq/example/WeblogWorkflow.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import serializers._
2121
import com.lightbend.kafka.scala.iq.http.InteractiveQueryHttpService
2222

2323
import ingestion.DataIngestion
24+
import com.lightbend.kafka.scala.server._
2425

2526
trait WeblogWorkflow extends LazyLogging with AppSerializers {
2627

@@ -33,6 +34,7 @@ trait WeblogWorkflow extends LazyLogging with AppSerializers {
3334
}
3435

3536
logger.info(s"config = $config")
37+
val maybeServer = startLocalServerIfSetInConfig(config)
3638

3739
// setup REST endpoints
3840
val restEndpointPort = config.httpPort
@@ -41,6 +43,7 @@ trait WeblogWorkflow extends LazyLogging with AppSerializers {
4143

4244
logger.info("Connecting to Kafka cluster via bootstrap servers " + config.brokers)
4345
logger.warn("REST endpoint at http://" + restEndpointHostName + ":" + restEndpointPort)
46+
println("Connecting to Kafka cluster via bootstrap servers " + config.brokers)
4447
println("REST endpoint at http://" + restEndpointHostName + ":" + restEndpointPort)
4548

4649
implicit val system = ActorSystem()
@@ -87,6 +90,8 @@ trait WeblogWorkflow extends LazyLogging with AppSerializers {
8790
case x: Exception => x.printStackTrace
8891
} finally {
8992
logger.error("Exiting application ..")
93+
logger.error(s"Stopping kafka server ..")
94+
maybeServer.foreach(_.stop())
9095
System.exit(-1)
9196
}
9297
})
@@ -101,11 +106,24 @@ trait WeblogWorkflow extends LazyLogging with AppSerializers {
101106
restService.stop()
102107
val closed = streams.close(1, TimeUnit.MINUTES)
103108
logger.error(s"Exiting application after streams close ($closed)")
109+
maybeServer.foreach(_.stop())
104110
} catch {
105111
case _: Exception => // ignored
106112
}))
107113
}
108114

115+
private def createTopics(config: ConfigData, server: KafkaLocalServer) = {
116+
import config._
117+
List(fromTopic, errorTopic).foreach(server.createTopic(_))
118+
}
119+
120+
private def startLocalServerIfSetInConfig(config: ConfigData): Option[KafkaLocalServer] = if (config.localServer) {
121+
val s = KafkaLocalServer(true, Some(config.stateStoreDir))
122+
s.start()
123+
createTopics(config, s)
124+
Some(s)
125+
} else None
126+
109127
def createStreams(config: ConfigData): KafkaStreams
110128
def startRestProxy(streams: KafkaStreams, hostInfo: HostInfo,
111129
actorSystem: ActorSystem, materializer: ActorMaterializer): InteractiveQueryHttpService

kafka-stream-q-example-proc/src/main/scala/com/lightbend/kafka/scala/iq/example/config/KStreamConfig.scala

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import cats.instances.all._
88
import scala.util.Try
99
import com.typesafe.config.Config
1010
import scala.concurrent.duration._
11+
import com.lightbend.kafka.scala.server._
1112

1213

1314
/**
@@ -22,8 +23,8 @@ object KStreamConfig {
2223
)
2324

2425
private[KStreamConfig] case class ServerSettings(
26+
localServer: Boolean,
2527
brokers: String,
26-
zk: String,
2728
stateStoreDir: String
2829
)
2930

@@ -44,8 +45,8 @@ object KStreamConfig {
4445
)
4546

4647
case class ConfigData(ks: KafkaSettings, hs: HttpSettings, dls: DataLoaderSettings) {
48+
def localServer = ks.serverSettings.localServer
4749
def brokers = ks.serverSettings.brokers
48-
def zk = ks.serverSettings.zk
4950
def fromTopic = ks.topicSettings.fromTopic
5051
def errorTopic = ks.topicSettings.errorTopic
5152
def stateStoreDir = ks.serverSettings.stateStoreDir
@@ -67,15 +68,26 @@ object KStreamConfig {
6768

6869
private def fromKafkaConfig: ConfigReader[KafkaSettings] = Kleisli { (config: Config) =>
6970
Try {
71+
val local = config.getBoolean("kafka.localserver")
72+
val serverSettings =
73+
if (local) {
74+
ServerSettings(
75+
local,
76+
s"localhost:${KafkaLocalServer.DefaultPort}",
77+
config.getString("kafka.statestoredir")
78+
)
79+
} else {
80+
ServerSettings(
81+
local,
82+
config.getString("kafka.brokers"),
83+
config.getString("kafka.statestoredir")
84+
)
85+
}
7086
KafkaSettings(
71-
ServerSettings(
72-
config.getString("dcos.kafka.brokers"),
73-
config.getString("dcos.kafka.zookeeper"),
74-
config.getString("dcos.kafka.statestoredir")
75-
),
87+
serverSettings,
7688
TopicSettings(
77-
config.getString("dcos.kafka.fromtopic"),
78-
config.getString("dcos.kafka.errortopic")
89+
config.getString("kafka.fromtopic"),
90+
config.getString("kafka.errortopic")
7991
)
8092
)
8193
}
@@ -84,18 +96,18 @@ object KStreamConfig {
8496
private def fromHttpConfig: ConfigReader[HttpSettings] = Kleisli { (config: Config) =>
8597
Try {
8698
HttpSettings(
87-
config.getString("dcos.http.interface"),
88-
config.getInt("dcos.http.port")
99+
config.getString("http.interface"),
100+
config.getInt("http.port")
89101
)
90102
}
91103
}
92104

93105
private def fromDataLoaderConfig: ConfigReader[DataLoaderSettings] = Kleisli { (config: Config) =>
94106
Try {
95107
DataLoaderSettings(
96-
config.getString("dcos.kafka.loader.sourcetopic"),
97-
getStringMaybe(config, "dcos.kafka.loader.directorytowatch"),
98-
config.getDuration("dcos.kafka.loader.pollinterval")
108+
config.getString("kafka.loader.sourcetopic"),
109+
getStringMaybe(config, "kafka.loader.directorytowatch"),
110+
config.getDuration("kafka.loader.pollinterval")
99111
)
100112
}
101113
}

0 commit comments

Comments
 (0)