Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid order race conditions #8

Merged
merged 10 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env.template
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
LOG_LEVEL=INFO
DEBUG=false

# Postgres
POSTGRES_SERVER=localhost
POSTGRES_PORT=5432
POSTGRES_USER=deep
POSTGRES_PASSWORD=icecream
POSTGRES_DB=deep_ice
Expand Down
8 changes: 4 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ WORKDIR /app
# Install uv deps with pip.
RUN pip install uv
COPY pyproject.toml .
RUN uv export --no-dev >requirements.txt && pip install -Ur requirements.txt
RUN uv pip install --system -Ur pyproject.toml

# Copy the rest of the application code.
# Copy the rest of the application code and install the project too.
COPY . .
RUN uv pip install --system -e .

# Run the FastAPI app using uvicorn on default port.
EXPOSE 80

# Command to run the FastAPI app using uvicorn.
CMD ["fastapi", "run", "deep_ice", "--port", "80"]
2 changes: 1 addition & 1 deletion deep_ice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async def lifespan(fast_app: FastAPI):
class TaskQueue:
functions = [payment_service.make_payment_task]
redis_settings = redis_settings
max_tries = settings.TASK_MAX_RETRIES
max_tries = settings.TASK_MAX_TRIES
retry_delay = settings.TASK_RETRY_DELAY


