272 lines
8.0 KiB
Python
272 lines
8.0 KiB
Python
from http import HTTPStatus
|
|
from uuid import uuid4
|
|
|
|
from aws_lambda_powertools.event_handler.api_gateway import Router
|
|
from aws_lambda_powertools.event_handler.exceptions import (
|
|
NotFoundError,
|
|
ServiceError,
|
|
)
|
|
from aws_lambda_powertools.event_handler.openapi.params import Body, Path, Query
|
|
from layercake.dateutils import now, ttl
|
|
from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair, SortKey, TransactKey
|
|
from pydantic import EmailStr
|
|
from typing_extensions import Annotated
|
|
|
|
from api_gateway import JSONResponse
|
|
from boto3clients import dynamodb_client
|
|
from config import USER_TABLE
|
|
|
|
router = Router()
|
|
dyn = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client)
|
|
|
|
|
|
class ConflictError(ServiceError):
|
|
def __init__(self, msg: str | dict):
|
|
super().__init__(HTTPStatus.CONFLICT, msg)
|
|
|
|
|
|
class UserNotFoundError(NotFoundError): ...
|
|
|
|
|
|
class EmailNotFoundError(NotFoundError): ...
|
|
|
|
|
|
class EmailVerificationNotFoundError(NotFoundError): ...
|
|
|
|
|
|
class EmailConflictError(ConflictError): ...
|
|
|
|
|
|
@router.get('/<user_id>/emails')
|
|
def get_emails(user_id: str, start_key: Annotated[str | None, Query] = None):
|
|
return dyn.collection.query(
|
|
# Post-migration (users): rename `emails` to `EMAIL`
|
|
key=KeyPair(user_id, 'emails'),
|
|
start_key=start_key,
|
|
)
|
|
|
|
|
|
@router.post('/<user_id>/emails')
|
|
def add(
|
|
user_id: str,
|
|
email: Annotated[EmailStr, Body(embed=True)],
|
|
):
|
|
now_ = now()
|
|
name = dyn.collection.get_item(
|
|
KeyPair(user_id, SortKey('0', path_spec='name')),
|
|
raise_on_error=False,
|
|
)
|
|
|
|
with dyn.transact_writer() as transact:
|
|
transact.update(
|
|
key=KeyPair(user_id, '0'),
|
|
# Makes the email searchable
|
|
update_expr='ADD emails :email',
|
|
expr_attr_values={
|
|
':email': {email},
|
|
},
|
|
cond_expr='attribute_exists(sk)',
|
|
exc_cls=UserNotFoundError,
|
|
)
|
|
transact.put(
|
|
item={
|
|
'id': user_id,
|
|
# Post-migration (users): rename `emails` to `EMAIL`
|
|
'sk': f'emails#{email}',
|
|
'email_verified': False,
|
|
'email_primary': False,
|
|
'created_at': now_,
|
|
}
|
|
)
|
|
transact.put(
|
|
item={
|
|
'id': user_id,
|
|
'sk': f'EMAIL_VERIFICATION#{uuid4()}',
|
|
'name': name,
|
|
'email': email,
|
|
'ttl': ttl(start_dt=now_, days=30),
|
|
'created_at': now_,
|
|
}
|
|
)
|
|
transact.put(
|
|
item={
|
|
# Post-migration (users): rename `email` to `EMAIL`
|
|
'id': 'email',
|
|
'sk': email,
|
|
'created_at': now_,
|
|
},
|
|
# Prevent duplicate emails
|
|
cond_expr='attribute_not_exists(sk)',
|
|
exc_cls=EmailConflictError,
|
|
)
|
|
|
|
return JSONResponse(status_code=HTTPStatus.CREATED)
|
|
|
|
|
|
@router.post('/<user_id>/emails/<email>/request-verification')
|
|
def request_verification(
|
|
user_id: str,
|
|
email: Annotated[EmailStr, Path],
|
|
):
|
|
now_ = now()
|
|
name = dyn.collection.get_item(
|
|
KeyPair(
|
|
pk=user_id,
|
|
sk=SortKey('0', path_spec='name'),
|
|
),
|
|
raise_on_error=False,
|
|
)
|
|
dyn.put_item(
|
|
item={
|
|
'id': user_id,
|
|
'sk': f'EMAIL_VERIFICATION#{uuid4()}',
|
|
'name': name,
|
|
'email': email,
|
|
'ttl': ttl(start_dt=now_, days=30),
|
|
'created_at': now_,
|
|
}
|
|
)
|
|
return JSONResponse(status_code=HTTPStatus.NO_CONTENT)
|
|
|
|
|
|
@router.post('/<user_id>/emails/<code>/verify')
|
|
def verify(user_id: str, code: str):
|
|
r = dyn.collection.get_items(
|
|
TransactKey(user_id)
|
|
+ SortKey(
|
|
sk='0',
|
|
rename_key='email_primary',
|
|
path_spec='email',
|
|
)
|
|
+ SortKey(
|
|
sk=f'EMAIL_VERIFICATION#{code}',
|
|
rename_key='email',
|
|
path_spec='email',
|
|
),
|
|
flatten_top=False,
|
|
)
|
|
|
|
if 'email' not in r:
|
|
raise EmailVerificationNotFoundError('Verification code not found')
|
|
|
|
email, email_primary = r['email'], r['email_primary']
|
|
|
|
with dyn.transact_writer() as transact:
|
|
transact.delete(
|
|
key=KeyPair(user_id, f'EMAIL_VERIFICATION#{code}'),
|
|
)
|
|
transact.update(
|
|
# Post-migration (users): rename `emails` to `EMAIL`
|
|
key=KeyPair(user_id, f'emails#{email}'),
|
|
update_expr='SET email_verified = :true, updated_at = :now',
|
|
expr_attr_values={
|
|
':true': True,
|
|
':now': now(),
|
|
},
|
|
cond_expr='attribute_exists(sk)',
|
|
exc_cls=EmailNotFoundError,
|
|
)
|
|
|
|
if email == email_primary:
|
|
transact.update(
|
|
key=KeyPair(user_id, '0'),
|
|
update_expr='SET email_verified = :true, \
|
|
updated_at = :now',
|
|
expr_attr_values={
|
|
':true': True,
|
|
':now': now(),
|
|
},
|
|
cond_expr='attribute_exists(sk)',
|
|
exc_cls=UserNotFoundError,
|
|
)
|
|
|
|
return JSONResponse(status_code=HTTPStatus.OK, body={'email_verified': email})
|
|
|
|
|
|
@router.patch('/<user_id>/emails/primary')
|
|
def primary(
|
|
user_id: str,
|
|
old_email: Annotated[EmailStr, Body(embed=True)],
|
|
new_email: Annotated[EmailStr, Body(embed=True)],
|
|
email_verified: Annotated[bool, Body(embed=True)],
|
|
):
|
|
now_ = now()
|
|
expr = 'SET email_primary = :email_primary, updated_at = :now'
|
|
|
|
if new_email == old_email:
|
|
return JSONResponse(status_code=HTTPStatus.NO_CONTENT)
|
|
|
|
with dyn.transact_writer() as transact:
|
|
# Set the old email as non-primary
|
|
transact.update(
|
|
# Post-migration (users): rename `emails` to `EMAIL`
|
|
key=KeyPair(user_id, f'emails#{old_email}'),
|
|
update_expr=expr,
|
|
expr_attr_values={
|
|
':email_primary': False,
|
|
':now': now_,
|
|
},
|
|
cond_expr='attribute_exists(sk)',
|
|
)
|
|
# Set the new email as primary
|
|
transact.update(
|
|
# Post-migration (users): rename `emails` to `EMAIL`
|
|
key=KeyPair(user_id, f'emails#{new_email}'),
|
|
update_expr=expr,
|
|
expr_attr_values={
|
|
':email_primary': True,
|
|
':now': now_,
|
|
},
|
|
cond_expr='attribute_exists(sk)',
|
|
)
|
|
transact.update(
|
|
key=KeyPair(user_id, '0'),
|
|
update_expr='SET email = :email, \
|
|
email_verified = :email_verified, \
|
|
updated_at = :now',
|
|
expr_attr_values={
|
|
':email': new_email,
|
|
':email_verified': email_verified,
|
|
':now': now_,
|
|
},
|
|
cond_expr='attribute_exists(sk)',
|
|
exc_cls=UserNotFoundError,
|
|
)
|
|
|
|
return JSONResponse(status_code=HTTPStatus.NO_CONTENT)
|
|
|
|
|
|
@router.delete('/<user_id>/emails/<email>')
|
|
def remove(
|
|
user_id: str,
|
|
email: Annotated[EmailStr, Path],
|
|
):
|
|
with dyn.transact_writer() as transact:
|
|
transact.delete(
|
|
# Post-migration (users): rename `email` to `EMAIL`
|
|
key=KeyPair('email', email),
|
|
)
|
|
transact.delete(
|
|
# Post-migration (users): rename `emails` to `EMAIL`
|
|
key=KeyPair(user_id, f'emails#{email}'),
|
|
# Delete any email except the primary email
|
|
cond_expr='email_primary <> :email_primary',
|
|
expr_attr_values={
|
|
':email_primary': True,
|
|
},
|
|
exc_cls=EmailConflictError,
|
|
)
|
|
transact.update(
|
|
key=KeyPair(user_id, '0'),
|
|
update_expr='DELETE emails :email \
|
|
SET updated_at = :now',
|
|
expr_attr_values={
|
|
':email': {email},
|
|
':now': now(),
|
|
},
|
|
cond_expr='attribute_exists(sk)',
|
|
exc_cls=UserNotFoundError,
|
|
)
|
|
|
|
return JSONResponse(status_code=HTTPStatus.NO_CONTENT)
|