from http import HTTPStatus from uuid import uuid4 from aws_lambda_powertools.event_handler.api_gateway import Router from aws_lambda_powertools.event_handler.exceptions import ( NotFoundError, ServiceError, ) from aws_lambda_powertools.event_handler.openapi.params import Body, Path, Query from layercake.dateutils import now, ttl from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair, SortKey, TransactKey from pydantic import EmailStr from typing_extensions import Annotated from api_gateway import JSONResponse from boto3clients import dynamodb_client from config import USER_TABLE router = Router() dyn = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) @router.get('//emails') def get_emails(user_id: str, start_key: Annotated[str | None, Query] = None): return dyn.collection.query( # Post-migration (users): rename `emails` to `EMAIL` key=KeyPair(user_id, 'emails'), start_key=start_key, ) class UserNotFoundError(NotFoundError): ... class EmailNotFoundError(NotFoundError): ... class EmailConflictError(ServiceError): def __init__(self, msg: str | dict): super().__init__(HTTPStatus.CONFLICT, msg) @router.post('//emails') def add( user_id: str, email: Annotated[EmailStr, Body(embed=True)], ): now_ = now() name = dyn.collection.get_item( KeyPair(user_id, SortKey('0', path_spec='name')), raise_on_error=False, ) with dyn.transact_writer() as transact: transact.condition( key=KeyPair(user_id, '0'), cond_expr='attribute_exists(sk)', exc_cls=UserNotFoundError, ) transact.put( item={ 'id': user_id, # Post-migration (users): rename `emails` to `EMAIL` 'sk': f'emails#{email}', 'email_verified': False, 'mx_record_exists': False, 'email_primary': False, 'created_at': now_, } ) transact.put( item={ # Post-migration (users): rename `email` to `EMAIL` 'id': 'email', 'sk': email, 'created_at': now_, }, cond_expr='attribute_not_exists(sk)', exc_cls=EmailConflictError, ) transact.put( item={ 'id': user_id, 'sk': f'EMAIL_VERIFICATION#{uuid4()}', 'name': name, 'email': email, 'ttl': ttl(start_dt=now_, days=30), 'created_at': now_, } ) return JSONResponse(status_code=HTTPStatus.CREATED) @router.post('//emails//request-verification') def request_verification( user_id: str, email: Annotated[EmailStr, Path], ): now_ = now() name = dyn.collection.get_item( KeyPair( pk=user_id, sk=SortKey('0', path_spec='name'), ), raise_on_error=False, ) dyn.put_item( item={ 'id': user_id, 'sk': f'EMAIL_VERIFICATION#{uuid4()}', 'name': name, 'email': email, 'ttl': ttl(start_dt=now_, days=30), 'created_at': now_, } ) return JSONResponse(status_code=HTTPStatus.NO_CONTENT) class EmailVerificationNotFoundError(NotFoundError): ... @router.post('//emails//verify') def verify(user_id: str, code: str): r = dyn.collection.get_items( TransactKey(user_id) + SortKey( sk='0', rename_key='email_primary', path_spec='email', ) + SortKey( sk=f'EMAIL_VERIFICATION#{code}', rename_key='email', path_spec='email', ), flatten_top=False, ) if 'email' not in r: raise EmailVerificationNotFoundError('Verification code not found') email, email_primary = r['email'], r['email_primary'] with dyn.transact_writer() as transact: transact.delete( key=KeyPair(user_id, f'EMAIL_VERIFICATION#{code}'), ) transact.update( # Post-migration (users): rename `emails` to `EMAIL` key=KeyPair(user_id, f'emails#{email}'), update_expr='SET email_verified = :true, updated_at = :now', expr_attr_values={ ':true': True, ':now': now(), }, cond_expr='attribute_exists(sk)', exc_cls=EmailNotFoundError, ) if email == email_primary: transact.update( key=KeyPair(user_id, '0'), update_expr='SET email_verified = :true, \ updated_at = :now', expr_attr_values={ ':true': True, ':now': now(), }, cond_expr='attribute_exists(sk)', exc_cls=UserNotFoundError, ) return JSONResponse(status_code=HTTPStatus.NO_CONTENT) @router.patch('//emails/primary') def primary( user_id: str, old_email: Annotated[EmailStr, Body(embed=True)], new_email: Annotated[EmailStr, Body(embed=True)], email_verified: Annotated[bool, Body(embed=True)], ): now_ = now() expr = 'SET email_primary = :email_primary, updated_at = :now' if new_email == old_email: return JSONResponse(status_code=HTTPStatus.NO_CONTENT) with dyn.transact_writer() as transact: # Set the old email as non-primary transact.update( # Post-migration (users): rename `emails` to `EMAIL` key=KeyPair(user_id, f'emails#{old_email}'), update_expr=expr, expr_attr_values={ ':email_primary': False, ':now': now_, }, cond_expr='attribute_exists(sk)', ) # Set the new email as primary transact.update( # Post-migration (users): rename `emails` to `EMAIL` key=KeyPair(user_id, f'emails#{new_email}'), update_expr=expr, expr_attr_values={ ':email_primary': True, ':now': now_, }, cond_expr='attribute_exists(sk)', ) transact.update( key=KeyPair(user_id, '0'), update_expr='DELETE emails :email_set \ SET email = :email, \ email_verified = :email_verified, \ updated_at = :now', expr_attr_values={ ':email': new_email, ':email_set': {new_email}, ':email_verified': email_verified, ':now': now_, }, cond_expr='attribute_exists(sk)', exc_cls=UserNotFoundError, ) return JSONResponse(status_code=HTTPStatus.NO_CONTENT) @router.delete('//emails/') def remove( user_id: str, email: Annotated[EmailStr, Path], ): with dyn.transact_writer() as transact: transact.delete( # Post-migration (users): rename `email` to `EMAIL` key=KeyPair('email', email), ) transact.delete( # Post-migration (users): rename `emails` to `EMAIL` key=KeyPair(user_id, f'emails#{email}'), # Delete any email except the primary email cond_expr='email_primary <> :email_primary', expr_attr_values={ ':email_primary': True, }, exc_cls=EmailConflictError, ) transact.update( key=KeyPair(user_id, '0'), update_expr='DELETE emails :email \ SET updated_at = :now', expr_attr_values={ ':email': {email}, ':now': now(), }, cond_expr='attribute_exists(sk)', exc_cls=UserNotFoundError, ) return JSONResponse(status_code=HTTPStatus.NO_CONTENT)