Expand Down
6 changes: 3 additions & 3 deletions deep_ice/api/routes/cart.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ async def get_cart_items(current_user: CurrentUserDep, cart_service: CartService
return cart


@router.post("/items", response_model=RetrieveCartItem)
@router.post(
"/items", response_model=RetrieveCartItem, status_code=status.HTTP_201_CREATED
)
async def add_item_to_cart(
session: SessionDep,
current_user: CurrentUserDep,
cart_service: CartServiceDep,
item: Annotated[CreateCartItem, Body()],
response: Response,
):
cart = await cart_service.ensure_cart(cast(int, current_user.id))
cart_item = CartItem(cart_id=cart.id, **item.model_dump())
Expand All @@ -65,7 +66,6 @@ async def add_item_to_cart(
else:
cart_item.icecream = icecream

response.status_code = status.HTTP_201_CREATED
return cart_item


Expand Down
95 changes: 61 additions & 34 deletions deep_ice/api/routes/payments.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,30 @@
from typing import Annotated, cast

import sentry_sdk
from aioredlock import LockError
from fastapi import APIRouter, Body, HTTPException, Request, Response, status
from fastapi.responses import RedirectResponse
from sqlalchemy.exc import SQLAlchemyError
from sqlmodel.ext.asyncio.session import AsyncSession

from deep_ice.core import logger
from deep_ice.core.dependencies import CurrentUserDep, SessionDep
from deep_ice.models import PaymentMethod, PaymentStatus, RetrievePayment
from deep_ice.services.cart import CartService
from deep_ice.core.dependencies import (
CartServiceDep,
CurrentUserDep,
RedlockDep,
SessionDep,
)
from deep_ice.models import Cart, Payment, PaymentMethod, PaymentStatus, RetrievePayment
from deep_ice.services.order import OrderService
from deep_ice.services.payment import PaymentError, PaymentService, payment_stub
from deep_ice.services.stats import stats_service

router = APIRouter()


@router.post("", response_model=RetrievePayment)
async def make_payment(
session: SessionDep,
current_user: CurrentUserDep,
method: Annotated[PaymentMethod, Body(embed=True)],
request: Request,
response: Response,
):
# FIXME(cmin764): Check if we need an async Lock primitive here in order to allow
# only one user to submit an order at a time. (based on available stock check)
cart_service = CartService(session)
cart = await cart_service.get_cart(cast(int, current_user.id))
if not cart or not cart.items:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="There are no items in the cart",
)

cart_ok = await cart_service.check_items_against_stock(cart)
if not cart_ok:
# Redirect back to the cart so we get aware of the new state based on the
# available stock. And let the user decide if it continues with a payment.
return RedirectResponse(url=request.url_for("get_cart_items"))

async def _make_payment(
session: AsyncSession, *, cart: Cart, method: PaymentMethod, response: Response
) -> Payment:
# Items are available and ready to be sold, make the order and pay for it.
order_service = OrderService(session, stats_service=stats_service)
payment_service = PaymentService(
Expand All @@ -62,14 +47,56 @@ async def make_payment(
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Payment failed"
)
else:
await session.commit()
response.status_code = (
status.HTTP_202_ACCEPTED
if payment.status == PaymentStatus.PENDING
else status.HTTP_201_CREATED

await session.commit()
response.status_code = (
status.HTTP_202_ACCEPTED
if payment.status == PaymentStatus.PENDING
else status.HTTP_201_CREATED
)
return payment


@router.post("", response_model=RetrievePayment)
async def make_payment(
session: SessionDep,
current_user: CurrentUserDep,
cart_service: CartServiceDep,
redlock: RedlockDep,
method: Annotated[PaymentMethod, Body(embed=True)],
request: Request,
response: Response,
):
cart = await cart_service.get_cart(cast(int, current_user.id))
if not cart or not cart.items:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="There are no items in the cart",
)
return payment

lock_keys = [f"ice-lock:{item.icecream_id}" for item in cart.items]
locks = []
try:
for lock_key in lock_keys:
lock = await redlock.lock(lock_key)
locks.append(lock)

cart_ok = await cart_service.check_items_against_stock(cart)
if not cart_ok:
# Redirect back to the cart so we get aware of the new state based on
# the available stock. And let the user decide if it continues with a
# payment.
return RedirectResponse(url=request.url_for("get_cart_items"))

return await _make_payment(
session, cart=cart, method=method, response=response
)
except LockError as exc:
logger.exception("Payment lock error with key %r: %s", lock_key, exc)
sentry_sdk.capture_exception(exc)
finally:
for lock in locks:
await redlock.unlock(lock)


@router.get("", response_model=list[RetrievePayment])
Expand Down
9 changes: 6 additions & 3 deletions deep_ice/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ class Settings(BaseSettings):
model_config = SettingsConfigDict(
# Use the top level .env file (one level above ./deep_ice/).
env_file=".env",
env_ignore_empty=True,
env_ignore_empty=False,
extra="ignore",
)

LOG_LEVEL: str = "INFO"
DEBUG: bool = False
PROJECT_NAME: str = "Deep Ice"
API_V1_STR: str = "/v1"

Expand All @@ -30,8 +31,10 @@ class Settings(BaseSettings):
POSTGRES_DB: str

REDIS_HOST: str = "localhost"
REDIS_PORT: int = 6379
REDLOCK_TTL: int = 30 # seconds for the lock to persists in Redis

TASK_MAX_RETRIES: int = 3
TASK_MAX_TRIES: int = 3
TASK_RETRY_DELAY: int = 1 # seconds between retries
TASK_BACKOFF_FACTOR: int = 5 # seconds to wait based on the job try counter

Expand All @@ -52,4 +55,4 @@ def SQLALCHEMY_DATABASE_URI(self) -> PostgresDsn:


settings = Settings() # type: ignore
redis_settings = RedisSettings(host=settings.REDIS_HOST)
redis_settings = RedisSettings(host=settings.REDIS_HOST, port=settings.REDIS_PORT)
2 changes: 1 addition & 1 deletion deep_ice/core/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from deep_ice.core.config import settings

async_engine = create_async_engine(
str(settings.SQLALCHEMY_DATABASE_URI), echo=True, future=True
str(settings.SQLALCHEMY_DATABASE_URI), echo=settings.DEBUG, future=True
)


Expand Down
15 changes: 13 additions & 2 deletions deep_ice/core/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from typing import Annotated
from typing import Annotated, AsyncGenerator

import jwt
import sentry_sdk
from aioredlock import Aioredlock
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jwt.exceptions import InvalidTokenError
from pydantic import ValidationError
from sqlmodel.ext.asyncio.session import AsyncSession

from deep_ice.core import logger, security
from deep_ice.core.config import settings
from deep_ice.core.config import redis_settings, settings
from deep_ice.core.database import get_async_session
from deep_ice.models import TokenPayload, User
from deep_ice.services.cart import CartService
Expand Down Expand Up @@ -51,5 +52,15 @@ async def get_cart_service(session: SessionDep) -> CartService:
return CartService(session)


async def get_lock_manager() -> AsyncGenerator[Aioredlock, None]:
lock_manager = Aioredlock(
[{"host": redis_settings.host, "port": redis_settings.port}],
internal_lock_timeout=settings.REDLOCK_TTL,
)
yield lock_manager
await lock_manager.destroy()


CurrentUserDep = Annotated[User, Depends(get_current_user)]
CartServiceDep = Annotated[CartService, Depends(get_cart_service)]
RedlockDep = Annotated[Aioredlock, Depends(get_lock_manager)]
2 changes: 1 addition & 1 deletion deep_ice/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class BaseIceCream(SQLModel):
class IceCream(BaseIceCream, FetchMixin, table=True):
id: Annotated[int | None, Field(primary_key=True)] = None
stock: int
blocked_quantity: int = 0 # reserved for payments only
blocked_quantity: int = 0 # reserved during payments
is_active: bool = True

cart_items: list["CartItem"] = Relationship(
Expand Down
8 changes: 8 additions & 0 deletions deep_ice/services/cart.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,16 @@ async def ensure_cart(self, user_id: int) -> Cart:

return cart

async def _refresh_icecream_stock(self, cart: Cart) -> None:
await self._session.refresh(cart)
for cart_item in cart.items:
await self._session.refresh(cart_item)
cart_item.icecream = await cart_item.awaitable_attrs.icecream
await self._session.refresh(cart_item.icecream)

async def check_items_against_stock(self, cart: Cart) -> bool:
# Ensure once again that we still have on stock the items we intend to buy.
await self._refresh_icecream_stock(cart)
cart_ok = True
for item in cart.items:
if item.quantity > item.icecream.available_stock:
Expand Down
5 changes: 2 additions & 3 deletions deep_ice/services/order.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@ async def confirm_order(self, order_id: int):

icecream.stock -= item.quantity
icecream.blocked_quantity -= item.quantity
self._session.add(icecream)
await self._stats_service.acknowledge_icecream_demand(
cast(int, icecream.id), name=icecream.name, quantity=item.quantity
)

self._session.add_all(order.items)

async def cancel_order(self, order_id: int):
order = await self._get_order(order_id)
order.status = OrderStatus.CANCELLED
Expand All @@ -62,7 +61,7 @@ async def cancel_order(self, order_id: int):
continue

icecream.blocked_quantity -= item.quantity
self._session.add_all(order.items)
self._session.add(icecream)

async def make_order_from_cart(self, cart: Cart) -> Order:
# Creates and saves an order out of the current cart and returns it for later
Expand Down
12 changes: 8 additions & 4 deletions deep_ice/services/payment.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async def make_payment_task(
msg = f"{method.value} payment for order #{order_id} failed, retrying..."
logger.warning(msg)
sentry_sdk.capture_message(msg, level="warning")
raise Retry(defer=attempts * settings.TASK_BACKOFF_FACTOR)
raise Retry(defer=attempts * settings.TASK_BACKOFF_FACTOR)

async for session in get_async_session():
order_service = OrderService(session, stats_service=stats_service)
Expand Down Expand Up @@ -82,7 +82,9 @@ class PaymentStub(PaymentInterface):

min_delay: int
max_delay: int
allow_failures: bool = False # enable failures or not
# Enable failures (or not) and at what rate.
allow_failures: bool = False
failure_rate: float = 0.2

async def make_payment(
self,
Expand Down Expand Up @@ -124,7 +126,9 @@ async def make_payment(
if self.allow_failures:
# Simulate payment result: 80% chance of success, 20% chance of failure.
payment_result = random.choices(
[PaymentStatus.SUCCESS, PaymentStatus.FAILED], weights=[80, 20], k=1
[PaymentStatus.SUCCESS, PaymentStatus.FAILED],
weights=[1 - self.failure_rate, self.failure_rate],
k=1,
)[0]
else:
payment_result = PaymentStatus.SUCCESS
Expand Down Expand Up @@ -196,4 +200,4 @@ async def set_order_payment_status(self, order_id: int, status: PaymentStatus):
self._session.add(payment)


payment_stub = PaymentStub(1, 3, allow_failures=True)
payment_stub = PaymentStub(1, 3, allow_failures=True, failure_rate=0.2)
6 changes: 4 additions & 2 deletions deep_ice/services/stats.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from collections import OrderedDict

import redis.asyncio as redis
import redis.asyncio as aioredis

from deep_ice.core.config import redis_settings

Expand All @@ -22,7 +22,9 @@ class StatsService(StatsInterface):
POPULARITY_KEY = "POPULAR_ICECREAM"

def __init__(self):
self._client = redis.Redis(host=redis_settings.host)
self._client = aioredis.Redis(
host=redis_settings.host, port=redis_settings.port
)

@staticmethod
def _get_product_key(*args: int | str) -> str:
Expand Down
Loading
Loading