Skip to content

Commit 61b04ba

Browse files
authored
Add normalize.schemas config to SR client (#1406)
1 parent cdc5f3b commit 61b04ba

File tree

5 files changed

+70
-14
lines changed

5 files changed

+70
-14
lines changed

src/confluent_kafka/schema_registry/avro.py

+15-3
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ class AvroSerializer(Serializer):
8383
| ``auto.register.schemas`` | bool | previously associated with a particular subject. |
8484
| | | Defaults to True. |
8585
+---------------------------+----------+--------------------------------------------------+
86+
| | | Whether to normalize schemas, which will |
87+
| ``normalize.schemas`` | bool | transform schemas to have a consistent format, |
88+
| | | including ordering properties and references. |
89+
+---------------------------+----------+--------------------------------------------------+
8690
| | | Whether to use the latest subject version for |
8791
| ``use.latest.version`` | bool | serialization. |
8892
| | | WARNING: There is no check that the latest |
@@ -143,12 +147,14 @@ class AvroSerializer(Serializer):
143147
conf (dict): AvroSerializer configuration.
144148
145149
""" # noqa: E501
146-
__slots__ = ['_hash', '_auto_register', '_use_latest_version', '_known_subjects', '_parsed_schema',
150+
__slots__ = ['_hash', '_auto_register', '_normalize_schemas', '_use_latest_version',
151+
'_known_subjects', '_parsed_schema',
147152
'_registry', '_schema', '_schema_id', '_schema_name',
148153
'_subject_name_func', '_to_dict']
149154

150155
# default configuration
151156
_default_conf = {'auto.register.schemas': True,
157+
'normalize.schemas': False,
152158
'use.latest.version': False,
153159
'subject.name.strategy': topic_subject_name_strategy}
154160

@@ -174,6 +180,10 @@ def __init__(self, schema_registry_client, schema_str,
174180
if not isinstance(self._auto_register, bool):
175181
raise ValueError("auto.register.schemas must be a boolean value")
176182

183+
self._normalize_schemas = conf_copy.pop('normalize.schemas')
184+
if not isinstance(self._normalize_schemas, bool):
185+
raise ValueError("normalize.schemas must be a boolean value")
186+
177187
self._use_latest_version = conf_copy.pop('use.latest.version')
178188
if not isinstance(self._use_latest_version, bool):
179189
raise ValueError("use.latest.version must be a boolean value")
@@ -248,10 +258,12 @@ def __call__(self, obj, ctx):
248258
# a schema without a subject so we set the schema_id here to handle
249259
# the initial registration.
250260
self._schema_id = self._registry.register_schema(subject,
251-
self._schema)
261+
self._schema,
262+
self._normalize_schemas)
252263
else:
253264
registered_schema = self._registry.lookup_schema(subject,
254-
self._schema)
265+
self._schema,
266+
self._normalize_schemas)
255267
self._schema_id = registered_schema.schema_id
256268
self._known_subjects.add(subject)
257269

src/confluent_kafka/schema_registry/json_schema.py

+15-3
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ class JSONSerializer(Serializer):
5959
| ``auto.register.schemas`` | bool | previously associated with a particular subject. |
6060
| | | Defaults to True. |
6161
+---------------------------+----------+--------------------------------------------------+
62+
| | | Whether to normalize schemas, which will |
63+
| ``normalize.schemas`` | bool | transform schemas to have a consistent format, |
64+
| | | including ordering properties and references. |
65+
+---------------------------+----------+--------------------------------------------------+
6266
| | | Whether to use the latest subject version for |
6367
| ``use.latest.version`` | bool | serialization. |
6468
| | | WARNING: There is no check that the latest |
@@ -116,12 +120,14 @@ class JSONSerializer(Serializer):
116120
conf (dict): JsonSerializer configuration.
117121
118122
""" # noqa: E501
119-
__slots__ = ['_hash', '_auto_register', '_use_latest_version', '_known_subjects', '_parsed_schema',
123+
__slots__ = ['_hash', '_auto_register', '_normalize_schemas', '_use_latest_version',
124+
'_known_subjects', '_parsed_schema',
120125
'_registry', '_schema', '_schema_id', '_schema_name',
121126
'_subject_name_func', '_to_dict']
122127

123128
# default configuration
124129
_default_conf = {'auto.register.schemas': True,
130+
'normalize.schemas': False,
125131
'use.latest.version': False,
126132
'subject.name.strategy': topic_subject_name_strategy}
127133

@@ -147,6 +153,10 @@ def __init__(self, schema_str, schema_registry_client, to_dict=None,
147153
if not isinstance(self._auto_register, bool):
148154
raise ValueError("auto.register.schemas must be a boolean value")
149155

156+
self._normalize_schemas = conf_copy.pop('normalize.schemas')
157+
if not isinstance(self._normalize_schemas, bool):
158+
raise ValueError("normalize.schemas must be a boolean value")
159+
150160
self._use_latest_version = conf_copy.pop('use.latest.version')
151161
if not isinstance(self._use_latest_version, bool):
152162
raise ValueError("use.latest.version must be a boolean value")
@@ -208,10 +218,12 @@ def __call__(self, obj, ctx):
208218
# a schema without a subject so we set the schema_id here to handle
209219
# the initial registration.
210220
self._schema_id = self._registry.register_schema(subject,
211-
self._schema)
221+
self._schema,
222+
self._normalize_schemas)
212223
else:
213224
registered_schema = self._registry.lookup_schema(subject,
214-
self._schema)
225+
self._schema,
226+
self._normalize_schemas)
215227
self._schema_id = registered_schema.schema_id
216228
self._known_subjects.add(subject)
217229

src/confluent_kafka/schema_registry/protobuf.py

+13-3
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ class ProtobufSerializer(object):
142142
| ``auto.register.schemas`` | bool | previously associated with a particular subject. |
143143
| | | Defaults to True. |
144144
+-------------------------------------+----------+------------------------------------------------------+
145+
| | | Whether to normalize schemas, which will |
146+
| ``normalize.schemas`` | bool | transform schemas to have a consistent format, |
147+
| | | including ordering properties and references. |
148+
+---------------------------+----------+----------------------------------------------------------------+
145149
| | | Whether to use the latest subject version for |
146150
| ``use.latest.version`` | bool | serialization. |
147151
| | | WARNING: There is no check that the latest |
@@ -212,14 +216,15 @@ class ProtobufSerializer(object):
212216
`Protobuf API reference <https://googleapis.dev/python/protobuf/latest/google/protobuf.html>`_
213217
214218
""" # noqa: E501
215-
__slots__ = ['_auto_register', '_use_latest_version', '_skip_known_types',
219+
__slots__ = ['_auto_register', '_normalize_schemas', '_use_latest_version', '_skip_known_types',
216220
'_registry', '_known_subjects',
217221
'_msg_class', '_msg_index', '_schema', '_schema_id',
218222
'_ref_reference_subject_func', '_subject_name_func',
219223
'_use_deprecated_format']
220224
# default configuration
221225
_default_conf = {
222226
'auto.register.schemas': True,
227+
'normalize.schemas': False,
223228
'use.latest.version': False,
224229
'skip.known.types': False,
225230
'subject.name.strategy': topic_subject_name_strategy,
@@ -245,6 +250,10 @@ def __init__(self, msg_type, schema_registry_client, conf=None):
245250
if not isinstance(self._auto_register, bool):
246251
raise ValueError("auto.register.schemas must be a boolean value")
247252

253+
self._normalize_schemas = conf_copy.pop('normalize.schemas')
254+
if not isinstance(self._normalize_schemas, bool):
255+
raise ValueError("normalize.schemas must be a boolean value")
256+
248257
self._use_latest_version = conf_copy.pop('use.latest.version')
249258
if not isinstance(self._use_latest_version, bool):
250259
raise ValueError("use.latest.version must be a boolean value")
@@ -405,10 +414,11 @@ def __call__(self, message_type, ctx):
405414

406415
if self._auto_register:
407416
self._schema_id = self._registry.register_schema(subject,
408-
self._schema)
417+
self._schema,
418+
self._normalize_schemas)
409419
else:
410420
self._schema_id = self._registry.lookup_schema(
411-
subject, self._schema).schema_id
421+
subject, self._schema, self._normalize_schemas).schema_id
412422

413423
self._known_subjects.add(subject)
414424

src/confluent_kafka/schema_registry/schema_registry_client.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ def __exit__(self, *args):
299299
if self._rest_client is not None:
300300
self._rest_client._close()
301301

302-
def register_schema(self, subject_name, schema):
302+
def register_schema(self, subject_name, schema, normalize_schemas=False):
303303
"""
304304
Registers a schema under ``subject_name``.
305305
@@ -334,7 +334,7 @@ def register_schema(self, subject_name, schema):
334334
for ref in schema.references]
335335

