Files
saladeaula.digital/id.saladeaula.digital/app/routes/register.py

232 lines
6.8 KiB
Python

from dataclasses import asdict, dataclass
from http import HTTPStatus
from typing import Annotated
from uuid import uuid4
from aws_lambda_powertools.event_handler import content_types
from aws_lambda_powertools.event_handler.api_gateway import Response, Router
from aws_lambda_powertools.event_handler.exceptions import NotFoundError, ServiceError
from aws_lambda_powertools.event_handler.openapi.params import Body
from aws_lambda_powertools.shared.cookies import Cookie
from layercake.dateutils import now, ttl
from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair
from layercake.extra_types import CpfStr, NameStr
from layercake.funcs import pick
from passlib.hash import pbkdf2_sha256
from pydantic import UUID4, EmailStr
from boto3clients import dynamodb_client
from config import OAUTH2_TABLE, SESSION_EXPIRES_IN
from .authentication import new_session
router = Router()
dyn = DynamoDBPersistenceLayer(OAUTH2_TABLE, dynamodb_client)
class ConflictError(ServiceError):
def __init__(self, msg: str | dict):
super().__init__(HTTPStatus.CONFLICT, msg)
class UserNotFound(NotFoundError): ...
class CPFConflictError(ConflictError): ...
class EmailConflictError(ConflictError): ...
class NeverLoggedConflictError(ConflictError): ...
@dataclass(frozen=True)
class User:
id: str
name: str
email: str
cpf: str
@router.post('/register')
def register(
id: Annotated[UUID4, Body(embed=True, alias='id', default_factory=uuid4)],
name: Annotated[NameStr, Body(embed=True)],
email: Annotated[EmailStr, Body(embed=True)],
password: Annotated[str, Body(min_length=6, embed=True)],
cpf: Annotated[CpfStr, Body(embed=True)],
):
new_user = User(id=str(id), name=name, email=email, cpf=cpf)
existing = dyn.collection.get_item(
KeyPair(str(id), '0'),
default=False,
raise_on_error=False,
)
if existing:
_update_user(
old_user=User(**pick(('id', 'name', 'email', 'cpf'), existing)),
new_user=new_user,
password=password,
)
return Response(
content_type=content_types.APPLICATION_JSON,
status_code=HTTPStatus.OK,
compress=True,
body=asdict(new_user),
cookies=[
_cookie(existing['id']),
],
)
_create_user(user=new_user, password=password)
return Response(
content_type=content_types.APPLICATION_JSON,
status_code=HTTPStatus.CREATED,
compress=True,
body=asdict(new_user),
cookies=[
_cookie(new_user.id),
],
)
def _cookie(user_id: str) -> Cookie:
return Cookie(
name='SID',
value=new_session(user_id),
http_only=True,
secure=True,
same_site=None,
max_age=SESSION_EXPIRES_IN,
)
def _create_user(*, user: User, password: str):
now_ = now()
with dyn.transact_writer() as transact:
transact.put(
item={
'sk': '0',
'email_verified': False,
'createdDate': now_,
# Post-migration (users): uncomment the folloing line
# 'created_at': now_,
}
| asdict(user),
)
transact.put(
item={
'id': user.id,
# Post-migration (users): rename `emails` to `EMAIL`
'sk': f'emails#{user.email}',
'email_verified': False,
'email_primary': True,
'created_at': now_,
}
)
transact.put(
item={
'id': user.id,
'sk': 'PASSWORD',
'hash': pbkdf2_sha256.hash(password),
'created_at': now_,
}
)
transact.put(
item={
# Post-migration (users): rename `cpf` to `CPF`
'id': 'cpf',
'sk': user.cpf,
'user_id': user.id,
'created_at': now_,
},
cond_expr='attribute_not_exists(sk)',
exc_cls=CPFConflictError,
)
transact.put(
item={
# Post-migration (users): rename `email` to `EMAIL`
'id': 'email',
'sk': user.email,
'user_id': user.id,
'created_at': now_,
},
cond_expr='attribute_not_exists(sk)',
exc_cls=EmailConflictError,
)
def _update_user(*, old_user: User, new_user: User, password: str):
now_ = now()
with dyn.transact_writer() as transact:
transact.update(
key=KeyPair(new_user.id, '0'),
update_expr='SET #name = :name, \
email = :email, \
email_verified = :false, \
updated_at = :now \
ADD emails :emails', # Makes the email searchable
expr_attr_names={
'#name': 'name',
},
expr_attr_values={
':name': new_user.name,
':email': new_user.email,
':emails': {new_user.email},
':false': False,
':now': now_,
},
cond_expr='attribute_exists(sk)',
)
transact.put(
item={
'id': new_user.id,
'sk': 'PASSWORD',
'hash': pbkdf2_sha256.hash(password),
'created_at': now_,
}
)
transact.delete(
key=KeyPair(new_user.id, 'NEVER_LOGGED'),
cond_expr='attribute_exists(sk)',
exc_cls=NeverLoggedConflictError,
)
if new_user.email != old_user.email:
transact.put(
item={
'id': new_user.id,
# Post-migration (users): rename `emails` to `EMAIL`
'sk': f'emails#{new_user.email}',
'email_verified': False,
'email_primary': True,
'created_at': now_,
}
)
transact.put(
item={
'id': new_user.id,
'sk': f'EMAIL_VERIFICATION#{uuid4()}',
'name': new_user.name,
'email': new_user.email,
'ttl': ttl(start_dt=now_, days=30),
'created_at': now_,
}
)
transact.put(
item={
# Post-migration (users): rename `email` to `EMAIL`
'id': 'email',
'sk': new_user.email,
'created_at': now_,
},
cond_expr='attribute_not_exists(sk)',
exc_cls=EmailConflictError,
)