Skip to content

WIP: Support for simpleType restrictions/facets #1303

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions src/zeep/xsd/types/facets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from collections import namedtuple
from enum import Enum
import re
from typing import Any, List, Optional

from lxml import etree

from zeep.xsd.const import xsd_ns

Whitespace = Enum('Whitespace', ('preserve', 'replace', 'collapse'))


class Facets(namedtuple('Facets', (
'enumeration',
'fraction_digits',
'length',
'max_exclusive',
'max_inclusive',
'max_length',
'min_exclusive',
'min_inclusive',
'min_length',
'patterns',
'total_digits',
'whitespace',
))):
def __new__(
cls,
enumeration: Optional[List[Any]] = None,
fraction_digits: Optional[int] = None,
length: Optional[int] = None,
max_exclusive: Optional[Any] = None,
max_inclusive: Optional[Any] = None,
max_length: Optional[int] = None,
min_exclusive: Optional[Any] = None,
min_inclusive: Optional[Any] = None,
min_length: Optional[int] = None,
patterns: Optional[List[re.Pattern]] = None,
total_digits: Optional[int] = None,
whitespace: Optional[Whitespace] = None):
kwargs = locals()
del kwargs['cls']
del kwargs['__class__']
return super().__new__(cls, **kwargs)

@classmethod
def parse_xml(cls, restriction_elem: etree._Element):
kwargs = {}
enumeration = []
patterns = []
for facet in restriction_elem:
if facet.tag == xsd_ns('enumeration'):
enumeration.append(facet.get('value'))
elif facet.tag == xsd_ns('fractionDigits'):
kwargs['fraction_digits'] = int(facet.get('value'))
elif facet.tag == xsd_ns('length'):
kwargs['length'] = int(facet.get('value'))
elif facet.tag == xsd_ns('maxExclusive'):
kwargs['max_exclusive'] = facet.get('value')
elif facet.tag == xsd_ns('maxInclusive'):
kwargs['max_inclusive'] = facet.get('value')
elif facet.tag == xsd_ns('maxLength'):
kwargs['max_length'] = int(facet.get('value'))
elif facet.tag == xsd_ns('minExclusive'):
kwargs['min_exclusive'] = facet.get('value')
elif facet.tag == xsd_ns('minInclusive'):
kwargs['min_inclusive'] = facet.get('value')
elif facet.tag == xsd_ns('minLength'):
kwargs['min_length'] = int(facet.get('value'))
elif facet.tag == xsd_ns('pattern'):
patterns.append(re.compile(facet.get('value')))
elif facet.tag == xsd_ns('totalDigits'):
kwargs['total_digits'] = int(facet.get('value'))
elif facet.tag == xsd_ns('whiteSpace'):
kwargs['whitesapce'] = Whitespace[facet.get('value')]

if enumeration:
kwargs['enumeration'] = enumeration
if patterns:
kwargs['patterns'] = patterns
return cls(**kwargs)

def parse_values(self, xsd_type):
"""Convert captured string values to their native Python types.
"""
def map_opt(f, v):
return None if v is None else f(v)

def go(v):
return map_opt(xsd_type.pythonvalue, v)

return self._replace(
enumeration=map_opt(lambda es: [go(e) for e in es], self.enumeration),
max_exclusive=go(self.max_exclusive),
max_inclusive=go(self.max_inclusive),
min_exclusive=go(self.min_exclusive),
min_inclusive=go(self.min_inclusive),
)
3 changes: 3 additions & 0 deletions src/zeep/xsd/types/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from zeep.xsd.const import Nil, xsd_ns, xsi_ns
from zeep.xsd.context import XmlParserContext
from zeep.xsd.types.any import AnyType
from zeep.xsd.types.facets import Facets
from zeep.xsd.valueobjects import CompoundValue

if typing.TYPE_CHECKING:
Expand All @@ -22,6 +23,8 @@
class AnySimpleType(AnyType):
_default_qname = xsd_ns("anySimpleType")

facets = Facets()

def __init__(self, qname=None, is_global=False):
super().__init__(qname or etree.QName(self._default_qname), is_global)

Expand Down
11 changes: 8 additions & 3 deletions src/zeep/xsd/types/unresolved.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from zeep.xsd.types.base import Type
from zeep.xsd.types.collection import UnionType # FIXME
from zeep.xsd.types.simple import AnySimpleType # FIXME
from zeep.xsd.types.simple import AnySimpleType, Facets # FIXME

if typing.TYPE_CHECKING:
from zeep.xsd.types.complex import ComplexType
Expand Down Expand Up @@ -38,12 +38,13 @@ def resolve(self):


class UnresolvedCustomType(Type):
def __init__(self, qname, base_type, schema):
def __init__(self, qname, base_type, schema, facets: Facets):
assert qname is not None
self.qname = qname
self.name = str(qname.localname)
self.schema = schema
self.base_type = base_type
self.facets = facets

