117 lines
3.2 KiB
Python
117 lines
3.2 KiB
Python
from dataclasses import dataclass
|
|
from http import HTTPStatus
|
|
from typing import Annotated
|
|
from uuid import uuid4
|
|
|
|
from aws_lambda_powertools.event_handler import content_types
|
|
from aws_lambda_powertools.event_handler.api_gateway import Response, Router
|
|
from aws_lambda_powertools.event_handler.exceptions import NotFoundError
|
|
from aws_lambda_powertools.event_handler.openapi.params import Body
|
|
from layercake.dateutils import now, ttl
|
|
from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair, SortKey
|
|
from layercake.extra_types import CpfStr
|
|
from layercake.funcs import pick
|
|
from pydantic import EmailStr
|
|
|
|
from boto3clients import dynamodb_client
|
|
from config import USER_TABLE
|
|
|
|
router = Router()
|
|
dyn = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client)
|
|
|
|
|
|
class UserNotFoundError(NotFoundError): ...
|
|
|
|
|
|
@router.post('/forgot', compress=True)
|
|
def forgot(username: Annotated[EmailStr | CpfStr, Body(embed=True)]):
|
|
now_ = now()
|
|
user = _get_user(username)
|
|
reset_ttl = ttl(start_dt=now_, hours=3)
|
|
code = str(uuid4())
|
|
|
|
with dyn.transact_writer() as transact:
|
|
transact.update(
|
|
key=KeyPair(
|
|
pk='PASSWORD_RESET',
|
|
sk=f'USER#{user.id}',
|
|
),
|
|
update_expr='SET #name = :name, \
|
|
email = :email, \
|
|
#ttl = :ttl, \
|
|
updated_at = :now \
|
|
ADD code_attempts :code',
|
|
expr_attr_names={
|
|
'#name': 'name',
|
|
'#ttl': 'ttl',
|
|
},
|
|
expr_attr_values={
|
|
':name': user.name,
|
|
':email': user.email,
|
|
':ttl': reset_ttl,
|
|
':code': {code},
|
|
':now': now_,
|
|
},
|
|
)
|
|
dyn.put_item(
|
|
item={
|
|
'id': 'PASSWORD_RESET',
|
|
'sk': f'CODE#{code}',
|
|
'name': user.name,
|
|
'email': user.email,
|
|
'user_id': user.id,
|
|
'ttl': reset_ttl,
|
|
'created_at': now_,
|
|
}
|
|
)
|
|
|
|
return Response(
|
|
content_type=content_types.APPLICATION_JSON,
|
|
status_code=HTTPStatus.CREATED,
|
|
body={
|
|
'email': mask_email(user.email),
|
|
},
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class User:
|
|
id: str
|
|
name: str
|
|
email: str
|
|
|
|
|
|
def _get_user(username: str) -> User:
|
|
user_id = dyn.collection.get_items(
|
|
KeyPair(
|
|
pk='email',
|
|
sk=SortKey(username, path_spec='user_id'), # type: ignore
|
|
rename_key='user_id',
|
|
)
|
|
+ KeyPair(
|
|
pk='cpf',
|
|
sk=SortKey(username, path_spec='user_id'), # type: ignore
|
|
rename_key='user_id',
|
|
),
|
|
flatten_top=False,
|
|
).get('user_id')
|
|
|
|
if not user_id:
|
|
raise UserNotFoundError('User not found')
|
|
|
|
user = dyn.get_item(
|
|
KeyPair(
|
|
pk=user_id,
|
|
sk='0',
|
|
)
|
|
)
|
|
return User(
|
|
**pick(('id', 'name', 'email'), user),
|
|
)
|
|
|
|
|
|
def mask_email(email):
|
|
username, domain = email.split('@')
|
|
username = username[0] + '*' * (len(username) - 3) + username[-2:]
|
|
return f'{username}@{domain}'
|