diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacetProvider.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacetProvider.java new file mode 100644 index 000000000..0ccc2aeb0 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacetProvider.java @@ -0,0 +1,16 @@ +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet; + +import java.util.Optional; + +/** Contains method to extract {@link KafkaDatasetFacet}. */ +public interface KafkaDatasetFacetProvider { + + /** + * List of lineage dataset facets. + * + * @return + */ + Optional getKafkaDatasetFacet(); +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifierProvider.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifierProvider.java new file mode 100644 index 000000000..7afe037cd --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifierProvider.java @@ -0,0 +1,16 @@ +package org.apache.flink.connector.kafka.lineage; + +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet.KafkaDatasetIdentifier; + +import java.util.Optional; + +/** Contains method which allows extracting topic identifier. */ +public interface KafkaDatasetIdentifierProvider { + + /** + * List of lineage dataset facets. + * + * @return + */ + Optional getDatasetIdentifier(); +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageFacetProvider.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageFacetProvider.java deleted file mode 100644 index 24ea3a1d8..000000000 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageFacetProvider.java +++ /dev/null @@ -1,19 +0,0 @@ -package org.apache.flink.connector.kafka.lineage; - -import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; - -import java.util.Collection; - -/** - * Contains method which can be used for lineage schema facet extraction. Useful for classes like - * topic selectors or serialization schemas to extract dataset information from. - */ -public interface LineageFacetProvider { - - /** - * List of lineage dataset facets. - * - * @return - */ - Collection getDatasetFacets(); -} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java index c526e83cf..bcf4d4c40 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java @@ -20,19 +20,16 @@ package org.apache.flink.connector.kafka.lineage; import org.apache.flink.api.connector.source.Boundedness; -import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicListFacet; -import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicPatternFacet; +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet; import org.apache.flink.streaming.api.lineage.LineageDataset; import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; import org.apache.flink.streaming.api.lineage.LineageVertex; import org.apache.flink.streaming.api.lineage.SourceLineageVertex; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Properties; import java.util.stream.Collectors; @@ -43,66 +40,15 @@ public class LineageUtil { private static final String COMMA = ","; private static final String SEMICOLON = ";"; - /** - * Loads facet from any object implementing @link{DatasetFacetProvider} interface. - * - * @param object - * @return - */ - public static Collection facetsFrom(Object object) { - return Optional.of(object) - .filter(LineageFacetProvider.class::isInstance) - .map(LineageFacetProvider.class::cast) - .map(LineageFacetProvider::getDatasetFacets) - .orElse(Collections.emptyList()); - } - - /** - * Creates dataset from a list of facets. Uses {@link KafkaTopicListFacet} to extract dataset - * name from. Dataset per each element of topic list is created - * - * @param facets - * @return - */ - public static Collection datasetsFrom( - String namespace, Collection facets) { - // Check if topic list facet is available -> if so explode the list of facets - Optional topicList = - facets.stream() - .filter(KafkaTopicListFacet.class::isInstance) - .map(KafkaTopicListFacet.class::cast) - .findAny(); - - List datasets = new ArrayList<>(); - - // Explode list of other facets - if (topicList.isPresent()) { - List facetsWithoutTopicList = - facets.stream().filter(f -> !f.equals(topicList)).collect(Collectors.toList()); - - datasets.addAll( - topicList.get().topics.stream() - .map(t -> datasetOf(namespace, t, facetsWithoutTopicList)) - .collect(Collectors.toList())); - } - - // Check if topic pattern is present - // If so topic pattern will be used as a dataset name - datasets.addAll( - facets.stream() - .filter(KafkaTopicPatternFacet.class::isInstance) - .map(KafkaTopicPatternFacet.class::cast) - .map(f -> datasetOf(namespace, f.pattern.toString(), facets)) - .collect(Collectors.toList())); - return datasets; - } - - private static LineageDataset datasetOf( - String namespace, String name, Collection facets) { + public static LineageDataset datasetOf(String namespace, KafkaDatasetFacet kafkaDatasetFacet) { return new LineageDataset() { @Override public String name() { - return name; + if (kafkaDatasetFacet.topicIdentifier.topicPattern != null) { + return kafkaDatasetFacet.topicIdentifier.toString(); + } + + return String.join(",", kafkaDatasetFacet.topicIdentifier.topics); } @Override @@ -112,16 +58,19 @@ public String namespace() { @Override public Map facets() { - return facets.stream() - .distinct() - .collect(Collectors.toMap(LineageDatasetFacet::name, item -> item)); + return Collections.singletonMap( + KafkaDatasetFacet.KAFKA_FACET_NAME, kafkaDatasetFacet); } }; } - public static String datasetNamespaceOf(Properties properties) { + public static String namespaceOf(Properties properties) { String bootstrapServers = properties.getProperty("bootstrap.servers"); + if (bootstrapServers == null) { + return KAFKA_DATASET_PREFIX; + } + if (bootstrapServers.contains(COMMA)) { bootstrapServers = bootstrapServers.split(COMMA)[0]; } else if (bootstrapServers.contains(SEMICOLON)) { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaDatasetFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaDatasetFacet.java new file mode 100644 index 000000000..21a8a7f49 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaDatasetFacet.java @@ -0,0 +1,97 @@ +package org.apache.flink.connector.kafka.lineage.facets; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.regex.Pattern; + +/** Facet containing all information related to sources and sinks on Kafka. */ +public class KafkaDatasetFacet implements LineageDatasetFacet { + + public static final String KAFKA_FACET_NAME = "kafka"; + + public final Properties properties; + public final TypeInformation typeInformation; + public final KafkaDatasetIdentifier topicIdentifier; + + public KafkaDatasetFacet( + KafkaDatasetIdentifier topicIdentifier, + Properties properties, + TypeInformation typeInformation) { + this.topicIdentifier = topicIdentifier; + this.properties = properties; + this.typeInformation = typeInformation; + } + + public void addProperties(Properties properties) { + this.properties.putAll(properties); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KafkaDatasetFacet that = (KafkaDatasetFacet) o; + return Objects.equals(properties, that.properties) + && Objects.equals(typeInformation, that.typeInformation) + && Objects.equals(topicIdentifier, that.topicIdentifier); + } + + @Override + public int hashCode() { + return Objects.hash(properties, typeInformation, topicIdentifier); + } + + @Override + public String name() { + return KAFKA_FACET_NAME; + } + + /** + * Record class to contain topics' identifier information which can be either a list of topics + * or a topic pattern. + */ + public static class KafkaDatasetIdentifier { + public final List topics; + public final Pattern topicPattern; + + public KafkaDatasetIdentifier(List fixedTopics, Pattern topicPattern) { + this.topics = fixedTopics; + this.topicPattern = topicPattern; + } + + public static KafkaDatasetIdentifier of(Pattern pattern) { + return new KafkaDatasetIdentifier(Collections.emptyList(), pattern); + } + + public static KafkaDatasetIdentifier of(List fixedTopics) { + return new KafkaDatasetIdentifier(fixedTopics, null); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KafkaDatasetIdentifier that = (KafkaDatasetIdentifier) o; + return Objects.equals(topics, that.topics) + && Objects.equals(topicPattern, that.topicPattern); + } + + @Override + public int hashCode() { + return Objects.hash(topics, topicPattern); + } + } +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaPropertiesFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaPropertiesFacet.java deleted file mode 100644 index 29b9a0687..000000000 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaPropertiesFacet.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.kafka.lineage.facets; - -import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; - -import com.google.common.collect.ImmutableMap; - -import java.util.Objects; -import java.util.Properties; - -/** Facet containing Kafka properties. */ -public class KafkaPropertiesFacet implements LineageDatasetFacet { - - public static final String KAFKA_PROPERTIES_FACET_NAME = "kafkaProperties"; - public Properties properties; - - public KafkaPropertiesFacet(Properties properties) { - this.properties = new Properties(); - this.properties.putAll(ImmutableMap.copyOf(properties)); - } - - @Override - public String name() { - return KAFKA_PROPERTIES_FACET_NAME; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - KafkaPropertiesFacet that = (KafkaPropertiesFacet) o; - return Objects.equals(properties, that.properties); - } - - @Override - public int hashCode() { - return Objects.hash(properties); - } -} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaTopicListFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaTopicListFacet.java deleted file mode 100644 index 1121673e1..000000000 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaTopicListFacet.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.kafka.lineage.facets; - -import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; - -import java.util.List; -import java.util.Objects; - -/** - * Facet containing TypeInformation object. Can be used as an intermediate step for evaluating topic - * involved in data processing. - */ -public class KafkaTopicListFacet implements LineageDatasetFacet { - - public static final String TOPIC_LIST_FACET_NAME = "topicList"; - public List topics; - - public KafkaTopicListFacet(List topics) { - this.topics = topics; - } - - @Override - public String name() { - return TOPIC_LIST_FACET_NAME; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - KafkaTopicListFacet that = (KafkaTopicListFacet) o; - return Objects.equals(topics, that.topics); - } - - @Override - public int hashCode() { - return Objects.hash(topics); - } -} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaTopicPatternFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaTopicPatternFacet.java deleted file mode 100644 index c43212500..000000000 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaTopicPatternFacet.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.kafka.lineage.facets; - -import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; - -import java.util.Objects; -import java.util.regex.Pattern; - -/** - * Facet containing topic pattern. Can be used as an intermediate step for evaluating topics - * involved in data processing. - */ -public class KafkaTopicPatternFacet implements LineageDatasetFacet { - - public static final String TOPIC_PATTERN_FACET_NAME = "topicPattern"; - public Pattern pattern; - - public KafkaTopicPatternFacet(Pattern pattern) { - this.pattern = pattern; - } - - @Override - public String name() { - return TOPIC_PATTERN_FACET_NAME; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - KafkaTopicPatternFacet that = (KafkaTopicPatternFacet) o; - return Objects.equals(pattern.pattern(), that.pattern.pattern()); - } - - @Override - public int hashCode() { - return Objects.hash(pattern.pattern()); - } -} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/TypeInformationFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/TypeInformationFacet.java deleted file mode 100644 index 2bb1dd7dc..000000000 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/TypeInformationFacet.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.kafka.lineage.facets; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; - -import java.util.Objects; - -/** - * Facet containing TypeInformation object. Can be used as an intermediate step for evaluating - * schema dataset facet. - */ -public class TypeInformationFacet implements LineageDatasetFacet { - - public static final String TYPE_INFORMATION_FACET_NAME = "typeInformation"; - - private TypeInformation typeInformation; - - public TypeInformationFacet(TypeInformation typeInformation) { - this.typeInformation = typeInformation; - } - - @Override - public String name() { - return TYPE_INFORMATION_FACET_NAME; - } - - public TypeInformation getTypeInformation() { - return typeInformation; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TypeInformationFacet that = (TypeInformationFacet) o; - return Objects.equals(typeInformation, that.typeInformation); - } - - @Override - public int hashCode() { - return Objects.hash(typeInformation); - } -} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java index 29ceb5159..73f122865 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java @@ -21,28 +21,30 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.connector.kafka.lineage.LineageFacetProvider; -import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicListFacet; -import org.apache.flink.connector.kafka.lineage.facets.TypeInformationFacet; -import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider; +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet; +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet.KafkaDatasetIdentifier; +import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import com.google.common.reflect.Invokable; import com.google.common.reflect.TypeToken; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.Serializable; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.OptionalInt; +import java.util.Properties; import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -92,6 +94,7 @@ */ @PublicEvolving public class KafkaRecordSerializationSchemaBuilder { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class); @Nullable private Function topicSelector; @Nullable private SerializationSchema valueSerializationSchema; @@ -298,7 +301,7 @@ private void checkKeySerializerNotSet() { } private static class ConstantTopicSelector - implements Function, Serializable, LineageFacetProvider { + implements Function, Serializable, KafkaDatasetIdentifierProvider { private String topic; @@ -312,13 +315,13 @@ public String apply(IN in) { } @Override - public Collection getDatasetFacets() { - return Collections.singletonList( - new KafkaTopicListFacet(Collections.singletonList(topic))); + public Optional getDatasetIdentifier() { + return Optional.of(KafkaDatasetIdentifier.of(Collections.singletonList(topic))); } } - private static class CachingTopicSelector implements Function, Serializable { + private static class CachingTopicSelector + implements Function, KafkaDatasetIdentifierProvider, Serializable { private static final int CACHE_RESET_SIZE = 5; private final Map cache; @@ -338,10 +341,19 @@ public String apply(IN in) { } return topic; } + + @Override + public Optional getDatasetIdentifier() { + if (topicSelector instanceof KafkaDatasetIdentifierProvider) { + return ((KafkaDatasetIdentifierProvider) topicSelector).getDatasetIdentifier(); + } else { + return Optional.empty(); + } + } } private static class KafkaRecordSerializationSchemaWrapper - implements LineageFacetProvider, KafkaRecordSerializationSchema { + implements KafkaDatasetFacetProvider, KafkaRecordSerializationSchema { private final SerializationSchema valueSerializationSchema; private final Function topicSelector; private final KafkaPartitioner partitioner; @@ -406,32 +418,41 @@ public ProducerRecord serialize( } @Override - public List getDatasetFacets() { - List facets = new ArrayList<>(); - if (topicSelector instanceof LineageFacetProvider) { - facets.addAll(((LineageFacetProvider) topicSelector).getDatasetFacets()); + public Optional getKafkaDatasetFacet() { + if (!(topicSelector instanceof KafkaDatasetIdentifierProvider)) { + LOG.warn("Cannot identify topics. Not an TopicsIdentifierProvider"); + return Optional.empty(); + } + + Optional topicsIdentifier = + ((KafkaDatasetIdentifierProvider) (topicSelector)).getDatasetIdentifier(); + + if (!topicsIdentifier.isPresent()) { + LOG.warn("No topics' identifiers provided"); + return Optional.empty(); } + TypeInformation typeInformation; if (this.valueSerializationSchema instanceof ResultTypeQueryable) { - facets.add( - new TypeInformationFacet( - ((ResultTypeQueryable) this.valueSerializationSchema) - .getProducedType())); + typeInformation = + ((ResultTypeQueryable) this.valueSerializationSchema).getProducedType(); } else { // gets type information from serialize method signature - Arrays.stream(this.valueSerializationSchema.getClass().getMethods()) - .map(m -> Invokable.from(m)) - .filter(m -> "serialize".equalsIgnoreCase(m.getName())) - .map(m -> m.getParameters().get(0)) - .filter(p -> !p.getType().equals(TypeToken.of(Object.class))) - .findFirst() - .map(p -> p.getType()) - .map(t -> TypeInformation.of(t.getRawType())) - .map(g -> new TypeInformationFacet(g)) - .ifPresent(f -> facets.add(f)); + typeInformation = + Arrays.stream(this.valueSerializationSchema.getClass().getMethods()) + .map(m -> Invokable.from(m)) + .filter(m -> "serialize".equalsIgnoreCase(m.getName())) + .map(m -> m.getParameters().get(0)) + .filter(p -> !p.getType().equals(TypeToken.of(Object.class))) + .findFirst() + .map(p -> p.getType()) + .map(t -> TypeInformation.of(t.getRawType())) + .orElse(null); } - return facets; + return Optional.of( + new KafkaDatasetFacet( + topicsIdentifier.get(), new Properties(), typeInformation)); } } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java index faee558cb..32f440e94 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java @@ -22,18 +22,20 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider; import org.apache.flink.connector.kafka.lineage.LineageUtil; -import org.apache.flink.connector.kafka.lineage.facets.KafkaPropertiesFacet; +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet; import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; import org.apache.flink.streaming.api.lineage.LineageVertex; import org.apache.flink.streaming.api.lineage.LineageVertexProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.List; +import java.util.Optional; import java.util.Properties; /** @@ -63,7 +65,7 @@ public class KafkaSink implements LineageVertexProvider, TwoPhaseCommittingStatefulSink { - + private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class); private final DeliveryGuarantee deliveryGuarantee; private final KafkaRecordSerializationSchema recordSerializer; @@ -143,13 +145,27 @@ protected Properties getKafkaProducerConfig() { @Override public LineageVertex getLineageVertex() { - List facets = new ArrayList<>(); - - // add all the facets from deserialization schema and subscriber - facets.addAll(LineageUtil.facetsFrom(recordSerializer)); - facets.add(new KafkaPropertiesFacet(this.kafkaProducerConfig)); - - String namespace = LineageUtil.datasetNamespaceOf(this.kafkaProducerConfig); - return LineageUtil.lineageVertexOf(LineageUtil.datasetsFrom(namespace, facets)); + // enrich dataset facet with properties + if (recordSerializer instanceof KafkaDatasetFacetProvider) { + Optional kafkaDatasetFacet = + ((KafkaDatasetFacetProvider) recordSerializer).getKafkaDatasetFacet(); + + if (!kafkaDatasetFacet.isPresent()) { + LOG.warn("Provided did not return kafka dataset facet"); + return null; + } + + kafkaDatasetFacet.get().addProperties(this.kafkaProducerConfig); + + String namespace = LineageUtil.namespaceOf(kafkaProducerConfig); + return LineageUtil.sourceLineageVertexOf( + Collections.singleton( + LineageUtil.datasetOf(namespace, kafkaDatasetFacet.get()))); + } else { + LOG.warn( + "recordSerializer does not implement KafkaDatasetFacetProvider: {}", + recordSerializer); + return LineageUtil.sourceLineageVertexOf(Collections.emptyList()); + } } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java index 0a3f84a46..d8717c788 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java @@ -33,9 +33,10 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider; import org.apache.flink.connector.kafka.lineage.LineageUtil; -import org.apache.flink.connector.kafka.lineage.facets.KafkaPropertiesFacet; -import org.apache.flink.connector.kafka.lineage.facets.TypeInformationFacet; +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet; +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet.KafkaDatasetIdentifier; import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState; import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer; import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator; @@ -51,27 +52,25 @@ import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.streaming.api.lineage.SourceLineageVertex; import org.apache.flink.util.UserCodeClassLoader; import org.apache.flink.util.function.SerializableSupplier; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; +import java.util.Collections; import java.util.Optional; import java.util.Properties; import java.util.function.Consumer; import java.util.function.Supplier; -import static org.apache.flink.connector.kafka.lineage.LineageUtil.sourceLineageVertexOf; - /** * The Source implementation of Kafka. Please use a {@link KafkaSourceBuilder} to construct a {@link * KafkaSource}. The following example shows how to create a KafkaSource emitting records of @@ -100,6 +99,7 @@ public class KafkaSource implements LineageVertexProvider, Source, ResultTypeQueryable { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class); private static final long serialVersionUID = -8755372893283732098L; // Users can choose only one of the following ways to specify the topics to consume from. private final KafkaSubscriber subscriber; @@ -265,16 +265,24 @@ OffsetsInitializer getStoppingOffsetsInitializer() { @Override public SourceLineageVertex getLineageVertex() { - List facets = new ArrayList<>(); + if (!(subscriber instanceof KafkaDatasetIdentifierProvider)) { + LOG.warn("unable to determine topic identifier"); + return null; + } + + Optional topicsIdentifier = + ((KafkaDatasetIdentifierProvider) subscriber).getDatasetIdentifier(); - // add all the facets from deserialization schema and subscriber - facets.addAll(LineageUtil.facetsFrom(deserializationSchema)); - facets.addAll(LineageUtil.facetsFrom(subscriber)); + if (!topicsIdentifier.isPresent()) { + LOG.warn("No topics' identifier returned from subscribed"); + return null; + } - facets.add(new TypeInformationFacet(getProducedType())); - facets.add(new KafkaPropertiesFacet(props)); + KafkaDatasetFacet kafkaDatasetFacet = + new KafkaDatasetFacet(topicsIdentifier.get(), props, getProducedType()); - String namespace = LineageUtil.datasetNamespaceOf(props); - return sourceLineageVertexOf(LineageUtil.datasetsFrom(namespace, facets)); + String namespace = LineageUtil.namespaceOf(props); + return LineageUtil.sourceLineageVertexOf( + Collections.singletonList(LineageUtil.datasetOf(namespace, kafkaDatasetFacet))); } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java index 6e03a4b68..c3e2f1dca 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java @@ -18,9 +18,8 @@ package org.apache.flink.connector.kafka.source.enumerator.subscriber; -import org.apache.flink.connector.kafka.lineage.LineageFacetProvider; -import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicListFacet; -import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider; +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet.KafkaDatasetIdentifier; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.TopicDescription; @@ -28,17 +27,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata; /** A subscriber for a partition set. */ -class PartitionSetSubscriber implements LineageFacetProvider, KafkaSubscriber { +class PartitionSetSubscriber implements KafkaDatasetIdentifierProvider, KafkaSubscriber { private static final long serialVersionUID = 390970375272146036L; private static final Logger LOG = LoggerFactory.getLogger(PartitionSetSubscriber.class); private final Set subscribedPartitions; @@ -81,9 +79,9 @@ private boolean partitionExistsInTopic(TopicPartition partition, TopicDescriptio } @Override - public List getDatasetFacets() { - return Collections.singletonList( - new KafkaTopicListFacet( + public Optional getDatasetIdentifier() { + return Optional.of( + KafkaDatasetIdentifier.of( subscribedPartitions.stream() .map(TopicPartition::topic) .distinct() diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java index a4f25531f..656279a74 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java @@ -18,9 +18,8 @@ package org.apache.flink.connector.kafka.source.enumerator.subscriber; -import org.apache.flink.connector.kafka.lineage.LineageFacetProvider; -import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicListFacet; -import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider; +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet.KafkaDatasetIdentifier; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.TopicDescription; @@ -29,10 +28,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata; @@ -41,7 +40,7 @@ * A subscriber to a fixed list of topics. The subscribed topics must have existed in the Kafka * cluster, otherwise an exception will be thrown. */ -class TopicListSubscriber implements KafkaSubscriber, LineageFacetProvider { +class TopicListSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierProvider { private static final long serialVersionUID = -6917603843104947866L; private static final Logger LOG = LoggerFactory.getLogger(TopicListSubscriber.class); private final List topics; @@ -67,7 +66,7 @@ public Set getSubscribedTopicPartitions(AdminClient adminClient) } @Override - public List getDatasetFacets() { - return Collections.singletonList(new KafkaTopicListFacet(topics)); + public Optional getDatasetIdentifier() { + return Optional.of(KafkaDatasetIdentifier.of(topics)); } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java index ada3067bd..97af2212a 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java @@ -18,9 +18,8 @@ package org.apache.flink.connector.kafka.source.enumerator.subscriber; -import org.apache.flink.connector.kafka.lineage.LineageFacetProvider; -import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicPatternFacet; -import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider; +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet.KafkaDatasetIdentifier; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.TopicDescription; @@ -29,17 +28,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.regex.Pattern; import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata; /** A subscriber to a topic pattern. */ -class TopicPatternSubscriber implements KafkaSubscriber, LineageFacetProvider { +class TopicPatternSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierProvider { private static final long serialVersionUID = -7471048577725467797L; private static final Logger LOG = LoggerFactory.getLogger(TopicPatternSubscriber.class); private final Pattern topicPattern; @@ -68,7 +66,7 @@ public Set getSubscribedTopicPartitions(AdminClient adminClient) } @Override - public List getDatasetFacets() { - return Collections.singletonList(new KafkaTopicPatternFacet(topicPattern)); + public Optional getDatasetIdentifier() { + return Optional.of(KafkaDatasetIdentifier.of(topicPattern)); } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaDeserializationSchemaWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaDeserializationSchemaWrapper.java index 71a567166..1cc7dde79 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaDeserializationSchemaWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaDeserializationSchemaWrapper.java @@ -20,17 +20,12 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.connector.kafka.lineage.LineageFacetProvider; -import org.apache.flink.connector.kafka.lineage.facets.TypeInformationFacet; -import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.util.Collector; import org.apache.kafka.clients.consumer.ConsumerRecord; import java.io.IOException; -import java.util.Collections; -import java.util.List; /** * A wrapper class that wraps a {@link @@ -41,8 +36,7 @@ * @deprecated Remove with @{@link KafkaDeserializationSchema} */ @Deprecated -class KafkaDeserializationSchemaWrapper - implements KafkaRecordDeserializationSchema, LineageFacetProvider { +class KafkaDeserializationSchemaWrapper implements KafkaRecordDeserializationSchema { private static final long serialVersionUID = 1L; private final KafkaDeserializationSchema kafkaDeserializationSchema; @@ -70,9 +64,4 @@ public void deserialize(ConsumerRecord message, Collector out public TypeInformation getProducedType() { return kafkaDeserializationSchema.getProducedType(); } - - @Override - public List getDatasetFacets() { - return Collections.singletonList(new TypeInformationFacet(this.getProducedType())); - } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializationSchemaWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializationSchemaWrapper.java index 30b760da1..209f5e15c 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializationSchemaWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializationSchemaWrapper.java @@ -20,16 +20,11 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.connector.kafka.lineage.LineageFacetProvider; -import org.apache.flink.connector.kafka.lineage.facets.TypeInformationFacet; -import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; import org.apache.flink.util.Collector; import org.apache.kafka.clients.consumer.ConsumerRecord; import java.io.IOException; -import java.util.Collections; -import java.util.List; /** * A class that wraps a {@link DeserializationSchema} as the value deserializer for a {@link @@ -37,8 +32,7 @@ * * @param the return type of the deserialization. */ -class KafkaValueOnlyDeserializationSchemaWrapper - implements KafkaRecordDeserializationSchema, LineageFacetProvider { +class KafkaValueOnlyDeserializationSchemaWrapper implements KafkaRecordDeserializationSchema { private static final long serialVersionUID = 1L; private final DeserializationSchema deserializationSchema; @@ -61,9 +55,4 @@ public void deserialize(ConsumerRecord message, Collector out public TypeInformation getProducedType() { return deserializationSchema.getProducedType(); } - - @Override - public List getDatasetFacets() { - return Collections.singletonList(new TypeInformationFacet(this.getProducedType())); - } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java index 3ab93f3df..8c8095b6b 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java @@ -21,9 +21,6 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.connector.kafka.lineage.LineageFacetProvider; -import org.apache.flink.connector.kafka.lineage.facets.TypeInformationFacet; -import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; import org.apache.flink.util.Collector; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TemporaryClassLoaderContext; @@ -35,13 +32,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Collections; -import java.util.List; import java.util.Map; /** A package private class to wrap {@link Deserializer}. */ -class KafkaValueOnlyDeserializerWrapper - implements KafkaRecordDeserializationSchema, LineageFacetProvider { +class KafkaValueOnlyDeserializerWrapper implements KafkaRecordDeserializationSchema { private static final long serialVersionUID = 5409547407386004054L; @@ -109,9 +103,4 @@ public void deserialize(ConsumerRecord record, Collector coll public TypeInformation getProducedType() { return TypeExtractor.createTypeInfo(Deserializer.class, deserializerClass, 0, null, null); } - - @Override - public List getDatasetFacets() { - return Collections.singletonList(new TypeInformationFacet(this.getProducedType())); - } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java index f0005a097..b754b4d09 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java @@ -20,17 +20,11 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.connector.kafka.lineage.LineageFacetProvider; -import org.apache.flink.connector.kafka.lineage.facets.TypeInformationFacet; -import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.util.Collector; import org.apache.kafka.clients.consumer.ConsumerRecord; -import java.util.Collections; -import java.util.List; - /** * A simple wrapper for using the DeserializationSchema with the KafkaDeserializationSchema * interface. @@ -39,8 +33,7 @@ */ @Internal @Deprecated -public class KafkaDeserializationSchemaWrapper - implements KafkaDeserializationSchema, LineageFacetProvider { +public class KafkaDeserializationSchemaWrapper implements KafkaDeserializationSchema { private static final long serialVersionUID = 2651665280744549932L; @@ -75,9 +68,4 @@ public boolean isEndOfStream(T nextElement) { public TypeInformation getProducedType() { return deserializationSchema.getProducedType(); } - - @Override - public List getDatasetFacets() { - return Collections.singletonList(new TypeInformationFacet(this.getProducedType())); - } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/lineage/LineageUtilTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/lineage/LineageUtilTest.java index 43a3083e3..64868a163 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/lineage/LineageUtilTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/lineage/LineageUtilTest.java @@ -1,129 +1,27 @@ package org.apache.flink.connector.kafka.lineage; import org.apache.flink.api.connector.source.Boundedness; -import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicListFacet; -import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicPatternFacet; import org.apache.flink.streaming.api.lineage.LineageDataset; -import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; import org.apache.flink.streaming.api.lineage.SourceLineageVertex; import org.junit.jupiter.api.Test; import org.mockito.Mockito; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; -import java.util.List; -import java.util.Optional; import java.util.Properties; -import java.util.regex.Pattern; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** Tests for {@link LineageUtil}. */ public class LineageUtilTest { - - @Test - public void testFromFacetsForNonDatasetFacetProvider() { - assertThat(LineageUtil.facetsFrom(new Object())).isEmpty(); - } - - @Test - public void testFromFacetsWhenNoFacetsReturned() { - LineageFacetProvider facetProvider = mock(LineageFacetProvider.class); - when(facetProvider.getDatasetFacets()).thenReturn(Collections.emptyList()); - - assertThat(LineageUtil.facetsFrom(facetProvider)).isEmpty(); - } - - @Test - public void testFromFacets() { - LineageDatasetFacet facet1 = mock(LineageDatasetFacet.class); - LineageDatasetFacet facet2 = mock(LineageDatasetFacet.class); - - LineageFacetProvider facetProvider = mock(LineageFacetProvider.class); - when(facetProvider.getDatasetFacets()).thenReturn(Arrays.asList(facet1, facet2)); - - assertThat(LineageUtil.facetsFrom(facetProvider)).containsExactly(facet1, facet2); - } - - @Test - public void testDatasetsFromWithTopicList() { - LineageDatasetFacet facet1 = new KafkaTopicListFacet(Arrays.asList("topic1", "topic2")); - LineageDatasetFacet facet2 = mock(LineageDatasetFacet.class); - LineageDatasetFacet facet3 = mock(LineageDatasetFacet.class); - - when(facet2.name()).thenReturn("facetName2"); - when(facet3.name()).thenReturn("facetName3"); - String namespace = "kafka://host"; - - List facets = Arrays.asList(facet1, facet2, facet3); - - Collection datasets = LineageUtil.datasetsFrom(namespace, facets); - - assertThat(datasets).hasSize(2); - - Optional topic1 = - datasets.stream().filter(e -> "topic1".equals(e.name())).findAny(); - assertThat(topic1).isPresent(); - assertThat(topic1.get().namespace()).isEqualTo(namespace); - assertThat(topic1.get().facets().get("facetName2")).isEqualTo(facet2); - assertThat(topic1.get().facets().get("facetName3")).isEqualTo(facet3); - - Optional topic2 = - datasets.stream().filter(e -> "topic2".equals(e.name())).findAny(); - assertThat(topic2).isPresent(); - assertThat(topic2.get().name()).isEqualTo("topic2"); - assertThat(topic2.get().namespace()).isEqualTo(namespace); - assertThat(topic2.get().facets().get("facetName2")).isEqualTo(facet2); - assertThat(topic2.get().facets().get("facetName3")).isEqualTo(facet3); - } - - @Test - public void testDatasetsFromWithTopicPattern() { - Pattern pattern = Pattern.compile("some-pattern"); - - LineageDatasetFacet facet1 = new KafkaTopicPatternFacet(pattern); - LineageDatasetFacet facet2 = mock(LineageDatasetFacet.class); - LineageDatasetFacet facet3 = mock(LineageDatasetFacet.class); - - when(facet2.name()).thenReturn("facetName2"); - when(facet3.name()).thenReturn("facetName3"); - String namespace = "kafka://host"; - - List facets = Arrays.asList(facet1, facet2, facet3); - - Collection datasets = LineageUtil.datasetsFrom(namespace, facets); - assertThat(datasets).hasSize(1); - - LineageDataset dataset = datasets.iterator().next(); - - assertThat(dataset.name()).isEqualTo("some-pattern"); - assertThat(dataset.namespace()).isEqualTo(namespace); - assertThat(dataset.facets().get("facetName2")).isEqualTo(facet2); - assertThat(dataset.facets().get("facetName3")).isEqualTo(facet3); - } - - @Test - public void testDatasetsWithNoTopicListNorPattern() { - LineageDatasetFacet facet2 = mock(LineageDatasetFacet.class); - LineageDatasetFacet facet3 = mock(LineageDatasetFacet.class); - - List facets = Arrays.asList(facet2, facet3); - - assertThat(LineageUtil.datasetsFrom("some-namespace", facets)).isEmpty(); - } - @Test public void testSourceLineageVertexOf() { - List datasets = Arrays.asList(Mockito.mock(LineageDataset.class)); - - SourceLineageVertex sourceLineageVertex = LineageUtil.sourceLineageVertexOf(datasets); + LineageDataset dataset = Mockito.mock(LineageDataset.class); + SourceLineageVertex sourceLineageVertex = + LineageUtil.sourceLineageVertexOf(Collections.singletonList(dataset)); assertThat(sourceLineageVertex.boundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED); - assertThat(sourceLineageVertex.datasets()).isEqualTo(datasets); + assertThat(sourceLineageVertex.datasets()).containsExactly(dataset); } @Test @@ -131,7 +29,7 @@ public void testDatasetNamespaceOf() { Properties properties = new Properties(); properties.put("bootstrap.servers", "my-kafka-host"); - assertThat(LineageUtil.datasetNamespaceOf(properties)).isEqualTo("kafka://my-kafka-host"); + assertThat(LineageUtil.namespaceOf(properties)).isEqualTo("kafka://my-kafka-host"); } @Test @@ -139,7 +37,7 @@ public void testDatasetNamespaceOfWithSemicolon() { Properties properties = new Properties(); properties.put("bootstrap.servers", "my-kafka-host1;my-kafka-host2"); - assertThat(LineageUtil.datasetNamespaceOf(properties)).isEqualTo("kafka://my-kafka-host1"); + assertThat(LineageUtil.namespaceOf(properties)).isEqualTo("kafka://my-kafka-host1"); } @Test @@ -147,6 +45,12 @@ public void testDatasetNamespaceOfWithComma() { Properties properties = new Properties(); properties.put("bootstrap.servers", "my-kafka-host1,my-kafka-host2"); - assertThat(LineageUtil.datasetNamespaceOf(properties)).isEqualTo("kafka://my-kafka-host1"); + assertThat(LineageUtil.namespaceOf(properties)).isEqualTo("kafka://my-kafka-host1"); + } + + @Test + public void testDatasetNamespaceWhenNoBootstrapServersProperty() { + Properties properties = new Properties(); + assertThat(LineageUtil.namespaceOf(properties)).isEqualTo("kafka://"); } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java index 1326009e7..523c4222d 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java @@ -20,11 +20,13 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.connector.kafka.lineage.LineageFacetProvider; -import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicListFacet; -import org.apache.flink.connector.kafka.lineage.facets.TypeInformationFacet; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider; +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet; +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet.KafkaDatasetIdentifier; import org.apache.flink.connector.testutils.formats.DummyInitializationContext; -import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.util.TestLogger; @@ -41,7 +43,6 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -52,6 +53,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; /** Tests for {@link KafkaRecordSerializationSchemaBuilder}. */ public class KafkaRecordSerializationSchemaBuilderTest extends TestLogger { @@ -263,36 +267,99 @@ public void testSerializeRecordWithTimestamp() { assertThat(recordWithInvalidTimestamp.timestamp()).isNull(); } + @Test + public void testGetLineageDatasetFacetsWhenTopicSelectorNotKafkaTopicsIdentifierProvider() { + SerializationSchema serializationSchema = new SimpleStringSchema(); + KafkaRecordSerializationSchema schema = + KafkaRecordSerializationSchema.builder() + .setTopicSelector(mock(TopicSelector.class)) + .setValueSerializationSchema(serializationSchema) + .setKeySerializationSchema(serializationSchema) + .build(); + + assertThat(((KafkaDatasetFacetProvider) schema).getKafkaDatasetFacet()).isEmpty(); + } + + @Test + public void testGetLineageDatasetFacetsWhenNoTopicsIdentifiersFound() { + TopicSelector topicSelector = + mock( + TopicSelector.class, + withSettings().extraInterfaces(KafkaDatasetIdentifierProvider.class)); + when(((KafkaDatasetIdentifierProvider) topicSelector).getDatasetIdentifier()) + .thenReturn(Optional.empty()); + SerializationSchema serializationSchema = new SimpleStringSchema(); + KafkaRecordSerializationSchema schema = + KafkaRecordSerializationSchema.builder() + .setTopicSelector(topicSelector) + .setValueSerializationSchema(serializationSchema) + .setKeySerializationSchema(serializationSchema) + .build(); + + assertThat(((KafkaDatasetFacetProvider) schema).getKafkaDatasetFacet()).isEmpty(); + } + + @Test + public void testGetLineageDatasetFacetsValueSerializationSchemaIsResultTypeQueryable() { + TopicSelector topicSelector = + mock( + TopicSelector.class, + withSettings().extraInterfaces(KafkaDatasetIdentifierProvider.class)); + when(((KafkaDatasetIdentifierProvider) topicSelector).getDatasetIdentifier()) + .thenReturn( + Optional.of(KafkaDatasetIdentifier.of(Arrays.asList("topic1", "topic2")))); + + SerializationSchema serializationSchema = + mock( + SerializationSchema.class, + withSettings().extraInterfaces(ResultTypeQueryable.class)); + + TypeInformation typeInformation = mock(TypeInformation.class); + when(((ResultTypeQueryable) serializationSchema).getProducedType()) + .thenReturn(typeInformation); + + KafkaRecordSerializationSchema schema = + KafkaRecordSerializationSchema.builder() + .setTopicSelector(topicSelector) + .setValueSerializationSchema(serializationSchema) + .setKeySerializationSchema(serializationSchema) + .build(); + + Optional kafkaDatasetFacet = + ((KafkaDatasetFacetProvider) schema).getKafkaDatasetFacet(); + + assertThat(kafkaDatasetFacet).isPresent(); + assertThat(kafkaDatasetFacet.get().topicIdentifier.topics) + .containsExactly("topic1", "topic2"); + assertThat(kafkaDatasetFacet.get().typeInformation).isEqualTo(typeInformation); + } + @Test public void testGetLineageDatasetFacets() { + TopicSelector topicSelector = + mock( + TopicSelector.class, + withSettings().extraInterfaces(KafkaDatasetIdentifierProvider.class)); + when(((KafkaDatasetIdentifierProvider) topicSelector).getDatasetIdentifier()) + .thenReturn( + Optional.of(KafkaDatasetIdentifier.of(Arrays.asList("topic1", "topic2")))); + final SerializationSchema serializationSchema = new SimpleStringSchema(); - final KafkaRecordSerializationSchema schema = + + KafkaRecordSerializationSchema schema = KafkaRecordSerializationSchema.builder() - .setTopic(DEFAULT_TOPIC) + .setTopicSelector(topicSelector) .setValueSerializationSchema(serializationSchema) .setKeySerializationSchema(serializationSchema) .build(); - Collection facets = ((LineageFacetProvider) schema).getDatasetFacets(); - - assertThat(facets).hasSize(2); - - Optional kafkaTopicListFacet = - facets.stream() - .filter(f -> f instanceof KafkaTopicListFacet) - .map(f -> (KafkaTopicListFacet) f) - .findAny(); - assertThat(kafkaTopicListFacet).isPresent(); - assertThat(kafkaTopicListFacet.get()) - .hasFieldOrPropertyWithValue("topics", Arrays.asList(DEFAULT_TOPIC)); - - Optional typeInformationFacet = - facets.stream() - .filter(f -> f instanceof TypeInformationFacet) - .map(f -> (TypeInformationFacet) f) - .findAny(); - assertThat(typeInformationFacet).isPresent(); - assertThat(typeInformationFacet.get().getTypeInformation()) + Optional kafkaDatasetFacet = + ((KafkaDatasetFacetProvider) schema).getKafkaDatasetFacet(); + + assertThat(kafkaDatasetFacet).isPresent(); + assertThat(kafkaDatasetFacet.get().topicIdentifier.topics) + .containsExactly("topic1", "topic2"); + assertThat(kafkaDatasetFacet.get().typeInformation) .isEqualTo(BasicTypeInfo.STRING_TYPE_INFO); } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java index 998046e49..30eda7e22 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java @@ -1,14 +1,17 @@ package org.apache.flink.connector.kafka.sink; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.base.DeliveryGuarantee; -import org.apache.flink.connector.kafka.lineage.LineageFacetProvider; -import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicListFacet; -import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider; +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet; +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet.KafkaDatasetIdentifier; import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; import java.util.Properties; import static org.assertj.core.api.Assertions.assertThat; @@ -19,41 +22,77 @@ /** Tests for {@link KafkaSink}. */ public class KafkaSinkTest { + Properties kafkaProperties; + + @BeforeEach + void setup() { + kafkaProperties = new Properties(); + kafkaProperties.put("bootstrap.servers", "host1;host2"); + } + + @Test + public void testGetLineageVertexWhenSerializerNotAnKafkaDatasetFacetProvider() { + KafkaRecordSerializationSchema recordSerializer = + mock(KafkaRecordSerializationSchema.class); + KafkaSink sink = + new KafkaSink( + DeliveryGuarantee.EXACTLY_ONCE, + mock(Properties.class), + "", + recordSerializer); + + assertThat(sink.getLineageVertex().datasets()).isEmpty(); + } + + @Test + public void testGetLineageVertexWhenNoKafkaDatasetFacetReturnedFromSerializer() { + KafkaRecordSerializationSchema recordSerializer = + mock( + KafkaRecordSerializationSchema.class, + withSettings().extraInterfaces(KafkaDatasetFacetProvider.class)); + when(((KafkaDatasetFacetProvider) recordSerializer).getKafkaDatasetFacet()) + .thenReturn(Optional.empty()); + + KafkaSink sink = + new KafkaSink( + DeliveryGuarantee.EXACTLY_ONCE, + mock(Properties.class), + "", + recordSerializer); + + assertThat(sink.getLineageVertex()).isNull(); + } + @Test public void testGetLineageVertex() { - LineageDatasetFacet facet1 = mock(LineageDatasetFacet.class); - LineageDatasetFacet facet2 = mock(LineageDatasetFacet.class); - when(facet1.name()).thenReturn("facet1"); - when(facet2.name()).thenReturn("facet2"); - LineageDatasetFacet topicSelector = - new KafkaTopicListFacet(Arrays.asList("topic1", "topic2")); - - KafkaRecordSerializationSchema schema = + KafkaRecordSerializationSchema recordSerializer = mock( KafkaRecordSerializationSchema.class, - withSettings().extraInterfaces(LineageFacetProvider.class)); + withSettings().extraInterfaces(KafkaDatasetFacetProvider.class)); - when(((LineageFacetProvider) schema).getDatasetFacets()) - .thenReturn(Arrays.asList(facet1, facet2, topicSelector)); - Properties kafkaProperties = new Properties(); - kafkaProperties.put("bootstrap.servers", "host1;host2"); - KafkaSink sink = new KafkaSink(DeliveryGuarantee.EXACTLY_ONCE, kafkaProperties, "", schema); + when(((KafkaDatasetFacetProvider) recordSerializer).getKafkaDatasetFacet()) + .thenReturn( + Optional.of( + new KafkaDatasetFacet( + KafkaDatasetIdentifier.of( + Collections.singletonList("topic1")), + new Properties(), + TypeInformation.of(String.class)))); + + KafkaSink sink = + new KafkaSink( + DeliveryGuarantee.EXACTLY_ONCE, kafkaProperties, "", recordSerializer); LineageVertex lineageVertex = sink.getLineageVertex(); - assertThat(lineageVertex.datasets()).hasSize(2); assertThat(lineageVertex.datasets().get(0).namespace()).isEqualTo("kafka://host1"); assertThat(lineageVertex.datasets().get(0).name()).isEqualTo("topic1"); - assertThat(lineageVertex.datasets().get(1).namespace()).isEqualTo("kafka://host1"); - assertThat(lineageVertex.datasets().get(1).name()).isEqualTo("topic2"); - - // facets shall be the same for both datasets - assertThat(lineageVertex.datasets().get(0).facets()) - .isEqualTo(lineageVertex.datasets().get(1).facets()); - - assertThat(lineageVertex.datasets().get(0).facets()) - .containsEntry("facet1", facet1) - .containsEntry("facet2", facet2); + assertThat(lineageVertex.datasets().get(0).facets().get(KafkaDatasetFacet.KAFKA_FACET_NAME)) + .hasFieldOrPropertyWithValue("properties", kafkaProperties) + .hasFieldOrPropertyWithValue("typeInformation", TypeInformation.of(String.class)) + .hasFieldOrPropertyWithValue( + "topicIdentifier", + KafkaDatasetIdentifier.of(Collections.singletonList("topic1"))); } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java index c138d2fb6..bcae1418a 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java @@ -17,17 +17,24 @@ package org.apache.flink.connector.kafka.source; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; -import org.apache.flink.connector.kafka.lineage.LineageFacetProvider; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider; +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet; +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet.KafkaDatasetIdentifier; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; -import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; import java.util.Properties; import static org.assertj.core.api.Assertions.assertThat; @@ -37,27 +44,72 @@ /** Tests for {@link KafkaSource}. */ public class KafkaSourceTest { + Properties kafkaProperties; + + @BeforeEach + void setup() { + kafkaProperties = new Properties(); + kafkaProperties.put("bootstrap.servers", "host1;host2"); + } @Test - public void testGetLineageVertex() { - LineageDatasetFacet facet1 = mock(LineageDatasetFacet.class); - LineageDatasetFacet facet2 = mock(LineageDatasetFacet.class); - when(facet1.name()).thenReturn("facet1"); - when(facet2.name()).thenReturn("facet2"); + public void testGetLineageVertexWhenSubscriberNotAnKafkaDatasetFacetProvider() { + KafkaSource source = + new KafkaSource( + mock(KafkaSubscriber.class), + mock(OffsetsInitializer.class), + null, + Boundedness.CONTINUOUS_UNBOUNDED, + mock(KafkaRecordDeserializationSchema.class), + kafkaProperties, + null); + assertThat(source.getLineageVertex()).isNull(); + } + + @Test + public void testGetLineageVertexWhenNoKafkaTopicsIdentifier() { + KafkaSubscriber subscriber = + mock( + KafkaSubscriber.class, + withSettings().extraInterfaces(KafkaDatasetIdentifierProvider.class)); + when(((KafkaDatasetIdentifierProvider) subscriber).getDatasetIdentifier()) + .thenReturn(Optional.empty()); - KafkaRecordDeserializationSchema schema = + KafkaSource source = + new KafkaSource( + subscriber, + mock(OffsetsInitializer.class), + null, + Boundedness.CONTINUOUS_UNBOUNDED, + mock(KafkaRecordDeserializationSchema.class), + kafkaProperties, + null); + assertThat(source.getLineageVertex()).isNull(); + } + + @Test + public void testGetLineageVertex() { + KafkaRecordSerializationSchema recordSerializer = mock( - KafkaRecordDeserializationSchema.class, - withSettings().extraInterfaces(LineageFacetProvider.class)); + KafkaRecordSerializationSchema.class, + withSettings().extraInterfaces(KafkaDatasetFacetProvider.class)); - when(((LineageFacetProvider) schema).getDatasetFacets()) - .thenReturn(Arrays.asList(facet1, facet2)); - Properties kafkaProperties = new Properties(); + when(((KafkaDatasetFacetProvider) recordSerializer).getKafkaDatasetFacet()) + .thenReturn( + Optional.of( + new KafkaDatasetFacet( + KafkaDatasetIdentifier.of( + Collections.singletonList("topic1")), + new Properties(), + TypeInformation.of(String.class)))); + + TypeInformation typeInformation = mock(TypeInformation.class); + KafkaRecordDeserializationSchema schema = mock(KafkaRecordDeserializationSchema.class); + when(schema.getProducedType()).thenReturn(typeInformation); - kafkaProperties.put("bootstrap.servers", "host1;host2"); KafkaSource source = new KafkaSource( - KafkaSubscriber.getTopicListSubscriber(Arrays.asList("topic1", "topic2")), + KafkaSubscriber.getTopicListSubscriber(Arrays.asList("topic1")), mock(OffsetsInitializer.class), null, Boundedness.CONTINUOUS_UNBOUNDED, @@ -66,20 +118,16 @@ public void testGetLineageVertex() { null); LineageVertex lineageVertex = source.getLineageVertex(); - assertThat(lineageVertex.datasets()).hasSize(2); + assertThat(lineageVertex.datasets()).hasSize(1); assertThat(lineageVertex.datasets().get(0).namespace()).isEqualTo("kafka://host1"); assertThat(lineageVertex.datasets().get(0).name()).isEqualTo("topic1"); - assertThat(lineageVertex.datasets().get(1).namespace()).isEqualTo("kafka://host1"); - assertThat(lineageVertex.datasets().get(1).name()).isEqualTo("topic2"); - - // facets shall be the same for both datasets - assertThat(lineageVertex.datasets().get(0).facets()) - .isEqualTo(lineageVertex.datasets().get(1).facets()); - - assertThat(lineageVertex.datasets().get(0).facets()) - .containsEntry("facet1", facet1) - .containsEntry("facet2", facet2); + assertThat(lineageVertex.datasets().get(0).facets().get(KafkaDatasetFacet.KAFKA_FACET_NAME)) + .hasFieldOrPropertyWithValue("properties", kafkaProperties) + .hasFieldOrPropertyWithValue("typeInformation", typeInformation) + .hasFieldOrPropertyWithValue( + "topicIdentifier", + KafkaDatasetIdentifier.of(Collections.singletonList("topic1"))); } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java index 435570de8..7b8293226 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java @@ -18,9 +18,8 @@ package org.apache.flink.connector.kafka.source.enumerator.subscriber; -import org.apache.flink.connector.kafka.lineage.LineageFacetProvider; -import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicListFacet; -import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicPatternFacet; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider; +import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet.KafkaDatasetIdentifier; import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv; import org.apache.kafka.clients.admin.AdminClient; @@ -74,8 +73,8 @@ public void testTopicListSubscriber() { new HashSet<>(KafkaSourceTestEnv.getPartitionsForTopics(topics)); assertThat(subscribedPartitions).isEqualTo(expectedSubscribedPartitions); - assertThat(((LineageFacetProvider) subscriber).getDatasetFacets()) - .containsExactly(new KafkaTopicListFacet(topics)); + assertThat(((KafkaDatasetIdentifierProvider) subscriber).getDatasetIdentifier().get()) + .isEqualTo(KafkaDatasetIdentifier.of(topics)); } @Test @@ -91,8 +90,8 @@ public void testNonExistingTopic() { @Test public void testTopicPatternSubscriber() { - KafkaSubscriber subscriber = - KafkaSubscriber.getTopicPatternSubscriber(Pattern.compile("pattern.*")); + Pattern pattern = Pattern.compile("pattern.*"); + KafkaSubscriber subscriber = KafkaSubscriber.getTopicPatternSubscriber(pattern); final Set subscribedPartitions = subscriber.getSubscribedTopicPartitions(adminClient); @@ -101,8 +100,8 @@ public void testTopicPatternSubscriber() { KafkaSourceTestEnv.getPartitionsForTopics(Collections.singleton(TOPIC2))); assertThat(subscribedPartitions).isEqualTo(expectedSubscribedPartitions); - assertThat(((LineageFacetProvider) subscriber).getDatasetFacets()) - .containsExactly(new KafkaTopicPatternFacet(Pattern.compile("pattern.*"))); + assertThat(((KafkaDatasetIdentifierProvider) subscriber).getDatasetIdentifier().get()) + .isEqualTo(KafkaDatasetIdentifier.of(pattern)); } @Test @@ -118,8 +117,8 @@ public void testPartitionSetSubscriber() { subscriber.getSubscribedTopicPartitions(adminClient); assertThat(subscribedPartitions).isEqualTo(partitions); - assertThat(((LineageFacetProvider) subscriber).getDatasetFacets()) - .containsExactly(new KafkaTopicListFacet(topics)); + assertThat(((KafkaDatasetIdentifierProvider) subscriber).getDatasetIdentifier().get()) + .isEqualTo(KafkaDatasetIdentifier.of(topics)); } @Test diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java index 9bb7eb0c2..b0ca63161 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java @@ -18,10 +18,6 @@ package org.apache.flink.connector.kafka.source.reader.deserializer; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.connector.kafka.lineage.LineageFacetProvider; -import org.apache.flink.connector.kafka.lineage.facets.TypeInformationFacet; import org.apache.flink.connector.kafka.util.JacksonMapperFactory; import org.apache.flink.connector.testutils.formats.DummyInitializationContext; import org.apache.flink.connector.testutils.source.deserialization.TestingDeserializationContext; @@ -80,8 +76,6 @@ public void testKafkaDeserializationSchemaWrapper() throws Exception { assertThat(deserializedValue.get("metadata").get("topic").asText()).isEqualTo("topic#1"); assertThat(deserializedValue.get("metadata").get("offset").asInt()).isEqualTo(4); assertThat(deserializedValue.get("metadata").get("partition").asInt()).isEqualTo(3); - assertThat(((LineageFacetProvider) schema).getDatasetFacets()) - .containsExactly(new TypeInformationFacet(TypeInformation.of(ObjectNode.class))); } @Test @@ -108,12 +102,6 @@ public void testKafkaValueDeserializationSchemaWrapper() throws Exception { assertThat(deserializedValue.get("word").asText()).isEqualTo("world"); assertThat(deserializedValue.get("key")).isNull(); assertThat(deserializedValue.get("metadata")).isNull(); - assertThat(((LineageFacetProvider) schema).getDatasetFacets()) - .containsExactly( - new TypeInformationFacet( - TypeInformation.of( - org.apache.flink.shaded.jackson2.com.fasterxml.jackson - .databind.node.ObjectNode.class))); } @Test @@ -131,8 +119,6 @@ public void testKafkaValueDeserializerWrapper() throws Exception { assertThat(collector.list).hasSize(1); assertThat(collector.list.get(0)).isEqualTo("world"); - assertThat(((LineageFacetProvider) schema).getDatasetFacets()) - .containsExactly(new TypeInformationFacet(BasicTypeInfo.STRING_TYPE_INFO)); } @Test