diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java new file mode 100644 index 000000000..e1c682345 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java @@ -0,0 +1,65 @@ +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil; + +import java.util.Objects; +import java.util.Properties; + +/** Default implementation of {@link KafkaDatasetFacet}. */ +@PublicEvolving +public class DefaultKafkaDatasetFacet implements KafkaDatasetFacet { + + public static final String KAFKA_FACET_NAME = "kafka"; + + private Properties properties; + + private final KafkaDatasetIdentifier topicIdentifier; + + public DefaultKafkaDatasetFacet(KafkaDatasetIdentifier topicIdentifier, Properties properties) { + this(topicIdentifier); + + this.properties = new Properties(); + KafkaPropertiesUtil.copyProperties(properties, this.properties); + } + + public DefaultKafkaDatasetFacet(KafkaDatasetIdentifier topicIdentifier) { + this.topicIdentifier = topicIdentifier; + } + + public void setProperties(Properties properties) { + this.properties = new Properties(); + KafkaPropertiesUtil.copyProperties(properties, this.properties); + } + + public Properties getProperties() { + return properties; + } + + public KafkaDatasetIdentifier getTopicIdentifier() { + return topicIdentifier; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DefaultKafkaDatasetFacet that = (DefaultKafkaDatasetFacet) o; + return Objects.equals(properties, that.properties) + && Objects.equals(topicIdentifier, that.topicIdentifier); + } + + @Override + public int hashCode() { + return Objects.hash(properties, topicIdentifier); + } + + @Override + public String name() { + return KAFKA_FACET_NAME; + } +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetIdentifier.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetIdentifier.java new file mode 100644 index 000000000..cd97b7ff4 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetIdentifier.java @@ -0,0 +1,59 @@ +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Objects; +import java.util.regex.Pattern; + +/** Default implementation of {@link KafkaDatasetIdentifier}. */ +@PublicEvolving +public class DefaultKafkaDatasetIdentifier implements KafkaDatasetIdentifier { + + @Nullable private final List topics; + @Nullable private final Pattern topicPattern; + + private DefaultKafkaDatasetIdentifier( + @Nullable List fixedTopics, @Nullable Pattern topicPattern) { + this.topics = fixedTopics; + this.topicPattern = topicPattern; + } + + public static DefaultKafkaDatasetIdentifier ofPattern(Pattern pattern) { + return new DefaultKafkaDatasetIdentifier(null, pattern); + } + + public static DefaultKafkaDatasetIdentifier ofTopics(List fixedTopics) { + return new DefaultKafkaDatasetIdentifier(fixedTopics, null); + } + + @Nullable + public List getTopics() { + return topics; + } + + @Nullable + public Pattern getTopicPattern() { + return topicPattern; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DefaultKafkaDatasetIdentifier that = (DefaultKafkaDatasetIdentifier) o; + return Objects.equals(topics, that.topics) + && Objects.equals(topicPattern, that.topicPattern); + } + + @Override + public int hashCode() { + return Objects.hash(topics, topicPattern); + } +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultTypeDatasetFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultTypeDatasetFacet.java new file mode 100644 index 000000000..d9475d77a --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultTypeDatasetFacet.java @@ -0,0 +1,44 @@ +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.util.Objects; + +/** Default implementation of {@link KafkaDatasetFacet}. */ +@PublicEvolving +public class DefaultTypeDatasetFacet implements TypeDatasetFacet { + + public static final String TYPE_FACET_NAME = "type"; + + private final TypeInformation typeInformation; + + public DefaultTypeDatasetFacet(TypeInformation typeInformation) { + this.typeInformation = typeInformation; + } + + public TypeInformation getTypeInformation() { + return typeInformation; + } + + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DefaultTypeDatasetFacet that = (DefaultTypeDatasetFacet) o; + return Objects.equals(typeInformation, that.typeInformation); + } + + @Override + public int hashCode() { + return Objects.hash(typeInformation); + } + + @Override + public String name() { + return TYPE_FACET_NAME; + } +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java new file mode 100644 index 000000000..c0d3d0b73 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java @@ -0,0 +1,16 @@ +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; + +import java.util.Properties; + +/** Facet definition to contain all Kafka specific information on Kafka sources and sinks. */ +@PublicEvolving +public interface KafkaDatasetFacet extends LineageDatasetFacet { + Properties getProperties(); + + KafkaDatasetIdentifier getTopicIdentifier(); + + void setProperties(Properties properties); +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacetProvider.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacetProvider.java new file mode 100644 index 000000000..76fe41b82 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacetProvider.java @@ -0,0 +1,16 @@ +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Optional; + +/** Contains method to extract {@link KafkaDatasetFacet}. */ +@PublicEvolving +public interface KafkaDatasetFacetProvider { + + /** + * Returns a Kafka dataset facet or empty in case an implementing class is not able to identify + * a dataset. + */ + Optional getKafkaDatasetFacet(); +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifier.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifier.java new file mode 100644 index 000000000..19f7082e2 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifier.java @@ -0,0 +1,30 @@ +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Objects; +import java.util.regex.Pattern; + +/** Kafka dataset identifier which can contain either a list of topics or a topic pattern. */ +@PublicEvolving +public interface KafkaDatasetIdentifier { + @Nullable + List getTopics(); + + @Nullable + Pattern getTopicPattern(); + + /** + * Assigns lineage dataset's name which is topic pattern if it is present or comma separated + * list of topics. + */ + default String toLineageName() { + if (getTopicPattern() != null) { + return getTopicPattern().toString(); + } + return String.join(",", Objects.requireNonNull(getTopics())); + } +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifierProvider.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifierProvider.java new file mode 100644 index 000000000..1389fea58 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifierProvider.java @@ -0,0 +1,16 @@ +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Optional; + +/** Contains method which allows extracting topic identifier. */ +@PublicEvolving +public interface KafkaDatasetIdentifierProvider { + + /** + * Gets Kafka dataset identifier or empty in case a class implementing is not able to extract + * dataset identifier. + */ + Optional getDatasetIdentifier(); +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java new file mode 100644 index 000000000..086303e09 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + +/** Utility class with useful methods for managing lineage objects. */ +public class LineageUtil { + + private static final String KAFKA_DATASET_PREFIX = "kafka://"; + private static final String COMMA = ","; + private static final String SEMICOLON = ";"; + + public static LineageDataset datasetOf(String namespace, KafkaDatasetFacet kafkaDatasetFacet) { + return datasetOf(namespace, kafkaDatasetFacet, Collections.emptyList()); + } + + public static LineageDataset datasetOf( + String namespace, KafkaDatasetFacet kafkaDatasetFacet, TypeDatasetFacet typeFacet) { + return datasetOf(namespace, kafkaDatasetFacet, Collections.singletonList(typeFacet)); + } + + private static LineageDataset datasetOf( + String namespace, + KafkaDatasetFacet kafkaDatasetFacet, + List facets) { + return new LineageDataset() { + @Override + public String name() { + return kafkaDatasetFacet.getTopicIdentifier().toLineageName(); + } + + @Override + public String namespace() { + return namespace; + } + + @Override + public Map facets() { + Map facetMap = new HashMap<>(); + facetMap.put(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME, kafkaDatasetFacet); + facetMap.putAll( + facets.stream() + .collect( + Collectors.toMap(LineageDatasetFacet::name, item -> item))); + return facetMap; + } + }; + } + + public static String namespaceOf(Properties properties) { + String bootstrapServers = properties.getProperty("bootstrap.servers"); + + if (bootstrapServers == null) { + return KAFKA_DATASET_PREFIX; + } + + if (bootstrapServers.contains(COMMA)) { + bootstrapServers = bootstrapServers.split(COMMA)[0]; + } else if (bootstrapServers.contains(SEMICOLON)) { + bootstrapServers = bootstrapServers.split(SEMICOLON)[0]; + } + + return String.format(KAFKA_DATASET_PREFIX + bootstrapServers); + } + + public static SourceLineageVertex sourceLineageVertexOf(Collection datasets) { + return new SourceLineageVertex() { + @Override + public Boundedness boundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public List datasets() { + return datasets.stream().collect(Collectors.toList()); + } + }; + } + + public static LineageVertex lineageVertexOf(Collection datasets) { + return new LineageVertex() { + @Override + public List datasets() { + return datasets.stream().collect(Collectors.toList()); + } + }; + } +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacet.java new file mode 100644 index 000000000..1e64f5819 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacet.java @@ -0,0 +1,11 @@ +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; + +/** Facet definition to contain type information of source and sink. */ +@PublicEvolving +public interface TypeDatasetFacet extends LineageDatasetFacet { + TypeInformation getTypeInformation(); +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacetProvider.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacetProvider.java new file mode 100644 index 000000000..016a1bb84 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacetProvider.java @@ -0,0 +1,16 @@ +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Optional; + +/** Contains method to extract {@link TypeDatasetFacet}. */ +@PublicEvolving +public interface TypeDatasetFacetProvider { + + /** + * Returns a type dataset facet or `Optional.empty` in case an implementing class is not able to + * resolve type. + */ + Optional getTypeDatasetFacet(); +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java index 9d081c755..f56a7da54 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java @@ -29,7 +29,10 @@ /** * A serialization schema which defines how to convert a value of type {@code T} to {@link - * ProducerRecord}. + * ProducerRecord}. {@link org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider} can + * be implemented to provide Kafka specific lineage metadata, while {@link + * org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider} can be implemented to provide + * lineage metadata with type information. * * @param the type of values being serialized */ diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java index e9fc413b2..0fba3a364 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java @@ -19,16 +19,32 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet; +import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier; +import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider; +import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet; +import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider; +import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import com.google.common.reflect.TypeToken; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.Serializable; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.OptionalInt; import java.util.function.Function; @@ -79,6 +95,7 @@ */ @PublicEvolving public class KafkaRecordSerializationSchemaBuilder { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class); @Nullable private Function topicSelector; @Nullable private SerializationSchema valueSerializationSchema; @@ -122,7 +139,8 @@ public KafkaRecordSerializationSchemaBuilder setPartitioner( public KafkaRecordSerializationSchemaBuilder setTopic(String topic) { checkState(this.topicSelector == null, "Topic selector already set."); checkNotNull(topic); - this.topicSelector = new CachingTopicSelector<>((e) -> topic); + + this.topicSelector = new ConstantTopicSelector<>(topic); return this; } @@ -283,7 +301,29 @@ private void checkKeySerializerNotSet() { checkState(keySerializationSchema == null, "Key serializer already set."); } - private static class CachingTopicSelector implements Function, Serializable { + private static class ConstantTopicSelector + implements Function, Serializable, KafkaDatasetIdentifierProvider { + + private String topic; + + ConstantTopicSelector(String topic) { + this.topic = topic; + } + + @Override + public String apply(IN in) { + return topic; + } + + @Override + public Optional getDatasetIdentifier() { + return Optional.of( + DefaultKafkaDatasetIdentifier.ofTopics(Collections.singletonList(topic))); + } + } + + private static class CachingTopicSelector + implements Function, KafkaDatasetIdentifierProvider, Serializable { private static final int CACHE_RESET_SIZE = 5; private final Map cache; @@ -303,10 +343,21 @@ public String apply(IN in) { } return topic; } + + @Override + public Optional getDatasetIdentifier() { + if (topicSelector instanceof KafkaDatasetIdentifierProvider) { + return ((KafkaDatasetIdentifierProvider) topicSelector).getDatasetIdentifier(); + } else { + return Optional.empty(); + } + } } private static class KafkaRecordSerializationSchemaWrapper - implements KafkaRecordSerializationSchema { + implements KafkaDatasetFacetProvider, + KafkaRecordSerializationSchema, + TypeDatasetFacetProvider { private final SerializationSchema valueSerializationSchema; private final Function topicSelector; private final KafkaPartitioner partitioner; @@ -369,5 +420,46 @@ public ProducerRecord serialize( value, headerProvider != null ? headerProvider.getHeaders(element) : null); } + + @Override + public Optional getKafkaDatasetFacet() { + if (!(topicSelector instanceof KafkaDatasetIdentifierProvider)) { + LOG.info("Cannot identify topics. Not an TopicsIdentifierProvider"); + return Optional.empty(); + } + + Optional topicsIdentifier = + ((KafkaDatasetIdentifierProvider) (topicSelector)).getDatasetIdentifier(); + + if (!topicsIdentifier.isPresent()) { + LOG.info("No topics' identifiers provided"); + return Optional.empty(); + } + + return Optional.of(new DefaultKafkaDatasetFacet(topicsIdentifier.get())); + } + + @Override + public Optional getTypeDatasetFacet() { + if (this.valueSerializationSchema instanceof ResultTypeQueryable) { + return Optional.of( + new DefaultTypeDatasetFacet( + ((ResultTypeQueryable) this.valueSerializationSchema) + .getProducedType())); + } else { + // gets type information from serialize method signature + TypeToken serializationSchemaType = + TypeToken.of(valueSerializationSchema.getClass()); + Class parameterType = + serializationSchemaType + .resolveType(SerializationSchema.class.getTypeParameters()[0]) + .getRawType(); + if (parameterType != Object.class) { + return Optional.of( + new DefaultTypeDatasetFacet(TypeInformation.of(parameterType))); + } + } + return Optional.empty(); + } } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java index d5b1c3700..d3d3c89df 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java @@ -22,11 +22,22 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider; +import org.apache.flink.connector.kafka.lineage.LineageUtil; +import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet; +import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.Optional; import java.util.Properties; /** @@ -54,8 +65,9 @@ */ @PublicEvolving public class KafkaSink - implements TwoPhaseCommittingStatefulSink { - + implements LineageVertexProvider, + TwoPhaseCommittingStatefulSink { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class); private final DeliveryGuarantee deliveryGuarantee; private final KafkaRecordSerializationSchema recordSerializer; @@ -132,4 +144,42 @@ public SimpleVersionedSerializer getWriterStateSerializer() { protected Properties getKafkaProducerConfig() { return kafkaProducerConfig; } + + @Override + public LineageVertex getLineageVertex() { + // enrich dataset facet with properties + Optional kafkaDatasetFacet; + if (recordSerializer instanceof KafkaDatasetFacetProvider) { + kafkaDatasetFacet = + ((KafkaDatasetFacetProvider) recordSerializer).getKafkaDatasetFacet(); + + if (!kafkaDatasetFacet.isPresent()) { + LOG.info("Provider did not return kafka dataset facet"); + return LineageUtil.sourceLineageVertexOf(Collections.emptyList()); + } + kafkaDatasetFacet.get().setProperties(this.kafkaProducerConfig); + } else { + LOG.info( + "recordSerializer does not implement KafkaDatasetFacetProvider: {}", + recordSerializer); + return LineageUtil.sourceLineageVertexOf(Collections.emptyList()); + } + + String namespace = LineageUtil.namespaceOf(kafkaProducerConfig); + + Optional typeDatasetFacet = Optional.empty(); + if (recordSerializer instanceof TypeDatasetFacetProvider) { + typeDatasetFacet = ((TypeDatasetFacetProvider) recordSerializer).getTypeDatasetFacet(); + } + + if (typeDatasetFacet.isPresent()) { + return LineageUtil.sourceLineageVertexOf( + Collections.singleton( + LineageUtil.datasetOf( + namespace, kafkaDatasetFacet.get(), typeDatasetFacet.get()))); + } + + return LineageUtil.sourceLineageVertexOf( + Collections.singleton(LineageUtil.datasetOf(namespace, kafkaDatasetFacet.get()))); + } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java index 54f5f856c..39302751c 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java @@ -33,6 +33,11 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet; +import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier; +import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider; +import org.apache.flink.connector.kafka.lineage.LineageUtil; import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState; import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer; import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator; @@ -48,15 +53,20 @@ import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; import org.apache.flink.util.UserCodeClassLoader; import org.apache.flink.util.function.SerializableSupplier; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.Optional; import java.util.Properties; import java.util.function.Consumer; @@ -87,8 +97,10 @@ */ @PublicEvolving public class KafkaSource - implements Source, + implements LineageVertexProvider, + Source, ResultTypeQueryable { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class); private static final long serialVersionUID = -8755372893283732098L; // Users can choose only one of the following ways to specify the topics to consume from. private final KafkaSubscriber subscriber; @@ -251,4 +263,31 @@ KafkaSubscriber getKafkaSubscriber() { OffsetsInitializer getStoppingOffsetsInitializer() { return stoppingOffsetsInitializer; } + + @Override + public SourceLineageVertex getLineageVertex() { + if (!(subscriber instanceof KafkaDatasetIdentifierProvider)) { + LOG.info("unable to determine topic identifier"); + return LineageUtil.sourceLineageVertexOf(Collections.emptyList()); + } + + Optional topicsIdentifier = + ((KafkaDatasetIdentifierProvider) subscriber).getDatasetIdentifier(); + + if (!topicsIdentifier.isPresent()) { + LOG.info("No topics' identifier returned from subscriber"); + return LineageUtil.sourceLineageVertexOf(Collections.emptyList()); + } + + DefaultKafkaDatasetFacet kafkaDatasetFacet = + new DefaultKafkaDatasetFacet(topicsIdentifier.get(), props); + + String namespace = LineageUtil.namespaceOf(props); + return LineageUtil.sourceLineageVertexOf( + Collections.singletonList( + LineageUtil.datasetOf( + namespace, + kafkaDatasetFacet, + new DefaultTypeDatasetFacet(getProducedType())))); + } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java index 1b819fb23..37de884af 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java @@ -39,6 +39,10 @@ * *

The KafkaSubscriber provides a unified interface for the Kafka source to support all these * three types of subscribing mode. + * + *

When implementing a subscriber, {@link + * org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider} can be implemented to + * provide lineage metadata with source topics. */ @PublicEvolving public interface KafkaSubscriber extends Serializable { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java index 3423b0f90..9cd50fb20 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java @@ -18,6 +18,9 @@ package org.apache.flink.connector.kafka.source.enumerator.subscriber; +import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider; + import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.TopicPartition; @@ -26,13 +29,14 @@ import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata; /** A subscriber for a partition set. */ -class PartitionSetSubscriber implements KafkaSubscriber { +class PartitionSetSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierProvider { private static final long serialVersionUID = 390970375272146036L; private static final Logger LOG = LoggerFactory.getLogger(PartitionSetSubscriber.class); private final Set subscribedPartitions; @@ -73,4 +77,14 @@ && partitionExistsInTopic( private boolean partitionExistsInTopic(TopicPartition partition, TopicDescription topic) { return topic.partitions().size() > partition.partition(); } + + @Override + public Optional getDatasetIdentifier() { + return Optional.of( + DefaultKafkaDatasetIdentifier.ofTopics( + subscribedPartitions.stream() + .map(TopicPartition::topic) + .distinct() + .collect(Collectors.toList()))); + } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java index b2ad844ab..e86ade0fa 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java @@ -18,6 +18,9 @@ package org.apache.flink.connector.kafka.source.enumerator.subscriber; +import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider; + import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.TopicPartition; @@ -28,6 +31,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata; @@ -36,7 +40,7 @@ * A subscriber to a fixed list of topics. The subscribed topics must have existed in the Kafka * cluster, otherwise an exception will be thrown. */ -class TopicListSubscriber implements KafkaSubscriber { +class TopicListSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierProvider { private static final long serialVersionUID = -6917603843104947866L; private static final Logger LOG = LoggerFactory.getLogger(TopicListSubscriber.class); private final List topics; @@ -60,4 +64,9 @@ public Set getSubscribedTopicPartitions(AdminClient adminClient) return subscribedPartitions; } + + @Override + public Optional getDatasetIdentifier() { + return Optional.of(DefaultKafkaDatasetIdentifier.ofTopics(topics)); + } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java index 985ca7137..208959e27 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java @@ -18,6 +18,9 @@ package org.apache.flink.connector.kafka.source.enumerator.subscriber; +import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider; + import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.TopicPartition; @@ -27,13 +30,14 @@ import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.regex.Pattern; import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata; /** A subscriber to a topic pattern. */ -class TopicPatternSubscriber implements KafkaSubscriber { +class TopicPatternSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierProvider { private static final long serialVersionUID = -7471048577725467797L; private static final Logger LOG = LoggerFactory.getLogger(TopicPatternSubscriber.class); private final Pattern topicPattern; @@ -60,4 +64,9 @@ public Set getSubscribedTopicPartitions(AdminClient adminClient) return subscribedTopicPartitions; } + + @Override + public Optional getDatasetIdentifier() { + return Optional.of(DefaultKafkaDatasetIdentifier.ofPattern(topicPattern)); + } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/lineage/LineageUtilTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/lineage/LineageUtilTest.java new file mode 100644 index 000000000..869399896 --- /dev/null +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/lineage/LineageUtilTest.java @@ -0,0 +1,74 @@ +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Map; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link LineageUtil}. */ +public class LineageUtilTest { + @Test + public void testSourceLineageVertexOf() { + LineageDataset dataset = new TestingLineageDataset(); + SourceLineageVertex sourceLineageVertex = + LineageUtil.sourceLineageVertexOf(Collections.singletonList(dataset)); + + assertThat(sourceLineageVertex.boundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED); + assertThat(sourceLineageVertex.datasets()).containsExactly(dataset); + } + + @Test + public void testDatasetNamespaceOf() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "my-kafka-host"); + + assertThat(LineageUtil.namespaceOf(properties)).isEqualTo("kafka://my-kafka-host"); + } + + @Test + public void testDatasetNamespaceOfWithSemicolon() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "my-kafka-host1;my-kafka-host2"); + + assertThat(LineageUtil.namespaceOf(properties)).isEqualTo("kafka://my-kafka-host1"); + } + + @Test + public void testDatasetNamespaceOfWithComma() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "my-kafka-host1,my-kafka-host2"); + + assertThat(LineageUtil.namespaceOf(properties)).isEqualTo("kafka://my-kafka-host1"); + } + + @Test + public void testDatasetNamespaceWhenNoBootstrapServersProperty() { + Properties properties = new Properties(); + assertThat(LineageUtil.namespaceOf(properties)).isEqualTo("kafka://"); + } + + private static class TestingLineageDataset implements LineageDataset { + @Override + public String name() { + return null; + } + + @Override + public String namespace() { + return null; + } + + @Override + public Map facets() { + return null; + } + } +} diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java index 701f9c8aa..4d1437288 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java @@ -19,6 +19,15 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider; +import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet; +import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider; import org.apache.flink.connector.testutils.formats.DummyInitializationContext; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.util.TestLogger; @@ -31,6 +40,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.Before; import org.junit.Test; @@ -40,6 +50,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -53,6 +64,13 @@ public class KafkaRecordSerializationSchemaBuilderTest extends TestLogger { private static Map configurableConfiguration; private static Map configuration; + + private interface TestingTopicSelector + extends TopicSelector, KafkaDatasetIdentifierProvider {} + + private interface SerializationSchemaWithResultQueryable + extends SerializationSchema, ResultTypeQueryable {} + private static boolean isKeySerializer; @Before @@ -256,6 +274,134 @@ public void testSerializeRecordWithTimestamp() { assertThat(recordWithInvalidTimestamp.timestamp()).isNull(); } + @Test + public void testGetLineageDatasetFacetsWhenTopicSelectorNotKafkaTopicsIdentifierProvider() { + SerializationSchema serializationSchema = new SimpleStringSchema(); + KafkaRecordSerializationSchema schema = + KafkaRecordSerializationSchema.builder() + .setTopicSelector((TopicSelector) o -> DEFAULT_TOPIC) + .setValueSerializationSchema(serializationSchema) + .setKeySerializationSchema(serializationSchema) + .build(); + + assertThat(schema) + .asInstanceOf(InstanceOfAssertFactories.type(KafkaDatasetFacetProvider.class)) + .returns(Optional.empty(), KafkaDatasetFacetProvider::getKafkaDatasetFacet); + } + + @Test + public void testGetLineageDatasetFacetsWhenNoTopicsIdentifiersFound() { + SerializationSchema serializationSchema = new SimpleStringSchema(); + KafkaRecordSerializationSchema schema = + KafkaRecordSerializationSchema.builder() + .setTopicSelector( + new TestingTopicSelector() { + @Override + public Optional + getDatasetIdentifier() { + return Optional.empty(); + } + + @Override + public String apply(Object o) { + return DEFAULT_TOPIC; + } + }) + .setValueSerializationSchema(serializationSchema) + .setKeySerializationSchema(serializationSchema) + .build(); + assertThat(schema) + .asInstanceOf(InstanceOfAssertFactories.type(KafkaDatasetFacetProvider.class)) + .returns(Optional.empty(), KafkaDatasetFacetProvider::getKafkaDatasetFacet); + } + + @Test + public void testGetLineageDatasetFacetsValueSerializationSchemaIsResultTypeQueryable() { + TypeInformation stringTypeInformation = TypeInformation.of(String.class); + SerializationSchemaWithResultQueryable serializationSchema = + new SerializationSchemaWithResultQueryable() { + + @Override + public TypeInformation getProducedType() { + return stringTypeInformation; + } + + @Override + public byte[] serialize(String o) { + return new byte[0]; + } + }; + + KafkaRecordSerializationSchema schema = + KafkaRecordSerializationSchema.builder() + .setTopicSelector( + new TestingTopicSelector() { + @Override + public Optional + getDatasetIdentifier() { + return Optional.of( + DefaultKafkaDatasetIdentifier.ofTopics( + Arrays.asList("topic1", "topic2"))); + } + + @Override + public String apply(Object o) { + return DEFAULT_TOPIC; + } + }) + .setValueSerializationSchema(serializationSchema) + .setKeySerializationSchema(serializationSchema) + .build(); + + Optional kafkaDatasetFacet = + ((KafkaDatasetFacetProvider) schema).getKafkaDatasetFacet(); + + assertThat(kafkaDatasetFacet).isPresent(); + assertThat(kafkaDatasetFacet.get().getTopicIdentifier().getTopics()) + .containsExactly("topic1", "topic2"); + assertThat(((TypeDatasetFacetProvider) schema).getTypeDatasetFacet()) + .isPresent() + .get() + .extracting(TypeDatasetFacet::getTypeInformation) + .isEqualTo(stringTypeInformation); + } + + @Test + public void testGetLineageDatasetFacets() { + KafkaRecordSerializationSchema schema = + KafkaRecordSerializationSchema.builder() + .setTopicSelector( + new TestingTopicSelector() { + @Override + public Optional + getDatasetIdentifier() { + return Optional.of( + DefaultKafkaDatasetIdentifier.ofTopics( + Arrays.asList("topic1", "topic2"))); + } + + @Override + public String apply(Object o) { + return DEFAULT_TOPIC; + } + }) + .setValueSerializationSchema(new SimpleStringSchema()) + .setKeySerializationSchema(new SimpleStringSchema()) + .build(); + + Optional kafkaDatasetFacet = + ((KafkaDatasetFacetProvider) schema).getKafkaDatasetFacet(); + + assertThat(kafkaDatasetFacet).isPresent(); + assertThat(kafkaDatasetFacet.get().getTopicIdentifier().getTopics()) + .containsExactly("topic1", "topic2"); + assertThat(((TypeDatasetFacetProvider) schema).getTypeDatasetFacet()) + .isPresent() + .get() + .extracting(TypeDatasetFacet::getTypeInformation) + .isEqualTo(BasicTypeInfo.STRING_TYPE_INFO); + } + private static void assertOnlyOneSerializerAllowed( List< Function< diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java new file mode 100644 index 000000000..1efb6ec7d --- /dev/null +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java @@ -0,0 +1,144 @@ +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet; +import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier; +import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider; +import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet; +import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider; +import org.apache.flink.streaming.api.lineage.LineageVertex; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Optional; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link KafkaSink}. */ +public class KafkaSinkTest { + + Properties kafkaProperties; + + @BeforeEach + void setup() { + kafkaProperties = new Properties(); + kafkaProperties.put("bootstrap.servers", "host1;host2"); + } + + @Test + public void testGetLineageVertexWhenSerializerNotAnKafkaDatasetFacetProvider() { + KafkaRecordSerializationSchema recordSerializer = + new KafkaRecordSerializationSchemaWithoutKafkaDatasetProvider(); + KafkaSink sink = + new KafkaSink( + DeliveryGuarantee.EXACTLY_ONCE, new Properties(), "", recordSerializer); + + assertThat(sink.getLineageVertex().datasets()).isEmpty(); + } + + @Test + public void testGetLineageVertexWhenNoKafkaDatasetFacetReturnedFromSerializer() { + KafkaRecordSerializationSchema recordSerializer = + new KafkaRecordSerializationSchemaWithEmptyKafkaDatasetProvider(); + + KafkaSink sink = + new KafkaSink( + DeliveryGuarantee.EXACTLY_ONCE, new Properties(), "", recordSerializer); + + assertThat(sink.getLineageVertex().datasets()).isEmpty(); + } + + @Test + public void testGetLineageVertex() { + KafkaRecordSerializationSchema recordSerializer = + new TestingKafkaRecordSerializationSchema(); + + KafkaSink sink = + new KafkaSink( + DeliveryGuarantee.EXACTLY_ONCE, kafkaProperties, "", recordSerializer); + + LineageVertex lineageVertex = sink.getLineageVertex(); + + assertThat(lineageVertex.datasets().get(0).namespace()).isEqualTo("kafka://host1"); + assertThat(lineageVertex.datasets().get(0).name()).isEqualTo("topic1"); + + assertThat( + lineageVertex + .datasets() + .get(0) + .facets() + .get(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME)) + .hasFieldOrPropertyWithValue("properties", kafkaProperties) + .hasFieldOrPropertyWithValue( + "topicIdentifier", + DefaultKafkaDatasetIdentifier.ofTopics( + Collections.singletonList("topic1"))); + + assertThat( + lineageVertex + .datasets() + .get(0) + .facets() + .get(DefaultTypeDatasetFacet.TYPE_FACET_NAME)) + .hasFieldOrPropertyWithValue("typeInformation", TypeInformation.of(String.class)); + } + + private static class KafkaRecordSerializationSchemaWithoutKafkaDatasetProvider + implements KafkaRecordSerializationSchema { + @Nullable + @Override + public ProducerRecord serialize( + Object element, KafkaSinkContext context, Long timestamp) { + return null; + } + } + + private static class KafkaRecordSerializationSchemaWithEmptyKafkaDatasetProvider + implements KafkaRecordSerializationSchema, KafkaDatasetFacetProvider { + @Nullable + @Override + public ProducerRecord serialize( + Object element, KafkaSinkContext context, Long timestamp) { + return null; + } + + @Override + public Optional getKafkaDatasetFacet() { + return Optional.empty(); + } + } + + private static class TestingKafkaRecordSerializationSchema + implements KafkaRecordSerializationSchema, + KafkaDatasetFacetProvider, + TypeDatasetFacetProvider { + + @Override + public Optional getKafkaDatasetFacet() { + return Optional.of( + new DefaultKafkaDatasetFacet( + DefaultKafkaDatasetIdentifier.ofTopics( + Collections.singletonList("topic1")))); + } + + @Nullable + @Override + public ProducerRecord serialize( + Object element, KafkaSinkContext context, Long timestamp) { + return null; + } + + @Override + public Optional getTypeDatasetFacet() { + return Optional.of(new DefaultTypeDatasetFacet(TypeInformation.of(String.class))); + } + } +} diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java new file mode 100644 index 000000000..259668c5d --- /dev/null +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.source; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet; +import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier; +import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.util.Collector; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link KafkaSource}. */ +public class KafkaSourceTest { + Properties kafkaProperties; + + @BeforeEach + void setup() { + kafkaProperties = new Properties(); + kafkaProperties.put("bootstrap.servers", "host1;host2"); + } + + @Test + public void testGetLineageVertexWhenSubscriberNotAnKafkaDatasetFacetProvider() { + KafkaSource source = + new KafkaSourceBuilder() + .setKafkaSubscriber( + new KafkaSubscriber() { + @Override + public Set getSubscribedTopicPartitions( + AdminClient adminClient) { + return null; + } + }) + .setProperties(kafkaProperties) + .setGroupId("") + .setDeserializer( + new KafkaRecordDeserializationSchema() { + @Override + public TypeInformation getProducedType() { + return null; + } + + @Override + public void deserialize( + ConsumerRecord record, + Collector out) + throws IOException {} + }) + .setUnbounded(OffsetsInitializer.committedOffsets()) + .build(); + + assertThat(source.getLineageVertex()) + .extracting(LineageVertex::datasets) + .asList() + .isEmpty(); + } + + @Test + public void testGetLineageVertexWhenNoKafkaTopicsIdentifier() { + KafkaSource source = + new KafkaSourceBuilder() + .setKafkaSubscriber( + new TestingKafkaSubscriber() { + @Override + public Optional + getDatasetIdentifier() { + return Optional.empty(); + } + }) + .setProperties(kafkaProperties) + .setGroupId("") + .setDeserializer( + new KafkaRecordDeserializationSchema() { + @Override + public void deserialize( + ConsumerRecord record, + Collector out) + throws IOException {} + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(String.class); + } + }) + .setUnbounded(OffsetsInitializer.committedOffsets()) + .build(); + assertThat(source.getLineageVertex()) + .extracting(LineageVertex::datasets) + .asList() + .isEmpty(); + } + + @Test + public void testGetLineageVertex() { + TypeInformation typeInformation = TypeInformation.of(String.class); + KafkaSource source = + new KafkaSourceBuilder() + .setKafkaSubscriber(new TestingKafkaSubscriber()) + .setProperties(kafkaProperties) + .setGroupId("") + .setDeserializer( + new KafkaRecordDeserializationSchema() { + @Override + public void deserialize( + ConsumerRecord record, + Collector out) + throws IOException {} + + @Override + public TypeInformation getProducedType() { + return typeInformation; + } + }) + .setUnbounded(OffsetsInitializer.committedOffsets()) + .build(); + + LineageVertex lineageVertex = source.getLineageVertex(); + assertThat(lineageVertex.datasets()).hasSize(1); + LineageDataset dataset = lineageVertex.datasets().get(0); + + assertThat(dataset.namespace()).isEqualTo("kafka://host1"); + assertThat(dataset.name()).isEqualTo("topic1"); + + assertThat(dataset.facets()).containsKey(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME); + DefaultKafkaDatasetFacet kafkaFacet = + (DefaultKafkaDatasetFacet) + dataset.facets().get(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME); + + assertThat(kafkaFacet.getProperties()).containsEntry("bootstrap.servers", "host1;host2"); + + assertThat(dataset.facets()).containsKey(DefaultTypeDatasetFacet.TYPE_FACET_NAME); + assertThat(dataset.facets().get(DefaultTypeDatasetFacet.TYPE_FACET_NAME)) + .hasFieldOrPropertyWithValue("typeInformation", TypeInformation.of(String.class)); + } + + private static class TestingKafkaSubscriber + implements KafkaSubscriber, KafkaDatasetIdentifierProvider { + @Override + public Optional getDatasetIdentifier() { + return Optional.of( + DefaultKafkaDatasetIdentifier.ofTopics(Collections.singletonList("topic1"))); + } + + @Override + public Set getSubscribedTopicPartitions(AdminClient adminClient) { + return null; + } + } +} diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java index 258c1c0ab..4c5a50243 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java @@ -18,6 +18,8 @@ package org.apache.flink.connector.kafka.source.enumerator.subscriber; +import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider; import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv; import org.apache.kafka.clients.admin.AdminClient; @@ -71,6 +73,8 @@ public void testTopicListSubscriber() { new HashSet<>(KafkaSourceTestEnv.getPartitionsForTopics(topics)); assertThat(subscribedPartitions).isEqualTo(expectedSubscribedPartitions); + assertThat(((KafkaDatasetIdentifierProvider) subscriber).getDatasetIdentifier().get()) + .isEqualTo(DefaultKafkaDatasetIdentifier.ofTopics(topics)); } @Test @@ -86,8 +90,8 @@ public void testNonExistingTopic() { @Test public void testTopicPatternSubscriber() { - KafkaSubscriber subscriber = - KafkaSubscriber.getTopicPatternSubscriber(Pattern.compile("pattern.*")); + Pattern pattern = Pattern.compile("pattern.*"); + KafkaSubscriber subscriber = KafkaSubscriber.getTopicPatternSubscriber(pattern); final Set subscribedPartitions = subscriber.getSubscribedTopicPartitions(adminClient); @@ -96,6 +100,8 @@ public void testTopicPatternSubscriber() { KafkaSourceTestEnv.getPartitionsForTopics(Collections.singleton(TOPIC2))); assertThat(subscribedPartitions).isEqualTo(expectedSubscribedPartitions); + assertThat(((KafkaDatasetIdentifierProvider) subscriber).getDatasetIdentifier().get()) + .isEqualTo(DefaultKafkaDatasetIdentifier.ofPattern(pattern)); } @Test @@ -111,6 +117,8 @@ public void testPartitionSetSubscriber() { subscriber.getSubscribedTopicPartitions(adminClient); assertThat(subscribedPartitions).isEqualTo(partitions); + assertThat(((KafkaDatasetIdentifierProvider) subscriber).getDatasetIdentifier().get()) + .isEqualTo(DefaultKafkaDatasetIdentifier.ofTopics(topics)); } @Test