Skip to content

Commit 8e4e711

Browse files
author
Sebastian Molenda
committed
WIP
1 parent 97fdf38 commit 8e4e711

28 files changed

+1306
-457
lines changed

pubnub/endpoints/file_operations/send_file_asyncio.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,9 @@
1-
import aiohttp
2-
31
from pubnub.endpoints.file_operations.send_file import SendFileNative
42
from pubnub.endpoints.file_operations.publish_file_message import PublishFileMessage
53
from pubnub.endpoints.file_operations.fetch_upload_details import FetchFileUploadS3Data
64

75

86
class AsyncioSendFile(SendFileNative):
9-
def build_file_upload_request(self):
10-
file = self.encrypt_payload()
11-
form_data = aiohttp.FormData()
12-
for form_field in self._file_upload_envelope.result.data["form_fields"]:
13-
form_data.add_field(form_field["key"], form_field["value"], content_type="multipart/form-data")
14-
form_data.add_field("file", file, filename=self._file_name, content_type="application/octet-stream")
15-
16-
return form_data
17-
187
def options(self):
198
request_options = super(SendFileNative, self).options()
209
request_options.data = request_options.files

pubnub/pubnub_asyncio.py

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
import json
33
import asyncio
4-
import aiohttp
4+
import httpx
55
import math
66
import time
77
import urllib
@@ -45,7 +45,10 @@ def __init__(self, config, custom_event_loop=None, subscription_manager=None):
4545
self._connector = None
4646
self._session = None
4747

48-
self._connector = aiohttp.TCPConnector(verify_ssl=True, loop=self.event_loop)
48+
self._connector = httpx.AsyncHTTPTransport()
49+
50+
if not hasattr(self._connector, 'close'):
51+
self._connector.close = self._connector.aclose
4952

5053
if not subscription_manager:
5154
subscription_manager = EventEngineSubscriptionManager
@@ -62,19 +65,17 @@ async def close_pending_tasks(self, tasks):
6265
await asyncio.sleep(0.1)
6366

6467
async def create_session(self):
65-
if not self._session:
66-
self._session = aiohttp.ClientSession(
67-
loop=self.event_loop,
68-
timeout=aiohttp.ClientTimeout(connect=self.config.connect_timeout),
69-
connector=self._connector
70-
)
68+
self._session = httpx.AsyncClient(
69+
timeout=httpx.Timeout(self.config.connect_timeout),
70+
transport=self._connector
71+
)
7172

7273
async def close_session(self):
7374
if self._session is not None:
74-
await self._session.close()
75+
await self._session.aclose()
7576

7677
async def set_connector(self, cn):
77-
await self._session.close()
78+
await self._session.aclose()
7879

7980
self._connector = cn
8081
await self.create_session()
@@ -171,7 +172,7 @@ async def _request_helper(self, options_func, cancellation_event):
171172
else:
172173
url = utils.build_url(scheme="", origin="", path=options.path, params=options.query_string)
173174

174-
url = URL(url, encoded=True)
175+
url = str(URL(url, encoded=True))
175176
logger.debug("%s %s %s" % (options.method_string, url, options.data))
176177

177178
if options.request_headers:
@@ -189,7 +190,7 @@ async def _request_helper(self, options_func, cancellation_event):
189190
url,
190191
headers=request_headers,
191192
data=options.data if options.data else None,
192-
allow_redirects=options.allow_redirects
193+
follow_redirects=options.allow_redirects
193194
),
194195
options.request_timeout
195196
)
@@ -199,13 +200,14 @@ async def _request_helper(self, options_func, cancellation_event):
199200
logger.error("session.request exception: %s" % str(e))
200201
raise
201202

203+
response_body = response.read()
202204
if not options.non_json_response:
203-
body = await response.text()
205+
body = response_body
204206
else:
205207
if isinstance(response.content, bytes):
206208
body = response.content # TODO: simplify this logic within the v5 release
207209
else:
208-
body = await response.read()
210+
body = response_body
209211

