Skip to content

Commit

Permalink
fix CI.
Browse files Browse the repository at this point in the history
  • Loading branch information
lvyanquan committed Jan 8, 2025
1 parent 1f586c9 commit 923df3d
Showing 1 changed file with 1 addition and 51 deletions.
52 changes: 1 addition & 51 deletions flink-python/pyflink/datastream/connectors/tests/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from typing import Dict

import pyflink.datastream.data_stream as data_stream
from pyflink.common import typeinfo

from pyflink.common.configuration import Configuration
from pyflink.common.serialization import SimpleStringSchema, DeserializationSchema
Expand All @@ -28,69 +27,20 @@
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream.connectors.base import DeliveryGuarantee
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaTopicPartition, \
KafkaOffsetsInitializer, KafkaOffsetResetStrategy, KafkaRecordSerializationSchema, KafkaSink, \
FlinkKafkaProducer, FlinkKafkaConsumer
KafkaOffsetsInitializer, KafkaOffsetResetStrategy, KafkaRecordSerializationSchema, KafkaSink
from pyflink.datastream.formats.avro import AvroRowDeserializationSchema, AvroRowSerializationSchema
from pyflink.datastream.formats.csv import CsvRowDeserializationSchema, CsvRowSerializationSchema
from pyflink.datastream.formats.json import JsonRowDeserializationSchema, JsonRowSerializationSchema
from pyflink.java_gateway import get_gateway
from pyflink.testing.test_case_utils import (
PyFlinkStreamingTestCase,
PyFlinkTestCase,
invoke_java_object_method,
to_java_data_structure,
)
from pyflink.util.java_utils import to_jarray, is_instance_of, get_field_value


class KafkaSourceTests(PyFlinkStreamingTestCase):

def test_legacy_kafka_connector(self):
source_topic = 'test_source_topic'
sink_topic = 'test_sink_topic'
props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'}
type_info = Types.ROW([Types.INT(), Types.STRING()])

# Test for kafka consumer
deserialization_schema = JsonRowDeserializationSchema.builder() \
.type_info(type_info=type_info).build()

flink_kafka_consumer = FlinkKafkaConsumer(source_topic, deserialization_schema, props)
flink_kafka_consumer.set_start_from_earliest()
flink_kafka_consumer.set_commit_offsets_on_checkpoints(True)

j_properties = get_field_value(flink_kafka_consumer.get_java_function(), 'properties')
self.assertEqual('localhost:9092', j_properties.getProperty('bootstrap.servers'))
self.assertEqual('test_group', j_properties.getProperty('group.id'))
self.assertTrue(get_field_value(flink_kafka_consumer.get_java_function(),
'enableCommitOnCheckpoints'))
j_start_up_mode = get_field_value(flink_kafka_consumer.get_java_function(), 'startupMode')

j_deserializer = get_field_value(flink_kafka_consumer.get_java_function(), 'deserializer')
j_deserialize_type_info = invoke_java_object_method(j_deserializer, "getProducedType")
deserialize_type_info = typeinfo._from_java_type(j_deserialize_type_info)
self.assertTrue(deserialize_type_info == type_info)
self.assertTrue(j_start_up_mode.equals(get_gateway().jvm
.org.apache.flink.streaming.connectors
.kafka.config.StartupMode.EARLIEST))
j_topic_desc = get_field_value(flink_kafka_consumer.get_java_function(),
'topicsDescriptor')
j_topics = invoke_java_object_method(j_topic_desc, 'getFixedTopics')
self.assertEqual(['test_source_topic'], list(j_topics))

# Test for kafka producer
serialization_schema = JsonRowSerializationSchema.builder().with_type_info(type_info) \
.build()
flink_kafka_producer = FlinkKafkaProducer(sink_topic, serialization_schema, props)
flink_kafka_producer.set_write_timestamp_to_kafka(False)

j_producer_config = get_field_value(flink_kafka_producer.get_java_function(),
'producerConfig')
self.assertEqual('localhost:9092', j_producer_config.getProperty('bootstrap.servers'))
self.assertEqual('test_group', j_producer_config.getProperty('group.id'))
self.assertFalse(get_field_value(flink_kafka_producer.get_java_function(),
'writeTimestampToKafka'))

def test_compiling(self):
source = KafkaSource.builder() \
.set_bootstrap_servers('localhost:9092') \
Expand Down

0 comments on commit 923df3d

Please sign in to comment.