Skip to content

Commit 1784b8e

Browse files
authored
Support for Data Contracts (#1852)
* First cut at data contracts * Checkpoint * Update dependencies * Add respx * Fix recursive imports * Pin httpx * Minor fix to MeessageToDict call * Fix flake8 warnings * Fix flake8 warnings for tests * Fix npe * Fix build * Minor fix * Add comment * Add comment * Minor fix * Fix tests * Minor cleanup * Clean up requirements * Clean up requirements * Clean up requirements * Clean up requirements * Minor fix * Add examples * Minor fix * Minor fix * Minor fix * Minor fix * Minor fix * Fix protobuf example * Handle recursive refs * Add cache configs * Add rule overrides * Minor fix * Minor fix * Add override test * Fix encryption tests * Minor cleanup * Add assume role for aws * Minor fixes * Incorporate review feedback; consolidate caches * Add register full response to mock sr * Fix recursive error during integration test
1 parent 31640d7 commit 1784b8e

File tree

93 files changed

+12599
-1090
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

93 files changed

+12599
-1090
lines changed

examples/avro/user_generic.avsc

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
"fields": [
55
{
66
"name": "name",
7-
"type": "string"
7+
"type": "string",
8+
"confluent:tags": ["PII"]
89
},
910
{
1011
"name": "favorite_number",

examples/avro/user_specific.avsc

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
"fields": [
66
{
77
"name": "name",
8-
"type": "string"
8+
"type": "string",
9+
"confluent:tags": ["PII"]
910
},
1011
{
1112
"name": "favorite_number",

examples/avro_consumer_encryption.py

+145
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2024 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
19+
# A simple example demonstrating use of AvroDeserializer.
20+
21+
import argparse
22+
23+
from confluent_kafka.schema_registry.rules.encryption.encrypt_executor import \
24+
FieldEncryptionExecutor
25+
26+
from confluent_kafka.schema_registry.rules.encryption.localkms.local_driver import \
27+
LocalKmsDriver
28+
29+
from confluent_kafka.schema_registry.rules.encryption.hcvault.hcvault_driver import \
30+
HcVaultKmsDriver
31+
32+
from confluent_kafka.schema_registry.rules.encryption.gcpkms.gcp_driver import \
33+
GcpKmsDriver
34+
35+
from confluent_kafka.schema_registry.rules.encryption.azurekms.azure_driver import \
36+
AzureKmsDriver
37+
38+
from confluent_kafka.schema_registry.rules.encryption.awskms.aws_driver import \
39+
AwsKmsDriver
40+
41+
from confluent_kafka import Consumer
42+
from confluent_kafka.serialization import SerializationContext, MessageField
43+
from confluent_kafka.schema_registry import SchemaRegistryClient
44+
from confluent_kafka.schema_registry.avro import AvroDeserializer
45+
46+
47+
class User(object):
48+
"""
49+
User record
50+
51+
Args:
52+
name (str): User's name
53+
54+
favorite_number (int): User's favorite number
55+
56+
favorite_color (str): User's favorite color
57+
"""
58+
59+
def __init__(self, name=None, favorite_number=None, favorite_color=None):
60+
self.name = name
61+
self.favorite_number = favorite_number
62+
self.favorite_color = favorite_color
63+
64+
65+
def dict_to_user(obj, ctx):
66+
"""
67+
Converts object literal(dict) to a User instance.
68+
69+
Args:
70+
obj (dict): Object literal(dict)
71+
72+
ctx (SerializationContext): Metadata pertaining to the serialization
73+
operation.
74+
"""
75+
76+
if obj is None:
77+
return None
78+
79+
return User(name=obj['name'],
80+
favorite_number=obj['favorite_number'],
81+
favorite_color=obj['favorite_color'])
82+
83+
84+
def main(args):
85+
# Register the KMS drivers and the field-level encryption executor
86+
AwsKmsDriver.register()
87+
AzureKmsDriver.register()
88+
GcpKmsDriver.register()
89+
HcVaultKmsDriver.register()
90+
LocalKmsDriver.register()
91+
FieldEncryptionExecutor.register()
92+
93+
topic = args.topic
94+
95+
# When using Data Contract rules, a schema should not be passed to the
96+
# AvroDeserializer. The schema is fetched from the Schema Registry.
97+
schema_str = None
98+
99+
sr_conf = {'url': args.schema_registry}
100+
schema_registry_client = SchemaRegistryClient(sr_conf)
101+
102+
avro_deserializer = AvroDeserializer(schema_registry_client,
103+
schema_str,
104+
dict_to_user)
105+
106+
consumer_conf = {'bootstrap.servers': args.bootstrap_servers,
107+
'group.id': args.group,
108+
'auto.offset.reset': "earliest"}
109+
110+
consumer = Consumer(consumer_conf)
111+
consumer.subscribe([topic])
112+
113+
while True:
114+
try:
115+
# SIGINT can't be handled when polling, limit timeout to 1 second.
116+
msg = consumer.poll(1.0)
117+
if msg is None:
118+
continue
119+
120+
user = avro_deserializer(msg.value(), SerializationContext(msg.topic(), MessageField.VALUE))
121+
if user is not None:
122+
print("User record {}: name: {}\n"
123+
"\tfavorite_number: {}\n"
124+
"\tfavorite_color: {}\n"
125+
.format(msg.key(), user.name,
126+
user.favorite_number,
127+
user.favorite_color))
128+
except KeyboardInterrupt:
129+
break
130+
131+
consumer.close()
132+
133+
134+
if __name__ == '__main__':
135+
parser = argparse.ArgumentParser(description="AvroDeserializer example")
136+
parser.add_argument('-b', dest="bootstrap_servers", required=True,
137+
help="Bootstrap broker(s) (host[:port])")
138+
parser.add_argument('-s', dest="schema_registry", required=True,
139+
help="Schema Registry (http(s)://host[:port]")
140+
parser.add_argument('-t', dest="topic", default="example_serde_avro",
141+
help="Topic name")
142+
parser.add_argument('-g', dest="group", default="example_serde_avro",
143+
help="Consumer group")
144+
145+
main(parser.parse_args())

0 commit comments

Comments
 (0)