From 923df3d9a0c04a298df90431b3166d03c6896ba1 Mon Sep 17 00:00:00 2001 From: lvyanquan Date: Wed, 8 Jan 2025 10:14:28 +0800 Subject: [PATCH] fix CI. --- .../datastream/connectors/tests/test_kafka.py | 52 +------------------ 1 file changed, 1 insertion(+), 51 deletions(-) diff --git a/flink-python/pyflink/datastream/connectors/tests/test_kafka.py b/flink-python/pyflink/datastream/connectors/tests/test_kafka.py index dea06b3e0..a5f7370b5 100644 --- a/flink-python/pyflink/datastream/connectors/tests/test_kafka.py +++ b/flink-python/pyflink/datastream/connectors/tests/test_kafka.py @@ -19,7 +19,6 @@ from typing import Dict import pyflink.datastream.data_stream as data_stream -from pyflink.common import typeinfo from pyflink.common.configuration import Configuration from pyflink.common.serialization import SimpleStringSchema, DeserializationSchema @@ -28,8 +27,7 @@ from pyflink.common.watermark_strategy import WatermarkStrategy from pyflink.datastream.connectors.base import DeliveryGuarantee from pyflink.datastream.connectors.kafka import KafkaSource, KafkaTopicPartition, \ - KafkaOffsetsInitializer, KafkaOffsetResetStrategy, KafkaRecordSerializationSchema, KafkaSink, \ - FlinkKafkaProducer, FlinkKafkaConsumer + KafkaOffsetsInitializer, KafkaOffsetResetStrategy, KafkaRecordSerializationSchema, KafkaSink from pyflink.datastream.formats.avro import AvroRowDeserializationSchema, AvroRowSerializationSchema from pyflink.datastream.formats.csv import CsvRowDeserializationSchema, CsvRowSerializationSchema from pyflink.datastream.formats.json import JsonRowDeserializationSchema, JsonRowSerializationSchema @@ -37,60 +35,12 @@ from pyflink.testing.test_case_utils import ( PyFlinkStreamingTestCase, PyFlinkTestCase, - invoke_java_object_method, to_java_data_structure, ) from pyflink.util.java_utils import to_jarray, is_instance_of, get_field_value class KafkaSourceTests(PyFlinkStreamingTestCase): - - def test_legacy_kafka_connector(self): - source_topic = 'test_source_topic' - sink_topic = 'test_sink_topic' - props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'} - type_info = Types.ROW([Types.INT(), Types.STRING()]) - - # Test for kafka consumer - deserialization_schema = JsonRowDeserializationSchema.builder() \ - .type_info(type_info=type_info).build() - - flink_kafka_consumer = FlinkKafkaConsumer(source_topic, deserialization_schema, props) - flink_kafka_consumer.set_start_from_earliest() - flink_kafka_consumer.set_commit_offsets_on_checkpoints(True) - - j_properties = get_field_value(flink_kafka_consumer.get_java_function(), 'properties') - self.assertEqual('localhost:9092', j_properties.getProperty('bootstrap.servers')) - self.assertEqual('test_group', j_properties.getProperty('group.id')) - self.assertTrue(get_field_value(flink_kafka_consumer.get_java_function(), - 'enableCommitOnCheckpoints')) - j_start_up_mode = get_field_value(flink_kafka_consumer.get_java_function(), 'startupMode') - - j_deserializer = get_field_value(flink_kafka_consumer.get_java_function(), 'deserializer') - j_deserialize_type_info = invoke_java_object_method(j_deserializer, "getProducedType") - deserialize_type_info = typeinfo._from_java_type(j_deserialize_type_info) - self.assertTrue(deserialize_type_info == type_info) - self.assertTrue(j_start_up_mode.equals(get_gateway().jvm - .org.apache.flink.streaming.connectors - .kafka.config.StartupMode.EARLIEST)) - j_topic_desc = get_field_value(flink_kafka_consumer.get_java_function(), - 'topicsDescriptor') - j_topics = invoke_java_object_method(j_topic_desc, 'getFixedTopics') - self.assertEqual(['test_source_topic'], list(j_topics)) - - # Test for kafka producer - serialization_schema = JsonRowSerializationSchema.builder().with_type_info(type_info) \ - .build() - flink_kafka_producer = FlinkKafkaProducer(sink_topic, serialization_schema, props) - flink_kafka_producer.set_write_timestamp_to_kafka(False) - - j_producer_config = get_field_value(flink_kafka_producer.get_java_function(), - 'producerConfig') - self.assertEqual('localhost:9092', j_producer_config.getProperty('bootstrap.servers')) - self.assertEqual('test_group', j_producer_config.getProperty('group.id')) - self.assertFalse(get_field_value(flink_kafka_producer.get_java_function(), - 'writeTimestampToKafka')) - def test_compiling(self): source = KafkaSource.builder() \ .set_bootstrap_servers('localhost:9092') \