Skip to content

Commit 0484abd

Browse files
[FLINK-34466] create KafkaDatasetFacet
Signed-off-by: Pawel Leszczynski <[email protected]>
1 parent ca14634 commit 0484abd

25 files changed

+530
-674
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.apache.flink.connector.kafka.lineage;
2+
3+
import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet;
4+
5+
import java.util.Optional;
6+
7+
/** Contains method to extract {@link KafkaDatasetFacet}. */
8+
public interface KafkaDatasetFacetProvider {
9+
10+
/**
11+
* List of lineage dataset facets.
12+
*
13+
* @return
14+
*/
15+
Optional<KafkaDatasetFacet> getKafkaDatasetFacet();
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.apache.flink.connector.kafka.lineage;
2+
3+
import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet.KafkaTopicsIdentifier;
4+
5+
import java.util.Optional;
6+
7+
/** Contains method which allows extracting topic identifier. */
8+
public interface KafkaTopicsIdentifierProvider {
9+
10+
/**
11+
* List of lineage dataset facets.
12+
*
13+
* @return
14+
*/
15+
Optional<KafkaTopicsIdentifier> getTopicsIdentifier();
16+
}

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageFacetProvider.java

Lines changed: 0 additions & 19 deletions
This file was deleted.

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java

Lines changed: 14 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,16 @@
2020
package org.apache.flink.connector.kafka.lineage;
2121

2222
import org.apache.flink.api.connector.source.Boundedness;
23-
import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicListFacet;
24-
import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicPatternFacet;
23+
import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet;
2524
import org.apache.flink.streaming.api.lineage.LineageDataset;
2625
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
2726
import org.apache.flink.streaming.api.lineage.LineageVertex;
2827
import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
2928

30-
import java.util.ArrayList;
3129
import java.util.Collection;
3230
import java.util.Collections;
3331
import java.util.List;
3432
import java.util.Map;
35-
import java.util.Optional;
3633
import java.util.Properties;
3734
import java.util.stream.Collectors;
3835

@@ -43,66 +40,15 @@ public class LineageUtil {
4340
private static final String COMMA = ",";
4441
private static final String SEMICOLON = ";";
4542

46-
/**
47-
* Loads facet from any object implementing @link{DatasetFacetProvider} interface.
48-
*
49-
* @param object
50-
* @return
51-
*/
52-
public static Collection<LineageDatasetFacet> facetsFrom(Object object) {
53-
return Optional.of(object)
54-
.filter(LineageFacetProvider.class::isInstance)
55-
.map(LineageFacetProvider.class::cast)
56-
.map(LineageFacetProvider::getDatasetFacets)
57-
.orElse(Collections.emptyList());
58-
}
59-
60-
/**
61-
* Creates dataset from a list of facets. Uses {@link KafkaTopicListFacet} to extract dataset
62-
* name from. Dataset per each element of topic list is created
63-
*
64-
* @param facets
65-
* @return
66-
*/
67-
public static Collection<LineageDataset> datasetsFrom(
68-
String namespace, Collection<LineageDatasetFacet> facets) {
69-
// Check if topic list facet is available -> if so explode the list of facets
70-
Optional<KafkaTopicListFacet> topicList =
71-
facets.stream()
72-
.filter(KafkaTopicListFacet.class::isInstance)
73-
.map(KafkaTopicListFacet.class::cast)
74-
.findAny();
75-
76-
List<LineageDataset> datasets = new ArrayList<>();
77-
78-
// Explode list of other facets
79-
if (topicList.isPresent()) {
80-
List<LineageDatasetFacet> facetsWithoutTopicList =
81-
facets.stream().filter(f -> !f.equals(topicList)).collect(Collectors.toList());
82-
83-
datasets.addAll(
84-
topicList.get().topics.stream()
85-
.map(t -> datasetOf(namespace, t, facetsWithoutTopicList))
86-
.collect(Collectors.toList()));
87-
}
88-
89-
// Check if topic pattern is present
90-
// If so topic pattern will be used as a dataset name
91-
datasets.addAll(
92-
facets.stream()
93-
.filter(KafkaTopicPatternFacet.class::isInstance)
94-
.map(KafkaTopicPatternFacet.class::cast)
95-
.map(f -> datasetOf(namespace, f.pattern.toString(), facets))
96-
.collect(Collectors.toList()));
97-
return datasets;
98-
}
99-
100-
private static LineageDataset datasetOf(
101-
String namespace, String name, Collection<LineageDatasetFacet> facets) {
43+
public static LineageDataset datasetOf(String namespace, KafkaDatasetFacet kafkaDatasetFacet) {
10244
return new LineageDataset() {
10345
@Override
10446
public String name() {
105-
return name;
47+
if (kafkaDatasetFacet.topicIdentifier.topicPattern != null) {
48+
return kafkaDatasetFacet.topicIdentifier.toString();
49+
}
50+
51+
return String.join(",", kafkaDatasetFacet.topicIdentifier.topics);
10652
}
10753

10854
@Override
@@ -112,16 +58,19 @@ public String namespace() {
11258

11359
@Override
11460
public Map<String, LineageDatasetFacet> facets() {
115-
return facets.stream()
116-
.distinct()
117-
.collect(Collectors.toMap(LineageDatasetFacet::name, item -> item));
61+
return Collections.singletonMap(
62+
KafkaDatasetFacet.KAFKA_FACET_NAME, kafkaDatasetFacet);
11863
}
11964
};
12065
}
12166

122-
public static String datasetNamespaceOf(Properties properties) {
67+
public static String namespaceOf(Properties properties) {
12368
String bootstrapServers = properties.getProperty("bootstrap.servers");
12469

70+
if (bootstrapServers == null) {
71+
return KAFKA_DATASET_PREFIX;
72+
}
73+
12574
if (bootstrapServers.contains(COMMA)) {
12675
bootstrapServers = bootstrapServers.split(COMMA)[0];
12776
} else if (bootstrapServers.contains(SEMICOLON)) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package org.apache.flink.connector.kafka.lineage.facets;
2+
3+
import org.apache.flink.api.common.typeinfo.TypeInformation;
4+
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
5+
6+
import java.util.Collections;
7+
import java.util.List;
8+
import java.util.Objects;
9+
import java.util.Properties;
10+
import java.util.regex.Pattern;
11+
12+
/** Facet containing all information related to sources and sinks on Kafka. */
13+
public class KafkaDatasetFacet implements LineageDatasetFacet {
14+
15+
public static final String KAFKA_FACET_NAME = "kafka";
16+
17+
public final Properties properties;
18+
public final TypeInformation typeInformation;
19+
public final KafkaTopicsIdentifier topicIdentifier;
20+
21+
public KafkaDatasetFacet(
22+
KafkaTopicsIdentifier topicIdentifier,
23+
Properties properties,
24+
TypeInformation typeInformation) {
25+
this.topicIdentifier = topicIdentifier;
26+
this.properties = properties;
27+
this.typeInformation = typeInformation;
28+
}
29+
30+
public void addProperties(Properties properties) {
31+
this.properties.putAll(properties);
32+
}
33+
34+
@Override
35+
public boolean equals(Object o) {
36+
if (this == o) {
37+
return true;
38+
}
39+
if (o == null || getClass() != o.getClass()) {
40+
return false;
41+
}
42+
KafkaDatasetFacet that = (KafkaDatasetFacet) o;
43+
return Objects.equals(properties, that.properties)
44+
&& Objects.equals(typeInformation, that.typeInformation)
45+
&& Objects.equals(topicIdentifier, that.topicIdentifier);
46+
}
47+
48+
@Override
49+
public int hashCode() {
50+
return Objects.hash(properties, typeInformation, topicIdentifier);
51+
}
52+
53+
@Override
54+
public String name() {
55+
return KAFKA_FACET_NAME;
56+
}
57+
58+
/**
59+
* Record class to contain topics' identifier information which can be either a list of topics
60+
* or a topic pattern.
61+
*/
62+
public static class KafkaTopicsIdentifier {
63+
public final List<String> topics;
64+
public final Pattern topicPattern;
65+
66+
public KafkaTopicsIdentifier(List<String> fixedTopics, Pattern topicPattern) {
67+
this.topics = fixedTopics;
68+
this.topicPattern = topicPattern;
69+
}
70+
71+
public static KafkaTopicsIdentifier of(Pattern pattern) {
72+
return new KafkaTopicsIdentifier(Collections.emptyList(), pattern);
73+
}
74+
75+
public static KafkaTopicsIdentifier of(List<String> fixedTopics) {
76+
return new KafkaTopicsIdentifier(fixedTopics, null);
77+
}
78+
79+
@Override
80+
public boolean equals(Object o) {
81+
if (this == o) {
82+
return true;
83+
}
84+
if (o == null || getClass() != o.getClass()) {
85+
return false;
86+
}
87+
KafkaTopicsIdentifier that = (KafkaTopicsIdentifier) o;
88+
return Objects.equals(topics, that.topics)
89+
&& Objects.equals(topicPattern, that.topicPattern);
90+
}
91+
92+
@Override
93+
public int hashCode() {
94+
return Objects.hash(topics, topicPattern);
95+
}
96+
}
97+
}

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaPropertiesFacet.java

Lines changed: 0 additions & 59 deletions
This file was deleted.

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/facets/KafkaTopicListFacet.java

Lines changed: 0 additions & 59 deletions
This file was deleted.

0 commit comments

Comments
 (0)