Skip to content

Commit

Permalink
Merge commit '20b1cc9d376304f89d53a9fc477d705348363121'
Browse files Browse the repository at this point in the history
  • Loading branch information
SanthoshVasabhaktula committed Feb 20, 2023
2 parents e4a47e9 + 20b1cc9 commit 8f14927
Show file tree
Hide file tree
Showing 14 changed files with 58 additions and 22 deletions.
2 changes: 1 addition & 1 deletion dataset-registry/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.maj.version>2.12</scala.maj.version>
<scala.version>2.12.11</scala.version>
<flink.version>1.13.5</flink.version>
<flink.version>1.15.2</flink.version>
<kafka.version>2.4.0</kafka.version>
<java.target.runtime>11</java.target.runtime>
<jackson-jaxrs.version>1.9.13</jackson-jaxrs.version>
Expand Down
8 changes: 0 additions & 8 deletions dataset-registry/src/main/resources/base-config.conf

This file was deleted.

8 changes: 8 additions & 0 deletions dataset-registry/src/main/resources/dataset-registry.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
postgres {
host = 192.168.106.2
port = 5432
maxConnections = 2
user = "obsrv"
password = "obsrv123"
database = "obsrv-registry"
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import java.sql.ResultSet

object DatasetRegistryService {

private val config = ConfigFactory.load("base-config.conf")
private val config = ConfigFactory.load("dataset-registry.conf")
private val postgresConfig = PostgresConnectionConfig(config.getString("postgres.user"), config.getString("postgres.password"),
config.getString("postgres.database"), config.getString("postgres.host"), config.getInt("postgres.port"),
config.getInt("postgres.maxConnections"))
Expand Down
12 changes: 8 additions & 4 deletions framework/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.maj.version>2.12</scala.maj.version>
<scala.version>2.12.11</scala.version>
<flink.version>1.13.5</flink.version>
<flink.version>1.15.2</flink.version>
<kafka.version>2.4.0</kafka.version>
<java.target.runtime>11</java.target.runtime>
<jackson-jaxrs.version>1.9.13</jackson-jaxrs.version>
Expand All @@ -29,7 +29,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.maj.version}</artifactId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
Expand Down Expand Up @@ -72,7 +72,11 @@
<artifactId>cassandra-driver-core</artifactId>
<version>3.7.0</version>
</dependency>

<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
Expand Down Expand Up @@ -105,7 +109,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.12</artifactId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class BaseJobConfig(val config: Config, val jobName: String) extends Serializabl
val systemEventsProducer = "system-events-sink"

// Checkpointing config
val enableCompressedCheckpointing: Boolean = config.getBoolean("task.checkpointing.compressed")
val enableCompressedCheckpointing: Boolean = config.getBoolean("job.enable.distributed.checkpointing")
val checkpointingInterval: Int = config.getInt("task.checkpointing.interval")
val checkpointingPauseSeconds: Int = config.getInt("task.checkpointing.pause.between.seconds")
val enableDistributedCheckpointing: Option[Boolean] = if (config.hasPath("job")) Option(config.getBoolean("job.enable.distributed.checkpointing")) else None
Expand Down
16 changes: 15 additions & 1 deletion framework/src/test/resources/base-test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,22 @@ kafka {
output.system.event.topic = "flink.system.events"
}

job {
env = "local"
enable.distributed.checkpointing = false
statebackend {
blob {
storage {
account = "blob.storage.account"
container = "telemetry-container"
checkpointing.dir = "flink-jobs"
}
}
base.url = "wasbs://"${job.statebackend.blob.storage.container}"@"${job.statebackend.blob.storage.account}"/"${job.statebackend.blob.storage.checkpointing.dir}
}
}

task {
checkpointing.compressed = true
checkpointing.interval = 60000
checkpointing.pause.between.seconds = 30000
restart-strategy.attempts = 1
Expand Down
16 changes: 15 additions & 1 deletion framework/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,23 @@ kafka {
}
}

job {
env = "local"
enable.distributed.checkpointing = false
statebackend {
blob {
storage {
account = "blob.storage.account"
container = "telemetry-container"
checkpointing.dir = "flink-jobs"
}
}
base.url = "wasbs://"${job.statebackend.blob.storage.container}"@"${job.statebackend.blob.storage.account}"/"${job.statebackend.blob.storage.checkpointing.dir}
}
}

kafka.output.metrics.topic = "pipeline_metrics"
task {
checkpointing.compressed = true
parallelism = 1
consumer.parallelism = 1
checkpointing.interval = 60000
Expand Down
4 changes: 4 additions & 0 deletions pipeline/denormalizer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.sunbird.obsrv.denormalizer.task.DenormalizerStreamTask</mainClass>
</transformer>
<!-- append default configs -->
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
Expand Down
2 changes: 1 addition & 1 deletion pipeline/druid-router/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.sunbird.dp.validator.task.DruidValidatorStreamTask</mainClass>
<mainClass>org.sunbird.obsrv.router.task.DruidRouterStreamTask</mainClass>
</transformer>
<!-- append default configs -->
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
Expand Down
2 changes: 1 addition & 1 deletion pipeline/extractor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.sunbird.dp.extractor.task.TelemetryExtractorStreamTask</mainClass>
<mainClass>org.sunbird.obsrv.extractor.task.ExtractorStreamTask</mainClass>
</transformer>
<!-- append default configs -->
<transformer
Expand Down
2 changes: 1 addition & 1 deletion pipeline/preprocessor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.sunbird.dp.preprocessor.task.PipelinePreprocessorStreamTask</mainClass>
<mainClass>org.sunbird.obsrv.preprocessor.task.PipelinePreprocessorStreamTask</mainClass>
</transformer>
<!-- append default configs -->
<transformer
Expand Down
2 changes: 1 addition & 1 deletion pipeline/transformer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.sunbird.dp.extractor.task.TelemetryExtractorStreamTask</mainClass>
<mainClass>org.sunbird.obsrv.transformer.task.TransformerStreamTask</mainClass>
</transformer>
<!-- append default configs -->
<transformer
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.maj.version>2.12</scala.maj.version>
<scala.version>2.12.11</scala.version>
<flink.version>1.13.5</flink.version>
<flink.version>1.15.2</flink.version>
<kafka.version>2.4.0</kafka.version>
<java.target.runtime>11</java.target.runtime>
<jackson-jaxrs.version>1.9.13</jackson-jaxrs.version>
Expand Down

0 comments on commit 8f14927

Please sign in to comment.