from http import HTTPStatus from typing import Annotated from aws_lambda_powertools.event_handler.api_gateway import Router from aws_lambda_powertools.event_handler.exceptions import ( BadRequestError, ServiceError, ) from aws_lambda_powertools.event_handler.openapi.params import Path from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair, SortKey, TransactKey from layercake.extra_types import CnpjStr, CpfStr from layercake.funcs import pick from pydantic import EmailStr from boto3clients import dynamodb_client from config import USER_TABLE router = Router() dyn = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) class UserAlreadyOnboardedError(ServiceError): def __init__(self, msg: str | dict): super().__init__(HTTPStatus.CONFLICT, msg) @router.get('/lookup') def lookup( email: Annotated[EmailStr | None, Path] = None, cpf: Annotated[CpfStr | None, Path] = None, cnpj: Annotated[CnpjStr | None, Path] = None, ): if not any([email, cpf, cnpj]): raise BadRequestError('Malformed body request') r = dyn.collection.get_items( KeyPair( pk='email', sk=SortKey(email, path_spec='user_id'), # type: ignore rename_key='user_id', ) + KeyPair( pk='cpf', sk=SortKey(cpf, path_spec='user_id'), # type: ignore rename_key='user_id', ) + KeyPair( pk='cnpj', sk=SortKey(cnpj, path_spec='user_id'), # type: ignore rename_key='org_id', ), flatten_top=False, ) if cnpj: return pick(('org_id',), r) if 'user_id' not in r: return {} user = dyn.collection.get_items( TransactKey(r['user_id']) + SortKey('0') + SortKey( sk='NEVER_LOGGED', rename_key='never_logged', ) ) if 'never_logged' not in user: raise UserAlreadyOnboardedError('User is already onboarded') return {'never_logged': True} | pick(('id', 'name', 'email'), user)