-
Notifications
You must be signed in to change notification settings - Fork 338
/
Copy pathmodel.py
284 lines (221 loc) · 8.06 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
import copy
import re
import warnings
from collections import OrderedDict
from collections.abc import MutableMapping
from werkzeug.utils import cached_property
from .mask import Mask
from .errors import abort
from jsonschema import Draft4Validator
from jsonschema.exceptions import ValidationError
from .utils import not_none
from ._http import HTTPStatus
RE_REQUIRED = re.compile(r"u?\'(?P<name>.*)\' is a required property", re.I | re.U)
def instance(cls):
if isinstance(cls, type):
return cls()
return cls
class ModelBase(object):
"""
Handles validation and swagger style inheritance for both subclasses.
Subclass must define `schema` attribute.
:param str name: The model public name
"""
def __init__(self, name, *args, **kwargs):
super(ModelBase, self).__init__(*args, **kwargs)
self.__apidoc__ = {"name": name}
self.name = name
self.__parents__ = []
def instance_inherit(name, *parents):
return self.__class__.inherit(name, self, *parents)
self.inherit = instance_inherit
@property
def ancestors(self):
"""
Return the ancestors tree
"""
ancestors = [p.ancestors for p in self.__parents__]
return set.union(set([self.name]), *ancestors)
def get_parent(self, name):
if self.name == name:
return self
else:
for parent in self.__parents__:
found = parent.get_parent(name)
if found:
return found
raise ValueError("Parent " + name + " not found")
@property
def __schema__(self):
schema = self._schema
if self.__parents__:
refs = [
{"$ref": "#/definitions/{0}".format(parent.name)}
for parent in self.__parents__
]
return {"allOf": refs + [schema]}
else:
return schema
@classmethod
def inherit(cls, name, *parents):
"""
Inherit this model (use the Swagger composition pattern aka. allOf)
:param str name: The new model name
:param dict fields: The new model extra fields
"""
model = cls(name, parents[-1])
model.__parents__ = parents[:-1]
return model
def validate(self, data, format_checker=None, definitions=None):
schema = self.__schema__
schema["definitions"] = definitions or {}
validator = Draft4Validator(schema, format_checker=format_checker)
try:
validator.validate(data)
except ValidationError:
abort(
HTTPStatus.BAD_REQUEST,
message="Input payload validation failed",
errors=dict(self.format_error(e) for e in validator.iter_errors(data)),
)
def format_error(self, error):
path = list(error.path)
if error.validator == "required":
name = RE_REQUIRED.match(error.message).group("name")
path.append(name)
key = ".".join(str(p) for p in path)
return key, error.message
def __unicode__(self):
return "Model({name},{{{fields}}})".format(
name=self.name, fields=",".join(self.keys())
)
__str__ = __unicode__
class RawModel(ModelBase):
"""
A thin wrapper on ordered fields dict to store API doc metadata.
Can also be used for response marshalling.
:param str name: The model public name
:param str mask: an optional default model mask
:param bool strict: validation should raise error when there is param not provided in schema
"""
wrapper = dict
def __init__(self, name, *args, **kwargs):
self.__mask__ = kwargs.pop("mask", None)
self.__strict__ = kwargs.pop("strict", False)
if self.__mask__ and not isinstance(self.__mask__, Mask):
self.__mask__ = Mask(self.__mask__)
super(RawModel, self).__init__(name, *args, **kwargs)
def instance_clone(name, *parents):
return self.__class__.clone(name, self, *parents)
self.clone = instance_clone
@property
def _schema(self):
properties = self.wrapper()
required = set()
discriminator = None
for name, field in self.items():
field = instance(field)
properties[name] = field.__schema__
if field.required:
required.add(name)
if getattr(field, "discriminator", False):
discriminator = name
definition = {
"required": sorted(list(required)) or None,
"properties": properties,
"discriminator": discriminator,
"x-mask": str(self.__mask__) if self.__mask__ else None,
"type": "object",
}
if self.__strict__:
definition["additionalProperties"] = False
return not_none(definition)
@cached_property
def resolved(self):
"""
Resolve real fields before submitting them to marshal
"""
# Duplicate fields
resolved = copy.deepcopy(self)
# Recursively copy parent fields if necessary
for parent in self.__parents__:
resolved.update(parent.resolved)
# Handle discriminator
candidates = [f for f in resolved.values() if getattr(f, "discriminator", None)]
# Ensure the is only one discriminator
if len(candidates) > 1:
raise ValueError("There can only be one discriminator by schema")
# Ensure discriminator always output the model name
elif len(candidates) == 1:
candidates[0].default = self.name
return resolved
def extend(self, name, fields):
"""
Extend this model (Duplicate all fields)
:param str name: The new model name
:param dict fields: The new model extra fields
:deprecated: since 0.9. Use :meth:`clone` instead.
"""
warnings.warn(
"extend is is deprecated, use clone instead",
DeprecationWarning,
stacklevel=2,
)
if isinstance(fields, (list, tuple)):
return self.clone(name, *fields)
else:
return self.clone(name, fields)
@classmethod
def clone(cls, name, *parents):
"""
Clone these models (Duplicate all fields)
It can be used from the class
>>> model = Model.clone(fields_1, fields_2)
or from an Instanciated model
>>> new_model = model.clone(fields_1, fields_2)
:param str name: The new model name
:param dict parents: The new model extra fields
"""
fields = cls.wrapper()
for parent in parents:
fields.update(copy.deepcopy(parent))
return cls(name, fields)
def __deepcopy__(self, memo):
obj = self.__class__(
self.name,
[(key, copy.deepcopy(value, memo)) for key, value in self.items()],
mask=self.__mask__,
strict=self.__strict__,
)
obj.__parents__ = self.__parents__
return obj
class Model(RawModel, dict, MutableMapping):
"""
A thin wrapper on fields dict to store API doc metadata.
Can also be used for response marshalling.
:param str name: The model public name
:param str mask: an optional default model mask
"""
pass
class OrderedModel(RawModel, OrderedDict, MutableMapping):
"""
A thin wrapper on ordered fields dict to store API doc metadata.
Can also be used for response marshalling.
:param str name: The model public name
:param str mask: an optional default model mask
"""
wrapper = OrderedDict
class SchemaModel(ModelBase):
"""
Stores API doc metadata based on a json schema.
:param str name: The model public name
:param dict schema: The json schema we are documenting
"""
def __init__(self, name, schema=None):
super(SchemaModel, self).__init__(name)
self._schema = schema or {}
def __unicode__(self):
return "SchemaModel({name},{schema})".format(
name=self.name, schema=self._schema
)
__str__ = __unicode__