from http import HTTPStatus from typing import Annotated from uuid import uuid4 from aws_lambda_powertools.event_handler.api_gateway import Router from aws_lambda_powertools.event_handler.exceptions import ( BadRequestError, NotFoundError, ) from aws_lambda_powertools.event_handler.openapi.params import Body from layercake.dateutils import now from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair, SortKey from layercake.extra_types import CnpjStr, CpfStr, NameStr from pydantic import UUID4, BaseModel, EmailStr, Field from api_gateway import JSONResponse from boto3clients import dynamodb_client from config import USER_TABLE router = Router() dyn = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) class Org(BaseModel): id: str name: str cnpj: CnpjStr class User(BaseModel): id: UUID4 = Field(default_factory=uuid4) name: NameStr cpf: CpfStr email: EmailStr class CPFConflictError(Exception): ... class EmailConflictError(Exception): ... class UserConflictError(BadRequestError): ... class UserNotFoundError(NotFoundError): ... class OrgMissingError(NotFoundError): ... @router.post('/') def add_user( user: Annotated[User, Body(embed=True)], org: Annotated[Org, Body(embed=True)], ): if _create_user(user, org): return JSONResponse(HTTPStatus.CREATED) now_ = now() user_id = _get_user_id(user) with dyn.transact_writer() as transact: transact.update( key=KeyPair(user_id, '0'), update_expr='ADD org_id :org_id', expr_attr_values={ ':org_id': {org.id}, }, ) transact.put( item={ 'id': user_id, # Post-migration: rename `orgs` to `ORG` 'sk': f'orgs#{org.id}', 'name': org.name, 'cnpj': org.cnpj, 'created_at': now_, } ) transact.put( item={ # Post-migration: rename `orgmembers` to `ORGMEMBER` 'id': f'orgmembers#{org.id}', 'sk': user_id, 'created_at': now_, }, cond_expr='attribute_not_exists(sk)', exc_cls=UserConflictError, ) transact.condition( key=KeyPair(org.id, '0'), cond_expr='attribute_exists(sk)', exc_cls=OrgMissingError, ) return JSONResponse(HTTPStatus.CREATED) def _create_user(user: User, org: Org) -> bool: now_ = now() try: with dyn.transact_writer() as transact: transact.put( item={ **user.model_dump(), 'sk': '0', 'org_id': {org.id}, 'created_at': now_, }, ) transact.put( item={ # Post-migration: rename `cpf` to `CPF` 'id': 'cpf', 'sk': user.cpf, }, cond_expr='attribute_not_exists(sk)', exc_cls=CPFConflictError, ) transact.put( item={ # Post-migration: rename `email` to `EMAIL` 'id': 'email', 'sk': user.email, }, cond_expr='attribute_not_exists(sk)', exc_cls=EmailConflictError, ) transact.put( item={ 'id': user.id, # Post-migration: rename `orgs` to `ORG` 'sk': f'orgs#{org.id}', 'name': org.name, 'cnpj': org.cnpj, 'created_at': now_, } ) transact.put( item={ # Post-migration: rename `orgmembers` to `ORGMEMBER` 'id': f'orgmembers#{org.id}', 'sk': user.id, 'created_at': now_, } ) transact.condition( key=KeyPair(org.id, '0'), cond_expr='attribute_exists(sk)', exc_cls=OrgMissingError, ) except (CPFConflictError, EmailConflictError): return False else: return True def _get_user_id(user: User) -> str: user_id = dyn.collection.get_items( KeyPair( pk='email', sk=SortKey(user.email, path_spec='user_id'), rename_key='id', ) + KeyPair( pk='cpf', sk=SortKey(user.cpf, path_spec='user_id'), rename_key='id', ), flatten_top=False, ).get('id') if not user_id: raise UserNotFoundError() return user_id