210212
if cancellation_event is not None and cancellation_event.is_set():
211213
return
@@ -226,7 +228,7 @@ async def _request_helper(self, options_func, cancellation_event):
226228
auth_key = query['auth_key'][0]
227229

228230
response_info = ResponseInfo(
229-
status_code=response.status,
231+
status_code=response.status_code,
230232
tls_enabled='https' == request_url.scheme,
231233
origin=request_url.hostname,
232234
uuid=uuid,
@@ -265,17 +267,17 @@ async def _request_helper(self, options_func, cancellation_event):
265267

266268
logger.debug(data)
267269

268-
if response.status not in (200, 307, 204):
270+
if response.status_code not in (200, 307, 204):
269271

270-
if response.status >= 500:
272+
if response.status_code >= 500:
271273
err = PNERR_SERVER_ERROR
272274
else:
273275
err = PNERR_CLIENT_ERROR
274276

275-
if response.status == 403:
277+
if response.status_code == 403:
276278
status_category = PNStatusCategory.PNAccessDeniedCategory
277279

278-
if response.status == 400:
280+
if response.status_code == 400:
279281
status_category = PNStatusCategory.PNBadRequestCategory
280282

281283
raise create_exception(
@@ -285,7 +287,7 @@ async def _request_helper(self, options_func, cancellation_event):
285287
exception=PubNubException(
286288
errormsg=data,
287289
pn_error=err,
288-
status_code=response.status
290+
status_code=response.status_code
289291
)
290292
)
291293
else:

pubnub/request_handlers/requests_handler.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
import json # noqa # pylint: disable=W0611
66
import urllib
77

8-
from requests import Session
9-
from requests.adapters import HTTPAdapter
108

119
from pubnub import utils
1210
from pubnub.enums import PNStatusCategory

tests/integrational/asyncio/test_file_upload.py

Lines changed: 32 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from unittest.mock import patch
44
from pubnub.pubnub_asyncio import PubNubAsyncio
55
from tests.integrational.vcr_helper import pn_vcr
6-
from tests.helper import pnconf_file_copy, pnconf_enc_env_copy
6+
from tests.helper import pnconf_env_copy, pnconf_enc_env_copy
77
from pubnub.endpoints.file_operations.publish_file_message import PublishFileMessage
88
from pubnub.models.consumer.file import (
99
PNSendFileResult, PNGetFilesResult, PNDownloadFileResult,
@@ -33,12 +33,12 @@ async def send_file(pubnub, file_for_upload, cipher_key=None):
3333

3434

3535
@pn_vcr.use_cassette(
36-
"tests/integrational/fixtures/asyncio/file_upload/delete_file.yaml",
36+
"tests/integrational/fixtures/asyncio/file_upload/delete_file.json", serializer="pn_json",
3737
filter_query_parameters=['uuid', 'l_file', 'pnsdk']
3838
)
39-
@pytest.mark.asyncio
40-
async def test_delete_file(event_loop, file_for_upload):
41-
pubnub = PubNubAsyncio(pnconf_file_copy(), custom_event_loop=event_loop)
39+
@pytest.mark.asyncio(loop_scope="module")
40+
async def test_delete_file(file_for_upload):
41+
pubnub = PubNubAsyncio(pnconf_env_copy())
4242
pubnub.config.uuid = "files_asyncio_uuid"
4343

4444
envelope = await send_file(pubnub, file_for_upload)
@@ -53,28 +53,26 @@ async def test_delete_file(event_loop, file_for_upload):
5353

5454

5555
@pn_vcr.use_cassette(
56-
"tests/integrational/fixtures/asyncio/file_upload/list_files.yaml",
56+
"tests/integrational/fixtures/asyncio/file_upload/list_files.json", serializer="pn_json",
5757
filter_query_parameters=['uuid', 'l_file', 'pnsdk']
58-
59-
6058
)
61-
@pytest.mark.asyncio
62-
async def test_list_files(event_loop):
63-
pubnub = PubNubAsyncio(pnconf_file_copy(), custom_event_loop=event_loop)
59+
@pytest.mark.asyncio(loop_scope="module")
60+
async def test_list_files():
61+
pubnub = PubNubAsyncio(pnconf_env_copy())
6462
envelope = await pubnub.list_files().channel(CHANNEL).future()
6563

6664
assert isinstance(envelope.result, PNGetFilesResult)
67-
assert envelope.result.count == 23
65+
assert envelope.result.count == 7
6866
await pubnub.stop()
6967

7068

7169
@pn_vcr.use_cassette(
72-
"tests/integrational/fixtures/asyncio/file_upload/send_and_download_file.yaml",
70+
"tests/integrational/fixtures/asyncio/file_upload/send_and_download_file.json", serializer="pn_json",
7371
filter_query_parameters=['uuid', 'l_file', 'pnsdk']
7472
)
75-
@pytest.mark.asyncio
76-
async def test_send_and_download_file(event_loop, file_for_upload):
77-
pubnub = PubNubAsyncio(pnconf_file_copy(), custom_event_loop=event_loop)
73+
@pytest.mark.asyncio(loop_scope="module")
74+
async def test_send_and_download_file(file_for_upload):
75+
pubnub = PubNubAsyncio(pnconf_env_copy())
7876
envelope = await send_file(pubnub, file_for_upload)
7977
download_envelope = await pubnub.download_file().\
8078
channel(CHANNEL).\
@@ -89,9 +87,9 @@ async def test_send_and_download_file(event_loop, file_for_upload):
8987
"tests/integrational/fixtures/asyncio/file_upload/send_and_download_encrypted_file_cipher_key.json",
9088
filter_query_parameters=['uuid', 'l_file', 'pnsdk'], serializer='pn_json'
9189
)
92-
@pytest.mark.asyncio
93-
async def test_send_and_download_file_encrypted_cipher_key(event_loop, file_for_upload, file_upload_test_data):
94-
pubnub = PubNubAsyncio(pnconf_enc_env_copy(), custom_event_loop=event_loop)
90+
@pytest.mark.asyncio(loop_scope="module")
91+
async def test_send_and_download_file_encrypted_cipher_key(file_for_upload, file_upload_test_data):
92+
pubnub = PubNubAsyncio(pnconf_enc_env_copy())
9593

9694
with patch("pubnub.crypto.PubNubCryptodome.get_initialization_vector", return_value="knightsofni12345"):
9795
envelope = await send_file(pubnub, file_for_upload, cipher_key="test")
@@ -111,9 +109,9 @@ async def test_send_and_download_file_encrypted_cipher_key(event_loop, file_for_
111109
"tests/integrational/fixtures/asyncio/file_upload/send_and_download_encrypted_file_crypto_module.json",
112110
filter_query_parameters=['uuid', 'l_file', 'pnsdk'], serializer='pn_json'
113111
)
114-
@pytest.mark.asyncio
115-
async def test_send_and_download_encrypted_file_crypto_module(event_loop, file_for_upload, file_upload_test_data):
116-
pubnub = PubNubAsyncio(pnconf_enc_env_copy(), custom_event_loop=event_loop)
112+
@pytest.mark.asyncio(loop_scope="module")
113+
async def test_send_and_download_encrypted_file_crypto_module(file_for_upload, file_upload_test_data):
114+
pubnub = PubNubAsyncio(pnconf_enc_env_copy())
117115

118116
with patch("pubnub.crypto_core.PubNubLegacyCryptor.get_initialization_vector", return_value=b"knightsofni12345"):
119117
envelope = await send_file(pubnub, file_for_upload)
@@ -129,12 +127,12 @@ async def test_send_and_download_encrypted_file_crypto_module(event_loop, file_f
129127

130128

131129
@pn_vcr.use_cassette(
132-
"tests/integrational/fixtures/asyncio/file_upload/get_file_url.yaml",
130+
"tests/integrational/fixtures/asyncio/file_upload/get_file_url.json", serializer="pn_json",
133131
filter_query_parameters=['uuid', 'l_file', 'pnsdk']
134132
)
135-
@pytest.mark.asyncio
136-
async def test_get_file_url(event_loop, file_for_upload):
137-
pubnub = PubNubAsyncio(pnconf_file_copy(), custom_event_loop=event_loop)
133+
@pytest.mark.asyncio(loop_scope="module")
134+
async def test_get_file_url(file_for_upload):
135+
pubnub = PubNubAsyncio(pnconf_env_copy())
138136
envelope = await send_file(pubnub, file_for_upload)
139137
file_url_envelope = await pubnub.get_file_url().\
140138
channel(CHANNEL).\
@@ -146,12 +144,12 @@ async def test_get_file_url(event_loop, file_for_upload):
146144

147145

148146
@pn_vcr.use_cassette(
149-
"tests/integrational/fixtures/asyncio/file_upload/fetch_s3_upload_data.yaml",
147+
"tests/integrational/fixtures/asyncio/file_upload/fetch_s3_upload_data.json", serializer="pn_json",
150148
filter_query_parameters=['uuid', 'l_file', 'pnsdk']
151149
)
152-
@pytest.mark.asyncio
153-
async def test_fetch_file_upload_s3_data_with_result_invocation(event_loop, file_upload_test_data):
154-
pubnub = PubNubAsyncio(pnconf_file_copy(), custom_event_loop=event_loop)
150+
@pytest.mark.asyncio(loop_scope="module")
151+
async def test_fetch_file_upload_s3_data_with_result_invocation(file_upload_test_data):
152+
pubnub = PubNubAsyncio(pnconf_env_copy())
155153
result = await pubnub._fetch_file_upload_s3_data().\
156154
channel(CHANNEL).\
157155
file_name(file_upload_test_data["UPLOADED_FILENAME"]).result()
@@ -161,12 +159,12 @@ async def test_fetch_file_upload_s3_data_with_result_invocation(event_loop, file
161159

162160

163161
@pn_vcr.use_cassette(
164-
"tests/integrational/fixtures/asyncio/file_upload/publish_file_message_encrypted.yaml",
162+
"tests/integrational/fixtures/asyncio/file_upload/publish_file_message_encrypted.json", serializer="pn_json",
165163
filter_query_parameters=['uuid', 'seqn', 'pnsdk']
166164
)
167-
@pytest.mark.asyncio
168-
async def test_publish_file_message_with_encryption(event_loop, file_upload_test_data):
169-
pubnub = PubNubAsyncio(pnconf_file_copy(), custom_event_loop=event_loop)
165+
@pytest.mark.asyncio(loop_scope="module")
166+
async def test_publish_file_message_with_encryption(file_upload_test_data):
167+
pubnub = PubNubAsyncio(pnconf_env_copy())
170168
envelope = await PublishFileMessage(pubnub).\
171169
channel(CHANNEL).\
172170
meta({}).\

tests/integrational/asyncio/test_invocations.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ async def test_publish_future_raises_pubnub_error(event_loop):
5555
async def test_publish_future_raises_lower_level_error(event_loop):
5656
pubnub = PubNubAsyncio(corrupted_keys, custom_event_loop=event_loop)
5757

58-
pubnub._connector.close()
58+
pubnub._connector.aclose()
5959

6060
with pytest.raises(RuntimeError) as exinfo:
6161
await pubnub.publish().message('hey').channel('blah').result()
@@ -102,7 +102,7 @@ async def test_publish_envelope_raises(event_loop):
102102
async def test_publish_envelope_raises_lower_level_error(event_loop):
103103
pubnub = PubNubAsyncio(corrupted_keys, custom_event_loop=event_loop)
104104

105-
pubnub._connector.close()
105+
pubnub._connector.aclose()
106106

107107
e = await pubnub.publish().message('hey').channel('blah').future()
108108
assert isinstance(e, PubNubAsyncioException)

0 commit comments

Comments
 (0)