Skip to content

Commit d2871ea

Browse files
authored
Merge pull request #341 from splunk/protobuf-support
Added support for protobuf
2 parents 052b742 + 3be19a2 commit d2871ea

File tree

8 files changed

+67
-4
lines changed

8 files changed

+67
-4
lines changed

.github/workflows/ci_build_test.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ jobs:
103103
CI_KAFKA_HEADER_INDEX: kafka
104104
CI_DATAGEN_IMAGE: rock1017/log-generator:latest
105105
CI_OLD_CONNECTOR_VERSION: v2.0.1
106+
SCHEMA_REGISTRY_URL: ${{ Secrets.SCHEMA_REGISTRY_URL }}
106107

107108
steps:
108109
- name: Checkout

README.md

+16
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,12 @@ Use the below schema to configure Splunk Connect for Kafka
106106
"splunk.hec.raw": "<true|false>",
107107
"splunk.hec.raw.line.breaker": "<line breaker separator>",
108108
"splunk.hec.json.event.enrichment": "<key value pairs separated by comma, only applicable to /event HEC>",
109+
"value.converter": "<converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka>",
110+
"value.converter.schema.registry.url": "<Schema-Registry-URL>",
111+
"value.converter.schemas.enable": "<true|false>",
112+
"key.converter": "<converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka>",
113+
"key.converter.schema.registry.url": "<Schema-Registry-URL>",
114+
"key.converter.schemas.enable": "<true|false>",
109115
"splunk.hec.ack.enabled": "<true|false>",
110116
"splunk.hec.ack.poll.interval": "<event ack poll interval>",
111117
"splunk.hec.ack.poll.threads": "<number of threads used to poll event acks>",
@@ -206,6 +212,16 @@ Use the below schema to configure Splunk Connect for Kafka
206212
| `kerberos.user.principal` | The Kerberos user principal the connector may use to authenticate with Kerberos. | `""` |
207213
| `kerberos.keytab.path` | The path to the keytab file to use for authentication with Kerberos. | `""` |
208214

215+
### Protobuf Parameters
216+
| Name | Description | Default Value |
217+
|-------- |----------------------------|-----------------------|
218+
| `value.converter` | Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka. For using protobuf format ,set the value of this field to `io.confluent.connect.protobuf.ProtobufConverter` | `org.apache.kafka.connect.storage.StringConverter` |
219+
| `value.converter.schema.registry.url` | Schema Registry URL. | `""` |
220+
| `value.converter.schemas.enable` | For using protobuf format ,set the value of this field to `true` | `false` |
221+
| `key.converter` | Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka. For using protobuf format ,set the value of this field to `io.confluent.connect.protobuf.ProtobufConverter` | `org.apache.kafka.connect.storage.StringConverter` |
222+
| `key.converter.schema.registry.url` | Schema Registry URL. | `""` |
223+
| `key.converter.schemas.enable` | For using protobuf format ,set the value of this field to `true` | `false` |
224+
209225
## Load balancing
210226

