Files
saladeaula.digital/http-api/app/routes/users/emails.py
2025-05-16 14:29:14 -03:00

106 lines
2.5 KiB
Python

from http import HTTPStatus
from aws_lambda_powertools.event_handler.api_gateway import Router
from aws_lambda_powertools.event_handler.exceptions import (
BadRequestError as PowertoolsBadRequestError,
)
from layercake.dynamodb import (
DynamoDBCollection,
DynamoDBPersistenceLayer,
KeyPair,
MissingError,
PrefixKey,
)
from pydantic import BaseModel, EmailStr
from api_gateway import JSONResponse
from boto3clients import dynamodb_client
from conf import USER_TABLE
from middlewares import AuditLogMiddleware
from rules.user import add_email, del_email, set_email_as_primary
class BadRequestError(MissingError, PowertoolsBadRequestError): ...
router = Router()
user_layer = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client)
user_collect = DynamoDBCollection(user_layer, exception_cls=BadRequestError)
@router.get(
'/<id>/emails',
compress=True,
tags=['User'],
summary='Get user emails',
)
def get_emails(id: str):
return user_collect.query(
KeyPair(id, PrefixKey('emails')),
start_key=router.current_event.get_query_string_value('start_key', None),
)
class Email(BaseModel):
email: EmailStr
@router.post(
'/<id>/emails',
compress=True,
tags=['User'],
summary='Add user email',
middlewares=[AuditLogMiddleware('EMAIL_ADD', user_collect, ('email',))],
)
def post_email(id: str, payload: Email):
add_email(id, payload.email, persistence_layer=user_layer)
return JSONResponse(
body=payload,
status_code=HTTPStatus.CREATED,
)
class EmailAsPrimary(BaseModel):
new_email: EmailStr
old_email: EmailStr
email_verified: bool = False
@router.patch(
'/<id>/emails',
compress=True,
tags=['User'],
summary='Add user email as primary',
middlewares=[
AuditLogMiddleware(
'EMAIL_CHANGE',
user_collect,
(
'new_email',
'old_email',
),
)
],
)
def patch_email(id: str, payload: EmailAsPrimary):
set_email_as_primary(
id,
payload.new_email,
payload.old_email,
email_verified=payload.email_verified,
persistence_layer=user_layer,
)
return JSONResponse(body=payload, status_code=HTTPStatus.OK)
@router.delete(
'/<id>/emails',
compress=True,
tags=['User'],
summary='Delete user email',
middlewares=[AuditLogMiddleware('EMAIL_DEL', user_collect, ('email',))],
)
def delete_email(id: str, payload: Email):
del_email(id, payload.email, persistence_layer=user_layer)
return payload