Skip to content

Commit d7f4862

Browse files
committed
GH-5: Use DebeziumMessageProducer from Spring Integration
Fixes: #5 * Expose `enableEmptyPayload` and `headerNamesToMap` configuration properties instead of plain `copyHeaders` * All the logic now is hidden in the `DebeziumMessageProducer` * Clean up tests according to a new code base * Fix warning for the deprecated Debezium properties. Now only one `delete.tombstone.handling.mode` is needed * Optimize the code flow for the `StepVerifier` via conversion pulled up to the `Flux` * Apparently there is some race condition in the `EmbeddedEngine` around stop functionality, so, add `debezium.embedded.shutdown.pause.before.interrupt.ms=500` instead of `5 minutes` by default
1 parent 8a44c67 commit d7f4862

File tree

6 files changed

+74
-168
lines changed

6 files changed

+74
-168
lines changed

supplier/spring-debezium-supplier/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ repositories {
44

55
dependencies {
66
api project(':spring-debezium-autoconfigure')
7+
api 'org.springframework.integration:spring-integration-debezium'
78
api 'io.debezium:debezium-connector-mysql'
89
api 'io.debezium:debezium-connector-mongodb'
910
api 'io.debezium:debezium-connector-postgres'

supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/DebeziumReactiveConsumerConfiguration.java

Lines changed: 22 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -16,35 +16,25 @@
1616

1717
package org.springframework.cloud.fn.supplier.debezium;
1818

19-
import java.lang.reflect.Field;
20-
import java.util.List;
21-
import java.util.concurrent.ExecutorService;
22-
import java.util.concurrent.Executors;
23-
import java.util.function.Consumer;
2419
import java.util.function.Supplier;
2520

2621
import io.debezium.engine.ChangeEvent;
2722
import io.debezium.engine.DebeziumEngine;
2823
import io.debezium.engine.DebeziumEngine.Builder;
29-
import io.debezium.engine.Header;
30-
import org.apache.commons.logging.Log;
31-
import org.apache.commons.logging.LogFactory;
24+
import org.reactivestreams.Publisher;
3225
import reactor.core.publisher.Flux;
33-
import reactor.core.publisher.Sinks;
3426

35-
import org.springframework.beans.factory.BeanClassLoaderAware;
3627
import org.springframework.boot.autoconfigure.AutoConfiguration;
3728
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
38-
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
3929
import org.springframework.boot.context.properties.EnableConfigurationProperties;
30+
import org.springframework.cloud.fn.common.config.ComponentCustomizer;
4031
import org.springframework.cloud.fn.common.debezium.DebeziumEngineBuilderAutoConfiguration;
41-
import org.springframework.cloud.fn.common.debezium.DebeziumProperties;
4232
import org.springframework.context.annotation.Bean;
33+
import org.springframework.integration.debezium.dsl.Debezium;
34+
import org.springframework.integration.debezium.dsl.DebeziumMessageProducerSpec;
35+
import org.springframework.integration.dsl.IntegrationFlow;
36+
import org.springframework.lang.Nullable;
4337
import org.springframework.messaging.Message;
44-
import org.springframework.messaging.MessageHeaders;
45-
import org.springframework.messaging.support.MessageBuilder;
46-
import org.springframework.util.ClassUtils;
47-
import org.springframework.util.MimeTypeUtils;
4838

4939
/**
5040
* The Debezium supplier auto-configuration.
@@ -55,131 +45,29 @@
5545
@AutoConfiguration(after = DebeziumEngineBuilderAutoConfiguration.class)
5646
@EnableConfigurationProperties(DebeziumSupplierProperties.class)
5747
@ConditionalOnBean(DebeziumEngine.Builder.class)
58-
public class DebeziumReactiveConsumerConfiguration implements BeanClassLoaderAware {
59-
60-
private static final Log LOGGER = LogFactory.getLog(DebeziumReactiveConsumerConfiguration.class);
61-
62-
/**
63-
* ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL.
64-
*/
65-
public static final String ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL = "org.springframework.kafka.support.KafkaNull";
66-
67-
private Object kafkaNull = null;
68-
69-
@Override
70-
public void setBeanClassLoader(ClassLoader classLoader) {
71-
try {
72-
Class<?> clazz = ClassUtils.forName(ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL, classLoader);
73-
Field field = clazz.getDeclaredField("INSTANCE");
74-
this.kafkaNull = field.get(null);
75-
}
76-
catch (ClassNotFoundException | NoSuchFieldException | IllegalAccessException ex) {
77-
}
78-
}
79-
80-
/**
81-
* Reactive Streams, single subscriber, sink used to push down the change event
82-
* signals received from the Debezium Engine.
83-
*/
84-
private final Sinks.Many<Message<?>> eventSink = Sinks.many().unicast().onBackpressureError();
85-
86-
/**
87-
* Debezium Engine is designed to be submitted to an {@link ExecutorService} for
88-
* execution by a single thread, and a running connector can be stopped either by
89-
* calling {@code stop()} from another thread or by interrupting the running thread
90-
* (e.g., as is the case with {@link ExecutorService#shutdownNow()}).
91-
*/
92-
private final ExecutorService debeziumExecutor = Executors.newSingleThreadExecutor();
48+
public class DebeziumReactiveConsumerConfiguration {
9349

9450
@Bean
95-
public DebeziumEngine<ChangeEvent<byte[], byte[]>> debeziumEngine(
96-
Consumer<ChangeEvent<byte[], byte[]>> changeEventConsumer,
97-
Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {
98-
99-
return debeziumEngineBuilder.notifying(changeEventConsumer).build();
51+
public Supplier<Flux<Message<?>>> debeziumSupplier(Publisher<Message<?>> debeziumPublisher) {
52+
return () -> Flux.from(debeziumPublisher);
10053
}
10154

10255
@Bean
103-
public Supplier<Flux<Message<?>>> debeziumSupplier(DebeziumEngine<ChangeEvent<byte[], byte[]>> debeziumEngine) {
104-
105-
return () -> this.eventSink.asFlux()
106-
.doOnRequest((r) -> this.debeziumExecutor.execute(debeziumEngine))
107-
.doOnTerminate(this.debeziumExecutor::shutdownNow);
108-
}
109-
110-
@Bean
111-
@ConditionalOnMissingBean
112-
public Consumer<ChangeEvent<byte[], byte[]>> changeEventConsumer(DebeziumProperties engineProperties,
113-
DebeziumSupplierProperties supplierProperties) {
114-
115-
return new ChangeEventConsumer<>(engineProperties.getPayloadFormat().contentType(),
116-
supplierProperties.isCopyHeaders(), this.eventSink);
117-
}
118-
119-
/**
120-
* Format-agnostic change event consumer.
121-
*/
122-
private final class ChangeEventConsumer<T> implements Consumer<ChangeEvent<T, T>> {
123-
124-
private final String contentType;
125-
126-
private final boolean copyHeaders;
127-
128-
private final Sinks.Many<Message<?>> eventSink;
129-
130-
private ChangeEventConsumer(String contentType, boolean copyHeaders, Sinks.Many<Message<?>> eventSink) {
131-
this.contentType = contentType;
132-
this.copyHeaders = copyHeaders;
133-
this.eventSink = eventSink;
134-
}
135-
136-
@Override
137-
public void accept(ChangeEvent<T, T> changeEvent) {
138-
if (LOGGER.isDebugEnabled()) {
139-
LOGGER.debug("[Debezium Event]: " + changeEvent.key());
140-
}
141-
142-
Object key = changeEvent.key();
143-
Object payload = changeEvent.value();
144-
String destination = changeEvent.destination();
145-
146-
// When the tombstone event is enabled, Debezium serializes the payload to
147-
// null (e.g. empty payload)
148-
// while the metadata information is carried through the headers
149-
// (debezium_key).
150-
// Note: Event for none flattened responses, when the
151-
// debezium.properties.tombstones.on.delete=true
152-
// (default), tombstones are generate by Debezium and handled by the code
153-
// below.
154-
if (payload == null) {
155-
payload = DebeziumReactiveConsumerConfiguration.this.kafkaNull;
156-
}
157-
158-
// If payload is still null ignore the message.
159-
if (payload == null) {
160-
LOGGER.info("Dropped null payload message");
161-
return;
162-
}
163-
164-
MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(payload)
165-
.setHeader("debezium_key", key)
166-
.setHeader("debezium_destination", destination)
167-
.setHeader(MessageHeaders.CONTENT_TYPE,
168-
(payload.equals(DebeziumReactiveConsumerConfiguration.this.kafkaNull))
169-
? MimeTypeUtils.TEXT_PLAIN_VALUE : this.contentType);
170-
171-
if (this.copyHeaders) {
172-
List<Header<T>> headers = changeEvent.headers();
173-
if (headers != null && !headers.isEmpty()) {
174-
for (Header<T> header : headers) {
175-
messageBuilder.setHeader(header.getKey(), header.getValue());
176-
}
177-
}
178-
}
179-
180-
this.eventSink.tryEmitNext(messageBuilder.build());
56+
public Publisher<Message<byte[]>> debeziumPublisher(Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
57+
DebeziumSupplierProperties supplierProperties,
58+
@Nullable ComponentCustomizer<DebeziumMessageProducerSpec> debeziumMessageProducerSpecComponentCustomizer) {
59+
60+
DebeziumMessageProducerSpec debeziumMessageProducerSpec = Debezium.inboundChannelAdapter(debeziumEngineBuilder)
61+
.enableEmptyPayload(supplierProperties.isEnableEmptyPayload())
62+
.headerNames(supplierProperties.getHeaderNamesToMap())
63+
// TODO until Spring Integration 6.3.0-M2
64+
.autoStartup(false);
65+
66+
if (debeziumMessageProducerSpecComponentCustomizer != null) {
67+
debeziumMessageProducerSpecComponentCustomizer.customize(debeziumMessageProducerSpec);
18168
}
18269

70+
return IntegrationFlow.from(debeziumMessageProducerSpec).toReactivePublisher(true);
18371
}
18472

18573
}

supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/DebeziumSupplierProperties.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,35 @@
2222
* Debezium supplier configuration properties.
2323
*
2424
* @author Christian Tzolov
25+
* @author Artem Bilan
2526
*/
2627
@ConfigurationProperties("debezium.supplier")
2728
public class DebeziumSupplierProperties {
2829

2930
/**
30-
* Copy Change Event headers into Message headers.
31+
* Enable support for tombstone (aka delete) messages.
3132
*/
32-
private boolean copyHeaders = true;
33+
private boolean enableEmptyPayload = true;
3334

34-
public boolean isCopyHeaders() {
35-
return this.copyHeaders;
35+
/**
36+
* Patterns for {@code ChangeEvent.headers()} to map.
37+
*/
38+
private String[] headerNamesToMap = { "*" };
39+
40+
public boolean isEnableEmptyPayload() {
41+
return this.enableEmptyPayload;
42+
}
43+
44+
public void setEnableEmptyPayload(boolean enableEmptyPayload) {
45+
this.enableEmptyPayload = enableEmptyPayload;
46+
}
47+
48+
public String[] getHeaderNamesToMap() {
49+
return this.headerNamesToMap;
3650
}
3751

38-
public void setCopyHeaders(boolean copyHeaders) {
39-
this.copyHeaders = copyHeaders;
52+
public void setHeaderNamesToMap(String[] headerNamesToMap) {
53+
this.headerNamesToMap = headerNamesToMap;
4054
}
4155

4256
}

supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumReactiveConsumerConfigurationTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616

1717
package org.springframework.cloud.fn.supplier.debezium.it.supplier;
1818

19-
import io.debezium.engine.DebeziumEngine;
19+
import java.util.function.Supplier;
20+
2021
import org.junit.jupiter.api.Test;
2122

2223
import org.springframework.boot.autoconfigure.AutoConfigurations;
@@ -30,6 +31,7 @@
3031
* Tests for {@link DebeziumReactiveConsumerConfiguration}.
3132
*
3233
* @author Christian Tzolov
34+
* @author Artem Bilan
3335
*/
3436
public class DebeziumReactiveConsumerConfigurationTests {
3537

@@ -41,13 +43,13 @@ public class DebeziumReactiveConsumerConfigurationTests {
4143

4244
@Test
4345
void noConnectorNoProperty() {
44-
this.contextRunner.run((context) -> assertThat(context).doesNotHaveBean(DebeziumEngine.class));
46+
this.contextRunner.run((context) -> assertThat(context).doesNotHaveBean(Supplier.class));
4547
}
4648

4749
@Test
4850
void withConnectorWithProperty() {
4951
this.contextRunner.withPropertyValues("debezium.properties.connector.class=Dummy")
50-
.run((context) -> assertThat(context).hasSingleBean(DebeziumEngine.class));
52+
.run((context) -> assertThat(context).hasSingleBean(Supplier.class));
5153
}
5254

5355
}

supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumSupplierIntegrationTests.java

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.cloud.fn.supplier.debezium.it.supplier;
1818

19+
import java.time.Duration;
1920
import java.util.function.Supplier;
2021

2122
import org.junit.jupiter.api.Test;
@@ -41,16 +42,16 @@
4142
properties = { "spring.cloud.function.definition=debeziumSupplier",
4243

4344
// https://debezium.io/documentation/reference/transformations/event-flattening.html
45+
"debezium.properties.debezium.embedded.shutdown.pause.before.interrupt.ms=500",
4446
"debezium.properties.transforms=unwrap",
4547
"debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState",
46-
"debezium.properties.transforms.unwrap.drop.tombstones=true",
47-
"debezium.properties.transforms.unwrap.delete.handling.mode=rewrite",
48+
"debezium.properties.transforms.unwrap.delete.tombstone.handling.mode=rewrite",
4849
"debezium.properties.transforms.unwrap.add.fields=name,db,op,table",
4950

5051
"debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory",
5152
"debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore",
5253

53-
// Drop schema from the message payload.
54+
// Drop schema from the payload payload.
5455
"debezium.properties.key.converter.schemas.enable=false",
5556
"debezium.properties.value.converter.schemas.enable=false",
5657

@@ -101,52 +102,51 @@ void testDebeziumSupplier() {
101102
jdbcTemplate.update(
102103
"INSERT INTO `customers`(`first_name`,`last_name`,`email`) VALUES('Test666', 'Test666', '[email protected]')");
103104

104-
Flux<Message<?>> messageFlux = this.debeziumSupplier.get();
105+
Flux<String> payloadFlux = this.debeziumSupplier.get()
106+
.map(Message::getPayload)
107+
.cast(byte[].class)
108+
.map(String::new);
105109

106-
// Message size should correspond to the number of insert statements in:
110+
// payload size should correspond to the number of insert statements in:
107111
// https://github.com/debezium/container-images/blob/main/examples/mysql/2.3/inventory.sql
108112
// filtered by Customers and Addresses table.
109-
StepVerifier.create(messageFlux)
113+
StepVerifier.create(payloadFlux)
110114
.expectNextCount(16) // Skip the DDL transaction logs.
111115

112116
// Customers table
113-
.assertNext((message) -> assertThat(payloadString(message)).isEqualTo(
117+
.assertNext((payload) -> assertThat(payload).isEqualTo(
114118
"{\"id\":1001,\"first_name\":\"Sally\",\"last_name\":\"Thomas\",\"email\":\"[email protected]\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"customers\"}"))
115-
.assertNext((message) -> assertThat(payloadString(message)).isEqualTo(
119+
.assertNext((payload) -> assertThat(payload).isEqualTo(
116120
"{\"id\":1002,\"first_name\":\"George\",\"last_name\":\"Bailey\",\"email\":\"[email protected]\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"customers\"}"))
117-
.assertNext((message) -> assertThat(payloadString(message)).isEqualTo(
121+
.assertNext((payload) -> assertThat(payload).isEqualTo(
118122
"{\"id\":1003,\"first_name\":\"Edward\",\"last_name\":\"Walker\",\"email\":\"[email protected]\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"customers\"}"))
119-
.assertNext((message) -> assertThat(payloadString(message)).isEqualTo(
123+
.assertNext((payload) -> assertThat(payload).isEqualTo(
120124
"{\"id\":1004,\"first_name\":\"Anne\",\"last_name\":\"Kretchmar\",\"email\":\"[email protected]\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"customers\"}"))
121125

122126
// NEW Customer Insert
123-
.assertNext((message) -> assertThat(payloadString(message)).isEqualTo(
127+
.assertNext((payload) -> assertThat(payload).isEqualTo(
124128
"{\"id\":1005,\"first_name\":\"Test666\",\"last_name\":\"Test666\",\"email\":\"[email protected]\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"customers\"}"))
125129

126130
// Addresses table
127-
.assertNext((message) -> assertThat(payloadString(message)).isEqualTo(
131+
.assertNext((payload) -> assertThat(payload).isEqualTo(
128132
"{\"id\":10,\"customer_id\":1001,\"street\":\"3183 Moore Avenue\",\"city\":\"Euless\",\"state\":\"Texas\",\"zip\":\"76036\",\"type\":\"SHIPPING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}"))
129-
.assertNext((message) -> assertThat(payloadString(message)).isEqualTo(
133+
.assertNext((payload) -> assertThat(payload).isEqualTo(
130134
"{\"id\":11,\"customer_id\":1001,\"street\":\"2389 Hidden Valley Road\",\"city\":\"Harrisburg\",\"state\":\"Pennsylvania\",\"zip\":\"17116\",\"type\":\"BILLING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}"))
131-
.assertNext((message) -> assertThat(payloadString(message)).isEqualTo(
135+
.assertNext((payload) -> assertThat(payload).isEqualTo(
132136
"{\"id\":12,\"customer_id\":1002,\"street\":\"281 Riverside Drive\",\"city\":\"Augusta\",\"state\":\"Georgia\",\"zip\":\"30901\",\"type\":\"BILLING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}"))
133-
.assertNext((message) -> assertThat(payloadString(message)).isEqualTo(
137+
.assertNext((payload) -> assertThat(payload).isEqualTo(
134138
"{\"id\":13,\"customer_id\":1003,\"street\":\"3787 Brownton Road\",\"city\":\"Columbus\",\"state\":\"Mississippi\",\"zip\":\"39701\",\"type\":\"SHIPPING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}"))
135-
.assertNext((message) -> assertThat(payloadString(message)).isEqualTo(
139+
.assertNext((payload) -> assertThat(payload).isEqualTo(
136140
"{\"id\":14,\"customer_id\":1003,\"street\":\"2458 Lost Creek Road\",\"city\":\"Bethlehem\",\"state\":\"Pennsylvania\",\"zip\":\"18018\",\"type\":\"SHIPPING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}"))
137-
.assertNext((message) -> assertThat(payloadString(message)).isEqualTo(
141+
.assertNext((payload) -> assertThat(payload).isEqualTo(
138142
"{\"id\":15,\"customer_id\":1003,\"street\":\"4800 Simpson Square\",\"city\":\"Hillsdale\",\"state\":\"Oklahoma\",\"zip\":\"73743\",\"type\":\"BILLING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}"))
139-
.assertNext((message) -> assertThat(payloadString(message)).isEqualTo(
143+
.assertNext((payload) -> assertThat(payload).isEqualTo(
140144
"{\"id\":16,\"customer_id\":1004,\"street\":\"1289 University Hill Road\",\"city\":\"Canehill\",\"state\":\"Arkansas\",\"zip\":\"72717\",\"type\":\"LIVING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}"))
141145
.thenCancel()
142-
.verify();
146+
.verify(Duration.ofSeconds(30));
143147

144148
}
145149

146-
private String payloadString(Message<?> message) {
147-
return new String((byte[]) message.getPayload());
148-
}
149-
150150
@SpringBootApplication(exclude = { MongoAutoConfiguration.class })
151151
@Import({ TestJdbcTemplateConfiguration.class })
152152
static class DebeziumSupplierTestApplication {

supplier/spring-debezium-supplier/src/test/resources/logback-test.xml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@
2121
</encoder>
2222
</appender>
2323

24-
<root level="info">
24+
<root level="warn">
2525
<appender-ref ref="STDOUT"/>
2626
</root>
2727

28-
<logger name="org.testcontainers" level="INFO"/>
28+
<logger name="org.testcontainers" level="WARN"/>
2929
<logger name="com.github.dockerjava" level="WARN"/>
30+
<logger name="io.debezium" level="INFO"/>
3031
</configuration>

0 commit comments

Comments
 (0)