from typing import Annotated from uuid import uuid4 from aws_lambda_powertools.event_handler.api_gateway import Router from aws_lambda_powertools.event_handler.openapi.params import Body from layercake.dateutils import now from layercake.dynamodb import DynamoDBPersistenceLayer from layercake.extra_types import CnpjStr, NameStr from pydantic import UUID4, BaseModel, EmailStr from boto3clients import dynamodb_client from config import INTERNAL_EMAIL_DOMAIN, USER_TABLE from exceptions import ConflictError from .admins import router as admins from .custom_pricing import router as custom_pricing from .enrollments.scheduled import router as scheduled from .users import router as users router = Router() dyn = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) __all__ = ['admins', 'custom_pricing', 'scheduled', 'users'] class OrgConflictError(ConflictError): ... class User(BaseModel): id: str | UUID4 name: NameStr email: EmailStr @router.post('/s') def add_org( name: Annotated[str, Body(embed=True)], cnpj: Annotated[CnpjStr, Body(embed=True)], user: Annotated[User, Body(embed=True)], ): now_ = now() org_id = str(uuid4()) with dyn.transact_writer() as transact: transact.put( item={ # Post-migration (users): rename `cnpj` to `CNPJ` 'id': 'cnpj', 'sk': cnpj, 'org_id': org_id, 'created_at': now_, }, cond_expr='attribute_not_exists(sk)', exc_cls=OrgConflictError, ) transact.put( item={ 'id': org_id, 'sk': '0', 'name': name, 'email': f'org+{cnpj}@{INTERNAL_EMAIL_DOMAIN}', 'cnpj': cnpj, 'created_at': now_, } )