Files
saladeaula.digital/api.saladeaula.digital/app/routes/orders/checkout.py
2025-12-25 18:11:49 -03:00

97 lines
2.3 KiB
Python

import re
from decimal import Decimal
from http import HTTPStatus
from typing import Annotated, Literal
from aws_lambda_powertools.event_handler.api_gateway import Router
from aws_lambda_powertools.event_handler.openapi.params import Body
from layercake.dynamodb import DynamoDBPersistenceLayer
from layercake.extra_types import CnpjStr, CpfStr, NameStr
from pydantic import (
UUID4,
BaseModel,
ConfigDict,
EmailStr,
field_validator,
model_validator,
)
from api_gateway import JSONResponse
from boto3clients import dynamodb_client
from config import ORDER_TABLE
router = Router()
dyn = DynamoDBPersistenceLayer(ORDER_TABLE, dynamodb_client)
class User(BaseModel):
id: UUID4 | str
name: NameStr
class Address(BaseModel):
model_config = ConfigDict(str_strip_whitespace=True)
postcode: str
address1: str
address2: str | None = None
neighborhood: str
city: str
state: str
@field_validator('postcode')
@classmethod
def ensure_numbers(cls, v: str) -> str:
return re.sub(r'\D', '', v)
class Item(BaseModel):
id: UUID4
name: str
unit_price: Decimal
quantity: int = 1
class Coupon(BaseModel):
code: str
type: Literal['PERCENT', 'FIXED']
amount: Decimal
class Checkout(BaseModel):
model_config = ConfigDict(str_strip_whitespace=True)
name: str
email: EmailStr
address: Address
payment_method: Literal['PIX', 'CREDIT_CARD', 'BANK_SLIP', 'MANUAL']
items: tuple[Item, ...]
coupon: Coupon | None = None
user: User | None = None
org_id: UUID4 | str | None = None
user_id: UUID4 | str | None = None
cnpj: CnpjStr | None = None
cpf: CpfStr | None = None
@model_validator(mode='after')
def verify_fields(self):
if not any([self.cnpj, self.cpf]):
raise ValueError('cnpj or cpf is required')
if self.cnpj is not None:
if self.org_id is None:
raise ValueError('org_id is missing')
if self.user is None:
raise ValueError('user is missing')
if self.cpf is not None and self.user_id is None:
raise ValueError('user_id is missing')
return self
@router.post('/')
def checkout(body: Annotated[Checkout, Body()]):
return JSONResponse(status_code=HTTPStatus.CREATED)