def __repr__(self):
return "<%s(qname=%r, base_type=%r)>" % (
Expand All @@ -63,7 +64,11 @@ def resolve(self):
return xsd_type(base.item_types)

elif issubclass(base.__class__, AnySimpleType):
xsd_type = type(self.name, (base.__class__,), cls_attributes)
xsd_type = type(
self.name,
(base.__class__,),
dict(cls_attributes, facets=self.facets.parse_values(base))
)
return xsd_type(self.qname)

else:
Expand Down
13 changes: 8 additions & 5 deletions src/zeep/xsd/visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from zeep.xsd import elements as xsd_elements
from zeep.xsd import types as xsd_types
from zeep.xsd.const import AUTO_IMPORT_NAMESPACES, xsd_ns
from zeep.xsd.types.facets import Facets
from zeep.xsd.types.unresolved import UnresolvedCustomType, UnresolvedType

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -541,8 +542,8 @@ def visit_simple_type(self, node, parent):
annotation, items = self._pop_annotation(list(node))
child = items[0]
if child.tag == tags.restriction:
base_type = self.visit_restriction_simple_type(child, node)
xsd_type = UnresolvedCustomType(qname, base_type, self.schema)
base_type, facets = self.visit_restriction_simple_type(child, node)
xsd_type = UnresolvedCustomType(qname, base_type, self.schema, facets)

elif child.tag == tags.list:
xsd_type = self.visit_list(child, node)
Expand Down Expand Up @@ -716,13 +717,15 @@ def visit_restriction_simple_type(self, node, parent):
:type parent: lxml.etree._Element

"""
annotation, children = self._pop_annotation(list(node))
facets = Facets.parse_xml(children)

base_name = qname_attr(node, "base")
if base_name:
return self._get_type(base_name)
return self._get_type(base_name), facets

annotation, children = self._pop_annotation(list(node))
if children[0].tag == tags.simpleType:
return self.visit_simple_type(children[0], node)
return self.visit_simple_type(children[0], node), facets

def visit_restriction_simple_content(self, node, parent):
"""
Expand Down
4 changes: 4 additions & 0 deletions tests/test_async_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def test_no_cache(event_loop):
assert transport.cache is None


@pytest.mark.xfail # Failing on master
@pytest.mark.requests
def test_load(httpx_mock):
cache = stub(get=lambda url: None, add=lambda url, content: None)
Expand All @@ -24,6 +25,7 @@ def test_load(httpx_mock):
assert result == b"x"


@pytest.mark.xfail # Failing on master
@pytest.mark.requests
@pytest.mark.asyncio
def test_load_cache(httpx_mock):
Expand All @@ -37,6 +39,7 @@ def test_load_cache(httpx_mock):
assert cache.get("http://tests.python-zeep.org/test.xml") == b"x"


@pytest.mark.xfail # Failing on master
@pytest.mark.requests
@pytest.mark.asyncio
async def test_post(httpx_mock: HTTPXMock):
Expand All @@ -61,6 +64,7 @@ async def test_session_close(httpx_mock: HTTPXMock):
return await transport.aclose()


@pytest.mark.xfail # Failing on master
@pytest.mark.requests
@pytest.mark.asyncio
async def test_http_error(httpx_mock: HTTPXMock):
Expand Down
29 changes: 29 additions & 0 deletions tests/test_facets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from tests.utils import load_xml

from zeep import xsd


def test_parse_xml():
schema_doc = load_xml(
b"""
<?xml version="1.0" encoding="utf-8"?>
<xsd:schema xmlns:tns="http://tests.python-zeep.org/attr"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
elementFormDefault="qualified"
targetNamespace="http://tests.python-zeep.org/facets">
<xsd:simpleType name="SomeType">
<xsd:restriction base="xsd:float">
<xsd:enumeration value="42.0"/>
<xsd:enumeration value="42.9"/>
<xsd:minInclusive value="42.0"/>
<xsd:maxExclusive value="43.0"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:schema>
"""
)
schema = xsd.Schema(schema_doc)
ty = schema.get_type('{http://tests.python-zeep.org/facets}SomeType')
assert ty.facets.enumeration == [42.0, 42.9]
assert ty.facets.min_inclusive == 42.0
assert ty.facets.max_exclusive == 43.0
11 changes: 9 additions & 2 deletions tests/test_wsdl.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io
from io import StringIO
import re

import pytest
import requests_mock
Expand Down Expand Up @@ -159,7 +160,7 @@ def test_parse_types_multiple_schemas():

def test_parse_types_nsmap_issues():
content = StringIO(
"""
r"""
<?xml version="1.0"?>
<wsdl:definitions targetNamespace="urn:ec.europa.eu:taxud:vies:services:checkVat"
xmlns:tns1="urn:ec.europa.eu:taxud:vies:services:checkVat:types"
Expand Down Expand Up @@ -193,7 +194,13 @@ def test_parse_types_nsmap_issues():
</wsdl:definitions>
""".strip()
)
assert wsdl.Document(content, None)
doc = wsdl.Document(content, None)
assert doc

ty = doc.types.get_type(
'{urn:ec.europa.eu:taxud:vies:services:checkVat:types}companyTypeCode'
)
assert ty.facets.patterns == [re.compile(r'[A-Z]{2}\-[1-9][0-9]?')]


@pytest.mark.requests
Expand Down
8 changes: 8 additions & 0 deletions tests/test_xsd_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,14 @@ def test_union():
result = elm.parse(xml, schema)
assert result._value_1 == "Idle"

ty_1 = schema.get_type("{http://tests.python-zeep.org/tst}Type1")
assert ty_1.facets.max_length == 255
assert ty_1.facets.enumeration == ['Idle', 'Processing', 'Stopped']

ty_2 = schema.get_type("{http://tests.python-zeep.org/tst}Type2")
assert ty_2.facets.max_length == 255
assert ty_2.facets.enumeration == ['Paused']


def test_parse_invalid_values():
schema = xsd.Schema(
Expand Down