211227
See [Splunk Docs](https://docs.splunk.com/Documentation/KafkaConnect/latest/User/LoadBalancing) for considerations when using load balancing in your deployment.

pom.xml

+13
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,21 @@
152152
<version>3.7</version>
153153
<scope>compile</scope>
154154
</dependency>
155+
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-connect-protobuf-converter -->
156+
<dependency>
157+
<groupId>io.confluent</groupId>
158+
<artifactId>kafka-connect-protobuf-converter</artifactId>
159+
<version>7.0.1</version>
160+
</dependency>
155161
</dependencies>
156162

163+
<repositories>
164+
<repository>
165+
<id>confluent</id>
166+
<url>https://packages.confluent.io/maven/</url>
167+
</repository>
168+
</repositories>
169+
157170
<reporting>
158171
<plugins>
159172
<plugin>

test/conftest.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,13 @@ def setup(request):
3737

3838
def pytest_configure():
3939
# Generate message data
40-
topics = [config["kafka_topic"], config["kafka_topic_2"], config["kafka_header_topic"],
40+
topics = [config["kafka_topic"], config["kafka_topic_2"], config["kafka_header_topic"],"prototopic",
4141
"test_splunk_hec_malformed_events"]
4242

4343
create_kafka_topics(config, topics)
4444
producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"],
4545
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
46+
protobuf_producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"])
4647

4748
for _ in range(3):
4849
msg = {"timestamp": config['timestamp']}
@@ -67,7 +68,9 @@ def pytest_configure():
6768

6869
producer.send("test_splunk_hec_malformed_events", {})
6970
producer.send("test_splunk_hec_malformed_events", {"&&": "null", "message": ["$$$$****////", 123, None]})
71+
protobuf_producer.send("prototopic",value=b'\x00\x00\x00\x00\x01\x00\n\x011\x12\r10-01-04-3:45\x18\x15%\x00\x00*C*\x02No:\x12\n\x011\x12\x04this\x1a\x07New oneB\x0c\n\x011\x12\x07shampooJ\x04Many')
7072
producer.flush()
73+
protobuf_producer.flush()
7174

7275
# Launch all connectors for tests
7376
for param in connect_params:

test/lib/connect_params.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -192,5 +192,13 @@
192192
{"name": "test_splunk_hec_malformed_events",
193193
"topics": "test_splunk_hec_malformed_events",
194194
"splunk_hec_raw": False,
195-
"splunk_hec_json_event_enrichment": "chars=hec_malformed_events_{}".format(config['timestamp'])}
195+
"splunk_hec_json_event_enrichment": "chars=hec_malformed_events_{}".format(config['timestamp'])},
196+
{"name": "test_protobuf_events",
197+
"splunk_sourcetypes": "protobuf",
198+
"splunk_hec_raw": False,
199+
"topics": "prototopic",
200+
"value_converter":"io.confluent.connect.protobuf.ProtobufConverter",
201+
"value_converter_schema_registry_url": os.environ["SCHEMA_REGISTRY_URL"],
202+
"value_converter_schemas_enable":"true"
203+
}
196204
]

test/lib/connector.template

+4-1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@
4444
{% if splunk_hec_json_event_formatted %}
4545
"splunk.hec.json.event.formatted": "{{splunk_hec_json_event_formatted}}",
4646
{% endif %}
47-
"splunk.sourcetypes": "{{splunk_sourcetypes}}"
47+
"splunk.sourcetypes": "{{splunk_sourcetypes}}",
48+
"value.converter": "{{value_converter}}",
49+
"value.converter.schema.registry.url": "{{value_converter_schema_registry_url}}",
50+
"value.converter.schemas.enable": "{{value_converter_schemas_enable}}"
4851
}
4952
}

test/lib/data_gen.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ def generate_connector_content(input_disc=None):
3232
"splunk_header_sourcetype": None,
3333
"splunk_header_host": None,
3434
"splunk_hec_json_event_formatted": None,
35-
"splunk_sourcetypes": "kafka"
35+
"splunk_sourcetypes": "kafka",
36+
"value_converter": "org.apache.kafka.connect.storage.StringConverter",
37+
"value_converter_schema_registry_url": "",
38+
"value_converter_schemas_enable":"false"
3639
}
3740

3841
if input_disc:

test/testcases/test_data_onboarding.py

+16
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,19 @@ def test_data_onboarding(self, setup, test_scenario, test_input, expected):
2828
password=setup["splunk_password"])
2929
logger.info("Splunk received %s events in the last hour", len(events))
3030
assert len(events) == expected
31+
32+
@pytest.mark.parametrize("test_scenario, test_input, expected", [
33+
("protobuf", "sourcetype::protobuf", 1),
34+
])
35+
def test_proto_data_onboarding(self, setup, test_scenario, test_input, expected):
36+
logger.info("testing {0} input={1} expected={2} event(s)".format(test_scenario, test_input, expected))
37+
search_query = "index={0} | search {1}".format(setup['splunk_index'],
38+
test_input)
39+
logger.info(search_query)
40+
events = check_events_from_splunk(start_time="-15m@m",
41+
url=setup["splunkd_url"],
42+
user=setup["splunk_user"],
43+
query=["search {}".format(search_query)],
44+
password=setup["splunk_password"])
45+
logger.info("Splunk received %s events in the last hour", len(events))
46+
assert len(events) == expected

0 commit comments

Comments
 (0)