Files
saladeaula.digital/http-api/app/routes/users/emails.py
2025-08-15 00:06:05 -03:00

76 lines
1.8 KiB
Python

from http import HTTPStatus
from typing import Annotated
from aws_lambda_powertools.event_handler.api_gateway import Router
from aws_lambda_powertools.event_handler.openapi.params import Body
from layercake.dynamodb import (
DynamoDBPersistenceLayer,
KeyPair,
PrefixKey,
)
from pydantic import BaseModel, EmailStr
from api_gateway import JSONResponse
from boto3clients import dynamodb_client
from config import USER_TABLE
from rules.user import add_email, remove_email, set_email_as_primary
router = Router()
user_layer = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client)
@router.get('/<id>/emails', compress=True)
def get_emails(id: str):
start_key = router.current_event.get_query_string_value('start_key', None)
return user_layer.collection.query(
KeyPair(id, PrefixKey('emails')),
start_key=start_key,
)
class Email(BaseModel):
email: EmailStr
@router.post('/<id>/emails', compress=True)
def add_email_(id: str, email: Annotated[str, Body(embed=True)]):
add_email(id, email, persistence_layer=user_layer)
return JSONResponse(
body={'email': email},
status_code=HTTPStatus.CREATED,
)
class EmailAsPrimary(BaseModel):
new_email: EmailStr
old_email: EmailStr
email_verified: bool = False
@router.patch('/<id>/emails', compress=True)
def patch_email(id: str, payload: EmailAsPrimary):
set_email_as_primary(
id,
payload.new_email,
payload.old_email,
email_verified=payload.email_verified,
persistence_layer=user_layer,
)
return JSONResponse(
body=payload,
status_code=HTTPStatus.OK,
)
@router.delete('/<id>/emails', compress=True)
def delete_email(id: str, payload: Email):
remove_email(
id,
payload.email,
persistence_layer=user_layer,
)
return payload