Skip to content

Commit 1b5e056

Browse files
authored
Merge branch 'develop' into release/2.0.9
2 parents ceccd67 + 8cb020b commit 1b5e056

22 files changed

+432
-212
lines changed

.github/workflows/ci_build_test.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ jobs:
7272
needs:
7373
- build-unit-test
7474
strategy:
75+
fail-fast: false
7576
matrix:
7677
include:
7778
- kafka_version: "1.1.1"
@@ -205,4 +206,3 @@ jobs:
205206
export PYTHONWARNINGS="ignore:Unverified HTTPS request"
206207
echo "Running functional tests....."
207208
python -m pytest --log-level=INFO
208-

README.md

+10
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ Use the below schema to configure Splunk Connect for Kafka
135135
"splunk.hec.ssl.trust.store.password": "<Java KeyStore password>"
136136
"kerberos.user.principal": "<The Kerberos user principal the connector may use to authenticate with Kerberos>",
137137
"kerberos.keytab.path": "<The path to the keytab file to use for authentication with Kerberos>"
138+
"enable.timestamp.extraction": "<true|false>",
139+
"timestamp.regex": "<regex for timestamp extraction>",
140+
"timestamp.format": "<time-format for timestamp extraction>"
138141
}
139142
}
140143
```
@@ -222,6 +225,13 @@ Use the below schema to configure Splunk Connect for Kafka
222225
| `key.converter.schema.registry.url` | Schema Registry URL. | `""` |
223226
| `key.converter.schemas.enable` | For using protobuf format ,set the value of this field to `true` | `false` |
224227

228+
### Timestamp extraction Parameters
229+
| Name | Description | Default Value |
230+
|-------- |----------------------------|-----------------------|
231+
| `enable.timestamp.extraction` | To enable timestamp extraction ,set the value of this field to `true`. <br/> **NOTE:** <br/> Applicable only if `splunk.hec.raw` is `false` | `false` |
232+
| `timestamp.regex` | Regex for timestamp extraction. <br/> **NOTE:** <br/> Regex must have name captured group `"time"` For eg.: `\\\"time\\\":\\s*\\\"(?<time>.*?)\"` | `""` |
233+
| `timestamp.format` | Time-format for timestamp extraction .<br/>For eg.: <br/>If timestamp is `1555209605000` , set `timestamp.format` to `"epoch"` format .<br/> If timestamp is `Jun 13 2010 23:11:52.454 UTC` , set `timestamp.format` to `"MMM dd yyyy HH:mm:ss.SSS zzz"` | `""` |
234+
225235
## Load balancing
226236

