Skip to content

Commit

Permalink
#4 [SV] - Added more test cases and updated code coverage > 80%
Browse files Browse the repository at this point in the history
  • Loading branch information
SanthoshVasabhaktula committed Mar 29, 2023
1 parent acde777 commit 58d9582
Show file tree
Hide file tree
Showing 18 changed files with 212 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ abstract class BaseProcessFunction[T, R](config: BaseJobConfig[R]) extends Proce
private val metrics: Metrics = registerMetrics(metricsList.datasets, metricsList.metrics)

override def open(parameters: Configuration): Unit = {
metricsList.datasets.map { dataset =>
(metricsList.datasets ++ List(SystemConfig.defaultDatasetId)).map { dataset =>
metricsList.metrics.map(metric => {
getRuntimeContext.getMetricGroup.addGroup(config.jobName).addGroup(dataset)
.gauge[Long, ScalaGauge[Long]](metric, ScalaGauge[Long](() => metrics.getAndReset(dataset, metric)))
Expand All @@ -137,7 +137,11 @@ abstract class BaseProcessFunction[T, R](config: BaseJobConfig[R]) extends Proce
def getMetricsList(): MetricsList

override def processElement(event: T, context: ProcessFunction[T, R]#Context, out: Collector[R]): Unit = {
processElement(event, context, metrics)
try {
processElement(event, context, metrics)
} catch {
case exception: Exception => exception.printStackTrace()
}
}

}
Expand All @@ -148,7 +152,7 @@ abstract class WindowBaseProcessFunction[I, O, K](config: BaseJobConfig[O]) exte
private val metrics: Metrics = registerMetrics(metricsList.datasets, metricsList.metrics)

override def open(parameters: Configuration): Unit = {
metricsList.datasets.map { dataset =>
(metricsList.datasets ++ List(SystemConfig.defaultDatasetId)).map { dataset =>
metricsList.metrics.map(metric => {
getRuntimeContext.getMetricGroup.addGroup(config.jobName).addGroup(dataset)
.gauge[Long, ScalaGauge[Long]](metric, ScalaGauge[Long](() => metrics.getAndReset(dataset, metric)))
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class ExtractionFunction(config: ExtractorConfig, @transient var dedupEngine: De
context.output(config.failedEventsOutputTag, markEventFailed(dataset.id, eventData, ErrorConstants.EVENT_SIZE_EXCEEDED, obsrvMeta))
} else {
metrics.incCounter(dataset.id, config.skippedExtractionCount)
context.output(config.rawEventsOutputTag, markEventSkipped(dataset.id, eventData))
context.output(config.rawEventsOutputTag, markEventSkipped(dataset.id, eventData, obsrvMeta))
}
}

Expand Down Expand Up @@ -159,8 +159,9 @@ class ExtractionFunction(config: ExtractorConfig, @transient var dedupEngine: De
wrapperEvent
}

private def markEventSkipped(dataset: String, event: mutable.Map[String, AnyRef]): mutable.Map[String, AnyRef] = {
private def markEventSkipped(dataset: String, event: mutable.Map[String, AnyRef], obsrvMeta: Map[String, AnyRef]): mutable.Map[String, AnyRef] = {
val wrapperEvent = createWrapperEvent(dataset, event)
updateEvent(wrapperEvent, obsrvMeta)
super.markSkipped(wrapperEvent, config.jobName)
wrapperEvent
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package org.sunbird.obsrv.pipeline

import com.typesafe.config.{Config, ConfigFactory}
import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.apache.kafka.common.serialization.StringDeserializer
import org.scalatest.Matchers._
import org.sunbird.obsrv.BaseMetricsReporter
import org.sunbird.obsrv.core.streaming.FlinkKafkaConnector
import org.sunbird.obsrv.core.util.FlinkUtil
import org.sunbird.obsrv.fixture.EventFixture
import org.sunbird.obsrv.pipeline.task.{MergedPipelineConfig, MergedPipelineStreamTask}
import org.sunbird.obsrv.spec.BaseSpecWithDatasetRegistry

import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration._
Expand Down Expand Up @@ -83,12 +84,19 @@ class MergedPipelineStreamTaskTestSpec extends BaseSpecWithDatasetRegistry {
task.process(env)
Future {
env.execute(mergedPipelineConfig.jobName)
Thread.sleep(20000)
Thread.sleep(10000)
}
//val extractorFailed = EmbeddedKafka.consumeNumberMessagesFrom[String](config.getString("kafka.input.topic"), 2, timeout = 60.seconds)

val stats = EmbeddedKafka.consumeNumberMessagesFrom[String](mergedPipelineConfig.kafkaStatsTopic, 1, timeout = 20.seconds)
stats.foreach(Console.println("Stats:", _))
stats.foreach(Console.println("Event:", _))

val mutableMetricsMap = mutable.Map[String, Long]();
BaseMetricsReporter.gaugeMetrics.toMap.mapValues(f => f.getValue()).map(f => mutableMetricsMap.put(f._1, f._2))

mutableMetricsMap.foreach(println(_))
//TODO: Add assertions
mergedPipelineConfig.successTag().getId should be ("processing_stats")

}


Expand Down

This file was deleted.

44 changes: 31 additions & 13 deletions pipeline/preprocessor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,6 @@
<groupId>org.sunbird.obsrv</groupId>
<artifactId>dataset-registry</artifactId>
<version>1.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.sunbird.obsrv</groupId>
<artifactId>framework</artifactId>
<version>1.0.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.java-json-tools</groupId>
Expand Down Expand Up @@ -104,12 +91,43 @@
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>io.github.embeddedkafka</groupId>
<artifactId>embedded-kafka_2.12</artifactId>
<version>3.4.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.zonky.test</groupId>
<artifactId>embedded-postgres</artifactId>
<version>2.0.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.sunbird.obsrv</groupId>
<artifactId>dataset-registry</artifactId>
<version>1.0.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>it.ozimov</groupId>
<artifactId>embedded-redis</artifactId>
<version>0.7.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.maj.version}</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import org.sunbird.obsrv.registry.DatasetRegistry

import scala.collection.mutable

class DeduplicationFunction(config: PipelinePreprocessorConfig, @transient var dedupEngine: DedupEngine = null)
class DeduplicationFunction(config: PipelinePreprocessorConfig)
(implicit val eventTypeInfo: TypeInformation[mutable.Map[String, AnyRef]])
extends BaseProcessFunction[mutable.Map[String, AnyRef], mutable.Map[String, AnyRef]](config) {

@transient private var dedupEngine: DedupEngine = null
private[this] val logger = LoggerFactory.getLogger(classOf[DeduplicationFunction])

override def getMetricsList(): MetricsList = {
Expand All @@ -29,10 +30,8 @@ class DeduplicationFunction(config: PipelinePreprocessorConfig, @transient var d

override def open(parameters: Configuration): Unit = {
super.open(parameters)
if (dedupEngine == null) {
val redisConnect = new RedisConnect(config.redisHost, config.redisPort, config.redisConnectionTimeout)
dedupEngine = new DedupEngine(redisConnect, config.dedupStore, config.cacheExpirySeconds)
}
val redisConnect = new RedisConnect(config.redisHost, config.redisPort, config.redisConnectionTimeout)
dedupEngine = new DedupEngine(redisConnect, config.dedupStore, config.cacheExpirySeconds)
}

override def close(): Unit = {
Expand All @@ -46,17 +45,7 @@ class DeduplicationFunction(config: PipelinePreprocessorConfig, @transient var d

metrics.incCounter(config.defaultDatasetID, config.duplicationTotalMetricsCount)
val datasetId = msg.get(config.CONST_DATASET)
if (datasetId.isEmpty) {
context.output(config.failedEventsOutputTag, markFailed(msg, ErrorConstants.MISSING_DATASET_ID, config.jobName))
metrics.incCounter(config.defaultDatasetID, config.eventFailedMetricsCount)
return
}
val datasetOpt = DatasetRegistry.getDataset(datasetId.get.asInstanceOf[String])
if (datasetOpt.isEmpty) {
context.output(config.failedEventsOutputTag, markFailed(msg, ErrorConstants.MISSING_DATASET_CONFIGURATION, "Deduplication"))
metrics.incCounter(config.defaultDatasetID, config.eventFailedMetricsCount)
return
}
val dataset = datasetOpt.get
val dedupConfig = dataset.dedupConfig
if (dedupConfig.isDefined && dedupConfig.get.dropDuplicates.get) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ class PipelinePreprocessorStreamTask(config: PipelinePreprocessorConfig, kafkaCo
def process(): Unit = {

implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(config)
val dataStream = getMapDataStream(env, config, kafkaConnector)
processStream(dataStream)
process(env)
env.execute(config.jobName)
}
// $COVERAGE-ON$

def process(env: StreamExecutionEnvironment): Unit = {
val dataStream = getMapDataStream(env, config, kafkaConnector)
processStream(dataStream)
}

override def processStream(dataStream: DataStream[mutable.Map[String, AnyRef]]): DataStream[mutable.Map[String, AnyRef]] = {

val validStream = dataStream.process(new EventValidationFunction(config)).setParallelism(config.downstreamOperatorsParallelism)
Expand Down
8 changes: 8 additions & 0 deletions pipeline/preprocessor/src/test/resources/base-config.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
postgres {
host = localhost
port = 5432
maxConnections = 2
user = "postgres"
password = "postgres"
database="postgres"
}
2 changes: 1 addition & 1 deletion pipeline/preprocessor/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ task {

redis {
host = 127.0.0.1
port = 6379
port = 6340
database {
preprocessor.duplication.store.id = 2
key.expiry.seconds = 3600
Expand Down
Loading

0 comments on commit 58d9582

Please sign in to comment.