Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34466] Lineage interfaces for kafka connector #130

Merged
merged 3 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;

import java.util.Objects;
import java.util.Properties;

/** Default implementation of {@link KafkaDatasetFacet}. */
public class DefaultKafkaDatasetFacet implements KafkaDatasetFacet {

public static final String KAFKA_FACET_NAME = "kafka";

private Properties properties;

private final KafkaDatasetIdentifier topicIdentifier;

public DefaultKafkaDatasetFacet(KafkaDatasetIdentifier topicIdentifier, Properties properties) {
this(topicIdentifier);

this.properties = new Properties();
KafkaPropertiesUtil.copyProperties(properties, this.properties);
}

public DefaultKafkaDatasetFacet(KafkaDatasetIdentifier topicIdentifier) {
this.topicIdentifier = topicIdentifier;
}

public void setProperties(Properties properties) {
this.properties = new Properties();
KafkaPropertiesUtil.copyProperties(properties, this.properties);
}

public Properties getProperties() {
return properties;
}

public KafkaDatasetIdentifier getTopicIdentifier() {
return topicIdentifier;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DefaultKafkaDatasetFacet that = (DefaultKafkaDatasetFacet) o;
return Objects.equals(properties, that.properties)
&& Objects.equals(topicIdentifier, that.topicIdentifier);
}

@Override
public int hashCode() {
return Objects.hash(properties, topicIdentifier);
}

@Override
public String name() {
return KAFKA_FACET_NAME;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.apache.flink.connector.kafka.lineage;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.regex.Pattern;

/** Default implementation of {@link KafkaDatasetIdentifier}. */
public class DefaultKafkaDatasetIdentifier implements KafkaDatasetIdentifier {

@Nullable private final List<String> topics;
@Nullable private final Pattern topicPattern;

public DefaultKafkaDatasetIdentifier(List<String> fixedTopics, Pattern topicPattern) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public DefaultKafkaDatasetIdentifier(List<String> fixedTopics, Pattern topicPattern) {
public DefaultKafkaDatasetIdentifier(@Nullable List<String> fixedTopics, @Nullable Pattern topicPattern) {

Just try to be as consistent as possible.

this.topics = fixedTopics;
this.topicPattern = topicPattern;
}

public static DefaultKafkaDatasetIdentifier ofPattern(Pattern pattern) {
return new DefaultKafkaDatasetIdentifier(Collections.emptyList(), pattern);
}

public static DefaultKafkaDatasetIdentifier ofTopics(List<String> fixedTopics) {
return new DefaultKafkaDatasetIdentifier(fixedTopics, null);
}

@Nullable
public List<String> getTopics() {
return topics;
}

@Nullable
public Pattern getTopicPattern() {
return topicPattern;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DefaultKafkaDatasetIdentifier that = (DefaultKafkaDatasetIdentifier) o;
return Objects.equals(topics, that.topics)
&& Objects.equals(topicPattern, that.topicPattern);
}

@Override
public int hashCode() {
return Objects.hash(topics, topicPattern);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.util.Objects;

/** Default implementation of {@link KafkaDatasetFacet}. */
public class DefaultTypeDatasetFacet implements TypeDatasetFacet {

public static final String TYPE_FACET_NAME = "type";

private final TypeInformation typeInformation;

public DefaultTypeDatasetFacet(TypeInformation typeInformation) {
this.typeInformation = typeInformation;
}

public TypeInformation getTypeInformation() {
return typeInformation;
}

public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DefaultTypeDatasetFacet that = (DefaultTypeDatasetFacet) o;
return Objects.equals(typeInformation, that.typeInformation);
}

@Override
public int hashCode() {
return Objects.hash(typeInformation);
}

@Override
public String name() {
return TYPE_FACET_NAME;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;

import java.util.Properties;

/** Facet definition to contain all Kafka specific information on Kafka sources and sinks. */
public interface KafkaDatasetFacet extends LineageDatasetFacet {
Properties getProperties();

KafkaDatasetIdentifier getTopicIdentifier();

void setProperties(Properties properties);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.flink.connector.kafka.lineage;

import java.util.Optional;

/** Contains method to extract {@link KafkaDatasetFacet}. */
public interface KafkaDatasetFacetProvider {

/**
* Returns a Kafka dataset facet or `Optional.empty` in case an implementing class is not able
* to identify a dataset.
*
* @return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove if you don't want to fill it out. Would be good to document when it's returning empty.

*/
Optional<KafkaDatasetFacet> getKafkaDatasetFacet();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.apache.flink.connector.kafka.lineage;

import javax.annotation.Nullable;

import java.util.List;
import java.util.regex.Pattern;

/** Kafka dataset identifier which can contain either a list of topics or a topic pattern. */
public interface KafkaDatasetIdentifier {
@Nullable
List<String> getTopics();

@Nullable
Pattern getTopicPattern();

/**
* Assigns lineage dataset's name which is topic pattern if it is present or comma separated
* list of topics.
*
* @return
*/
default String toLineageName() {
if (getTopicPattern() != null) {
return getTopicPattern().toString();
}
return String.join(",", getTopics());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.flink.connector.kafka.lineage;

import java.util.Optional;

/** Contains method which allows extracting topic identifier. */
public interface KafkaDatasetIdentifierProvider {
AHeise marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure to tag all public API with @PublicEvolving. It needs to be clearly visible if a user is supposed to touch the class or not (the easiest way is to not use public unless needed).


/**
* Gets Kafka dataset identifier or empty in case a class implementing is not able to extract
* dataset identifier.
*
* @return
*/
Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.SourceLineageVertex;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

/** Utility class with useful methods for managing lineage objects. */
public class LineageUtil {
AHeise marked this conversation as resolved.
Show resolved Hide resolved

private static final String KAFKA_DATASET_PREFIX = "kafka://";
private static final String COMMA = ",";
private static final String SEMICOLON = ";";

public static LineageDataset datasetOf(String namespace, KafkaDatasetFacet kafkaDatasetFacet) {
return datasetOf(namespace, kafkaDatasetFacet, Collections.emptyList());
}

public static LineageDataset datasetOf(
String namespace, KafkaDatasetFacet kafkaDatasetFacet, TypeDatasetFacet typeFacet) {
return datasetOf(namespace, kafkaDatasetFacet, Collections.singletonList(typeFacet));
}

private static LineageDataset datasetOf(
String namespace,
KafkaDatasetFacet kafkaDatasetFacet,
List<LineageDatasetFacet> facets) {
return new LineageDataset() {
@Override
public String name() {
return kafkaDatasetFacet.getTopicIdentifier().toLineageName();
}

@Override
public String namespace() {
return namespace;
}

@Override
public Map<String, LineageDatasetFacet> facets() {
Map facetMap = new HashMap<String, LineageDatasetFacet>();
facetMap.put(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME, kafkaDatasetFacet);
facetMap.putAll(
facets.stream()
.collect(
Collectors.toMap(LineageDatasetFacet::name, item -> item)));
return facetMap;
}
};
}

public static String namespaceOf(Properties properties) {
String bootstrapServers = properties.getProperty("bootstrap.servers");

if (bootstrapServers == null) {
return KAFKA_DATASET_PREFIX;
}

if (bootstrapServers.contains(COMMA)) {
bootstrapServers = bootstrapServers.split(COMMA)[0];
} else if (bootstrapServers.contains(SEMICOLON)) {
bootstrapServers = bootstrapServers.split(SEMICOLON)[0];
}
Comment on lines +87 to +91
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check if there is already some util in kafka that does that? If not, leave as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like piece of code that has to be available somewhere, but I wasn't able to find it.


return String.format(KAFKA_DATASET_PREFIX + bootstrapServers);
}

public static SourceLineageVertex sourceLineageVertexOf(Collection<LineageDataset> datasets) {
return new SourceLineageVertex() {
@Override
public Boundedness boundedness() {
return Boundedness.CONTINUOUS_UNBOUNDED;
}

@Override
public List<LineageDataset> datasets() {
return datasets.stream().collect(Collectors.toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return datasets.stream().collect(Collectors.toList());
return List.copyOf(datasets);

}
};
}

public static LineageVertex lineageVertexOf(Collection<LineageDataset> datasets) {
return new LineageVertex() {
@Override
public List<LineageDataset> datasets() {
return datasets.stream().collect(Collectors.toList());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not use the List.copyOf(datasets); as per Arvid's suggested change here also.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merged version is not using List.copyOf. We agreed on that in offline discussion.

}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;

/** Facet definition to contain type information of source and sink. */
public interface TypeDatasetFacet extends LineageDatasetFacet {
TypeInformation getTypeInformation();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.flink.connector.kafka.lineage;

import java.util.Optional;

/** Contains method to extract {@link TypeDatasetFacet}. */
public interface TypeDatasetFacetProvider {

/**
* Returns a type dataset facet or `Optional.empty` in case an implementing class is not able to
* resolve type.
*
* @return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove all empty javadoc tags or let Copilot help you ;)

*/
Optional<TypeDatasetFacet> getTypeDatasetFacet();
}
Loading
Loading