336336
response = self._rest_client.post(
337-
'subjects/{}/versions'.format(_urlencode(subject_name)),
337+
'subjects/{}/versions?normalize={}'.format(_urlencode(subject_name), normalize_schemas),
338338
body=request)
339339

340340
schema_id = response['id']
@@ -380,7 +380,7 @@ def get_schema(self, schema_id):
380380

381381
return schema
382382

383-
def lookup_schema(self, subject_name, schema):
383+
def lookup_schema(self, subject_name, schema, normalize_schemas=False):
384384
"""
385385
Returns ``schema`` registration information for ``subject``.
386386
@@ -409,8 +409,8 @@ def lookup_schema(self, subject_name, schema):
409409
'version': ref.version}
410410
for ref in schema.references]
411411

412-
response = self._rest_client.post('subjects/{}'
413-
.format(_urlencode(subject_name)),
412+
response = self._rest_client.post('subjects/{}?normalize={}'
413+
.format(_urlencode(subject_name), normalize_schemas),
414414
body=request)
415415

416416
schema_type = response.get('schemaType', 'AVRO')

tests/integration/schema_registry/test_api_client.py

+22
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,28 @@ def test_api_register_schema(kafka_cluster, load_file):
6565
assert schema.schema_str, registered_schema.schema.schema_str
6666

6767

68+
def test_api_register_normalized_schema(kafka_cluster, load_file):
69+
"""
70+
Registers a schema, verifies the registration
71+
72+
Args:
73+
kafka_cluster (KafkaClusterFixture): Kafka Cluster fixture
74+
load_file (callable(str)): Schema fixture constructor
75+
76+
"""
77+
sr = kafka_cluster.schema_registry()
78+
avsc = 'basic_schema.avsc'
79+
subject = _subject_name(avsc)
80+
schema = Schema(load_file(avsc), schema_type='AVRO')
81+
82+
schema_id = sr.register_schema(subject, schema, True)
83+
registered_schema = sr.lookup_schema(subject, schema, True)
84+
85+
assert registered_schema.schema_id == schema_id
86+
assert registered_schema.subject == subject
87+
assert schema.schema_str, registered_schema.schema.schema_str
88+
89+
6890
def test_api_register_schema_incompatible(kafka_cluster, load_file):
6991
"""
7092
Attempts to register an incompatible Schema verifies the error.

0 commit comments

Comments
 (0)