-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkau.py
executable file
·88 lines (69 loc) · 2.43 KB
/
kau.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#!/bin/env python
import requests
import ast
HOST = 'http://10.149.52.2:8080'
ADMIN_USER = 'admin'
ADMIN_PASS = 'changeme'
REALM = 'master'
ATTRIBUTES = {
'testattr': 'testattrvalue'
}
class RestKey:
token = ''
base_url = ''
auth_header = {}
def __init__(self, host: str, user: str, password: str):
self.base_url = f'{host}/auth'
self.token = self._get_token(user, password)
self.auth_header = {
'content-type': 'application/json',
'Authorization' : 'Bearer '+ self.token
}
def _get_token(self, user, password):
url = f'{self.base_url}/realms/master/protocol/openid-connect/token'
params = {
'client_id': 'admin-cli',
'grant_type': 'password',
'username' : user,
'password': password
}
x = requests.post(url, params, verify=False).content.decode('utf-8')
return str(ast.literal_eval(x)['access_token'])
def create_user(self, realm: str, name: str, enabled: bool = False):
url = f'{self.base_url}/admin/realms/{realm}/users'
params = {
'username': name,
'enabled': enabled
}
request = requests.post(url, headers=self.auth_header, json=params)
print(f'Create User - Status Code: ({request.status_code})')
def get_users(self, realm: str, limit: int = -1):
url = f'{self.base_url}/admin/realms/{realm}/users?max={limit}'
request = requests.get(url, headers=self.auth_header)
return request.json()
def add_attr(self, realm: str, user: dict, new_attr: dict, verbose: bool = True):
user_id = user['id']
url = f'{self.base_url}/admin/realms/{realm}/users/{user_id}'
try:
old_attr = user['attributes']
except KeyError:
old_attr = None
if old_attr:
attributes = old_attr.copy()
attributes.update(new_attr)
else:
attributes = new_attr
params = {
'attributes': attributes
}
response = requests.put(url, headers=self.auth_header, json=params)
if verbose:
print(f'{user_id} status_code {response.status_code}')
if __name__ == '__main__':
r = RestKey(HOST, ADMIN_USER, ADMIN_PASS)
users = r.get_users(REALM)
i = 0
for u in users:
r.add_attr(REALM, u, ATTRIBUTES)
i+=1
print(f'Finished updating {i} users.')