diff --git a/framework/src/main/scala/org/sunbird/obsrv/core/streaming/BaseProcessFunction.scala b/framework/src/main/scala/org/sunbird/obsrv/core/streaming/BaseProcessFunction.scala
index 254b8620..45be8c7c 100644
--- a/framework/src/main/scala/org/sunbird/obsrv/core/streaming/BaseProcessFunction.scala
+++ b/framework/src/main/scala/org/sunbird/obsrv/core/streaming/BaseProcessFunction.scala
@@ -124,7 +124,7 @@ abstract class BaseProcessFunction[T, R](config: BaseJobConfig[R]) extends Proce
private val metrics: Metrics = registerMetrics(metricsList.datasets, metricsList.metrics)
override def open(parameters: Configuration): Unit = {
- metricsList.datasets.map { dataset =>
+ (metricsList.datasets ++ List(SystemConfig.defaultDatasetId)).map { dataset =>
metricsList.metrics.map(metric => {
getRuntimeContext.getMetricGroup.addGroup(config.jobName).addGroup(dataset)
.gauge[Long, ScalaGauge[Long]](metric, ScalaGauge[Long](() => metrics.getAndReset(dataset, metric)))
@@ -137,7 +137,11 @@ abstract class BaseProcessFunction[T, R](config: BaseJobConfig[R]) extends Proce
def getMetricsList(): MetricsList
override def processElement(event: T, context: ProcessFunction[T, R]#Context, out: Collector[R]): Unit = {
- processElement(event, context, metrics)
+ try {
+ processElement(event, context, metrics)
+ } catch {
+ case exception: Exception => exception.printStackTrace()
+ }
}
}
@@ -148,7 +152,7 @@ abstract class WindowBaseProcessFunction[I, O, K](config: BaseJobConfig[O]) exte
private val metrics: Metrics = registerMetrics(metricsList.datasets, metricsList.metrics)
override def open(parameters: Configuration): Unit = {
- metricsList.datasets.map { dataset =>
+ (metricsList.datasets ++ List(SystemConfig.defaultDatasetId)).map { dataset =>
metricsList.metrics.map(metric => {
getRuntimeContext.getMetricGroup.addGroup(config.jobName).addGroup(dataset)
.gauge[Long, ScalaGauge[Long]](metric, ScalaGauge[Long](() => metrics.getAndReset(dataset, metric)))
diff --git a/pipeline/denormalizer/src/test/scala/org/sunbird/obsrv/denormalizer/TestDenormalizerStreamTask.scala b/pipeline/denormalizer/src/test/scala/org/sunbird/obsrv/denormalizer/TestDenormalizerStreamTask.scala
deleted file mode 100644
index 71af4ac8..00000000
--- a/pipeline/denormalizer/src/test/scala/org/sunbird/obsrv/denormalizer/TestDenormalizerStreamTask.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.sunbird.obsrv.denormalizer
-
-import com.typesafe.config.ConfigFactory
-import org.apache.flink.api.java.utils.ParameterTool
-import org.sunbird.obsrv.core.streaming.FlinkKafkaConnector
-import org.sunbird.obsrv.denormalizer.task.{DenormalizerConfig, DenormalizerStreamTask}
-
-import java.io.File
-
-object TestDenormalizerStreamTask {
-
- def main(args: Array[String]): Unit = {
- val configFilePath = Option(ParameterTool.fromArgs(args).get("config.file.path"))
- val config = configFilePath.map {
- path => ConfigFactory.parseFile(new File(path)).resolve()
- }.getOrElse(ConfigFactory.load("test.conf").withFallback(ConfigFactory.systemEnvironment()))
- val denormalizerConfig = new DenormalizerConfig(config)
- val kafkaUtil = new FlinkKafkaConnector(denormalizerConfig)
- val task = new DenormalizerStreamTask(denormalizerConfig, kafkaUtil)
- task.process()
- }
-
-}
diff --git a/pipeline/druid-router/src/test/scala/org/sunbird/obsrv/router/TestDruidRouterStreamTask.scala b/pipeline/druid-router/src/test/scala/org/sunbird/obsrv/router/TestDruidRouterStreamTask.scala
deleted file mode 100644
index e85a34cc..00000000
--- a/pipeline/druid-router/src/test/scala/org/sunbird/obsrv/router/TestDruidRouterStreamTask.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.sunbird.obsrv.router
-
-import com.typesafe.config.ConfigFactory
-import org.apache.flink.api.java.utils.ParameterTool
-import org.sunbird.obsrv.core.streaming.FlinkKafkaConnector
-import org.sunbird.obsrv.router.task.{DruidRouterConfig, DruidRouterStreamTask}
-
-import java.io.File
-
-object TestDruidRouterStreamTask {
-
- def main(args: Array[String]): Unit = {
- val configFilePath = Option(ParameterTool.fromArgs(args).get("config.file.path"))
- val config = configFilePath.map {
- path => ConfigFactory.parseFile(new File(path)).resolve()
- }.getOrElse(ConfigFactory.load("test.conf").withFallback(ConfigFactory.systemEnvironment()))
- val routerConfig = new DruidRouterConfig(config)
- val kafkaUtil = new FlinkKafkaConnector(routerConfig)
- val task = new DruidRouterStreamTask(routerConfig, kafkaUtil)
- task.process()
- }
-
-}
diff --git a/pipeline/extractor/src/main/scala/org/sunbird/obsrv/extractor/functions/ExtractionFunction.scala b/pipeline/extractor/src/main/scala/org/sunbird/obsrv/extractor/functions/ExtractionFunction.scala
index 04ee945f..4feccf73 100644
--- a/pipeline/extractor/src/main/scala/org/sunbird/obsrv/extractor/functions/ExtractionFunction.scala
+++ b/pipeline/extractor/src/main/scala/org/sunbird/obsrv/extractor/functions/ExtractionFunction.scala
@@ -82,7 +82,7 @@ class ExtractionFunction(config: ExtractorConfig, @transient var dedupEngine: De
context.output(config.failedEventsOutputTag, markEventFailed(dataset.id, eventData, ErrorConstants.EVENT_SIZE_EXCEEDED, obsrvMeta))
} else {
metrics.incCounter(dataset.id, config.skippedExtractionCount)
- context.output(config.rawEventsOutputTag, markEventSkipped(dataset.id, eventData))
+ context.output(config.rawEventsOutputTag, markEventSkipped(dataset.id, eventData, obsrvMeta))
}
}
@@ -159,8 +159,9 @@ class ExtractionFunction(config: ExtractorConfig, @transient var dedupEngine: De
wrapperEvent
}
- private def markEventSkipped(dataset: String, event: mutable.Map[String, AnyRef]): mutable.Map[String, AnyRef] = {
+ private def markEventSkipped(dataset: String, event: mutable.Map[String, AnyRef], obsrvMeta: Map[String, AnyRef]): mutable.Map[String, AnyRef] = {
val wrapperEvent = createWrapperEvent(dataset, event)
+ updateEvent(wrapperEvent, obsrvMeta)
super.markSkipped(wrapperEvent, config.jobName)
wrapperEvent
}
diff --git a/pipeline/extractor/src/test/scala/org/sunbird/obsrv/extractor/TestExtractorStreamTask.scala b/pipeline/extractor/src/test/scala/org/sunbird/obsrv/extractor/TestExtractorStreamTask.scala
deleted file mode 100644
index fef6226e..00000000
--- a/pipeline/extractor/src/test/scala/org/sunbird/obsrv/extractor/TestExtractorStreamTask.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.sunbird.obsrv.extractor
-
-import com.typesafe.config.ConfigFactory
-import org.apache.flink.api.java.utils.ParameterTool
-import org.sunbird.obsrv.core.streaming.FlinkKafkaConnector
-import org.sunbird.obsrv.extractor.task.{ExtractorConfig, ExtractorStreamTask}
-
-import java.io.File
-
-object TestExtractorStreamTask {
-
- def main(args: Array[String]): Unit = {
- val configFilePath = Option(ParameterTool.fromArgs(args).get("config.file.path"))
- val config = configFilePath.map {
- path => ConfigFactory.parseFile(new File(path)).resolve()
- }.getOrElse(ConfigFactory.load("test.conf").withFallback(ConfigFactory.systemEnvironment()))
- val extractorConfig = new ExtractorConfig(config)
- val kafkaUtil = new FlinkKafkaConnector(extractorConfig)
- val task = new ExtractorStreamTask(extractorConfig, kafkaUtil)
- task.process()
- }
-
-}
diff --git a/pipeline/pipeline-merged/src/test/scala/org/sunbird/obsrv/pipeline/MergedPipelineStreamTaskTestSpec.scala b/pipeline/pipeline-merged/src/test/scala/org/sunbird/obsrv/pipeline/MergedPipelineStreamTaskTestSpec.scala
index f616367e..40b31493 100644
--- a/pipeline/pipeline-merged/src/test/scala/org/sunbird/obsrv/pipeline/MergedPipelineStreamTaskTestSpec.scala
+++ b/pipeline/pipeline-merged/src/test/scala/org/sunbird/obsrv/pipeline/MergedPipelineStreamTaskTestSpec.scala
@@ -1,12 +1,12 @@
package org.sunbird.obsrv.pipeline
-import com.typesafe.config.{Config, ConfigFactory}
import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.apache.kafka.common.serialization.StringDeserializer
+import org.scalatest.Matchers._
import org.sunbird.obsrv.BaseMetricsReporter
import org.sunbird.obsrv.core.streaming.FlinkKafkaConnector
import org.sunbird.obsrv.core.util.FlinkUtil
@@ -14,6 +14,7 @@ import org.sunbird.obsrv.fixture.EventFixture
import org.sunbird.obsrv.pipeline.task.{MergedPipelineConfig, MergedPipelineStreamTask}
import org.sunbird.obsrv.spec.BaseSpecWithDatasetRegistry
+import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration._
@@ -83,12 +84,19 @@ class MergedPipelineStreamTaskTestSpec extends BaseSpecWithDatasetRegistry {
task.process(env)
Future {
env.execute(mergedPipelineConfig.jobName)
- Thread.sleep(20000)
+ Thread.sleep(10000)
}
- //val extractorFailed = EmbeddedKafka.consumeNumberMessagesFrom[String](config.getString("kafka.input.topic"), 2, timeout = 60.seconds)
+
val stats = EmbeddedKafka.consumeNumberMessagesFrom[String](mergedPipelineConfig.kafkaStatsTopic, 1, timeout = 20.seconds)
- stats.foreach(Console.println("Stats:", _))
+ stats.foreach(Console.println("Event:", _))
+
+ val mutableMetricsMap = mutable.Map[String, Long]();
+ BaseMetricsReporter.gaugeMetrics.toMap.mapValues(f => f.getValue()).map(f => mutableMetricsMap.put(f._1, f._2))
+ mutableMetricsMap.foreach(println(_))
+ //TODO: Add assertions
+ mergedPipelineConfig.successTag().getId should be ("processing_stats")
+
}
diff --git a/pipeline/pipeline-merged/src/test/scala/org/sunbird/obsrv/pipeline/TestMergedPipelineStreamTask.scala b/pipeline/pipeline-merged/src/test/scala/org/sunbird/obsrv/pipeline/TestMergedPipelineStreamTask.scala
deleted file mode 100644
index 7ec95a23..00000000
--- a/pipeline/pipeline-merged/src/test/scala/org/sunbird/obsrv/pipeline/TestMergedPipelineStreamTask.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.sunbird.obsrv.pipeline
-
-import com.typesafe.config.ConfigFactory
-import org.apache.flink.api.java.utils.ParameterTool
-import org.sunbird.obsrv.core.streaming.FlinkKafkaConnector
-import org.sunbird.obsrv.pipeline.task.{MergedPipelineConfig, MergedPipelineStreamTask}
-
-import java.io.File
-
-object TestMergedPipelineStreamTask {
-
- def main(args: Array[String]): Unit = {
- val configFilePath = Option(ParameterTool.fromArgs(args).get("config.file.path"))
- val config = configFilePath.map {
- path => ConfigFactory.parseFile(new File(path)).resolve()
- }.getOrElse(ConfigFactory.load("test.conf").withFallback(ConfigFactory.systemEnvironment()))
- val mergedPipelineConfig = new MergedPipelineConfig(config)
- val kafkaUtil = new FlinkKafkaConnector(mergedPipelineConfig)
- val task = new MergedPipelineStreamTask(config, mergedPipelineConfig, kafkaUtil)
- task.process()
- }
-
-}
diff --git a/pipeline/preprocessor/pom.xml b/pipeline/preprocessor/pom.xml
index 6c09ebfc..63e3334b 100644
--- a/pipeline/preprocessor/pom.xml
+++ b/pipeline/preprocessor/pom.xml
@@ -39,19 +39,6 @@
org.sunbird.obsrv
dataset-registry
1.0.0
-
-
- org.apache.kafka
- kafka-clients
-
-
-
-
- org.sunbird.obsrv
- framework
- 1.0.0
- test-jar
- test
com.github.java-json-tools
@@ -104,12 +91,43 @@
test
tests
+
+ io.github.embeddedkafka
+ embedded-kafka_2.12
+ 3.4.0
+ test
+
+
+ io.zonky.test
+ embedded-postgres
+ 2.0.3
+ test
+
+
+ org.sunbird.obsrv
+ dataset-registry
+ 1.0.0
+ test-jar
+ test
+
it.ozimov
embedded-redis
0.7.1
test
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+ test
+
+
+ org.apache.kafka
+ kafka_${scala.maj.version}
+ ${kafka.version}
+ test
+
org.apache.flink
flink-streaming-java
diff --git a/pipeline/preprocessor/src/main/scala/org/sunbird/obsrv/preprocessor/functions/DeduplicationFunction.scala b/pipeline/preprocessor/src/main/scala/org/sunbird/obsrv/preprocessor/functions/DeduplicationFunction.scala
index 0b70baab..6d0c8c6a 100644
--- a/pipeline/preprocessor/src/main/scala/org/sunbird/obsrv/preprocessor/functions/DeduplicationFunction.scala
+++ b/pipeline/preprocessor/src/main/scala/org/sunbird/obsrv/preprocessor/functions/DeduplicationFunction.scala
@@ -13,10 +13,11 @@ import org.sunbird.obsrv.registry.DatasetRegistry
import scala.collection.mutable
-class DeduplicationFunction(config: PipelinePreprocessorConfig, @transient var dedupEngine: DedupEngine = null)
+class DeduplicationFunction(config: PipelinePreprocessorConfig)
(implicit val eventTypeInfo: TypeInformation[mutable.Map[String, AnyRef]])
extends BaseProcessFunction[mutable.Map[String, AnyRef], mutable.Map[String, AnyRef]](config) {
+ @transient private var dedupEngine: DedupEngine = null
private[this] val logger = LoggerFactory.getLogger(classOf[DeduplicationFunction])
override def getMetricsList(): MetricsList = {
@@ -29,10 +30,8 @@ class DeduplicationFunction(config: PipelinePreprocessorConfig, @transient var d
override def open(parameters: Configuration): Unit = {
super.open(parameters)
- if (dedupEngine == null) {
- val redisConnect = new RedisConnect(config.redisHost, config.redisPort, config.redisConnectionTimeout)
- dedupEngine = new DedupEngine(redisConnect, config.dedupStore, config.cacheExpirySeconds)
- }
+ val redisConnect = new RedisConnect(config.redisHost, config.redisPort, config.redisConnectionTimeout)
+ dedupEngine = new DedupEngine(redisConnect, config.dedupStore, config.cacheExpirySeconds)
}
override def close(): Unit = {
@@ -46,17 +45,7 @@ class DeduplicationFunction(config: PipelinePreprocessorConfig, @transient var d
metrics.incCounter(config.defaultDatasetID, config.duplicationTotalMetricsCount)
val datasetId = msg.get(config.CONST_DATASET)
- if (datasetId.isEmpty) {
- context.output(config.failedEventsOutputTag, markFailed(msg, ErrorConstants.MISSING_DATASET_ID, config.jobName))
- metrics.incCounter(config.defaultDatasetID, config.eventFailedMetricsCount)
- return
- }
val datasetOpt = DatasetRegistry.getDataset(datasetId.get.asInstanceOf[String])
- if (datasetOpt.isEmpty) {
- context.output(config.failedEventsOutputTag, markFailed(msg, ErrorConstants.MISSING_DATASET_CONFIGURATION, "Deduplication"))
- metrics.incCounter(config.defaultDatasetID, config.eventFailedMetricsCount)
- return
- }
val dataset = datasetOpt.get
val dedupConfig = dataset.dedupConfig
if (dedupConfig.isDefined && dedupConfig.get.dropDuplicates.get) {
diff --git a/pipeline/preprocessor/src/main/scala/org/sunbird/obsrv/preprocessor/task/PipelinePreprocessorStreamTask.scala b/pipeline/preprocessor/src/main/scala/org/sunbird/obsrv/preprocessor/task/PipelinePreprocessorStreamTask.scala
index abbe6a1d..04b66c8c 100644
--- a/pipeline/preprocessor/src/main/scala/org/sunbird/obsrv/preprocessor/task/PipelinePreprocessorStreamTask.scala
+++ b/pipeline/preprocessor/src/main/scala/org/sunbird/obsrv/preprocessor/task/PipelinePreprocessorStreamTask.scala
@@ -23,12 +23,16 @@ class PipelinePreprocessorStreamTask(config: PipelinePreprocessorConfig, kafkaCo
def process(): Unit = {
implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(config)
- val dataStream = getMapDataStream(env, config, kafkaConnector)
- processStream(dataStream)
+ process(env)
env.execute(config.jobName)
}
// $COVERAGE-ON$
+ def process(env: StreamExecutionEnvironment): Unit = {
+ val dataStream = getMapDataStream(env, config, kafkaConnector)
+ processStream(dataStream)
+ }
+
override def processStream(dataStream: DataStream[mutable.Map[String, AnyRef]]): DataStream[mutable.Map[String, AnyRef]] = {
val validStream = dataStream.process(new EventValidationFunction(config)).setParallelism(config.downstreamOperatorsParallelism)
diff --git a/pipeline/preprocessor/src/test/resources/base-config.conf b/pipeline/preprocessor/src/test/resources/base-config.conf
new file mode 100644
index 00000000..3ade36f7
--- /dev/null
+++ b/pipeline/preprocessor/src/test/resources/base-config.conf
@@ -0,0 +1,8 @@
+postgres {
+ host = localhost
+ port = 5432
+ maxConnections = 2
+ user = "postgres"
+ password = "postgres"
+ database="postgres"
+}
\ No newline at end of file
diff --git a/pipeline/preprocessor/src/test/resources/test.conf b/pipeline/preprocessor/src/test/resources/test.conf
index 75c7458e..dc5734f9 100644
--- a/pipeline/preprocessor/src/test/resources/test.conf
+++ b/pipeline/preprocessor/src/test/resources/test.conf
@@ -16,7 +16,7 @@ task {
redis {
host = 127.0.0.1
- port = 6379
+ port = 6340
database {
preprocessor.duplication.store.id = 2
key.expiry.seconds = 3600
diff --git a/pipeline/preprocessor/src/test/scala/org/sunbird/obsrv/preprocessor/PipelinePreprocessorStreamTestSpec.scala b/pipeline/preprocessor/src/test/scala/org/sunbird/obsrv/preprocessor/PipelinePreprocessorStreamTestSpec.scala
new file mode 100644
index 00000000..03c54d4d
--- /dev/null
+++ b/pipeline/preprocessor/src/test/scala/org/sunbird/obsrv/preprocessor/PipelinePreprocessorStreamTestSpec.scala
@@ -0,0 +1,109 @@
+package org.sunbird.obsrv.preprocessor
+
+import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.test.util.MiniClusterWithClientResource
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.scalatest.Matchers._
+import org.sunbird.obsrv.BaseMetricsReporter
+import org.sunbird.obsrv.core.streaming.FlinkKafkaConnector
+import org.sunbird.obsrv.core.util.FlinkUtil
+import org.sunbird.obsrv.preprocessor.fixture.EventFixtures
+import org.sunbird.obsrv.preprocessor.task.{PipelinePreprocessorConfig, PipelinePreprocessorStreamTask}
+import org.sunbird.obsrv.spec.BaseSpecWithDatasetRegistry
+
+import scala.collection.mutable
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+class PipelinePreprocessorStreamTestSpec extends BaseSpecWithDatasetRegistry {
+
+ val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(testConfiguration())
+ .setNumberSlotsPerTaskManager(1)
+ .setNumberTaskManagers(1)
+ .build)
+
+ val pConfig = new PipelinePreprocessorConfig(config)
+ val kafkaConnector = new FlinkKafkaConnector(pConfig)
+ val customKafkaConsumerProperties: Map[String, String] = Map[String, String]("auto.offset.reset" -> "earliest", "group.id" -> "test-event-schema-group")
+ implicit val embeddedKafkaConfig: EmbeddedKafkaConfig =
+ EmbeddedKafkaConfig(
+ kafkaPort = 9093,
+ zooKeeperPort = 2183,
+ customConsumerProperties = customKafkaConsumerProperties
+ )
+ implicit val deserializer: StringDeserializer = new StringDeserializer()
+
+ def testConfiguration(): Configuration = {
+ val config = new Configuration()
+ config.setString("metrics.reporter", "job_metrics_reporter")
+ config.setString("metrics.reporter.job_metrics_reporter.class", classOf[BaseMetricsReporter].getName)
+ config
+ }
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ BaseMetricsReporter.gaugeMetrics.clear()
+ EmbeddedKafka.start()(embeddedKafkaConfig)
+ createTestTopics()
+ EmbeddedKafka.publishStringMessageToKafka(pConfig.kafkaInputTopic, EventFixtures.VALID_EVENT)
+ EmbeddedKafka.publishStringMessageToKafka(pConfig.kafkaInputTopic, EventFixtures.INVALID_EVENT)
+ EmbeddedKafka.publishStringMessageToKafka(pConfig.kafkaInputTopic, EventFixtures.DUPLICATE_EVENT)
+ EmbeddedKafka.publishStringMessageToKafka(pConfig.kafkaInputTopic, EventFixtures.MISSING_DATASET_EVENT)
+ EmbeddedKafka.publishStringMessageToKafka(pConfig.kafkaInputTopic, EventFixtures.INVALID_DATASET_EVENT)
+ EmbeddedKafka.publishStringMessageToKafka(pConfig.kafkaInputTopic, EventFixtures.INVALID_EVENT_KEY)
+ EmbeddedKafka.publishStringMessageToKafka(pConfig.kafkaInputTopic, EventFixtures.VALID_EVENT_DEDUP_CONFIG_NONE)
+
+ flinkCluster.before()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ flinkCluster.after()
+ EmbeddedKafka.stop()
+ }
+
+ def createTestTopics(): Unit = {
+ List(
+ pConfig.kafkaInputTopic, pConfig.kafkaInvalidTopic, pConfig.kafkaSystemTopic,
+ pConfig.kafkaDuplicateTopic, pConfig.kafkaUniqueTopic
+ ).foreach(EmbeddedKafka.createCustomTopic(_))
+ }
+
+ "PipelinePreprocessorStreamTestSpec" should "validate the preprocessor job" in {
+
+ implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(pConfig)
+ val task = new PipelinePreprocessorStreamTask(pConfig, kafkaConnector)
+ task.process(env)
+ Future {
+ env.execute(pConfig.jobName)
+ Thread.sleep(5000)
+ }
+ //val extractorFailed = EmbeddedKafka.consumeNumberMessagesFrom[String](config.getString("kafka.input.topic"), 2, timeout = 60.seconds)
+ val uniqueEvents = EmbeddedKafka.consumeNumberMessagesFrom[String](pConfig.kafkaUniqueTopic, 1, timeout = 20.seconds)
+ uniqueEvents.foreach(Console.println("Event:", _))
+
+ val mutableMetricsMap = mutable.Map[String, Long]();
+ val metricsMap = BaseMetricsReporter.gaugeMetrics.toMap.mapValues(f => f.getValue()).map(f => mutableMetricsMap.put(f._1, f._2))
+
+ mutableMetricsMap(s"${pConfig.jobName}.ALL.${pConfig.validationTotalMetricsCount}") should be (7)
+ mutableMetricsMap(s"${pConfig.jobName}.ALL.${pConfig.eventFailedMetricsCount}") should be (2)
+ mutableMetricsMap(s"${pConfig.jobName}.ALL.${pConfig.duplicationTotalMetricsCount}") should be (3)
+
+ mutableMetricsMap(s"${pConfig.jobName}.d1.${pConfig.validationFailureMetricsCount}") should be (1)
+ mutableMetricsMap(s"${pConfig.jobName}.d1.${pConfig.duplicationProcessedEventMetricsCount}") should be (1)
+ mutableMetricsMap(s"${pConfig.jobName}.d1.${pConfig.duplicationEventMetricsCount}") should be (1)
+ mutableMetricsMap(s"${pConfig.jobName}.d1.${pConfig.validationSuccessMetricsCount}") should be (2)
+
+ mutableMetricsMap(s"${pConfig.jobName}.d2.${pConfig.duplicationSkippedEventMetricsCount}") should be (1)
+ mutableMetricsMap(s"${pConfig.jobName}.d2.${pConfig.validationSkipMetricsCount}") should be (1)
+ mutableMetricsMap(s"${pConfig.jobName}.d2.${pConfig.eventFailedMetricsCount}") should be (1)
+
+ }
+
+
+}
diff --git a/pipeline/preprocessor/src/test/scala/org/sunbird/obsrv/preprocessor/TestPipelinePreprocessorStreamTask.scala b/pipeline/preprocessor/src/test/scala/org/sunbird/obsrv/preprocessor/TestPipelinePreprocessorStreamTask.scala
deleted file mode 100644
index 188cfded..00000000
--- a/pipeline/preprocessor/src/test/scala/org/sunbird/obsrv/preprocessor/TestPipelinePreprocessorStreamTask.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.sunbird.obsrv.preprocessor
-
-import com.typesafe.config.ConfigFactory
-import org.apache.flink.api.java.utils.ParameterTool
-import org.sunbird.obsrv.core.streaming.FlinkKafkaConnector
-import org.sunbird.obsrv.preprocessor.task.{PipelinePreprocessorConfig, PipelinePreprocessorStreamTask}
-
-import java.io.File
-
-object TestPipelinePreprocessorStreamTask {
-
- def main(args: Array[String]): Unit = {
- val configFilePath = Option(ParameterTool.fromArgs(args).get("config.file.path"))
- val config = configFilePath.map {
- path => ConfigFactory.parseFile(new File(path)).resolve()
- }.getOrElse(ConfigFactory.load("test.conf").withFallback(ConfigFactory.systemEnvironment()))
- val extractorConfig = new PipelinePreprocessorConfig(config)
- val kafkaUtil = new FlinkKafkaConnector(extractorConfig)
- val task = new PipelinePreprocessorStreamTask(extractorConfig, kafkaUtil)
- task.process()
- }
-
-}
diff --git a/pipeline/preprocessor/src/test/scala/org/sunbird/obsrv/preprocessor/TestSchemaValidator.scala b/pipeline/preprocessor/src/test/scala/org/sunbird/obsrv/preprocessor/TestSchemaValidator.scala
index be46a8d9..ad9f90cf 100644
--- a/pipeline/preprocessor/src/test/scala/org/sunbird/obsrv/preprocessor/TestSchemaValidator.scala
+++ b/pipeline/preprocessor/src/test/scala/org/sunbird/obsrv/preprocessor/TestSchemaValidator.scala
@@ -18,34 +18,34 @@ class TestSchemaValidator extends FlatSpec with Matchers {
"SchemaValidator" should "return a success report for a valid event" in {
- val dataset = Dataset("obs2.0", None, None, None, Option(EventFixtures.schema), None, RouterConfig(""), "Active")
+ val dataset = Dataset("d1", None, None, None, Option(EventFixtures.VALID_SCHEMA), None, RouterConfig(""), "Active")
schemaValidator.loadDataSchemas(List(dataset))
val gson = new Gson()
- val event = JSONUtil.deserialize[Map[String, AnyRef]](EventFixtures.validEvent)
- val report = schemaValidator.validate("obs2.0", event)
+ val event = JSONUtil.deserialize[Map[String, AnyRef]](EventFixtures.VALID_SCHEMA_EVENT)
+ val report = schemaValidator.validate("d1", event)
assert(report.isSuccess)
}
it should "return a failed validation report for a invalid event" in {
- val dataset = Dataset("obs2.0", None, None, None, Option(EventFixtures.schema), None, RouterConfig(""), "Active")
+ val dataset = Dataset("d1", None, None, None, Option(EventFixtures.VALID_SCHEMA), None, RouterConfig(""), "Active")
schemaValidator.loadDataSchemas(List(dataset))
- val event = JSONUtil.deserialize[Map[String, AnyRef]](EventFixtures.invalidEvent)
- val report = schemaValidator.validate("obs2.0", event)
+ val event = JSONUtil.deserialize[Map[String, AnyRef]](EventFixtures.INVALID_SCHEMA_EVENT)
+ val report = schemaValidator.validate("d1", event)
assert(!report.isSuccess)
- assert(report.toString.contains("error: object has missing required properties ([\"obsCode\"])"))
+ assert(report.toString.contains("error: object has missing required properties ([\"vehicleCode\"])"))
val invalidFieldName = schemaValidator.getInvalidFieldName(report.toString)
invalidFieldName should be ("Unable to obtain field name for failed validation")
}
it should "validate the negative scenarios" in {
- val dataset = Dataset("obs2.0", None, None, None, Option(EventFixtures.INVALID_SCHEMA), None, RouterConfig(""), "Active")
+ val dataset = Dataset("d1", None, None, None, Option(EventFixtures.INVALID_SCHEMA), None, RouterConfig(""), "Active")
schemaValidator.loadDataSchemas(List(dataset))
- val dataset2 = Dataset("obs2.0", None, None, None, None, None, RouterConfig(""), "Active")
+ val dataset2 = Dataset("d1", None, None, None, None, None, RouterConfig(""), "Active")
an[ObsrvException] should be thrownBy(schemaValidator.schemaFileExists(dataset2))
schemaValidator.schemaFileExists(dataset) should be (false)
}
diff --git a/pipeline/preprocessor/src/test/scala/org/sunbird/obsrv/preprocessor/fixture/EventFixture.scala b/pipeline/preprocessor/src/test/scala/org/sunbird/obsrv/preprocessor/fixture/EventFixture.scala
deleted file mode 100644
index 8b59dab8..00000000
--- a/pipeline/preprocessor/src/test/scala/org/sunbird/obsrv/preprocessor/fixture/EventFixture.scala
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.sunbird.obsrv.preprocessor.fixture
-
-object EventFixtures {
-
- val schema = """{"$schema":"https://json-schema.org/draft/2020-12/schema","id":"https://syngenta.com/obervation.schema.json","title":"Canonical Observations","description":"A canonical observation ","type":"object","properties":{"obsCode":{"type":"string"},"codeComponents":{"type":"array","items":{"type":"object","properties":{"componentCode":{"type":"string"},"componentType":{"type":"string","enum":["AGG_TIME_WINDOW","AGG_METHOD","PARAMETER","FEATURE_OF_INTEREST","OBS_PROPERTY","SAMPLING_STRATEGY","OBS_METHOD","METADATA","METADATA_DEVICE","DATA_QUALITY","EVENT"]},"selector":{"type":"string"},"value":{"type":"string"},"valueUoM":{"type":"string"}}}},"valueUoM":{"type":"string"},"value":{"type":["number","string","boolean"]},"id":{"type":"string"},"parentCollectionRef":{"type":"string"},"integrationAccountRef":{"type":"string"},"assetRef":{"type":"string"},"xMin":{"type":"number"},"xMax":{"type":"number"},"yMin":{"type":"number"},"yMax":{"type":"number"},"phenTime":{"type":"string","format":"date-time"},"phenEndTime":{"type":"string","format":"date-time"},"spatialExtent":{"type":"string"}},"required":["id","parentCollectionRef","integrationAccountRef","obsCode","phenTime","value"]}"""
- val INVALID_SCHEMA = """{"$schema":"https://json-schema.org/draft/2020-12/schema","id":"https://syngenta.com/obervation.schema.json","title":"Canonical Observations","description":"A canonical observation ","type":"object","properties":{"obsCode":{"type":"string"},"codeComponents":{"type":"array","items":{"type":"object","properties":{"componentCode":{"type":"string"},"componentType":{"type":"string","enum":["AGG_TIME_WINDOW","AGG_METHOD","PARAMETER","FEATURE_OF_INTEREST","OBS_PROPERTY","SAMPLING_STRATEGY","OBS_METHOD","METADATA","METADATA_DEVICE","DATA_QUALITY","EVENT"]},"selector":{"type":"string"},"value":{"type":"string"},"valueUoM":{"type":"string"}}}},"valueUoM":{"type":"string"},"value":{"type":["number","string","boolean"]},"id":{"type":"string"},"parentCollectionRef":{"type":"string"},"integrationAccountRef":{"type":"string"},"assetRef":{"type":"string"},"xMin":{"type":"number"},"xMax":{"type":"number"},"yMin":{"type":"number"},"yMax":{"type":"number"},"phenTime":{"type":"string","format":"date-time"},"phenEndTime":{"type":"string","format":"date-time"},"spatialExtent":{"type":"string"}},"required":["id","parentCollectionRef","integrationAccountRef","obsCode","phenTime","value"}"""
-
-
- val validEvent = """{"obsCode":"M_BATTERY_CHARGE","codeComponents":[{"componentCode":"CC_METADATA_DEVICE_FIRMWARE_VER","componentType":"METADATA_DEVICE","selector":"FIRMWARE_VERSION","value":"2.3"},{"componentCode":"CC_AGG_TIME_DURATION","componentType":"AGG_TIME_WINDOW","selector":"DURATION","value":"0","valueUoM":"sec"},{"componentCode":"CC_METADATA_DEVICE_SENSOR_LENGTH","componentType":"METADATA_DEVICE","selector":"SENSOR_LENGTH","value":"150.0","valueUoM":"mm"},{"componentCode":"CC_METADATA_HAS_IRRIGATION","componentType":"METADATA","selector":"HAS_IRRIGATION","value":"False"},{"componentCode":"CC_PARAM_SOIL_DEPTH_RANGE","componentType":"PARAMETER","selector":"SOIL_DEPTH_RANGE","value":"[140.0,290.0)","valueUoM":"mm"},{"componentCode":"CC_METADATA_PLOT_NUMBER","componentType":"METADATA","selector":"PLOT_NUMBER","value":"3"},{"componentCode":"CC_METADATA_SOIL_PREPARATION","componentType":"METADATA","selector":"SOIL_PREPARATION","value":"UNKNOWN"},{"componentCode":"CC_METADATA_TRIAL_EXPERIMENTAL_DESIGN","componentType":"METADATA","selector":"TRIAL_EXPERIMENTAL_DESIGN","value":"UNKNOWN"},{"componentCode":"CC_METADATA_USER_UOM_SYSTEM","componentType":"METADATA","selector":"USER_UOM_SYSTEM","value":"IMPERIAL"},{"componentCode":"CC_METADATA_TRIAL_NAME","componentType":"METADATA","selector":"TRIAL_NAME","value":"BERDZT8682022"},{"componentCode":"CC_PARAMETER_PLANTING_DATE","componentType":"PARAMETER","selector":"PLANTING_DATE","value":"2022-05-03"},{"componentCode":"CC_PARAMETER_TRIAL_END_DATE","componentType":"PARAMETER","selector":"TRIAL_END_DATE","value":"2022-06-08"},{"componentCode":"CC_METADATA_TRIAL_TRIALIST_EMAIL_ADDRESS","componentType":"METADATA","selector":"TRIAL_TRIALIST_EMAIL","value":"w.falesse@redebel.com"},{"componentCode":"CC_METADATA_TRIAL_TRIALIST_NAME","componentType":"METADATA","selector":"TRIAL_TRIALIST_NAME","value":"REDEBEL SA"}],"phenTime":"2022-06-17T07:12:02Z","valueUoM":"prcnt","value":"100","id":"df4c7aa4-65df-4463-b92a-7a29835f9c4d","parentCollectionRef":"41e9b7a4-5b6f-11ed-8fd5-a6a5696c2aaa","created":"2022-11-03T12:01:32Z","modified":1667476892000,"integrationAccountRef":"zzz11120-f0c8-4064-8d00-a73e58939ce0_mtgc203d-2478-4679-a0ef-d736a7a406fd","assetRef":"9422f7ac-c6e9-5c72-b605-5a7655863866","contextItems":[{"code":"SYN_SYSTEM","value":"VALENCO"}],"status":"ACTIVE","xMin":3.356701,"xMax":3.356701,"yMin":51.01653,"yMax":51.01653,"spatialExtent":"{\"type\": \"Point\", \"coordinates\": [3.356701, 51.016530]}","phenEndTime":"2022-06-17T07:12:02Z","value_double_type":100.0}"""
-
- val invalidEvent = """{"obsCod":"E_SOIL_TEMPERATURE","codeComponents":[{"componentCode":"CC_METADATA_DEVICE_FIRMWARE_VER","componentType":"METADATA_DEVICE","selector":"FIRMWARE_VERSION","value":"2.3"},{"componentCode":"CC_AGG_TIME_DURATION","componentType":"AGG_TIME_WINDOW","selector":"DURATION","value":"0","valueUoM":"sec"},{"componentCode":"CC_METADATA_DEVICE_SENSOR_LENGTH","componentType":"METADATA_DEVICE","selector":"SENSOR_LENGTH","value":"150.0","valueUoM":"mm"},{"componentCode":"CC_METADATA_HAS_IRRIGATION","componentType":"METADATA","selector":"HAS_IRRIGATION","value":"True"},{"componentCode":"CC_PARAM_SOIL_DEPTH_RANGE","componentType":"PARAMETER","selector":"SOIL_DEPTH_RANGE","value":"[95.0,245.0)","valueUoM":"mm"},{"componentCode":"CC_METADATA_PLOT_NUMBER","componentType":"METADATA","selector":"PLOT_NUMBER","value":"111"},{"componentCode":"CC_METADATA_SOIL_PREPARATION","componentType":"METADATA","selector":"SOIL_PREPARATION","value":"TILLAGE_CONVENTIONAL"},{"componentCode":"CC_METADATA_TRIAL_EXPERIMENTAL_DESIGN","componentType":"METADATA","selector":"TRIAL_EXPERIMENTAL_DESIGN","value":"UNKNOWN"},{"componentCode":"CC_METADATA_USER_UOM_SYSTEM","componentType":"METADATA","selector":"USER_UOM_SYSTEM","value":"IMPERIAL"},{"componentCode":"CC_METADATA_TRIAL_NAME","componentType":"METADATA","selector":"TRIAL_NAME","value":"FREUZT2032022"},{"componentCode":"CC_PARAMETER_PLANTING_DATE","componentType":"PARAMETER","selector":"PLANTING_DATE","value":"2022-04-21"},{"componentCode":"CC_METADATA_TRIAL_TRIALIST_EMAIL_ADDRESS","componentType":"METADATA","selector":"TRIAL_TRIALIST_EMAIL","value":"olivieriberlin@eurofins.com"},{"componentCode":"CC_METADATA_TRIAL_TRIALIST_NAME","componentType":"METADATA","selector":"TRIAL_TRIALIST_NAME","value":"Olivier Oberlin"}],"phenTime":"2022-09-25T04:51:48Z","valueUoM":"C","value":"0","id":"64fea245-473b-4abd-92a5-5c0a6af965a6","parentCollectionRef":"4ac2b182-5b6f-11ed-867c-36527989a6e2","created":"2022-11-03T12:01:47Z","modified":1667476907000,"integrationAccountRef":"zzz11120-f0c8-4064-8d00-a73e58939ce0_mtgc203d-2478-4679-a0ef-d736a7a406fd","assetRef":"18acf966-c135-583f-9106-7df4a493f0a6","contextItems":[{"code":"SYN_SYSTEM","value":"VALENCO"}],"status":"ACTIVE","xMin":7.526196,"xMax":7.526196,"yMin":48.349297,"yMax":48.349297,"spatialExtent":"{\"type\": \"Point\", \"coordinates\": [7.526196, 48.349297]}","phenEndTime":"2022-09-25T04:51:48Z","value_double_type":0.0}"""
-
-}
diff --git a/pipeline/preprocessor/src/test/scala/org/sunbird/obsrv/preprocessor/fixture/EventFixtures.scala b/pipeline/preprocessor/src/test/scala/org/sunbird/obsrv/preprocessor/fixture/EventFixtures.scala
new file mode 100644
index 00000000..ef26b06b
--- /dev/null
+++ b/pipeline/preprocessor/src/test/scala/org/sunbird/obsrv/preprocessor/fixture/EventFixtures.scala
@@ -0,0 +1,22 @@
+package org.sunbird.obsrv.preprocessor.fixture
+
+object EventFixtures {
+
+ val VALID_SCHEMA = """{"$schema":"https://json-schema.org/draft/2020-12/schema","id":"https://sunbird.obsrv.com/test.json","title":"Test Schema","description":"Test Schema","type":"object","properties":{"id":{"type":"string"},"vehicleCode":{"type":"string"},"date":{"type":"string"},"dealer":{"type":"object","properties":{"dealerCode":{"type":"string"},"locationId":{"type":"string"},"email":{"type":"string"},"phone":{"type":"string"}},"required":["dealerCode","locationId"]},"metrics":{"type":"object","properties":{"bookingsTaken":{"type":"number"},"deliveriesPromised":{"type":"number"},"deliveriesDone":{"type":"number"}}}},"required":["id","vehicleCode","date","dealer","metrics"]}"""
+ val INVALID_SCHEMA = """{"$schema":"https://json-schema.org/draft/2020-12/schema","id":"https://sunbird.obsrv.com/test.json","title":"Test Schema","description":"Test Schema","type":"object","properties":{"id":{"type":"string"},"vehicleCode":{"type":"string"},"date":{"type":"string"},"dealer":{"type":"object","properties":{"dealerCode":{"type":"string"},"locationId":{"type":"string"},"email":{"type":"string"},"phone":{"type":"string"}},"required":["dealerCode","locationId"]},"metrics":{"type":"object","properties":{"bookingsTaken":{"type":"number"},"deliveriesPromised":{"type":"number"},"deliveriesDone":{"type":"number"}}}},"required":["id","vehicleCode","date","dealer","metrics"}"""
+
+
+ val VALID_SCHEMA_EVENT = """{"id":"1234","vehicleCode":"HYUN-CRE-D6","date":"2023-03-01","dealer":{"dealerCode":"KUNUnited","locationId":"KUN1","email":"dealer1@gmail.com","phone":"9849012345"},"metrics":{"bookingsTaken":50,"deliveriesPromised":20,"deliveriesDone":19}}"""
+ val INVALID_SCHEMA_EVENT = """{"id":"1234","date":"2023-03-01","dealer":{"dealerCode":"KUNUnited","locationId":"KUN1","email":"dealer1@gmail.com","phone":"9849012345"},"metrics":{"bookingsTaken":50,"deliveriesPromised":20,"deliveriesDone":19}}"""
+
+ val VALID_EVENT = """{"dataset":"d1","event":{"id":"1234","vehicleCode":"HYUN-CRE-D6","date":"2023-03-01","dealer":{"dealerCode":"KUNUnited","locationId":"KUN1","email":"dealer1@gmail.com","phone":"9849012345"},"metrics":{"bookingsTaken":50,"deliveriesPromised":20,"deliveriesDone":19}}}"""
+ val INVALID_EVENT = """{"dataset":"d1","event":{"id":"1234","date":"2023-03-01","dealer":{"dealerCode":"KUNUnited","locationId":"KUN1","email":"dealer1@gmail.com","phone":"9849012345"},"metrics":{"bookingsTaken":50,"deliveriesPromised":20,"deliveriesDone":19}}}"""
+ val DUPLICATE_EVENT = """{"dataset":"d1","event":{"id":"1234","vehicleCode":"HYUN-CRE-D6","date":"2023-03-01","dealer":{"dealerCode":"KUNUnited","locationId":"KUN1","email":"dealer1@gmail.com","phone":"9849012345"},"metrics":{"bookingsTaken":50,"deliveriesPromised":20,"deliveriesDone":19}}}"""
+ val MISSING_DATASET_EVENT = """{"event":{"id":"1234","vehicleCode":"HYUN-CRE-D6","date":"2023-03-01","dealer":{"dealerCode":"KUNUnited","locationId":"KUN1","email":"dealer1@gmail.com","phone":"9849012345"},"metrics":{"bookingsTaken":50,"deliveriesPromised":20,"deliveriesDone":19}}}"""
+ val INVALID_DATASET_EVENT = """{"dataset":"dX","event":{"id":"1234","vehicleCode":"HYUN-CRE-D6","date":"2023-03-01","dealer":{"dealerCode":"KUNUnited","locationId":"KUN1","email":"dealer1@gmail.com","phone":"9849012345"},"metrics":{"bookingsTaken":50,"deliveriesPromised":20,"deliveriesDone":19}}}"""
+ val INVALID_EVENT_KEY = """{"dataset":"d2","event1":{"id":"1234","vehicleCode":"HYUN-CRE-D6","date":"2023-03-01","dealer":{"dealerCode":"KUNUnited","locationId":"KUN1","email":"dealer1@gmail.com","phone":"9849012345"},"metrics":{"bookingsTaken":50,"deliveriesPromised":20,"deliveriesDone":19}}}"""
+ val VALID_EVENT_DEDUP_CONFIG_NONE = """{"dataset":"d2","event":{"id":"1235","vehicleCode":"HYUN-CRE-D6","date":"2023-03-01","dealer":{"dealerCode":"KUNUnited","locationId":"KUN1","email":"dealer1@gmail.com","phone":"9849012345"},"metrics":{"bookingsTaken":50,"deliveriesPromised":20,"deliveriesDone":19}}}"""
+
+
+
+}
diff --git a/pipeline/transformer/src/test/scala/org/sunbird/obsrv/transformer/TestTransformerStreamTask.scala b/pipeline/transformer/src/test/scala/org/sunbird/obsrv/transformer/TestTransformerStreamTask.scala
deleted file mode 100644
index 3cd0fa64..00000000
--- a/pipeline/transformer/src/test/scala/org/sunbird/obsrv/transformer/TestTransformerStreamTask.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.sunbird.obsrv.transformer
-
-import com.typesafe.config.ConfigFactory
-import org.apache.flink.api.java.utils.ParameterTool
-import org.sunbird.obsrv.core.streaming.FlinkKafkaConnector
-import org.sunbird.obsrv.transformer.task.{TransformerConfig, TransformerStreamTask}
-
-import java.io.File
-
-object TestTransformerStreamTask {
-
- def main(args: Array[String]): Unit = {
- val configFilePath = Option(ParameterTool.fromArgs(args).get("config.file.path"))
- val config = configFilePath.map {
- path => ConfigFactory.parseFile(new File(path)).resolve()
- }.getOrElse(ConfigFactory.load("test.conf").withFallback(ConfigFactory.systemEnvironment()))
- val transformerConfig = new TransformerConfig(config)
- val kafkaUtil = new FlinkKafkaConnector(transformerConfig)
- val task = new TransformerStreamTask(transformerConfig, kafkaUtil)
- task.process()
- }
-}