Skip to content

Commit 4bbff17

Browse files
[FLINK-34466] create KafkaDatasetFacet
Signed-off-by: Pawel Leszczynski <[email protected]>
1 parent ca14634 commit 4bbff17

31 files changed

+837
-684
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package org.apache.flink.connector.kafka.lineage;
2+
3+
import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;
4+
5+
import java.util.Objects;
6+
import java.util.Properties;
7+
8+
/** Default implementation of {@link KafkaDatasetFacet}. */
9+
public class DefaultKafkaDatasetFacet implements KafkaDatasetFacet {
10+
11+
public static final String KAFKA_FACET_NAME = "kafka";
12+
13+
private Properties properties;
14+
15+
private final KafkaDatasetIdentifier topicIdentifier;
16+
17+
public DefaultKafkaDatasetFacet(KafkaDatasetIdentifier topicIdentifier, Properties properties) {
18+
this(topicIdentifier);
19+
20+
this.properties = new Properties();
21+
KafkaPropertiesUtil.copyProperties(properties, this.properties);
22+
}
23+
24+
public DefaultKafkaDatasetFacet(KafkaDatasetIdentifier topicIdentifier) {
25+
this.topicIdentifier = topicIdentifier;
26+
}
27+
28+
public void setProperties(Properties properties) {
29+
this.properties = new Properties();
30+
KafkaPropertiesUtil.copyProperties(properties, this.properties);
31+
}
32+
33+
public Properties getProperties() {
34+
return properties;
35+
}
36+
37+
public KafkaDatasetIdentifier getTopicIdentifier() {
38+
return topicIdentifier;
39+
}
40+
41+
@Override
42+
public boolean equals(Object o) {
43+
if (this == o) {
44+
return true;
45+
}
46+
if (o == null || getClass() != o.getClass()) {
47+
return false;
48+
}
49+
DefaultKafkaDatasetFacet that = (DefaultKafkaDatasetFacet) o;
50+
return Objects.equals(properties, that.properties)
51+
&& Objects.equals(topicIdentifier, that.topicIdentifier);
52+
}
53+
54+
@Override
55+
public int hashCode() {
56+
return Objects.hash(properties, topicIdentifier);
57+
}
58+
59+
@Override
60+
public String name() {
61+
return KAFKA_FACET_NAME;
62+
}
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package org.apache.flink.connector.kafka.lineage;
2+
3+
import javax.annotation.Nullable;
4+
5+
import java.util.Collections;
6+
import java.util.List;
7+
import java.util.Objects;
8+
import java.util.regex.Pattern;
9+
10+
/** Default implementation of {@link KafkaDatasetIdentifier}. */
11+
public class DefaultKafkaDatasetIdentifier implements KafkaDatasetIdentifier {
12+
13+
@Nullable private final List<String> topics;
14+
@Nullable private final Pattern topicPattern;
15+
16+
public DefaultKafkaDatasetIdentifier(List<String> fixedTopics, Pattern topicPattern) {
17+
this.topics = fixedTopics;
18+
this.topicPattern = topicPattern;
19+
}
20+
21+
public static DefaultKafkaDatasetIdentifier ofPattern(Pattern pattern) {
22+
return new DefaultKafkaDatasetIdentifier(Collections.emptyList(), pattern);
23+
}
24+
25+
public static DefaultKafkaDatasetIdentifier ofTopics(List<String> fixedTopics) {
26+
return new DefaultKafkaDatasetIdentifier(fixedTopics, null);
27+
}
28+
29+
@Nullable
30+
public List<String> getTopics() {
31+
return topics;
32+
}
33+
34+
@Nullable
35+
public Pattern getTopicPattern() {
36+
return topicPattern;
37+
}
38+
39+
@Override
40+
public boolean equals(Object o) {
41+
if (this == o) {
42+
return true;
43+
}
44+
if (o == null || getClass() != o.getClass()) {
45+
return false;
46+
}
47+
DefaultKafkaDatasetIdentifier that = (DefaultKafkaDatasetIdentifier) o;
48+
return Objects.equals(topics, that.topics)
49+
&& Objects.equals(topicPattern, that.topicPattern);
50+
}
51+
52+
@Override
53+
public int hashCode() {
54+
return Objects.hash(topics, topicPattern);
55+
}
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package org.apache.flink.connector.kafka.lineage;
2+
3+
import org.apache.flink.api.common.typeinfo.TypeInformation;
4+
5+
import java.util.Objects;
6+
7+
/** Default implementation of {@link KafkaDatasetFacet}. */
8+
public class DefaultTypeDatasetFacet implements TypeDatasetFacet {
9+
10+
public static final String TYPE_FACET_NAME = "type";
11+
12+
private final TypeInformation typeInformation;
13+
14+
public DefaultTypeDatasetFacet(TypeInformation typeInformation) {
15+
this.typeInformation = typeInformation;
16+
}
17+
18+
public TypeInformation getTypeInformation() {
19+
return typeInformation;
20+
}
21+
22+
public boolean equals(Object o) {
23+
if (this == o) {
24+
return true;
25+
}
26+
if (o == null || getClass() != o.getClass()) {
27+
return false;
28+
}
29+
DefaultTypeDatasetFacet that = (DefaultTypeDatasetFacet) o;
30+
return Objects.equals(typeInformation, that.typeInformation);
31+
}
32+
33+
@Override
34+
public int hashCode() {
35+
return Objects.hash(typeInformation);
36+
}
37+
38+
@Override
39+
public String name() {
40+
return TYPE_FACET_NAME;
41+
}
42+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package org.apache.flink.connector.kafka.lineage;
2+
3+
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
4+
5+
import java.util.Properties;
6+
7+
/** Facet definition to contain all Kafka specific information on Kafka sources and sinks. */
8+
public interface KafkaDatasetFacet extends LineageDatasetFacet {
9+
Properties getProperties();
10+
11+
KafkaDatasetIdentifier getTopicIdentifier();
12+
13+
void setProperties(Properties properties);
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package org.apache.flink.connector.kafka.lineage;
2+
3+
import java.util.Optional;
4+
5+
/** Contains method to extract {@link KafkaDatasetFacet}. */
6+
public interface KafkaDatasetFacetProvider {
7+
8+
/**
9+
* Returns a Kafka dataset facet or `Optional.empty` in case an implementing class is not able
10+
* to identify a dataset.
11+
*
12+
* @return
13+
*/
14+
Optional<KafkaDatasetFacet> getKafkaDatasetFacet();
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package org.apache.flink.connector.kafka.lineage;
2+
3+
import javax.annotation.Nullable;
4+
5+
import java.util.List;
6+
import java.util.regex.Pattern;
7+
8+
/** Kafka dataset identifier which can contain either a list of topics or a topic pattern. */
9+
public interface KafkaDatasetIdentifier {
10+
@Nullable
11+
List<String> getTopics();
12+
13+
@Nullable
14+
Pattern getTopicPattern();
15+
16+
/**
17+
* Assigns lineage dataset's name which is topic pattern if it is present or comma separated
18+
* list of topics.
19+
*
20+
* @return
21+
*/
22+
default String toLineageName() {
23+
if (getTopicPattern() != null) {
24+
return getTopicPattern().toString();
25+
}
26+
return String.join(",", getTopics());
27+
}
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package org.apache.flink.connector.kafka.lineage;
2+
3+
import java.util.Optional;
4+
5+
/** Contains method which allows extracting topic identifier. */
6+
public interface KafkaDatasetIdentifierProvider {
7+
8+
/**
9+
* Gets Kafka dataset identifier or empty in case a class implementing is not able to extract
10+
* dataset identifier.
11+
*
12+
* @return
13+
*/
14+
Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier();
15+
}

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageFacetProvider.java

Lines changed: 0 additions & 19 deletions
This file was deleted.

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java

Lines changed: 23 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -20,89 +20,43 @@
2020
package org.apache.flink.connector.kafka.lineage;
2121

2222
import org.apache.flink.api.connector.source.Boundedness;
23-
import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicListFacet;
24-
import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicPatternFacet;
2523
import org.apache.flink.streaming.api.lineage.LineageDataset;
2624
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
2725
import org.apache.flink.streaming.api.lineage.LineageVertex;
2826
import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
2927

30-
import java.util.ArrayList;
3128
import java.util.Collection;
3229
import java.util.Collections;
30+
import java.util.HashMap;
3331
import java.util.List;
3432
import java.util.Map;
35-
import java.util.Optional;
3633
import java.util.Properties;
3734
import java.util.stream.Collectors;
3835

39-
/** Utility class with useful methods for managing dataset facets. */
36+
/** Utility class with useful methods for managing lineage objects. */
4037
public class LineageUtil {
4138

4239
private static final String KAFKA_DATASET_PREFIX = "kafka://";
4340
private static final String COMMA = ",";
4441
private static final String SEMICOLON = ";";
4542

46-
/**
47-
* Loads facet from any object implementing @link{DatasetFacetProvider} interface.
48-
*
49-
* @param object
50-
* @return
51-
*/
52-
public static Collection<LineageDatasetFacet> facetsFrom(Object object) {
53-
return Optional.of(object)
54-
.filter(LineageFacetProvider.class::isInstance)
55-
.map(LineageFacetProvider.class::cast)
56-
.map(LineageFacetProvider::getDatasetFacets)
57-
.orElse(Collections.emptyList());
43+
public static LineageDataset datasetOf(String namespace, KafkaDatasetFacet kafkaDatasetFacet) {
44+
return datasetOf(namespace, kafkaDatasetFacet, Collections.emptyList());
5845
}
5946

60-
/**
61-
* Creates dataset from a list of facets. Uses {@link KafkaTopicListFacet} to extract dataset
62-
* name from. Dataset per each element of topic list is created
63-
*
64-
* @param facets
65-
* @return
66-
*/
67-
public static Collection<LineageDataset> datasetsFrom(
68-
String namespace, Collection<LineageDatasetFacet> facets) {
69-
// Check if topic list facet is available -> if so explode the list of facets
70-
Optional<KafkaTopicListFacet> topicList =
71-
facets.stream()
72-
.filter(KafkaTopicListFacet.class::isInstance)
73-
.map(KafkaTopicListFacet.class::cast)
74-
.findAny();
75-
76-
List<LineageDataset> datasets = new ArrayList<>();
77-
78-
// Explode list of other facets
79-
if (topicList.isPresent()) {
80-
List<LineageDatasetFacet> facetsWithoutTopicList =
81-
facets.stream().filter(f -> !f.equals(topicList)).collect(Collectors.toList());
82-
83-
datasets.addAll(
84-
topicList.get().topics.stream()
85-
.map(t -> datasetOf(namespace, t, facetsWithoutTopicList))
86-
.collect(Collectors.toList()));
87-
}
88-
89-
// Check if topic pattern is present
90-
// If so topic pattern will be used as a dataset name
91-
datasets.addAll(
92-
facets.stream()
93-
.filter(KafkaTopicPatternFacet.class::isInstance)
94-
.map(KafkaTopicPatternFacet.class::cast)
95-
.map(f -> datasetOf(namespace, f.pattern.toString(), facets))
96-
.collect(Collectors.toList()));
97-
return datasets;
47+
public static LineageDataset datasetOf(
48+
String namespace, KafkaDatasetFacet kafkaDatasetFacet, TypeDatasetFacet typeFacet) {
49+
return datasetOf(namespace, kafkaDatasetFacet, Collections.singletonList(typeFacet));
9850
}
9951

10052
private static LineageDataset datasetOf(
101-
String namespace, String name, Collection<LineageDatasetFacet> facets) {
53+
String namespace,
54+
KafkaDatasetFacet kafkaDatasetFacet,
55+
List<LineageDatasetFacet> facets) {
10256
return new LineageDataset() {
10357
@Override
10458
public String name() {
105-
return name;
59+
return kafkaDatasetFacet.getTopicIdentifier().toLineageName();
10660
}
10761

10862
@Override
@@ -112,16 +66,24 @@ public String namespace() {
11266

11367
@Override
11468
public Map<String, LineageDatasetFacet> facets() {
115-
return facets.stream()
116-
.distinct()
117-
.collect(Collectors.toMap(LineageDatasetFacet::name, item -> item));
69+
Map facetMap = new HashMap<String, LineageDatasetFacet>();
70+
facetMap.put(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME, kafkaDatasetFacet);
71+
facetMap.putAll(
72+
facets.stream()
73+
.collect(
74+
Collectors.toMap(LineageDatasetFacet::name, item -> item)));
75+
return facetMap;
11876
}
11977
};
12078
}
12179

122-
public static String datasetNamespaceOf(Properties properties) {
80+
public static String namespaceOf(Properties properties) {
12381
String bootstrapServers = properties.getProperty("bootstrap.servers");
12482

83+
if (bootstrapServers == null) {
84+
return KAFKA_DATASET_PREFIX;
85+
}
86+
12587
if (bootstrapServers.contains(COMMA)) {
12688
bootstrapServers = bootstrapServers.split(COMMA)[0];
12789
} else if (bootstrapServers.contains(SEMICOLON)) {
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package org.apache.flink.connector.kafka.lineage;
2+
3+
import org.apache.flink.api.common.typeinfo.TypeInformation;
4+
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
5+
6+
/** Facet definition to contain type information of source and sink. */
7+
public interface TypeDatasetFacet extends LineageDatasetFacet {
8+
TypeInformation getTypeInformation();
9+
}

0 commit comments

Comments
 (0)