Skip to content

Commit 37020fa

Browse files
feat(messaging): add transport abstractions and Kafka-only wiring (#17481)
Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent df9ddbe commit 37020fa

102 files changed

Lines changed: 5155 additions & 496 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/GeneralUpgradeConfiguration.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
import com.linkedin.gms.factory.auth.DataHubAuthorizerFactory;
55
import com.linkedin.gms.factory.event.ExternalEventsServiceFactory;
66
import com.linkedin.gms.factory.event.KafkaConsumerPoolFactory;
7+
import com.linkedin.gms.factory.event.KafkaExternalEventsPollHandlerConfiguration;
78
import com.linkedin.gms.factory.graphql.GraphQLEngineFactory;
89
import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory;
910
import com.linkedin.gms.factory.kafka.SimpleKafkaConsumerFactory;
1011
import com.linkedin.gms.factory.kafka.trace.KafkaTraceReaderFactory;
12+
import com.linkedin.gms.factory.messaging.KafkaConsumerLagPort;
1113
import com.linkedin.gms.factory.telemetry.ScheduledAnalyticsFactory;
1214
import com.linkedin.gms.factory.trace.TraceServiceFactory;
1315
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -18,6 +20,9 @@
1820
/**
1921
* Configuration for general upgrades that includes most components but excludes some that are not
2022
* typically needed for upgrade operations.
23+
*
24+
* <p>Consumer lag ports and their trace-reader dependencies are excluded because the system-update
25+
* context excludes {@link KafkaTraceReaderFactory} and {@link TraceServiceFactory}.
2126
*/
2227
@Configuration
2328
@EnableAutoConfiguration
@@ -41,7 +46,9 @@
4146
KafkaTraceReaderFactory.class,
4247
TraceServiceFactory.class,
4348
KafkaConsumerPoolFactory.class,
44-
ExternalEventsServiceFactory.class
49+
KafkaExternalEventsPollHandlerConfiguration.class,
50+
ExternalEventsServiceFactory.class,
51+
KafkaConsumerLagPort.class
4552
})
4653
})
4754
public class GeneralUpgradeConfiguration {}

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/KafkaSetupConfig.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.linkedin.datahub.upgrade.system.BlockingSystemUpgrade;
55
import com.linkedin.datahub.upgrade.system.kafka.KafkaSetup;
66
import com.linkedin.gms.factory.config.ConfigurationProvider;
7+
import com.linkedin.metadata.config.messaging.KafkaMessagingEnabledCondition;
78
import io.datahubproject.metadata.context.OperationContext;
89
import org.springframework.beans.factory.annotation.Autowired;
910
import org.springframework.boot.kafka.autoconfigure.KafkaProperties;
@@ -13,13 +14,14 @@
1314
import org.springframework.core.annotation.Order;
1415