227237
See [Splunk Docs](https://docs.splunk.com/Documentation/KafkaConnect/latest/User/LoadBalancing) for considerations when using load balancing in your deployment.

ci/export_data.py

+12-20
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,14 @@ def _check_request_status(self, req_obj):
4848
returns True/False
4949
'''
5050
if not req_obj.ok:
51-
raise Exception('status code: {0} \n details: {1}'.format(
52-
str(req_obj.status_code), req_obj.text))
51+
raise Exception(f'status code: {str(req_obj.status_code)} \n details: {req_obj.text}')
5352

5453
def _check_source_connection(self):
5554
'''
5655
Check if a source server connection is accessible
5756
returns True/False
5857
'''
59-
service_url = '{0}/services'.format(self.src_splunk_uri)
58+
service_url = f'{self.src_splunk_uri}/services'
6059
logger.info('requesting: %s', service_url)
6160

6261
res = self._requests_retry_session().get(
@@ -70,11 +69,10 @@ def _check_dest_connection(self):
7069
Check if a destination server connection is accessible by
7170
sending a test event returns True/False
7271
'''
73-
dest_splunk_hec_url = '{0}/services/collector/event'.format(
74-
self.dest_splunk_hec_uri)
72+
dest_splunk_hec_url = f'{self.dest_splunk_hec_uri}/services/collector/event'
7573
logger.info('requesting: %s', dest_splunk_hec_url)
7674
headers = {
77-
'Authorization': 'Splunk {token}'.format(token=self.dest_splunk_hec_token),
75+
'Authorization': f'Splunk {self.dest_splunk_hec_token}',
7876
'Content-Type': 'application/json',
7977
}
8078
data = {
@@ -92,11 +90,10 @@ def _compose_search_query(self):
9290
returns job_str
9391
'''
9492
for idx, item in enumerate(self.src_source_types):
95-
self.src_source_types[idx] = 'sourcetype="{0}"'.format(item)
93+
self.src_source_types[idx] = f'sourcetype="{item}"'
9694

9795
source_type_str = ' OR '.join(self.src_source_types)
98-
job_str = 'search index="{index}" {source_type_search}'.format(
99-
index=self.src_index, source_type_search=source_type_str)
96+
job_str = f'search index="{self.src_index}" {source_type_str}'
10097

10198
logger.info('job_str: %s', job_str)
10299

@@ -111,8 +108,7 @@ def _collect_data(self, query, start_time, end_time):
111108
returns events
112109
'''
113110

114-
url = '{0}/services/search/jobs?output_mode=json'.format(
115-
self.src_splunk_uri)
111+
url = f'{self.src_splunk_uri}/services/search/jobs?output_mode=json'
116112
logger.info('requesting: %s', url)
117113
data = {
118114
'search': query,
@@ -139,8 +135,7 @@ def _wait_for_job_and__get_events(self, job_id):
139135
returns events
140136
'''
141137
events = []
142-
job_url = '{0}/services/search/jobs/{1}?output_mode=json'.format(
143-
self.src_splunk_uri, str(job_id))
138+
job_url = f'{self.src_splunk_uri}/services/search/jobs/{str(job_id)}?output_mode=json'
144139
logger.info('requesting: %s', job_url)
145140

146141
for _ in range(self.timeout):
@@ -157,7 +152,7 @@ def _wait_for_job_and__get_events(self, job_id):
157152
events = self._get_events(job_id)
158153
break
159154
if dispatch_state == 'FAILED':
160-
raise Exception('Search job: {0} failed'.format(job_url))
155+
raise Exception(f'Search job: {job_url} failed')
161156
time.sleep(1)
162157

163158
return events
@@ -168,8 +163,7 @@ def _get_events(self, job_id):
168163
@param: job_id
169164
returns events
170165
'''
171-
event_url = '{0}/services/search/jobs/{1}/events?output_mode=json'.format(
172-
self.src_splunk_uri, str(job_id))
166+
event_url = f'{self.src_splunk_uri}/services/search/jobs/{str(job_id)}/events?output_mode=json'
173167
logger.info('requesting: %s', event_url)
174168

175169
event_job = self._requests_retry_session().get(
@@ -214,13 +208,11 @@ def _send_to_dest_thru_hec(self, events):
214208
hec_events = self._transform_results_to_hec_events(events)
215209
data = '\n'.join(json.dumps(event) for event in hec_events)
216210
headers = {
217-
'Authorization': 'Splunk {token}'.format(
218-
token=self.dest_splunk_hec_token),
211+
'Authorization': f'Splunk {self.dest_splunk_hec_token}',
219212
'Content-Type': 'application/json',
220213
}
221214

222-
dest_splunk_hec_url = '{0}/services/collector/event'.format(
223-
self.dest_splunk_hec_uri)
215+
dest_splunk_hec_url = f'{self.dest_splunk_hec_uri}/services/collector/event'
224216
logger.info('sending %d events to : %s',
225217
len(events), dest_splunk_hec_url)
226218

ci/kafka_cluster_gen.py

+25-28
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def __init__(self, image, version='2', volumes=None):
1818
# 'ZOOKEEPER_myid=1',
1919
'ZOOKEEPER_initLimit=5',
2020
'ZOOKEEPER_syncLimit=2',
21-
'ZOOKEEPER_dataDir={}/zookeeper'.format(self.DATA_DIR_ROOT),
21+
f'ZOOKEEPER_dataDir={self.DATA_DIR_ROOT}/zookeeper',
2222
# 'ZOOKEEPER_servers=server.1=zookeeper1:2888:3888,server.2=zookeeper2:2888:3888,server.3=zookeeper3:2888:3888',
2323
]
2424

@@ -28,7 +28,7 @@ def __init__(self, image, version='2', volumes=None):
2828
self.broker_opts = [
2929
'KAFKA_listeners=PLAINTEXT://:9092',
3030
# 'KAFKA_advertised_listeners=PLAINTEXT://kafka1:9092',
31-
'KAFKA_log_dirs={}/kafkadata'.format(self.DATA_DIR_ROOT),
31+
f'KAFKA_log_dirs={self.DATA_DIR_ROOT}/kafkadata',
3232
# 'KAFKA_num_partitions=3',
3333
'KAFKA_delete_topic_enable=true',
3434
'KAFKA_auto_create_topics_enable=true',
@@ -40,7 +40,7 @@ def __init__(self, image, version='2', volumes=None):
4040

4141
def bootstrap_servers(self):
4242
return ','.join(
43-
'{prefix}{kid}:9092'.format(prefix=self.broker_prefix, kid=i + 1)
43+
f'{self.broker_prefix}{i + 1}:9092'
4444
for i in xrange(self.num_of_broker))
4545

4646
def gen(self):
@@ -54,7 +54,7 @@ def gen(self):
5454
if lin != '\n':
5555
yaml_lines[i] = ' ' + lin
5656

57-
yaml_lines.insert(0, 'version: \'{}\'\n'.format(self.version))
57+
yaml_lines.insert(0, f'version: \'{self.version}\'\n')
5858
yaml_lines.insert(0, 'services:\n')
5959
return '\n'.join(yaml_lines)
6060

@@ -69,31 +69,30 @@ def _do_gen_zk(self):
6969
self.zk_opts.insert(1, self._get_jvm_memory())
7070

7171
def add_myid(service, service_idx):
72-
myid = ' - ZOOKEEPER_myid={}'.format(service_idx)
72+
myid = f' - ZOOKEEPER_myid={service_idx}'
7373
service.append(myid)
7474
zk_servers = self._get_zk_servers(service_idx)
75-
service.append(' - ZOOKEEPER_servers={}'.format(zk_servers))
75+
service.append(f' - ZOOKEEPER_servers={zk_servers}')
7676

7777
return gen_services(
7878
self.num_of_zk, self.zk_prefix, self.image, [2181, 2888, 3888],
7979
self.zk_opts, [], [2181, 2888, 3888], self.volumes, add_myid)
8080

8181
def _do_gen_broker(self):
8282
def add_advertise_name_and_id(service, service_idx):
83-
adname = ' - KAFKA_advertised_listeners=PLAINTEXT://{}{}:9092'.format(
84-
self.broker_prefix, service_idx)
83+
adname = f' - KAFKA_advertised_listeners=PLAINTEXT://{self.broker_prefix}{service_idx}:9092'
8584
service.append(adname)
86-
bid = ' - KAFKA_broker_id={}'.format(service_idx - 1)
85+
bid = f' - KAFKA_broker_id={service_idx - 1}'
8786
service.append(bid)
8887

8988
self.broker_opts.insert(0, 'RUN=kafka')
9089
self.broker_opts.insert(1, self._get_jvm_memory())
9190
self.broker_opts.append(
92-
'KAFKA_num_partitions={}'.format(self.num_of_partition))
91+
f'KAFKA_num_partitions={self.num_of_partition}')
9392
zk_connect = self._get_zk_connect_setting()
9493
self.broker_opts.append(
95-
'KAFKA_zookeeper_connect={}'.format(zk_connect))
96-
depends = ['{}{}'.format(self.zk_prefix, i)
94+
f'KAFKA_zookeeper_connect={zk_connect}')
95+
depends = [f'{self.zk_prefix}{i}'
9796
for i in xrange(1, self.num_of_zk + 1)]
9897

9998
return gen_services(
@@ -102,71 +101,69 @@ def add_advertise_name_and_id(service, service_idx):
102101
add_advertise_name_and_id)
103102

104103
def _get_jvm_memory(self):
105-
return 'KAFKA_HEAP_OPTS=-Xmx{} -Xms{}'.format(
106-
self.max_jvm_memory, self.min_jvm_memory)
104+
return f'KAFKA_HEAP_OPTS=-Xmx{self.max_jvm_memory} -Xms{self.min_jvm_memory}'
107105

108106
def _get_zk_servers(self, cur_idx):
109107
zk_servers = []
110108
for i in xrange(1, self.num_of_zk + 1):
111109
if i != cur_idx:
112-
hname = '{prefix}{kid}'.format(prefix=self.zk_prefix, kid=i)
110+
hname = f'{self.zk_prefix}{i}'
113111
else:
114112
hname = '0.0.0.0'
115113

116-
zk_server = 'server.{kid}={hname}:2888:3888'.format(
117-
kid=i, hname=hname)
114+
zk_server = f'server.{i}={hname}:2888:3888'
118115
zk_servers.append(zk_server)
119116
return ','.join(zk_servers)
120117

121118
def _get_zk_connect_setting(self):
122119
zk_connect_settings = []
123120
for i in xrange(self.num_of_zk):
124121
zk_connect_settings.append(
125-
'{prefix}{kid}:2181'.format(prefix=self.zk_prefix, kid=i + 1))
122+
f'{self.zk_prefix}{i + 1}:2181')
126123
return ','.join(zk_connect_settings)
127124

128125

129126
def gen_services(num, prefix, image, ports, envs,
130127
depends, exposed_ports, volumes, callback):
131128
services = []
132129
for i in xrange(1, num + 1):
133-
name = '{}{}'.format(prefix, i)
130+
name = f'{prefix}{i}'
134131
service = [
135-
'{}:'.format(name),
136-
' image: {}'.format(image),
137-
' hostname: {}'.format(name),
138-
' container_name: {}'.format(name),
132+
f'{name}:',
133+
f' image: {image}',
134+
f' hostname: {name}',
135+
f' container_name: {name}',
139136
]
140137

141138
# exposed ports
142139
if exposed_ports:
143140
service.append(' expose:')
144141
for port in exposed_ports:
145-
service.append(' - "{}"'.format(port))
142+
service.append(f' - "{port}"')
146143

147144
# ports
148145
if ports:
149146
service.append(' ports:')
150147
for port in ports:
151-
service.append(' - "{}"'.format(port))
148+
service.append(f' - "{port}"')
152149

153150
# depends
154151
if depends:
155152
service.append(' depends_on:')
156153
for dep in depends:
157-
service.append(' - {}'.format(dep))
154+
service.append(f' - {dep}')
158155

159156
# volumes
160157
if volumes:
161158
service.append(' volumes:')
162159
for vol in volumes:
163-
service.append(' - {}'.format(vol))
160+
service.append(f' - {vol}')
164161

165162
# envs
166163
if envs:
167164
service.append(' environment:')
168165
for env in envs:
169-
service.append(' - {}'.format(env))
166+
service.append(f' - {env}')
170167

171168
if callback is not None:
172169
callback(service, i)

ci/kafka_orca_gen.py

+25-26
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,14 @@ def gen(self):
3838
num_of_host = self.num_of_gen / self.DATA_GEN_PER_HOST
3939

4040
envs = [
41-
'KAFKA_BOOTSTRAP_SERVERS={}'.format(self.bootstrap_servers),
42-
'KAFKA_TOPIC={}'.format(self.topic),
43-
'MESSAGE_COUNT={}'.format(self.total_messages),
44-
'EPS={}'.format(self.eps),
45-
'MESSAGE_SIZE={}'.format(self.message_size),
46-
'JVM_MAX_HEAP=2G',
47-
'JVM_MIN_HEAP=512M',
48-
'KAFKA_DATA_GEN_SIZE={}'.format(data_gen_size),
41+
f'KAFKA_BOOTSTRAP_SERVERS={self.bootstrap_servers}',
42+
f'KAFKA_TOPIC={self.topic}',
43+
f'MESSAGE_COUNT={self.total_messages}',
44+
f'EPS={self.eps}',
45+
f'MESSAGE_SIZE={self.message_size}',
46+
f'JVM_MAX_HEAP=2G',
47+
f'JVM_MIN_HEAP=512M',
48+
f'KAFKA_DATA_GEN_SIZE={data_gen_size}',
4949
]
5050
depends = gen_depends_from(self.bootstrap_servers)
5151
services = kcg.gen_services(
@@ -67,14 +67,13 @@ def __init__(self, image, bootstrap_servers):
6767
self.min_jvm_memory = '512M'
6868

6969
def gen(self):
70-
jvm_mem = 'KAFKA_HEAP_OPTS=-Xmx{} -Xms{}'.format(
71-
self.max_jvm_memory, self.min_jvm_memory)
70+
jvm_mem = f'KAFKA_HEAP_OPTS=-Xmx{self.max_jvm_memory} -Xms{self.min_jvm_memory}'
7271

7372
envs = [
74-
'KAFKA_BOOTSTRAP_SERVERS={}'.format(self.bootstrap_servers),
73+
f'KAFKA_BOOTSTRAP_SERVERS={self.bootstrap_servers}',
7574
jvm_mem,
76-
'KAFKA_CONNECT_LOGGING={}'.format(self.logging_level),
77-
'KAFKA_CONNECT_BRANCH={}'.format(self.branch),
75+
f'KAFKA_CONNECT_LOGGING={self.logging_level}',
76+
f'KAFKA_CONNECT_BRANCH={self.branch}',
7877
# for proc monitor
7978
'SPLUNK_HOST=https://heclb1:8088',
8079
'SPLUNK_TOKEN=00000000-0000-0000-0000-000000000000',
@@ -105,18 +104,18 @@ def __init__(self, image, num_of_indexer, num_of_connect):
105104

106105
def gen(self):
107106
envs = [
108-
'INDEX_CLUSTER_SIZE={}'.format(self.num_of_indexer),
109-
'KAFKA_CONNECT_HEC_MODE={}'.format(self.hec_mode.lower()),
110-
'KAFKA_CONNECT_ACK_MODE={}'.format(self.ack_mode.lower()),
111-
'KAFKA_CONNECT_TOPICS={}'.format(self.topic),
112-
'KAFKA_CONNECT_LINE_BREAKER={}'.format(self.line_breaker),
113-
'JVM_HEAP_SIZE={}'.format(self.jvm_size),
114-
'KAFKA_CONNECT_BRANCH={}'.format(self.branch),
115-
'CONNECT_PERF_METRIC_DEST_HEC={}'.format(self.metric_dest_hec_uri),
116-
'CONNECT_PERF_METRIC_TOKEN={}'.format(self.metric_dest_hec_token),
107+
f'INDEX_CLUSTER_SIZE={self.num_of_indexer}',
108+
f'KAFKA_CONNECT_HEC_MODE={self.hec_mode.lower()}',
109+
f'KAFKA_CONNECT_ACK_MODE={self.ack_mode.lower()}',
110+
f'KAFKA_CONNECT_TOPICS={self.topic}',
111+
f'KAFKA_CONNECT_LINE_BREAKER={self.line_breaker}',
112+
f'JVM_HEAP_SIZE={self.jvm_size}',
113+
f'KAFKA_CONNECT_BRANCH={self.branch}',
114+
f'CONNECT_PERF_METRIC_DEST_HEC={self.metric_dest_hec_uri}',
115+
f'CONNECT_PERF_METRIC_TOKEN={self.metric_dest_hec_token}',
117116
]
118117

119-
depends = ['{}{}'.format(KafkaConnectYamlGen.prefix, i)
118+
depends = [f'{KafkaConnectYamlGen.prefix}{i}'
120119
for i in xrange(1, self.num_of_connect + 1)]
121120
services = kcg.gen_services(
122121
1, 'kafkabastion', self.image, [], envs, depends,
@@ -215,11 +214,11 @@ def _gen_orca_file(args, service_file):
215214
lines.append('[kafka-connect]')
216215
lines.append('hec_load_balancers = 1')
217216
lines.append('search_heads = 1')
218-
lines.append('indexers = {}'.format(args.indexer_size))
217+
lines.append(f'indexers = {args.indexer_size}')
219218
lines.append('log_token = 00000000-0000-0000-0000-000000000000')
220219
if args.perf == 1:
221220
lines.append('perf = true')
222-
lines.append('services = {}'.format(service_file))
221+
lines.append(f'services = {service_file}')
223222
f.write('\n'.join(lines))
224223

225224
print 'finish generating orca.conf'
@@ -284,7 +283,7 @@ def main():
284283
help='Splunk HEC destintion token')
285284

286285

287-
volumes = '["{}"]'.format(kcg.KafkaClusterYamlGen.DATA_DIR_ROOT)
286+
volumes = f'["{kcg.KafkaClusterYamlGen.DATA_DIR_ROOT}"]'
288287
parser.add_argument('--volumes', default=volumes, help='Volumes to mount')
289288

290289
args = parser.parse_args()

0 commit comments

Comments
 (0)