-
Notifications
You must be signed in to change notification settings - Fork 911
/
Copy pathoauth_schema_registry.py
68 lines (53 loc) · 2.81 KB
/
oauth_schema_registry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2025 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Examples of setting up Schema Registry with OAuth with static token,
# Client Credentials, and custom functions
# CUSTOM OAuth configuration takes in a custom function, config for that
# function, and returns the fields shown below. All fields must be returned
# for OAuth authentication to work
from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient
def main():
static_oauth_config = {'url': 'https://psrc-123456.us-east-1.aws.confluent.cloud',
'bearer.auth.credentials.source': 'STATIC_TOKEN',
'bearer.auth.token': 'static-token',
'bearer.auth.logical.cluster': 'lsrc-12345',
'bearer.auth.identity.pool.id': 'pool-abcd'}
static_oauth_sr_client = SchemaRegistryClient(static_oauth_config)
print(static_oauth_sr_client.get_subjects())
client_credentials_oauth_config = {
'url': 'https://psrc-123456.us-east-1.aws.confluent.cloud',
'bearer.auth.credentials.source': 'OAUTHBEARER',
'bearer.auth.client.id': 'client-id',
'bearer.auth.client.secret': 'client-secret',
'bearer.auth.scope': 'schema_registry',
'bearer.auth.issuer.endpoint.url': 'https://yourauthprovider.com/v1/token',
'bearer.auth.logical.cluster': 'lsrc-12345',
'bearer.auth.identity.pool.id': 'pool-abcd'}
client_credentials_oauth_sr_client = SchemaRegistryClient(client_credentials_oauth_config)
print(client_credentials_oauth_sr_client.get_subjects())
def custom_oauth_function(config):
return config
custom_config = {'bearer.auth.token': 'example-token',
'bearer.auth.logical.cluster': 'lsrc-12345', 'bearer.auth.identity.pool.id': 'pool-abcd'}
custom_sr_config = {'url': 'https://psrc-123456.us-east-1.aws.confluent.cloud',
'bearer.auth.credentials.source': 'CUSTOM',
'bearer.auth.custom.provider.function': custom_oauth_function,
'bearer.auth.custom.provider.config': custom_config}
custom_sr_client = SchemaRegistryClient(custom_sr_config)
print(custom_sr_client.get_subjects())
if __name__ == '__main__':
main()