From 20b1cc9d376304f89d53a9fc477d705348363121 Mon Sep 17 00:00:00 2001 From: Anand Parthasarathy Date: Mon, 20 Feb 2023 15:44:01 +0530 Subject: [PATCH] Update manifest jar entrypoint classnames --- dataset-registry/pom.xml | 2 +- .../src/main/resources/base-config.conf | 8 -------- .../src/main/resources/dataset-registry.conf | 8 ++++++++ .../obsrv/service/DatasetRegistryService.scala | 2 +- framework/pom.xml | 12 ++++++++---- .../obsrv/core/streaming/BaseJobConfig.scala | 2 +- framework/src/test/resources/base-test.conf | 16 +++++++++++++++- framework/src/test/resources/test.conf | 16 +++++++++++++++- pipeline/denormalizer/pom.xml | 4 ++++ pipeline/druid-router/pom.xml | 2 +- pipeline/extractor/pom.xml | 2 +- pipeline/preprocessor/pom.xml | 2 +- pipeline/transformer/pom.xml | 2 +- pom.xml | 2 +- 14 files changed, 58 insertions(+), 22 deletions(-) delete mode 100644 dataset-registry/src/main/resources/base-config.conf create mode 100644 dataset-registry/src/main/resources/dataset-registry.conf diff --git a/dataset-registry/pom.xml b/dataset-registry/pom.xml index fcebbd3a..999053d7 100644 --- a/dataset-registry/pom.xml +++ b/dataset-registry/pom.xml @@ -13,7 +13,7 @@ UTF-8 2.12 2.12.11 - 1.13.5 + 1.15.2 2.4.0 11 1.9.13 diff --git a/dataset-registry/src/main/resources/base-config.conf b/dataset-registry/src/main/resources/base-config.conf deleted file mode 100644 index 0145366c..00000000 --- a/dataset-registry/src/main/resources/base-config.conf +++ /dev/null @@ -1,8 +0,0 @@ -postgres { - host = localhost - port = 5432 - maxConnections = 2 - user = "postgres" - password = "postgres" - database = "postgres" -} \ No newline at end of file diff --git a/dataset-registry/src/main/resources/dataset-registry.conf b/dataset-registry/src/main/resources/dataset-registry.conf new file mode 100644 index 00000000..806eec18 --- /dev/null +++ b/dataset-registry/src/main/resources/dataset-registry.conf @@ -0,0 +1,8 @@ +postgres { + host = 192.168.106.2 + port = 5432 + maxConnections = 2 + user = "obsrv" + password = "obsrv123" + database = "obsrv-registry" +} \ No newline at end of file diff --git a/dataset-registry/src/main/scala/org/sunbird/obsrv/service/DatasetRegistryService.scala b/dataset-registry/src/main/scala/org/sunbird/obsrv/service/DatasetRegistryService.scala index 0c6af771..d51f40c1 100644 --- a/dataset-registry/src/main/scala/org/sunbird/obsrv/service/DatasetRegistryService.scala +++ b/dataset-registry/src/main/scala/org/sunbird/obsrv/service/DatasetRegistryService.scala @@ -8,7 +8,7 @@ import java.sql.ResultSet object DatasetRegistryService { - private val config = ConfigFactory.load("base-config.conf") + private val config = ConfigFactory.load("dataset-registry.conf") private val postgresConfig = PostgresConnectionConfig(config.getString("postgres.user"), config.getString("postgres.password"), config.getString("postgres.database"), config.getString("postgres.host"), config.getInt("postgres.port"), config.getInt("postgres.maxConnections")) diff --git a/framework/pom.xml b/framework/pom.xml index c7344cb3..04a3cd73 100644 --- a/framework/pom.xml +++ b/framework/pom.xml @@ -13,7 +13,7 @@ UTF-8 2.12 2.12.11 - 1.13.5 + 1.15.2 2.4.0 11 1.9.13 @@ -29,7 +29,7 @@ org.apache.flink - flink-connector-kafka_${scala.maj.version} + flink-connector-kafka ${flink.version} @@ -72,7 +72,11 @@ cassandra-driver-core 3.7.0 - + + com.typesafe + config + 1.4.2 + org.postgresql postgresql @@ -105,7 +109,7 @@ org.apache.flink - flink-test-utils_2.12 + flink-test-utils ${flink.version} test diff --git a/framework/src/main/scala/org/sunbird/obsrv/core/streaming/BaseJobConfig.scala b/framework/src/main/scala/org/sunbird/obsrv/core/streaming/BaseJobConfig.scala index 2ff00b74..1c0ffb0f 100644 --- a/framework/src/main/scala/org/sunbird/obsrv/core/streaming/BaseJobConfig.scala +++ b/framework/src/main/scala/org/sunbird/obsrv/core/streaming/BaseJobConfig.scala @@ -45,7 +45,7 @@ class BaseJobConfig(val config: Config, val jobName: String) extends Serializabl val systemEventsProducer = "system-events-sink" // Checkpointing config - val enableCompressedCheckpointing: Boolean = config.getBoolean("task.checkpointing.compressed") + val enableCompressedCheckpointing: Boolean = config.getBoolean("job.enable.distributed.checkpointing") val checkpointingInterval: Int = config.getInt("task.checkpointing.interval") val checkpointingPauseSeconds: Int = config.getInt("task.checkpointing.pause.between.seconds") val enableDistributedCheckpointing: Option[Boolean] = if (config.hasPath("job")) Option(config.getBoolean("job.enable.distributed.checkpointing")) else None diff --git a/framework/src/test/resources/base-test.conf b/framework/src/test/resources/base-test.conf index b1d31253..8757255f 100644 --- a/framework/src/test/resources/base-test.conf +++ b/framework/src/test/resources/base-test.conf @@ -19,8 +19,22 @@ kafka { output.system.event.topic = "flink.system.events" } +job { + env = "local" + enable.distributed.checkpointing = false + statebackend { + blob { + storage { + account = "blob.storage.account" + container = "telemetry-container" + checkpointing.dir = "flink-jobs" + } + } + base.url = "wasbs://"${job.statebackend.blob.storage.container}"@"${job.statebackend.blob.storage.account}"/"${job.statebackend.blob.storage.checkpointing.dir} + } +} + task { - checkpointing.compressed = true checkpointing.interval = 60000 checkpointing.pause.between.seconds = 30000 restart-strategy.attempts = 1 diff --git a/framework/src/test/resources/test.conf b/framework/src/test/resources/test.conf index 76c68378..944f9b84 100644 --- a/framework/src/test/resources/test.conf +++ b/framework/src/test/resources/test.conf @@ -17,9 +17,23 @@ kafka { } } +job { + env = "local" + enable.distributed.checkpointing = false + statebackend { + blob { + storage { + account = "blob.storage.account" + container = "telemetry-container" + checkpointing.dir = "flink-jobs" + } + } + base.url = "wasbs://"${job.statebackend.blob.storage.container}"@"${job.statebackend.blob.storage.account}"/"${job.statebackend.blob.storage.checkpointing.dir} + } +} + kafka.output.metrics.topic = "pipeline_metrics" task { - checkpointing.compressed = true parallelism = 1 consumer.parallelism = 1 checkpointing.interval = 60000 diff --git a/pipeline/denormalizer/pom.xml b/pipeline/denormalizer/pom.xml index a1bffd46..282d1da9 100644 --- a/pipeline/denormalizer/pom.xml +++ b/pipeline/denormalizer/pom.xml @@ -141,6 +141,10 @@ + + org.sunbird.obsrv.denormalizer.task.DenormalizerStreamTask + diff --git a/pipeline/druid-router/pom.xml b/pipeline/druid-router/pom.xml index 12fc2fa6..282c343c 100644 --- a/pipeline/druid-router/pom.xml +++ b/pipeline/druid-router/pom.xml @@ -154,7 +154,7 @@ - org.sunbird.dp.validator.task.DruidValidatorStreamTask + org.sunbird.obsrv.router.task.DruidRouterStreamTask diff --git a/pipeline/extractor/pom.xml b/pipeline/extractor/pom.xml index dfc519c3..ed25af4f 100644 --- a/pipeline/extractor/pom.xml +++ b/pipeline/extractor/pom.xml @@ -138,7 +138,7 @@ - org.sunbird.dp.extractor.task.TelemetryExtractorStreamTask + org.sunbird.obsrv.extractor.task.ExtractorStreamTask - org.sunbird.dp.preprocessor.task.PipelinePreprocessorStreamTask + org.sunbird.obsrv.preprocessor.task.PipelinePreprocessorStreamTask - org.sunbird.dp.extractor.task.TelemetryExtractorStreamTask + org.sunbird.obsrv.transformer.task.TransformerStreamTask UTF-8 2.12 2.12.11 - 1.13.5 + 1.15.2 2.4.0 11 1.9.13