-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathcleanup.py
197 lines (166 loc) · 6.28 KB
/
cleanup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
"""
Functions to clean up a tenant within a fiware based platform.
"""
from functools import wraps
from pydantic import AnyHttpUrl, AnyUrl
from requests import RequestException
from typing import Callable, List, Union
from filip.models import FiwareHeader
from filip.clients.ngsi_v2 import \
ContextBrokerClient, \
IoTAClient, \
QuantumLeapClient
from requests import Session
def clear_context_broker(url: str, fiware_header: FiwareHeader, session: Session = None ):
"""
Function deletes all entities, registrations and subscriptions for a
given fiware header
Note:
Always clear the devices first because the IoT-Agent will otherwise
through errors if it cannot find its registration anymore.
Args:
url: Url of the context broker service
fiware_header: header of the tenant
session: session object with set ca.crt and ca.key for TLS support
Returns:
None
"""
# create client
client = ContextBrokerClient(url=url, fiware_header=fiware_header, session=session)
# clean entities
client.delete_entities(entities=client.get_entity_list())
# clear subscriptions
for sub in client.get_subscription_list():
client.delete_subscription(subscription_id=sub.id)
assert len(client.get_subscription_list()) == 0
# clear registrations
for reg in client.get_registration_list():
client.delete_registration(registration_id=reg.id)
assert len(client.get_registration_list()) == 0
def clear_iot_agent(url: Union[str, AnyHttpUrl], fiware_header: FiwareHeader, session: Session = None):
"""
Function deletes all device groups and devices for a
given fiware header
Args:
url: Url of the context broker service
fiware_header: header of the tenant
session: session object with set ca.crt and ca.key for TLS support
Returns:
None
"""
# create client
client = IoTAClient(url=url, fiware_header=fiware_header, session=session)
# clear registrations
for device in client.get_device_list():
client.delete_device(device_id=device.device_id)
assert len(client.get_device_list()) == 0
# clear groups
for group in client.get_group_list():
client.delete_group(resource=group.resource,
apikey=group.apikey)
assert len(client.get_group_list()) == 0
def clear_quantumleap(url: str, fiware_header: FiwareHeader, session: Session = None):
"""
Function deletes all data for a given fiware header
Args:
url: Url of the quantumleap service
fiware_header: header of the tenant
session: session object with set ca.crt and ca.key for TLS support
Returns:
None
"""
def handle_emtpy_db_exception(err: RequestException) -> None:
"""
When the database is empty for request quantumleap returns a 404
error with a error message. This will be handled here
evaluating the empty database error as 'OK'
Args:
err: exception raised by delete function
"""
if err.response.status_code == 404 \
and err.response.json().get('error', None) == 'Not Found':
pass
else:
raise
# create client
client = QuantumLeapClient(url=url, fiware_header=fiware_header, session=session)
# clear data
entities = []
try:
entities = client.get_entities()
except RequestException as err:
handle_emtpy_db_exception(err)
# will be executed for all found entities
for entity in entities:
client.delete_entity(entity_id=entity.entityId,
entity_type=entity.entityType)
def clear_all(*,
fiware_header: FiwareHeader,
cb_url: str = None,
iota_url: Union[str, List[str]] = None,
ql_url: str = None,
session: Session = None):
"""
Clears all services that a url is provided for
Args:
fiware_header:
cb_url: url of the context broker service
iota_url: url of the IoT-Agent service
ql_url: url of the QuantumLeap service
session: session object with set ca.crt and ca.key for TLS support
Returns:
None
"""
if iota_url is not None:
if isinstance(iota_url, (str, AnyUrl)):
iota_url = [iota_url]
for url in iota_url:
clear_iot_agent(url=url, fiware_header=fiware_header, session=session)
if cb_url is not None:
clear_context_broker(url=cb_url, fiware_header=fiware_header, session=session)
if ql_url is not None:
clear_quantumleap(url=ql_url, fiware_header=fiware_header, session=session)
def clean_test(*,
fiware_service: str,
fiware_servicepath: str,
cb_url: str = None,
iota_url: Union[str, List[str]] = None,
ql_url: str = None,
session: Session = None) -> Callable:
"""
Decorator to clean up the server before and after the test
Note:
This does not substitute a proper TearDown method, because a failing
test will not execute the clean up after the error. Since this would
mean an unnecessary error handling. We actually want a test to fail
with proper messages.
Args:
fiware_service: tenant
fiware_servicepath: tenant path
cb_url: url of context broker service
iota_url: url of IoT-Agent service
ql_url: url of quantumleap service
session: session object with set ca.crt and ca.key for TLS support
Returns:
Decorator for clean tests
"""
fiware_header = FiwareHeader(service=fiware_service,
service_path=fiware_servicepath)
clear_all(fiware_header=fiware_header,
cb_url=cb_url,
iota_url=iota_url,
ql_url=ql_url,
session=session)
# Inner decorator function
def decorator(func):
# Wrapper function for the decorated function
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
clear_all(fiware_header=fiware_header,
cb_url=cb_url,
iota_url=iota_url,
ql_url=ql_url,
session=session)
return decorator