Skip to content

added more python async options #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
venv/
py312/
.idea/
__pycache__/

32 changes: 27 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,35 @@
# async-http-requests-tut
# Parallel Request Benchmark

I had a need to understand the performance of the different python async packages for file download.
I started with requests and httpx SyncClient to define a baseline and then I looked at:
- multiprocessing;
- multithreading, both sync and async with httpx, single and multiple event loops;
- asyncio, both aiohttp and httpx;
- anyio, both aiohttp and httpx.

The execution time for a single run based on the average of 20 executions is:

| Test Name | Duration |
|---------------------------------------------------|------------|
| test_synchronous_requests | 0.208629 |
| test_synchronous_httpx | 0.231735 |
| test_multiprocessing_requests | 0.103449 |
| test_multithreading_requests | 0.065041 |
| test_multithreading_httpx_single_event_loop | 0.031216 |
| test_multithreading_httpx_multiple_event_loops | 0.056664 |
| test_async_aiohttp_asyncio | 0.000023 |
| test_async_aiohttp_anyio | 0.065789 |
| test_async_httpx_asyncio | 0.078591 |
| test_async_httpx_anyio | 0.088228 |


I stated from the initial work in [async-http-requests-tut](https://github.com/nikhilkumarsingh/async-http-requests-tut).

Making multiple HTTP requests using Python (synchronous, multiprocessing, multithreading, asyncio)

Watch tutorial video [here](https://www.youtube.com/watch?v=R4Oz8JUuM4s).


More tutorials:
- [Multiprocessing in Python](https://www.youtube.com/watch?v=Ju4xkvFm07o&list=PLyb_C2HpOQSDUh4kIJnprJjh5n5Wqsww8
)
- [Multiprocessing in Python](https://www.youtube.com/watch?v=Ju4xkvFm07o&list=PLyb_C2HpOQSDUh4kIJnprJjh5n5Wqsww8)
- [Multithreading in Python](https://www.youtube.com/watch?v=ZPM8TCz5cd8&list=PLyb_C2HpOQSC-Ncui9S4ncUdaGI2YEhwK)

- [Concurrent Programming in Python (asyncio)](https://www.youtube.com/watch?v=y85G7GLYhYA&list=PLyb_C2HpOQSBsygWeCYkJ7wjxXShIql43)
14 changes: 14 additions & 0 deletions comparison.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
The execution time for a single run is:

| Test Name | Duration |
|---------------------------------------------------|------------|
| test_synchronous_requests | 0.208629 |
| test_synchronous_httpx | 0.231735 |
| test_multiprocessing_requests | 0.103449 |
| test_multithreading_requests | 0.065041 |
| test_multithreading_httpx_single_event_loop | 0.031216 |
| test_multithreading_httpx_multiple_event_loops | 0.056664 |
| test_async_aiohttp_asyncio | 0.000023 |
| test_async_aiohttp_anyio | 0.065789 |
| test_async_httpx_asyncio | 0.078591 |
| test_async_httpx_anyio | 0.088228 |
923 changes: 923 additions & 0 deletions parallel_requests_benchmark.log

Large diffs are not rendered by default.

44 changes: 44 additions & 0 deletions parallel_requests_benchmark/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import logging

class Config:
NO_OF_THREADS : int = 8
NO_OF_TASKS : int = 20
NO_OF_TASKS_IN_BATCH : int = 10
URL : str = 'https://httpbin.org/uuid'
TIMEOUT : float = 15.0
INTERVAL_BETWEEN_BATCHES : float = 3.0
MINIMUM_WAITING_TIME : float =0.05
RETRIES : int = 3
BACKOFF_FACTOR : float = 0.3
MAX_CONNECTIONS : int = 20
MAX_KEEPALIVE_CONNECTIONS : int = 10
POOL_CONNECTIONS : int = 100
POOL_MAXSIZE : int = 1000

'''
The timeit.repeat function in Python is used to measure the execution time of code repeatedly. Here are the key options:

stmt (default: 'pass'): The code statement to be executed.
It can be a string or callable function.
setup (default: 'pass'): The setup code that runs before executing stmt.
It's executed once initially to set up any preconditions for the measured code.
Typically used for imports or initializing variables.
timer (default: timeit.default_timer):
A timer function that provides time measurements.
It usually defaults to the most precise clock available on the system.
number (default: 1000000): The number of times the stmt is executed per repetition.
You can set it to control how many times the function should be executed in each iteration.
repeat (default: 5): The number of times the experiment should be repeated.
The return value is a list with the times from each repetition, which helps analyze the variability of execution times.
'''
NO_OF_NUMBERS : int = 1
NO_OF_REPEATS : int = 3


logger = logging.getLogger('parallel_requests_benchmark')
logging.basicConfig(filename='parallel_requests_benchmark.log',
encoding='utf-8',
level=logging.INFO)
logger.info('\n\nThis is a new execution of the program.\n\n')

config = Config()
20 changes: 20 additions & 0 deletions parallel_requests_benchmark/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import logging
from functools import wraps

def mylogger(logger_name):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
logger = logging.getLogger(logger_name)
return func(logger, *args, **kwargs)
return wrapper
return decorator

# Example usage
@mylogger("my_custom_logger")
def example_function(logger, message):
logger.info(message)

if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
example_function("Hello, this is a log message!")
45 changes: 45 additions & 0 deletions parallel_requests_benchmark/test_async_aiohttp_anyio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import aiohttp
import anyio
import logging
import os

from config import config, logger
from timer import timer
from test_async_aiohttp_asyncio import fetch


logger = logging.getLogger('parallel_requests_benchmark.test_async_aiohttp_anyio')


async def main_async():
no_of_executed_tasks : int = 0
results = []
try:
timeout = aiohttp.ClientTimeout(total=config.TIMEOUT)
connector = aiohttp.TCPConnector(limit=config.MAX_CONNECTIONS,
limit_per_host=config.MAX_KEEPALIVE_CONNECTIONS)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
for batch_start in range(0, config.NO_OF_TASKS, config.NO_OF_TASKS_IN_BATCH):
batch_end = min(batch_start + config.NO_OF_TASKS_IN_BATCH, config.NO_OF_TASKS)
async with anyio.create_task_group() as tg:
tasks = [tg.start_soon(fetch, session, config.URL, logger) for _ in range(batch_start, batch_end)]
if (batch_start > 0) and (batch_end < config.NO_OF_TASKS):
await anyio.sleep(config.INTERVAL_BETWEEN_BATCHES)
_results = tasks
no_of_executed_tasks += len(tasks)
results.extend(_results)
return results
except Exception as e:
logger.warning(f":::ERROR::: Not all the tasks have been completed! \nError: {e}")

if no_of_executed_tasks != config.NO_OF_TASKS:
logger.warning(f":::WARNING::: Only {no_of_executed_tasks} out of {config.NO_OF_TASKS} tasks have been completed.")



@timer(name=os.path.splitext(os.path.basename(__file__))[0], number=config.NO_OF_NUMBERS, repeat=config.NO_OF_REPEATS)
def main():
anyio.run(main_async)

if __name__ == '__main__':
main()
61 changes: 61 additions & 0 deletions parallel_requests_benchmark/test_async_aiohttp_asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import aiohttp
import asyncio
import logging
import os

from config import config, logger
from timer import timer


logger = logging.getLogger('parallel_requests_benchmark.test_async_aiohttp_asyncio')


async def fetch(session, url, logger):
try:
response = await session.get(url)
response.raise_for_status()
json_response = await response.json()
logger.info(json_response['uuid'])
except aiohttp.ClientConnectionError:
logger.error("There was a connection error.")
except aiohttp.ClientResponseError as http_err:
logger.error(f"HTTP error occurred: {http_err.status} - {http_err.message}")
except aiohttp.ClientPayloadError:
logger.error("There was an error with the response payload.")
except asyncio.TimeoutError:
logger.error("The request timed out.")
except aiohttp.ClientError as err:
logger.error(f"An error occurred: {err}")

async def main_async():
no_of_executed_tasks : int = 0
results = []
try:
timeout = aiohttp.ClientTimeout(total=config.TIMEOUT)
connector = aiohttp.TCPConnector(limit=config.MAX_CONNECTIONS,
limit_per_host=config.MAX_KEEPALIVE_CONNECTIONS)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
for batch_start in range(0, config.NO_OF_TASKS, config.NO_OF_TASKS_IN_BATCH):
batch_end = min(batch_start + config.NO_OF_TASKS_IN_BATCH, config.NO_OF_TASKS)
tasks = [fetch(session, config.URL, task_id, logger) for task_id in range(batch_start, batch_end)]
_results = await asyncio.gather(*tasks, return_exceptions=True)
if (batch_start > 0) and (batch_end < config.NO_OF_TASKS):
await asyncio.sleep(config.INTERVAL_BETWEEN_BATCHES)
no_of_executed_tasks += len(tasks)
results.extend(_results)
return results
except Exception as e:
logger.warning(f":::ERROR::: Not all the tasks have been completed! \nError: {e}")

if no_of_executed_tasks != config.NO_OF_TASKS:
logger.warning(f":::WARNING::: Only {no_of_executed_tasks} out of {config.NO_OF_TASKS} tasks have been completed.")



@timer(name=os.path.splitext(os.path.basename(__file__))[0], number=config.NO_OF_NUMBERS, repeat=config.NO_OF_REPEATS)
def main():
asyncio.run(main_async())


if __name__ == '__main__':
main()
43 changes: 43 additions & 0 deletions parallel_requests_benchmark/test_async_httpx_anyio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import anyio
import httpx
import logging
import os

from config import config, logger
from timer import timer
from test_async_httpx_asyncio import fetch

logger = logging.getLogger('parallel_requests_benchmark.test_async_httpx_anyio')


async def main_async():
no_of_executed_tasks : int = 0
results = []
try:
limits = httpx.Limits(max_connections=config.MAX_CONNECTIONS,
max_keepalive_connections=config.MAX_KEEPALIVE_CONNECTIONS)
async with httpx.AsyncClient(timeout=config.TIMEOUT, limits=limits) as client:
for batch_start in range(0, config.NO_OF_TASKS, config.NO_OF_TASKS_IN_BATCH):
batch_end = min(batch_start + config.NO_OF_TASKS_IN_BATCH, config.NO_OF_TASKS)
async with anyio.create_task_group() as tg:
tasks = [tg.start_soon(fetch, client, config.URL, task_id, logger) for task_id in range(batch_start, batch_end)]
#results =
if (batch_start > 0) and (batch_end < config.NO_OF_TASKS):
await anyio.sleep(config.INTERVAL_BETWEEN_BATCHES)
_results = tasks
no_of_executed_tasks += len(tasks)
results.extend(_results)
return results
except Exception as e:
logger.warning(f":::ERROR::: Not all the tasks have been completed! \nError: {e}")

if no_of_executed_tasks != config.NO_OF_TASKS:
logger.warning(f":::WARNING::: Only {no_of_executed_tasks} out of {config.NO_OF_TASKS} tasks have been completed.")


@timer(name=os.path.splitext(os.path.basename(__file__))[0], number=config.NO_OF_NUMBERS, repeat=config.NO_OF_REPEATS)
def main():
anyio.run(main_async)

if __name__ == '__main__':
main()
59 changes: 59 additions & 0 deletions parallel_requests_benchmark/test_async_httpx_asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import asyncio
import httpx
import logging
import os

from config import config, logger
from timer import timer



logger = logging.getLogger('parallel_requests_benchmark.test_async_httpx_asyncio')


async def fetch(client, url, task_id:int, logger):
try:
response = await client.get(url)
response.raise_for_status()
json_response = response.json()
logger.info(json_response['uuid'])
except httpx.ConnectError:
logger.error("There was a connection error.")
except httpx.HTTPStatusError as http_err:
logger.error(f"HTTP error occurred: {http_err}")
except httpx.TimeoutException:
logger.error("The request timed out.")
except httpx.RequestError as req_err:
logger.error(f"An error occurred: {req_err}")
return task_id


async def main_async():
no_of_executed_tasks : int = 0
results = []
try:
limits = httpx.Limits(max_connections=config.MAX_CONNECTIONS,
max_keepalive_connections=config.MAX_KEEPALIVE_CONNECTIONS)
async with httpx.AsyncClient(timeout=config.TIMEOUT, limits=limits) as client:
for batch_start in range(0, config.NO_OF_TASKS, config.NO_OF_TASKS_IN_BATCH):
batch_end = min(batch_start + config.NO_OF_TASKS_IN_BATCH, config.NO_OF_TASKS)
tasks = [fetch(client, config.URL, task_id, logger) for task_id in range(batch_start, batch_end)]
_results = await asyncio.gather(*tasks, return_exceptions=True)
if (batch_start > 0) and (batch_end < config.NO_OF_TASKS):
await asyncio.sleep(config.INTERVAL_BETWEEN_BATCHES)
no_of_executed_tasks += len(tasks)
results.extend(_results)
return results
except Exception as e:
logger.warning(f":::ERROR::: Not all the tasks have been completed! \nError: {e}")

if no_of_executed_tasks != config.NO_OF_TASKS:
logger.warning(f":::WARNING::: Only {no_of_executed_tasks} out of {config.NO_OF_TASKS} tasks have been completed.")


@timer(name=os.path.splitext(os.path.basename(__file__))[0], number=config.NO_OF_NUMBERS, repeat=config.NO_OF_REPEATS)
def main():
asyncio.run(main_async())

if __name__ == '__main__':
main()
22 changes: 22 additions & 0 deletions parallel_requests_benchmark/test_multiprocessing_requests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from multiprocessing.pool import Pool
import requests
import logging
import os

from config import config, logger
from timer import timer
from test_synchronous_requests import fetch


logger = logging.getLogger('parallel_requests_benchmark.test_multiprocessing_requests')


@timer(name=os.path.splitext(os.path.basename(__file__))[0], number=config.NO_OF_NUMBERS, repeat=config.NO_OF_REPEATS)
def main():
with Pool() as pool:
with requests.Session() as session:
pool.starmap(fetch, [(session, config.URL, task_id, logger) for task_id in range(config.NO_OF_TASKS)])


if __name__ == '__main__':
main()
Loading