Skip to content

data-contracts tutorial #80

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ artifacts {
}

java {
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
}

repositories {
Expand All @@ -39,15 +39,15 @@ dependencies {
implementation 'org.slf4j:slf4j-simple:2.0.7'
implementation('org.apache.kafka:kafka-clients') {
version {
strictly '3.7.0'
strictly '3.8.0'
}
}
implementation 'io.confluent:kafka-streams-avro-serde:7.5.1'
implementation 'io.confluent:kafka-streams-avro-serde:7.7.0'
testImplementation 'junit:junit:4.13.2'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.2'
testImplementation 'org.hamcrest:hamcrest:2.2'
testImplementation 'org.testcontainers:testcontainers:1.19.3'
testImplementation 'org.testcontainers:kafka:1.19.3'
testImplementation 'org.testcontainers:testcontainers:1.20.1'
testImplementation 'org.testcontainers:kafka:1.20.1'
testImplementation 'commons-codec:commons-codec:1.17.0'
testImplementation 'org.apache.flink:flink-sql-connector-kafka:3.2.0-1.19'
testImplementation 'org.apache.flink:flink-connector-base:1.19.1'
Expand Down
4 changes: 2 additions & 2 deletions confluent-parallel-consumer-application/kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ repositories {
dependencies {
implementation project(':common')
implementation "org.slf4j:slf4j-simple:2.0.7"
implementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.2.4"
implementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.3.2"
implementation "org.apache.commons:commons-lang3:3.12.0"
implementation "me.tongfei:progressbar:0.9.3"
implementation 'org.awaitility:awaitility:4.2.0'
Expand All @@ -41,7 +41,7 @@ dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.2'
testImplementation 'org.hamcrest:hamcrest:2.2'
testImplementation 'org.awaitility:awaitility:4.2.0'
testImplementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.2.4:tests" // for LongPollingMockConsumer
testImplementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.3.2:tests" // for LongPollingMockConsumer
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.9.2'
}
Expand Down
23 changes: 23 additions & 0 deletions data-contracts/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
### Gradle template
.gradle
**/build/
!producer-app-schema-v1/src/**/build/

# Ignore Gradle GUI config
gradle-app.setting

# Avoid ignoring Gradle wrapper jar file (.jar files are usually ignored)
!gradle-wrapper.jar

# Avoid ignore Gradle wrappper properties
!gradle-wrapper.properties

# Cache of project
.gradletasknamecache

# Eclipse Gradle plugin generated files
# Eclipse Core
.project
# JDT-specific (Eclipse Java Development Tools)
.classpath

376 changes: 376 additions & 0 deletions data-contracts/README.md

Large diffs are not rendered by default.

93 changes: 93 additions & 0 deletions data-contracts/app-schema-v1/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import java.io.FileInputStream
import java.util.Properties

buildscript {
repositories {
mavenCentral()
maven("https://packages.confluent.io/maven/")
maven("https://jitpack.io")
gradlePluginPortal()
}
}

plugins {
kotlin("jvm") version "2.0.21"
id("com.google.protobuf") version "0.9.4"
id("com.github.imflog.kafka-schema-registry-gradle-plugin") version "2.1.0"
id("com.bakdata.avro") version "1.2.1"
}

group = "io.confluent.devrel"

repositories {
mavenCentral()
maven("https://packages.confluent.io/maven/")
maven("https://jitpack.io")
}

sourceSets {
main {
kotlin.srcDirs("src/main/kotlin", "build/generated-main-avro-java")
}
}

dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")
implementation(project(":data-contracts:shared"))

implementation("org.apache.kafka:kafka-clients:${project.property("kafkaVersion")}")
implementation("io.confluent:kafka-avro-serializer:${project.property("confluentVersion")}")

implementation("io.github.serpro69:kotlin-faker:${project.property("fakerVersion")}")
implementation("io.github.serpro69:kotlin-faker-books:${project.property("fakerVersion")}")
implementation("io.github.serpro69:kotlin-faker-tech:${project.property("fakerVersion")}")

implementation("org.jetbrains.kotlinx:kotlinx-cli:0.3.6")
implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.6.1")
}

kotlin {
jvmToolchain(17)
}

val schemaRegOutputDir = "${project.projectDir.absolutePath}/build/schema-registry-plugin"

tasks.downloadSchemasTask {
doFirst {
mkdir(schemaRegOutputDir)
}
}

schemaRegistry {
val srProperties = Properties()
// At the moment, this is a file with which we are LOCALLY aware.
// In an ACTUAL CI/CD workflow, this would be externalized, perhaps provided from a base build image or other parameter.
srProperties.load(FileInputStream(File("${project.projectDir.absolutePath}/../shared/src/main/resources/confluent.properties")))

url = srProperties.getProperty("schema.registry.url")

val srCredTokens = srProperties.get("basic.auth.user.info").toString().split(":")
credentials {
username = srCredTokens[0]
password = srCredTokens[1]
}
outputDirectory = "${System.getProperty("user.home")}/tmp/schema-registry-plugin"
pretty = true

download {
// download the membership avro schema, version 1
subject("membership-avro-value", "${projectDir}/src/main/avro", 1)
}
}

tasks.clean {
doFirst {
delete(fileTree("${projectDir}/src/main/avro/").include("**/*.avsc"))
}
}

tasks.register("generateCode") {
group = "source generation"
description = "wrapper task for all source generation"
dependsOn("downloadSchemasTask", "generateAvroJava", "generateProto")
}
6 changes: 6 additions & 0 deletions data-contracts/app-schema-v1/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
confluentVersion=7.8.0
fakerVersion=2.0.0-rc.3
grpcVersion=1.15.1
kafkaVersion=3.8.0
protobufVersion=3.6.1
slf4jVersion=2.0.11
1 change: 1 addition & 0 deletions data-contracts/app-schema-v1/settings.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
rootProject.name="app-schema-v1"
1 change: 1 addition & 0 deletions data-contracts/app-schema-v1/src/main/avro/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.confluent.devrel.dc.v1

import io.confluent.devrel.Membership
import io.confluent.devrel.dc.v1.kafka.MembershipConsumer
import io.confluent.devrel.dc.v1.kafka.MembershipProducer
import kotlinx.cli.ArgParser
import kotlinx.cli.ArgType
import kotlinx.cli.default
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.datetime.Clock
import java.time.LocalDate
import java.util.*
import kotlin.concurrent.thread
import kotlin.random.Random
import kotlin.time.DurationUnit
import kotlin.time.toDuration

class ApplicationMain {

companion object {


@JvmStatic
fun main(args: Array<String>) {
runBlocking {
println("Starting application main...")
println(args.joinToString(" "))
val parser = ArgParser("schema-v1")

val interval by parser.option(ArgType.Int,
shortName = "i", fullName = "interval",
description = "message send interval, seconds")
.default(1)
val duration by parser.option(ArgType.Int,
shortName = "d", fullName = "duration",
description = "how long to run, seconds")
.default(100)
parser.parse(args)

val messageInterval = interval.toDuration(DurationUnit.SECONDS)
val sendDuration = duration.toDuration(DurationUnit.SECONDS)

val producer = MembershipProducer()
val consumer = MembershipConsumer()

thread {
consumer.start(listOf("membership-avro"))
}

coroutineScope {
launch {
val until = Clock.System.now().plus(sendDuration)
while(Clock.System.now().compareTo(until) < 0) {
val userId = UUID.randomUUID().toString()
val membership = Membership.newBuilder()
.setUserId(userId)
.setStartDate(LocalDate.now().minusDays(Random.nextLong(100, 1000)))
.setEndDate(LocalDate.now().plusWeeks(Random.nextLong(1, 52)))
.build()
producer.send("membership-avro", userId, membership)
delay(messageInterval.inWholeSeconds)
}
}
}
producer.close()
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.confluent.devrel.dc.v1.kafka

import io.confluent.devrel.Membership
import io.confluent.devrel.datacontracts.shared.BaseConsumer
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig
import org.apache.kafka.clients.consumer.ConsumerConfig

class MembershipConsumer: BaseConsumer<String, Membership>(mapOf(
ConsumerConfig.GROUP_ID_CONFIG to "app-schema-v1",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to "io.confluent.kafka.serializers.KafkaAvroDeserializer",
AbstractKafkaSchemaSerDeConfig.LATEST_COMPATIBILITY_STRICT to true,
AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION to false,
AbstractKafkaSchemaSerDeConfig.USE_LATEST_WITH_METADATA to "major_version=1"
)) {

override fun consumeRecord(
key: String,
value: Membership
) {
logger.info("Received Membership ${key}, ${value}")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.confluent.devrel.dc.v1.kafka

import io.confluent.devrel.Membership
import io.confluent.devrel.datacontracts.shared.BaseProducer
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig
import org.apache.kafka.clients.producer.ProducerConfig

class MembershipProducer: BaseProducer<String, Membership>(mapOf(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "io.confluent.kafka.serializers.KafkaAvroSerializer",
ProducerConfig.CLIENT_ID_CONFIG to "membership-producer-app-v1",
AbstractKafkaSchemaSerDeConfig.LATEST_COMPATIBILITY_STRICT to true,
AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION to false,
AbstractKafkaSchemaSerDeConfig.USE_LATEST_WITH_METADATA to "major_version=1"
))
25 changes: 25 additions & 0 deletions data-contracts/app-schema-v1/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

<?xml version="1.0" encoding="UTF-8"?>
<configuration>

<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>

<logger name="io.confluent.devrel.dc.v1" level="debug" additivity="false">
<appender-ref ref="CONSOLE"/>
</logger>

<logger name="io.confluent.devrel.datacontracts.shared" level="warn" additivity="false">
<appender-ref ref="CONSOLE"/>
</logger>

<root level="error">
<appender-ref ref="CONSOLE"/>
</root>

</configuration>
Loading