363 lines
9.6 KiB
Python
363 lines
9.6 KiB
Python
import re
|
|
from datetime import date, datetime, timedelta
|
|
from decimal import Decimal
|
|
from enum import Enum
|
|
from functools import reduce
|
|
from http import HTTPStatus
|
|
from typing import Any, Literal, NotRequired, TypedDict, cast
|
|
from uuid import uuid4
|
|
|
|
from aws_lambda_powertools import Logger
|
|
from aws_lambda_powertools.event_handler.api_gateway import Router
|
|
from aws_lambda_powertools.event_handler.exceptions import (
|
|
NotFoundError,
|
|
)
|
|
from layercake.dateutils import now, ttl
|
|
from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair, SortKey, TransactKey
|
|
from layercake.extra_types import CnpjStr, CpfStr, CreditCard, NameStr
|
|
from pydantic import (
|
|
UUID4,
|
|
BaseModel,
|
|
ConfigDict,
|
|
EmailStr,
|
|
Field,
|
|
field_validator,
|
|
model_validator,
|
|
)
|
|
|
|
from api_gateway import JSONResponse
|
|
from boto3clients import dynamodb_client
|
|
from config import DUE_DAYS, ORDER_TABLE, USER_TABLE
|
|
from routes.enrollments.enroll import Enrollment
|
|
|
|
router = Router()
|
|
logger = Logger(__name__)
|
|
dyn = DynamoDBPersistenceLayer(ORDER_TABLE, dynamodb_client)
|
|
|
|
|
|
class CouponNotFoundError(NotFoundError): ...
|
|
|
|
|
|
class PaymentMethod(str, Enum):
|
|
PIX = 'PIX'
|
|
CREDIT_CARD = 'CREDIT_CARD'
|
|
BANK_SLIP = 'BANK_SLIP'
|
|
# MANUAL = 'MANUAL'
|
|
|
|
|
|
class User(BaseModel):
|
|
id: UUID4 | str
|
|
name: NameStr
|
|
|
|
|
|
class Address(BaseModel):
|
|
model_config = ConfigDict(str_strip_whitespace=True)
|
|
|
|
postcode: str
|
|
address1: str
|
|
address2: str | None = None
|
|
neighborhood: str
|
|
city: str
|
|
state: str
|
|
|
|
@field_validator('postcode')
|
|
@classmethod
|
|
def ensure_numbers(cls, v: str) -> str:
|
|
return re.sub(r'\D', '', v)
|
|
|
|
|
|
class Item(BaseModel):
|
|
id: UUID4
|
|
name: str
|
|
unit_price: Decimal = Field(..., ge=1)
|
|
quantity: int = Field(1, ge=1)
|
|
|
|
|
|
class Coupon(BaseModel):
|
|
code: str
|
|
type: Literal['PERCENT', 'FIXED']
|
|
amount: Decimal = Field(..., ge=1)
|
|
|
|
|
|
class Checkout(BaseModel):
|
|
model_config = ConfigDict(
|
|
str_strip_whitespace=True,
|
|
use_enum_values=True,
|
|
)
|
|
|
|
id: UUID4 = Field(default_factory=uuid4)
|
|
name: str
|
|
email: EmailStr
|
|
address: Address
|
|
payment_method: PaymentMethod
|
|
items: tuple[Item, ...]
|
|
enrollments: tuple[Enrollment, ...] = tuple()
|
|
coupon: Coupon | None = None
|
|
org_id: UUID4 | str | None = None
|
|
user_id: UUID4 | str | None = None
|
|
cnpj: CnpjStr | None = None
|
|
cpf: CpfStr | None = None
|
|
created_by: User | None = None
|
|
credit_card: CreditCard | None = None
|
|
installments: int | None = Field(None, ge=1, le=12)
|
|
|
|
@model_validator(mode='after')
|
|
def verify_fields(self):
|
|
if not any([self.cnpj, self.cpf]):
|
|
raise ValueError('cnpj or cpf is required')
|
|
|
|
if self.cnpj is not None:
|
|
if self.org_id is None:
|
|
raise ValueError('org_id is missing')
|
|
|
|
if self.created_by is None:
|
|
raise ValueError('created_by is missing')
|
|
|
|
if self.cpf is not None and self.user_id is None:
|
|
raise ValueError('user_id is missing')
|
|
|
|
return self
|
|
|
|
def model_dump(self, **kwargs) -> dict[str, Any]:
|
|
return super().model_dump(
|
|
exclude_none=True,
|
|
exclude={
|
|
'items',
|
|
'address',
|
|
'created_by',
|
|
'coupon',
|
|
'credit_card',
|
|
'enrollments',
|
|
},
|
|
**kwargs,
|
|
)
|
|
|
|
|
|
@router.post('/')
|
|
def checkout(payload: Checkout):
|
|
now_ = now()
|
|
settings = _get_settings(str(payload.org_id or payload.user_id))
|
|
order_id = payload.id
|
|
address = payload.address
|
|
credit_card = payload.credit_card
|
|
created_by = payload.created_by
|
|
items = payload.items
|
|
enrollments = payload.enrollments
|
|
coupon = payload.coupon
|
|
payment_method = payload.payment_method
|
|
installments = payload.installments
|
|
subtotal = _sum_items(items)
|
|
due_date = (
|
|
_calc_due_date(now_, settings['due_days'])
|
|
if payment_method == 'BANK_SLIP'
|
|
else now_ + timedelta(hours=1)
|
|
)
|
|
discount = (
|
|
_apply_discount(subtotal, coupon.amount, coupon.type) * -1 if coupon else 0
|
|
)
|
|
total = subtotal + discount if subtotal > 0 else 0
|
|
interest_amount = (
|
|
_calc_interest(total, installments) - total
|
|
if payment_method == 'CREDIT_CARD' and installments
|
|
else 0
|
|
)
|
|
|
|
with dyn.transact_writer() as transact:
|
|
transact.put(
|
|
item={
|
|
'id': order_id,
|
|
'sk': '0',
|
|
'status': 'PENDING',
|
|
'subtotal': subtotal,
|
|
'total': total + interest_amount,
|
|
'discount': discount,
|
|
'due_date': due_date,
|
|
# Post-migration (orders): rename `create_date` to `created_at`
|
|
'create_date': now_,
|
|
}
|
|
| ({'coupon': coupon.code} if coupon else {})
|
|
| (
|
|
{
|
|
'installments': payload.installments,
|
|
'interest_amount': interest_amount,
|
|
}
|
|
if payload.installments
|
|
else {}
|
|
)
|
|
| payload.model_dump()
|
|
)
|
|
|
|
transact.put(
|
|
item={
|
|
'id': order_id,
|
|
'sk': 'ITEMS',
|
|
'items': [item.model_dump() for item in items],
|
|
'created_at': now_,
|
|
}
|
|
)
|
|
transact.put(
|
|
item={
|
|
'id': order_id,
|
|
'sk': 'ADDRESS',
|
|
'created_at': now_,
|
|
}
|
|
| address.model_dump()
|
|
)
|
|
|
|
if created_by:
|
|
transact.put(
|
|
item={
|
|
'id': order_id,
|
|
'sk': 'CREATED_BY',
|
|
'user_id': created_by.id,
|
|
'name': created_by.name,
|
|
'created_at': now_,
|
|
}
|
|
)
|
|
|
|
if credit_card:
|
|
transact.put(
|
|
item={
|
|
'id': order_id,
|
|
'sk': 'CREDIT_CARD',
|
|
'brand': credit_card.brand,
|
|
'last4': credit_card.last4,
|
|
'created_at': now_,
|
|
}
|
|
)
|
|
transact.put(
|
|
item={
|
|
'id': order_id,
|
|
'sk': 'CREDIT_CARD#PAYMENT_INTENT',
|
|
'ttl': ttl(start_dt=now_, minutes=5),
|
|
'created_at': now_,
|
|
}
|
|
| credit_card.model_dump(),
|
|
)
|
|
|
|
if coupon:
|
|
transact.put(
|
|
item={
|
|
'id': order_id,
|
|
'sk': 'METADATA#COUPON',
|
|
'created_at': now_,
|
|
}
|
|
| coupon.model_dump()
|
|
)
|
|
transact.condition(
|
|
key=KeyPair('COUPON', coupon.code),
|
|
cond_expr='attribute_exists(sk) \
|
|
AND discount_type = :type \
|
|
AND discount_amount = :amount',
|
|
expr_attr_values={
|
|
':type': coupon.type,
|
|
':amount': coupon.amount,
|
|
},
|
|
exc_cls=CouponNotFoundError,
|
|
)
|
|
|
|
for enrollment in enrollments:
|
|
transact.put(
|
|
item={
|
|
'id': order_id,
|
|
'sk': f'ENROLLMENT#{enrollment.id}',
|
|
'status': 'PENDING',
|
|
'created_at': now_,
|
|
}
|
|
| enrollment.model_dump(exclude={'id'})
|
|
)
|
|
|
|
if 'iugu_api_token' in settings:
|
|
transact.put(
|
|
item={
|
|
'id': order_id,
|
|
'sk': 'METADATA#TEST_MODE',
|
|
'iugu_api_token': settings['iugu_api_token'],
|
|
'created_at': now_,
|
|
}
|
|
)
|
|
|
|
return JSONResponse(
|
|
body={'id': order_id},
|
|
status_code=HTTPStatus.CREATED,
|
|
)
|
|
|
|
|
|
def _sum_items(items: tuple[Item, ...]):
|
|
def sum(total: Decimal, item: Item) -> Decimal:
|
|
return total + item.unit_price * item.quantity
|
|
|
|
return reduce(sum, items, Decimal(0))
|
|
|
|
|
|
def _calc_interest(total, installments: int) -> Decimal:
|
|
rate2to6 = 0.055
|
|
rate7to12 = 0.0608
|
|
rate = rate7to12 if installments >= 7 else rate2to6
|
|
return total * Decimal((1 - 0.0382) / (1 - rate))
|
|
|
|
|
|
def _apply_discount(
|
|
subtotal: Decimal,
|
|
discount_amount: Decimal,
|
|
discount_type: Literal['FIXED', 'PERCENT'],
|
|
) -> Decimal:
|
|
if subtotal <= Decimal('0'):
|
|
return Decimal('0')
|
|
|
|
amount = (
|
|
(subtotal * discount_amount) / Decimal('100')
|
|
if discount_type == 'PERCENT'
|
|
else discount_amount
|
|
)
|
|
|
|
return min(amount, subtotal)
|
|
|
|
|
|
def _calc_due_date(
|
|
start_date: datetime,
|
|
business_days: int,
|
|
holidays: set[date] | None = None,
|
|
) -> datetime:
|
|
holidays = holidays or set()
|
|
|
|
due_date = start_date + timedelta(days=business_days)
|
|
|
|
while due_date.weekday() >= 5 or due_date.date() in holidays:
|
|
due_date += timedelta(days=1)
|
|
|
|
return due_date
|
|
|
|
|
|
Settings = TypedDict(
|
|
'Settings',
|
|
{
|
|
'due_days': int,
|
|
'iugu_api_token': NotRequired[str],
|
|
},
|
|
)
|
|
|
|
|
|
def _get_settings(id: str) -> Settings:
|
|
r = dyn.collection.get_items(
|
|
TransactKey(id, table_name=USER_TABLE)
|
|
+ SortKey(
|
|
sk='METADATA#BILLING',
|
|
path_spec='due_days',
|
|
rename_key='due_days',
|
|
)
|
|
+ SortKey(
|
|
'METADATA#TEST_MODE',
|
|
path_spec='iugu_api_token',
|
|
rename_key='iugu_api_token',
|
|
),
|
|
flatten_top=False,
|
|
)
|
|
|
|
if 'due_days' not in r:
|
|
r['due_days'] = DUE_DAYS
|
|
else:
|
|
r['due_days'] = int(r['due_days'])
|
|
|
|
return cast(Settings, r)
|