-
-
Notifications
You must be signed in to change notification settings - Fork 599
/
Copy pathtest_async_transport.py
79 lines (59 loc) · 2.31 KB
/
test_async_transport.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import pytest
from lxml import etree
from pretend import stub
from pytest_httpx import HTTPXMock
from zeep import exceptions
from zeep.cache import InMemoryCache
from zeep.transports import AsyncTransport
@pytest.mark.requests
def test_no_cache(event_loop):
transport = AsyncTransport()
assert transport.cache is None
@pytest.mark.xfail # Failing on master
@pytest.mark.requests
def test_load(httpx_mock):
cache = stub(get=lambda url: None, add=lambda url, content: None)
transport = AsyncTransport(cache=cache)
httpx_mock.add_response(url="http://tests.python-zeep.org/test.xml", data="x")
result = transport.load("http://tests.python-zeep.org/test.xml")
assert result == b"x"
@pytest.mark.xfail # Failing on master
@pytest.mark.requests
@pytest.mark.asyncio
def test_load_cache(httpx_mock):
cache = InMemoryCache()
transport = AsyncTransport(cache=cache)
httpx_mock.add_response(url="http://tests.python-zeep.org/test.xml", data="x")
result = transport.load("http://tests.python-zeep.org/test.xml")
assert result == b"x"
assert cache.get("http://tests.python-zeep.org/test.xml") == b"x"
@pytest.mark.xfail # Failing on master
@pytest.mark.requests
@pytest.mark.asyncio
async def test_post(httpx_mock: HTTPXMock):
cache = stub(get=lambda url: None, add=lambda url, content: None)
transport = AsyncTransport(cache=cache)
envelope = etree.Element("Envelope")
httpx_mock.add_response(url="http://tests.python-zeep.org/test.xml", data="x")
result = await transport.post_xml(
"http://tests.python-zeep.org/test.xml", envelope=envelope, headers={}
)
assert result.content == b"x"
await transport.aclose()
@pytest.mark.requests
@pytest.mark.asyncio
async def test_session_close(httpx_mock: HTTPXMock):
transport = AsyncTransport()
return await transport.aclose()
@pytest.mark.xfail # Failing on master
@pytest.mark.requests
@pytest.mark.asyncio
async def test_http_error(httpx_mock: HTTPXMock):
transport = AsyncTransport()
httpx_mock.add_response(
url="http://tests.python-zeep.org/test.xml", data="x", status_code=500
)
with pytest.raises(exceptions.TransportError) as exc:
transport.load("http://tests.python-zeep.org/test.xml")
assert exc.value.status_code == 500
assert exc.value.message is None