Skip to content

Commit

Permalink
[FLINK-34466] create KafkaDatasetFacet
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <[email protected]>
  • Loading branch information
pawel-big-lebowski committed Oct 18, 2024
1 parent ca14634 commit ea71a84
Show file tree
Hide file tree
Showing 25 changed files with 530 additions and 674 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet;

import java.util.Optional;

/** Contains method to extract {@link KafkaDatasetFacet}. */
public interface KafkaDatasetFacetProvider {

/**
* List of lineage dataset facets.
*
* @return
*/
Optional<KafkaDatasetFacet> getKafkaDatasetFacet();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet.KafkaDatasetIdentifier;

import java.util.Optional;

/** Contains method which allows extracting topic identifier. */
public interface KafkaDatasetIdentifierProvider {

/**
* List of lineage dataset facets.
*
* @return
*/
Optional<KafkaDatasetIdentifier> getDatasetIdentifier();
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,16 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicListFacet;
import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicPatternFacet;
import org.apache.flink.connector.kafka.lineage.facets.KafkaDatasetFacet;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.SourceLineageVertex;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;

Expand All @@ -43,66 +40,15 @@ public class LineageUtil {
private static final String COMMA = ",";
private static final String SEMICOLON = ";";

/**
* Loads facet from any object implementing @link{DatasetFacetProvider} interface.
*
* @param object
* @return
*/
public static Collection<LineageDatasetFacet> facetsFrom(Object object) {
return Optional.of(object)
.filter(LineageFacetProvider.class::isInstance)
.map(LineageFacetProvider.class::cast)
.map(LineageFacetProvider::getDatasetFacets)
.orElse(Collections.emptyList());
}

/**
* Creates dataset from a list of facets. Uses {@link KafkaTopicListFacet} to extract dataset
* name from. Dataset per each element of topic list is created
*
* @param facets
* @return
*/
public static Collection<LineageDataset> datasetsFrom(
String namespace, Collection<LineageDatasetFacet> facets) {
// Check if topic list facet is available -> if so explode the list of facets
Optional<KafkaTopicListFacet> topicList =
facets.stream()
.filter(KafkaTopicListFacet.class::isInstance)
.map(KafkaTopicListFacet.class::cast)
.findAny();

List<LineageDataset> datasets = new ArrayList<>();

// Explode list of other facets
if (topicList.isPresent()) {
List<LineageDatasetFacet> facetsWithoutTopicList =
facets.stream().filter(f -> !f.equals(topicList)).collect(Collectors.toList());

datasets.addAll(
topicList.get().topics.stream()
.map(t -> datasetOf(namespace, t, facetsWithoutTopicList))
.collect(Collectors.toList()));
}

// Check if topic pattern is present
// If so topic pattern will be used as a dataset name
datasets.addAll(
facets.stream()
.filter(KafkaTopicPatternFacet.class::isInstance)
.map(KafkaTopicPatternFacet.class::cast)
.map(f -> datasetOf(namespace, f.pattern.toString(), facets))
.collect(Collectors.toList()));
return datasets;
}

private static LineageDataset datasetOf(
String namespace, String name, Collection<LineageDatasetFacet> facets) {
public static LineageDataset datasetOf(String namespace, KafkaDatasetFacet kafkaDatasetFacet) {
return new LineageDataset() {
@Override
public String name() {
return name;
if (kafkaDatasetFacet.topicIdentifier.topicPattern != null) {
return kafkaDatasetFacet.topicIdentifier.toString();
}

return String.join(",", kafkaDatasetFacet.topicIdentifier.topics);
}

@Override
Expand All @@ -112,16 +58,19 @@ public String namespace() {

@Override
public Map<String, LineageDatasetFacet> facets() {
return facets.stream()
.distinct()
.collect(Collectors.toMap(LineageDatasetFacet::name, item -> item));
return Collections.singletonMap(
KafkaDatasetFacet.KAFKA_FACET_NAME, kafkaDatasetFacet);
}
};
}

public static String datasetNamespaceOf(Properties properties) {
public static String namespaceOf(Properties properties) {
String bootstrapServers = properties.getProperty("bootstrap.servers");

if (bootstrapServers == null) {
return KAFKA_DATASET_PREFIX;
}

if (bootstrapServers.contains(COMMA)) {
bootstrapServers = bootstrapServers.split(COMMA)[0];
} else if (bootstrapServers.contains(SEMICOLON)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package org.apache.flink.connector.kafka.lineage.facets;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.regex.Pattern;

/** Facet containing all information related to sources and sinks on Kafka. */
public class KafkaDatasetFacet implements LineageDatasetFacet {

public static final String KAFKA_FACET_NAME = "kafka";

public final Properties properties;
public final TypeInformation typeInformation;
public final KafkaDatasetIdentifier topicIdentifier;

public KafkaDatasetFacet(
KafkaDatasetIdentifier topicIdentifier,
Properties properties,
TypeInformation typeInformation) {
this.topicIdentifier = topicIdentifier;
this.properties = properties;
this.typeInformation = typeInformation;
}

public void addProperties(Properties properties) {
this.properties.putAll(properties);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
KafkaDatasetFacet that = (KafkaDatasetFacet) o;
return Objects.equals(properties, that.properties)
&& Objects.equals(typeInformation, that.typeInformation)
&& Objects.equals(topicIdentifier, that.topicIdentifier);
}

@Override
public int hashCode() {
return Objects.hash(properties, typeInformation, topicIdentifier);
}

@Override
public String name() {
return KAFKA_FACET_NAME;
}

/**
* Record class to contain topics' identifier information which can be either a list of topics
* or a topic pattern.
*/
public static class KafkaDatasetIdentifier {
public final List<String> topics;
public final Pattern topicPattern;

public KafkaDatasetIdentifier(List<String> fixedTopics, Pattern topicPattern) {
this.topics = fixedTopics;
this.topicPattern = topicPattern;
}

public static KafkaDatasetIdentifier of(Pattern pattern) {
return new KafkaDatasetIdentifier(Collections.emptyList(), pattern);
}

public static KafkaDatasetIdentifier of(List<String> fixedTopics) {
return new KafkaDatasetIdentifier(fixedTopics, null);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
KafkaDatasetIdentifier that = (KafkaDatasetIdentifier) o;
return Objects.equals(topics, that.topics)
&& Objects.equals(topicPattern, that.topicPattern);
}

@Override
public int hashCode() {
return Objects.hash(topics, topicPattern);
}
}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit ea71a84

Please sign in to comment.