Skip to content

Commit 6bf144b

Browse files
[FLINK-34466] lineage interfaces for kafka connector
1 parent 2dfdae6 commit 6bf144b

22 files changed

+916
-10
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.apache.flink.connector.kafka.lineage;
2+
3+
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
4+
5+
import java.util.List;
6+
7+
/**
8+
* Contains method which can be used for lineage schema facet extraction. Useful for classes like
9+
* topic selectors or serialization schemas to extract dataset information from.
10+
*/
11+
public interface LineageFacetProvider {
12+
13+
/**
14+
* List of lineage dataset facets.
15+
*
16+
* @return
17+
*/
18+
List<LineageDatasetFacet> getDatasetFacets();
19+
}
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
*/
19+
20+
package org.apache.flink.connector.kafka.lineage;
21+
22+
import org.apache.flink.api.connector.source.Boundedness;
23+
import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicListFacet;
24+
import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicPatternFacet;
25+
import org.apache.flink.streaming.api.lineage.LineageDataset;
26+
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
27+
import org.apache.flink.streaming.api.lineage.LineageVertex;
28+
import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
29+
30+
import java.util.ArrayList;
31+
import java.util.Collections;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.Optional;
35+
import java.util.Properties;
36+
import java.util.stream.Collectors;
37+
38+
/** Utility class with useful methods for managing dataset facets. */
39+
public class LineageUtil {
40+
41+
private static final String KAFKA_DATASET_PREFIX = "kafka://";
42+
private static final String COMMA = ",";
43+
private static final String SEMICOLON = ";";
44+
45+
/**
46+
* Loads facet from any object implementing @link{DatasetFacetProvider} interface.
47+
*
48+
* @param object
49+
* @return
50+
*/
51+
public static List<LineageDatasetFacet> facetsFrom(Object object) {
52+
return Optional.of(object)
53+
.filter(LineageFacetProvider.class::isInstance)
54+
.map(LineageFacetProvider.class::cast)
55+
.map(LineageFacetProvider::getDatasetFacets)
56+
.orElse(Collections.emptyList());
57+
}
58+
59+
/**
60+
* Creates dataset from a list of facets. Uses {@link KafkaTopicListFacet} to extract dataset
61+
* name from. Dataset per each element of topic list is created
62+
*
63+
* @param facets
64+
* @return
65+
*/
66+
public static List<LineageDataset> datasetsFrom(
67+
String namespace, List<LineageDatasetFacet> facets) {
68+
// Check if topic list facet is available -> if so explode the list of facets
69+
Optional<KafkaTopicListFacet> topicList =
70+
facets.stream()
71+
.filter(KafkaTopicListFacet.class::isInstance)
72+
.map(KafkaTopicListFacet.class::cast)
73+
.findAny();
74+
75+
List<LineageDataset> datasets = new ArrayList<>();
76+
77+
// Explode list of other facets
78+
if (topicList.isPresent()) {
79+
List<LineageDatasetFacet> facetsWithoutTopicList =
80+
facets.stream().filter(f -> !f.equals(topicList)).collect(Collectors.toList());
81+
82+
topicList.get().topics.stream()
83+
.forEach(t -> datasets.add(datasetOf(namespace, t, facetsWithoutTopicList)));
84+
}
85+
86+
// Check if topic pattern is present
87+
// If so topic pattern will be used as a dataset name
88+
facets.stream()
89+
.filter(KafkaTopicPatternFacet.class::isInstance)
90+
.map(KafkaTopicPatternFacet.class::cast)
91+
.findAny()
92+
.ifPresent(
93+
pattern ->
94+
datasets.add(
95+
datasetOf(namespace, pattern.pattern.toString(), facets)));
96+
97+
return datasets;
98+
}
99+
100+
private static LineageDataset datasetOf(
101+
String namespace, String name, List<LineageDatasetFacet> facets) {
102+
return new LineageDataset() {
103+
@Override
104+
public String name() {
105+
return name;
106+
}
107+
108+
@Override
109+
public String namespace() {
110+
return namespace;
111+
}
112+
113+
@Override
114+
public Map<String, LineageDatasetFacet> facets() {
115+
return facets.stream()
116+
.distinct()
117+
.collect(Collectors.toMap(LineageDatasetFacet::name, item -> item));
118+
}
119+
};
120+
}
121+
122+
public static String datasetNamespaceOf(Properties properties) {
123+
String bootstrapServers = properties.getProperty("bootstrap.servers");
124+
125+
if (bootstrapServers.contains(COMMA)) {
126+
bootstrapServers = bootstrapServers.split(COMMA)[0];
127+
} else if (bootstrapServers.contains(SEMICOLON)) {
128+
bootstrapServers = bootstrapServers.split(SEMICOLON)[0];
129+
}
130+
131+
return String.format(KAFKA_DATASET_PREFIX + bootstrapServers);
132+
}
133+
134+
public static SourceLineageVertex sourceLineageVertexOf(List<LineageDataset> datasets) {
135+
return new SourceLineageVertex() {
136+
@Override
137+
public Boundedness boundedness() {
138+
return Boundedness.CONTINUOUS_UNBOUNDED;
139+
}
140+
141+
@Override
142+
public List<LineageDataset> datasets() {
143+
return datasets;
144+
}
145+
};
146+
}
147+
148+
public static LineageVertex lineageVertexOf(List<LineageDataset> datasets) {
149+
return new LineageVertex() {
150+
@Override
151+
public List<LineageDataset> datasets() {
152+
return datasets;
153+
}
154+
};
155+
}
156+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.connector.kafka.lineage.facets;
19+
20+
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
21+
22+
import java.util.Objects;
23+
import java.util.Properties;
24+
25+
/** Facet containing Kafka properties. */
26+
public class KafkaPropertiesFacet implements LineageDatasetFacet {
27+
28+
public static final String KAFKA_PROPERTIES_FACET_NAME = "kafkaProperties";
29+
public Properties properties;
30+
31+
public KafkaPropertiesFacet(Properties properties) {
32+
this.properties = properties;
33+
}
34+
35+
@Override
36+
public String name() {
37+
return KAFKA_PROPERTIES_FACET_NAME;
38+
}
39+
40+
@Override
41+
public boolean equals(Object o) {
42+
if (this == o) {
43+
return true;
44+
}
45+
if (o == null || getClass() != o.getClass()) {
46+
return false;
47+
}
48+
KafkaPropertiesFacet that = (KafkaPropertiesFacet) o;
49+
return Objects.equals(properties, that.properties);
50+
}
51+
52+
@Override
53+
public int hashCode() {
54+
return Objects.hash(properties);
55+
}
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.connector.kafka.lineage.facets;
19+
20+
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
21+
22+
import java.util.List;
23+
import java.util.Objects;
24+
25+
/**
26+
* Facet containing TypeInformation object. Can be used as an intermediate step for evaluating topic
27+
* involved in data processing.
28+
*/
29+
public class KafkaTopicListFacet implements LineageDatasetFacet {
30+
31+
public static final String TOPIC_LIST_FACET_NAME = "topicList";
32+
public List<String> topics;
33+
34+
public KafkaTopicListFacet(List<String> topics) {
35+
this.topics = topics;
36+
}
37+
38+
@Override
39+
public String name() {
40+
return TOPIC_LIST_FACET_NAME;
41+
}
42+
43+
@Override
44+
public boolean equals(Object o) {
45+
if (this == o) {
46+
return true;
47+
}
48+
if (o == null || getClass() != o.getClass()) {
49+
return false;
50+
}
51+
KafkaTopicListFacet that = (KafkaTopicListFacet) o;
52+
return Objects.equals(topics, that.topics);
53+
}
54+
55+
@Override
56+
public int hashCode() {
57+
return Objects.hash(topics);
58+
}
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.connector.kafka.lineage.facets;
19+
20+
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
21+
22+
import java.util.Objects;
23+
import java.util.regex.Pattern;
24+
25+
/**
26+
* Facet containing topic pattern. Can be used as an intermediate step for evaluating topics
27+
* involved in data processing.
28+
*/
29+
public class KafkaTopicPatternFacet implements LineageDatasetFacet {
30+
31+
public static final String TOPIC_PATTERN_FACET_NAME = "topicPattern";
32+
public Pattern pattern;
33+
34+
public KafkaTopicPatternFacet(Pattern pattern) {
35+
this.pattern = pattern;
36+
}
37+
38+
@Override
39+
public String name() {
40+
return TOPIC_PATTERN_FACET_NAME;
41+
}
42+
43+
@Override
44+
public boolean equals(Object o) {
45+
if (this == o) {
46+
return true;
47+
}
48+
if (o == null || getClass() != o.getClass()) {
49+
return false;
50+
}
51+
KafkaTopicPatternFacet that = (KafkaTopicPatternFacet) o;
52+
return Objects.equals(pattern.pattern(), that.pattern.pattern());
53+
}
54+
55+
@Override
56+
public int hashCode() {
57+
return Objects.hash(pattern.pattern());
58+
}
59+
}

0 commit comments

Comments
 (0)