from http import HTTPStatus from typing import Annotated from uuid import uuid4 from aws_lambda_powertools.event_handler.api_gateway import Router from aws_lambda_powertools.event_handler.openapi.params import Body from layercake.dateutils import now, ttl from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair, SortKey from layercake.extra_types import CnpjStr, CpfStr, NameStr from pydantic import BaseModel, EmailStr, Field from api_gateway import JSONResponse from boto3clients import dynamodb_client from config import INTERNAL_EMAIL_DOMAIN, USER_TABLE from exceptions import ( CPFConflictError, EmailConflictError, OrgNotFoundError, SubscriptionFrozenError, UserConflictError, UserNotFoundError, ) from middlewares.authentication_middleware import User as Authenticated router = Router() dyn = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) class Org(BaseModel): id: str | None = Field(default=None, exclude=True) name: str cnpj: CnpjStr class User(BaseModel): name: NameStr cpf: CpfStr email: EmailStr # class OrgNotFoundError(NotFoundError): ... @router.post('//users') def add( org_id: str, user: Annotated[User, Body(embed=True)], org: Annotated[Org, Body(embed=True)], ): org.id = org_id created_by: Authenticated = router.context['user'] if _create_user(user, org, created_by): return JSONResponse(HTTPStatus.CREATED) user_id = _get_user_id(user) _add_member(user_id, org) return JSONResponse(HTTPStatus.NO_CONTENT) @router.delete('//users/') def unlink(org_id: str, user_id: str): with dyn.transact_writer() as transact: transact.delete( key=KeyPair( pk=f'orgmembers#{org_id}', # Post-migration: uncomment the following line # pk=f'MEMBER#ORG#{org_id}', sk=user_id, ) ) transact.delete( key=KeyPair(org_id, f'admins#{user_id}'), # Post-migration: uncomment the following line # key=KeyPair(org_id, f'ADMIN#{user_id}'), ) transact.delete( key=KeyPair(user_id, f'SCOPE#{org_id}'), ) transact.delete( key=KeyPair( pk=user_id, sk=f'orgs#{org_id}', # Post-migration: uncomment the following line # pk=f'ORG#{org_id}', ) ) transact.update( key=KeyPair(user_id, '0'), update_expr='DELETE tenant_id :org_id', # Post-migration: uncomment the following line # update_expr='DELETE org_id :org_id', expr_attr_values={':org_id': {org_id}}, ) return JSONResponse(HTTPStatus.NO_CONTENT) def _create_user( user: User, org: Org, created_by: Authenticated, ) -> bool: now_ = now() user_id = uuid4() email_verified = INTERNAL_EMAIL_DOMAIN in user.email try: with dyn.transact_writer() as transact: transact.condition( key=KeyPair( pk='SUBSCRIPTION#FROZEN', sk=f'ORG#{org.id}', ), cond_expr='attribute_not_exists(sk)', exc_cls=SubscriptionFrozenError, ) transact.put( item=user.model_dump() | { 'id': user_id, 'sk': '0', 'email_verified': email_verified, 'tenant_id': {org.id}, # Post-migration (users): uncomment the folloing line # 'org_id': {org.id}, # 'created_at': now_, 'createDate': now_, # Makes the email searchable 'emails': {user.email}, }, ) transact.put( item={ 'id': user_id, 'sk': 'NEVER_LOGGED', 'created_at': now_, } ) transact.put( item={ 'id': user_id, 'sk': 'CREATED_BY', 'created_by': { 'id': created_by.id, 'name': created_by.name, }, 'created_at': now_, } ) transact.put( item={ 'id': user_id, # Post-migration: rename `emails` to `EMAIL` 'sk': f'emails#{user.email}', 'email_verified': email_verified, 'email_primary': True, 'created_at': now_, **({'mx_record_exists': True} if email_verified else {}), } ) if not email_verified: transact.put( item={ 'id': user_id, 'sk': f'EMAIL_VERIFICATION#{uuid4()}', 'welcome': True, 'name': user.name, 'email': user.email, 'org_name': org.name, 'ttl': ttl(start_dt=now_, days=30), 'created_at': now_, } ) transact.put( item={ # Post-migration (users): rename `cpf` to `CPF` 'id': 'cpf', 'sk': user.cpf, 'user_id': user_id, 'created_at': now_, }, cond_expr='attribute_not_exists(sk)', exc_cls=CPFConflictError, ) transact.put( item={ # Post-migration (users): rename `email` to `EMAIL` 'id': 'email', 'sk': user.email, 'user_id': user_id, 'created_at': now_, }, cond_expr='attribute_not_exists(sk)', exc_cls=EmailConflictError, ) transact.put( item={ 'id': user_id, 'sk': f'orgs#{org.id}', # Post-migration (users): uncomment the following line # pk=f'ORG#{org.id}', 'name': org.name, 'cnpj': org.cnpj, 'created_at': now_, } ) transact.put( item={ 'id': f'orgmembers#{org.id}', # Post-migration (users): uncomment the following line # pk=f'MEMBER#ORG#{org_id}', 'sk': user_id, 'created_at': now_, } ) transact.condition( key=KeyPair(org.id, '0'), # type: ignore cond_expr='attribute_exists(sk)', exc_cls=OrgNotFoundError, ) except (CPFConflictError, EmailConflictError): return False else: return True def _add_member(user_id: str, org: Org) -> None: now_ = now() with dyn.transact_writer() as transact: transact.condition( key=KeyPair( pk='SUBSCRIPTION#FROZEN', sk=f'ORG#{org.id}', ), cond_expr='attribute_not_exists(sk)', exc_cls=SubscriptionFrozenError, ) transact.update( key=KeyPair(user_id, '0'), # Post-migration (users): uncomment the following line # update_expr='ADD org_id :org_id', update_expr='ADD tenant_id :org_id', expr_attr_values={ ':org_id': {org.id}, }, cond_expr='attribute_exists(sk)', exc_cls=UserNotFoundError, ) transact.put( item={ 'id': user_id, # Post-migration (users): rename `orgs` to `ORG` 'sk': f'orgs#{org.id}', 'name': org.name, 'cnpj': org.cnpj, 'created_at': now_, } ) transact.put( item={ # Post-migration (users): uncomment the following line # pk=f'MEMBER#ORG#{org_id}', 'id': f'orgmembers#{org.id}', 'sk': user_id, 'created_at': now_, }, cond_expr='attribute_not_exists(sk)', exc_cls=UserConflictError, ) transact.condition( key=KeyPair(org.id, '0'), # type: ignore cond_expr='attribute_exists(sk)', exc_cls=OrgNotFoundError, ) def _get_user_id(user: User) -> str: user_id = dyn.collection.get_items( KeyPair( pk='email', sk=SortKey(user.email, path_spec='user_id'), rename_key='id', ) + KeyPair( pk='cpf', sk=SortKey(user.cpf, path_spec='user_id'), rename_key='id', ), flatten_top=False, ).get('id') if not user_id: raise UserNotFoundError('User not found') return user_id