from http import HTTPStatus from uuid import uuid4 from aws_lambda_powertools.event_handler.api_gateway import Router from aws_lambda_powertools.event_handler.exceptions import ( NotFoundError, ServiceError, ) from aws_lambda_powertools.event_handler.openapi.params import Body, Path, Query from layercake.dateutils import now, ttl from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair, SortKey from pydantic import EmailStr from typing_extensions import Annotated from api_gateway import JSONResponse from boto3clients import dynamodb_client from config import USER_TABLE router = Router() dyn = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) @router.get('//emails') def get_emails(user_id: str, start_key: Annotated[str | None, Query] = None): return dyn.collection.query( # Post-migration (users): rename `emails` to `EMAIL` key=KeyPair(user_id, 'emails'), start_key=start_key, ) class EmailConflictError(ServiceError): def __init__(self, msg: str | dict): super().__init__(HTTPStatus.CONFLICT, msg) @router.post('//emails') def add( user_id: str, email: Annotated[EmailStr, Body(embed=True)], ): now_ = now() name = dyn.collection.get_item( KeyPair(user_id, SortKey('0', path_spec='name')), raise_on_error=False, ) with dyn.transact_writer() as transact: transact.put( item={ 'id': user_id, # Post-migration (users): rename `emails` to `EMAIL` 'sk': f'emails#{email}', 'email_verified': False, 'email_primary': True, 'created_at': now_, } ) transact.put( item={ # Post-migration (users): rename `email` to `EMAIL` 'id': 'email', 'sk': email, 'created_at': now_, }, cond_expr='attribute_not_exists(sk)', exc_cls=EmailConflictError, ) transact.put( item={ 'id': user_id, 'sk': f'EMAIL_VERIFICATION#{uuid4()}', 'name': name, 'email': email, 'user_id': user_id, 'ttl': ttl(start_dt=now_, days=30), 'created_at': now_, } ) return JSONResponse(status_code=HTTPStatus.CREATED) @router.post('//emails//request-verification') def request_verification(user_id: str, email: Annotated[EmailStr, Path]): now_ = now() name = dyn.collection.get_item( KeyPair(user_id, SortKey('0', path_spec='name')), raise_on_error=False, ) dyn.put_item( item={ 'id': user_id, 'sk': f'EMAIL_VERIFICATION#{uuid4()}', 'name': name, 'email': email, 'user_id': user_id, 'ttl': ttl(start_dt=now_, days=30), 'created_at': now_, } ) return JSONResponse(status_code=HTTPStatus.NO_CONTENT) class EmailVerificationNotFoundError(NotFoundError): ... @router.post('//emails//verify') def verify(user_id: str, hash: str): email = dyn.collection.get_item( KeyPair( pk=user_id, sk=SortKey(f'EMAIL_VERIFICATION#{hash}', path_spec='email'), ), exc_cls=EmailVerificationNotFoundError, ) with dyn.transact_writer() as transact: transact.delete( key=KeyPair(user_id, f'EMAIL_VERIFICATION#{hash}'), ) transact.update( # Post-migration (users): rename `emails` to `EMAIL` key=KeyPair(user_id, f'emails#{email}'), update_expr='SET email_verified = :true, updated_at = :now', expr_attr_values={ ':true': True, ':now': now(), }, ) return JSONResponse(status_code=HTTPStatus.NO_CONTENT) @router.patch('//emails/primary') def primary( user_id: str, old_email: Annotated[EmailStr, Body(embed=True)], new_email: Annotated[EmailStr, Body(embed=True)], email_verified: Annotated[bool, Body(embed=True)], ): now_ = now() expr = 'SET email_primary = :email_primary, updated_at = :updated_at' with dyn.transact_writer() as transact: # Set the old email as non-primary transact.update( # Post-migration (users): rename `emails` to `EMAIL` key=KeyPair(user_id, f'emails#{old_email}'), update_expr=expr, expr_attr_values={ ':email_primary': False, ':updated_at': now_, }, cond_expr='attribute_exists(sk)', ) # Set the new email as primary transact.update( # Post-migration (users): rename `emails` to `EMAIL` key=KeyPair(user_id, f'emails#{new_email}'), update_expr=expr, expr_attr_values={ ':email_primary': True, ':updated_at': now_, }, cond_expr='attribute_exists(sk)', ) transact.update( key=KeyPair(user_id, '0'), update_expr='DELETE emails :email_set \ SET email = :email, \ email_verified = :email_verified, \ updated_at = :updated_at', expr_attr_values={ ':email': new_email, ':email_set': {new_email}, ':email_verified': email_verified, ':updated_at': now_, }, ) return JSONResponse(status_code=HTTPStatus.NO_CONTENT) @router.delete('//emails/') def remove( user_id: str, email: Annotated[EmailStr, Path], ): with dyn.transact_writer() as transact: transact.delete( # Post-migration (users): rename `email` to `EMAIL` key=KeyPair('email', email), ) transact.delete( # Post-migration (users): rename `emails` to `EMAIL` key=KeyPair(user_id, f'emails#{email}'), # Delete any email except the primary email cond_expr='email_primary <> :email_primary', expr_attr_values={ ':email_primary': True, }, exc_cls=EmailConflictError, ) transact.update( key=KeyPair(user_id, '0'), update_expr='DELETE emails :email', expr_attr_values={ ':email': {email}, }, ) return JSONResponse(status_code=HTTPStatus.NO_CONTENT)