76 lines
1.8 KiB
Python
76 lines
1.8 KiB
Python
from http import HTTPStatus
|
|
from typing import Annotated
|
|
|
|
from aws_lambda_powertools.event_handler.api_gateway import Router
|
|
from aws_lambda_powertools.event_handler.openapi.params import Body
|
|
from layercake.dynamodb import (
|
|
DynamoDBPersistenceLayer,
|
|
KeyPair,
|
|
PrefixKey,
|
|
)
|
|
from pydantic import BaseModel, EmailStr
|
|
|
|
from api_gateway import JSONResponse
|
|
from boto3clients import dynamodb_client
|
|
from config import USER_TABLE
|
|
from rules.user import add_email, remove_email, set_email_as_primary
|
|
|
|
router = Router()
|
|
user_layer = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client)
|
|
|
|
|
|
@router.get('/<id>/emails', compress=True)
|
|
def get_emails(id: str):
|
|
start_key = router.current_event.get_query_string_value('start_key', None)
|
|
|
|
return user_layer.collection.query(
|
|
KeyPair(id, PrefixKey('emails')),
|
|
start_key=start_key,
|
|
)
|
|
|
|
|
|
class Email(BaseModel):
|
|
email: EmailStr
|
|
|
|
|
|
@router.post('/<id>/emails', compress=True)
|
|
def add_email_(id: str, email: Annotated[str, Body(embed=True)]):
|
|
add_email(id, email, persistence_layer=user_layer)
|
|
|
|
return JSONResponse(
|
|
body={'email': email},
|
|
status_code=HTTPStatus.CREATED,
|
|
)
|
|
|
|
|
|
class EmailAsPrimary(BaseModel):
|
|
new_email: EmailStr
|
|
old_email: EmailStr
|
|
email_verified: bool = False
|
|
|
|
|
|
@router.patch('/<id>/emails', compress=True)
|
|
def patch_email(id: str, payload: EmailAsPrimary):
|
|
set_email_as_primary(
|
|
id,
|
|
payload.new_email,
|
|
payload.old_email,
|
|
email_verified=payload.email_verified,
|
|
persistence_layer=user_layer,
|
|
)
|
|
|
|
return JSONResponse(
|
|
body=payload,
|
|
status_code=HTTPStatus.OK,
|
|
)
|
|
|
|
|
|
@router.delete('/<id>/emails', compress=True)
|
|
def delete_email(id: str, payload: Email):
|
|
remove_email(
|
|
id,
|
|
payload.email,
|
|
persistence_layer=user_layer,
|
|
)
|
|
return payload
|