1516
@Configuration
16-
@Conditional(SystemUpdateCondition.BlockingSystemUpdateCondition.class)
17+
@Conditional(KafkaMessagingEnabledCondition.class)
1718
public class KafkaSetupConfig {
1819

1920
@Autowired private OperationContext opContext;
2021

21-
@Order(1) // This ensures it runs before BuildIndices (@Order(2))
22+
@Order(1) // Before pgSearch entity schema (when enabled) and BuildIndices
2223
@Bean(name = "kafkaSetup")
24+
@Conditional(SystemUpdateCondition.BlockingSystemUpdateCondition.class)
2325
public BlockingSystemUpgrade kafkaSetup(
2426
final ConfigurationProvider configurationProvider, KafkaProperties properties) {
2527
return new KafkaSetup(opContext, configurationProvider.getKafka(), properties);

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/OpenTelemetryConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import com.linkedin.gms.factory.config.ConfigurationProvider;
44
import com.linkedin.gms.factory.system_telemetry.OpenTelemetryBaseFactory;
5-
import com.linkedin.metadata.event.GenericProducer;
5+
import com.linkedin.metadata.event.UsageEventPublisher;
66
import com.linkedin.metadata.utils.metrics.MetricUtils;
77
import io.datahubproject.metadata.context.SystemTelemetryContext;
88
import javax.annotation.Nullable;
@@ -25,7 +25,7 @@ protected SystemTelemetryContext traceContext(
2525
MetricUtils metricUtils,
2626
ConfigurationProvider configurationProvider,
2727
@Autowired(required = false) @Qualifier("dataHubUsageEventProducer") @Nullable
28-
GenericProducer<String> dueProducer) {
29-
return super.traceContext(metricUtils, configurationProvider, dueProducer);
28+
UsageEventPublisher usageEventPublisher) {
29+
return super.traceContext(metricUtils, configurationProvider, usageEventPublisher);
3030
}
3131
}

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java

Lines changed: 2 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,10 @@
1212
import com.linkedin.entity.client.EntityClientConfig;
1313
import com.linkedin.entity.client.SystemEntityClient;
1414
import com.linkedin.gms.factory.config.ConfigurationProvider;
15-
import com.linkedin.gms.factory.kafka.DataHubKafkaProducerFactory;
16-
import com.linkedin.gms.factory.kafka.common.TopicConventionFactory;
1715
import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory;
1816
import com.linkedin.metadata.client.SystemJavaEntityClient;
1917
import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig;
2018
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
21-
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
22-
import com.linkedin.metadata.dao.producer.KafkaHealthChecker;
2319
import com.linkedin.metadata.dao.throttle.ThrottleSensor;
2420
import com.linkedin.metadata.entity.AspectDao;
2521
import com.linkedin.metadata.entity.DeleteEntityService;
@@ -35,7 +31,6 @@
3531
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
3632
import com.linkedin.metadata.utils.metrics.MetricUtils;
3733
import com.linkedin.metadata.version.GitVersion;
38-
import com.linkedin.mxe.TopicConvention;
3934
import java.util.Arrays;
4035
import java.util.HashSet;
4136
import java.util.List;
@@ -44,14 +39,9 @@
4439
import javax.annotation.Nonnull;
4540
import lombok.NonNull;
4641
import lombok.extern.slf4j.Slf4j;
47-
import org.apache.avro.generic.IndexedRecord;
48-
import org.apache.kafka.clients.producer.KafkaProducer;
49-
import org.apache.kafka.clients.producer.Producer;
50-
import org.springframework.beans.factory.annotation.Autowired;
5142
import org.springframework.beans.factory.annotation.Qualifier;
5243
import org.springframework.beans.factory.annotation.Value;
5344
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
54-
import org.springframework.boot.kafka.autoconfigure.KafkaProperties;
5545
import org.springframework.context.annotation.Bean;
5646
import org.springframework.context.annotation.Conditional;
5747
import org.springframework.context.annotation.Configuration;
@@ -142,52 +132,13 @@ public String getRevision() {
142132

143133
@Bean
144134
public DataHubStartupStep dataHubStartupStep(
145-
@Qualifier("duheKafkaEventProducer") final KafkaEventProducer kafkaEventProducer,
135+
@Qualifier("duheKafkaEventProducer") final EventProducer kafkaEventProducer,
146136
final GitVersion gitVersion,
147137
@Qualifier("revision") String revision) {
148138
return new DataHubStartupStep(
149139
kafkaEventProducer, String.format("%s-%s", gitVersion.getVersion(), revision));
150140
}
151141

152-
@Autowired
153-
@Qualifier(TopicConventionFactory.TOPIC_CONVENTION_BEAN)
154-
private TopicConvention topicConvention;
155-
156-
@Autowired private KafkaHealthChecker kafkaHealthChecker;
157-
158-
@Bean(name = "duheKafkaEventProducer")
159-
protected KafkaEventProducer duheKafkaEventProducer(
160-
@Qualifier("configurationProvider") ConfigurationProvider provider,
161-
KafkaProperties properties,
162-
@Qualifier("duheSchemaRegistryConfig")
163-
KafkaConfiguration.SerDeKeyValueConfig duheSchemaRegistryConfig,
164-
MetricUtils metricUtils) {
165-
KafkaConfiguration kafkaConfiguration = provider.getKafka();
166-
Producer<String, IndexedRecord> producer =
167-
new KafkaProducer<>(
168-
DataHubKafkaProducerFactory.buildProducerProperties(
169-
duheSchemaRegistryConfig, kafkaConfiguration, properties));
170-
return new KafkaEventProducer(producer, topicConvention, kafkaHealthChecker, metricUtils);
171-
}
172-
173-
/**
174-
* The ReindexDataJobViaNodesCLLConfig step requires publishing to MCL. Overriding the default
175-
* producer with this special producer which doesn't require an active registry.
176-
*
177-
* <p>Use when INTERNAL registry and is SYSTEM_UPDATE
178-
*
179-
* <p>This forces this producer into the EntityService
180-
*/
181-
@Primary
182-
@Bean(name = "kafkaEventProducer")
183-
@ConditionalOnProperty(
184-
name = "kafka.schemaRegistry.type",
185-
havingValue = InternalSchemaRegistryFactory.TYPE)
186-
protected KafkaEventProducer kafkaEventProducer(
187-
@Qualifier("duheKafkaEventProducer") KafkaEventProducer kafkaEventProducer) {
188-
return kafkaEventProducer;
189-
}
190-
191142
@Primary
192143
@Bean(name = "schemaRegistryConfig")
193144
@ConditionalOnProperty(
@@ -208,7 +159,7 @@ protected KafkaConfiguration.SerDeKeyValueConfig schemaRegistryConfig(
208159
@Conditional(SystemUpdateCondition.BlockingSystemUpdateCondition.class)
209160
@Nonnull
210161
protected EntityService<ChangeItemImpl> createEntityServiceWithSystemUpdateCDCMode(
211-
@Qualifier("kafkaEventProducer") final KafkaEventProducer eventProducer,
162+
@Qualifier("kafkaEventProducer") final EventProducer eventProducer,
212163
@Qualifier("entityAspectDao") final AspectDao aspectDao,
213164
@Qualifier("configurationProvider") ConfigurationProvider configurationProvider,
214165
@Value("${featureFlags.showBrowseV2}") final boolean enableBrowsePathV2,
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package com.linkedin.datahub.upgrade.config;
2+
3+
import com.linkedin.gms.factory.config.ConfigurationProvider;
4+
import com.linkedin.gms.factory.kafka.DataHubKafkaProducerFactory;
5+
import com.linkedin.gms.factory.kafka.common.TopicConventionFactory;
6+
import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory;
7+
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
8+
import com.linkedin.metadata.config.messaging.KafkaMessagingEnabledCondition;
9+
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
10+
import com.linkedin.metadata.dao.producer.KafkaHealthChecker;
11+
import com.linkedin.metadata.utils.metrics.MetricUtils;
12+
import com.linkedin.mxe.TopicConvention;
13+
import lombok.extern.slf4j.Slf4j;
14+
import org.apache.avro.generic.IndexedRecord;
15+
import org.apache.kafka.clients.producer.KafkaProducer;
16+
import org.apache.kafka.clients.producer.Producer;
17+
import org.springframework.beans.factory.annotation.Autowired;
18+
import org.springframework.beans.factory.annotation.Qualifier;
19+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
20+
import org.springframework.boot.kafka.autoconfigure.KafkaProperties;
21+
import org.springframework.context.annotation.Bean;
22+
import org.springframework.context.annotation.Conditional;
23+
import org.springframework.context.annotation.Configuration;
24+
import org.springframework.context.annotation.Primary;
25+
26+
@Slf4j
27+
@Configuration
28+
@Conditional(KafkaMessagingEnabledCondition.class)
29+
public class SystemUpdateKafkaMessagingConfig {
30+
31+
@Autowired
32+
@Qualifier(TopicConventionFactory.TOPIC_CONVENTION_BEAN)
33+
private TopicConvention topicConvention;
34+
35+
@Autowired private KafkaHealthChecker kafkaHealthChecker;
36+
37+
@Bean(name = "duheKafkaEventProducer")
38+
protected KafkaEventProducer duheKafkaEventProducer(
39+
@Qualifier("configurationProvider") ConfigurationProvider provider,
40+
KafkaProperties properties,
41+
@Qualifier("duheSchemaRegistryConfig")
42+
KafkaConfiguration.SerDeKeyValueConfig duheSchemaRegistryConfig,
43+
MetricUtils metricUtils) {
44+
KafkaConfiguration kafkaConfiguration = provider.getKafka();
45+
Producer<String, IndexedRecord> producer =
46+
new KafkaProducer<>(
47+
DataHubKafkaProducerFactory.buildProducerProperties(
48+
duheSchemaRegistryConfig, kafkaConfiguration, properties));
49+
return new KafkaEventProducer(producer, topicConvention, kafkaHealthChecker, metricUtils);
50+
}
51+
52+
/**
53+
* The ReindexDataJobViaNodesCLLConfig step requires publishing to MCL. Overriding the default
54+
* producer with this special producer which doesn't require an active registry.
55+
*
56+
* <p>Use when INTERNAL registry and is SYSTEM_UPDATE
57+
*
58+
* <p>This forces this producer into the EntityService
59+
*/
60+
@Primary
61+
@Bean(name = "kafkaEventProducer")
62+
@ConditionalOnProperty(
63+
name = "kafka.schemaRegistry.type",
64+
havingValue = InternalSchemaRegistryFactory.TYPE)
65+
protected KafkaEventProducer kafkaEventProducer(
66+
@Qualifier("duheKafkaEventProducer") KafkaEventProducer kafkaEventProducer) {
67+
return kafkaEventProducer;
68+
}
69+
}

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/elasticsearch/steps/DataHubStartupStep.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import com.linkedin.datahub.upgrade.UpgradeStep;
55
import com.linkedin.datahub.upgrade.UpgradeStepResult;
66
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
7-
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
7+
import com.linkedin.metadata.event.EventProducer;
88
import com.linkedin.mxe.DataHubUpgradeHistoryEvent;
99
import com.linkedin.upgrade.DataHubUpgradeState;
1010
import java.util.function.Function;
@@ -14,7 +14,7 @@
1414
@RequiredArgsConstructor
1515
@Slf4j
1616
public class DataHubStartupStep implements UpgradeStep {
17-
private final KafkaEventProducer _kafkaEventProducer;
17+
private final EventProducer _kafkaEventProducer;
1818
private final String _version;
1919

2020
@Override

0 commit comments

Comments
 (0)