import base64 import json from http import HTTPStatus from typing import Annotated from aws_lambda_powertools.event_handler import Response from aws_lambda_powertools.event_handler.api_gateway import Router from aws_lambda_powertools.event_handler.exceptions import ( BadRequestError, ServiceError, ) from aws_lambda_powertools.event_handler.openapi.params import Body, Path from layercake.dateutils import now from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair from passlib.hash import pbkdf2_sha256 from boto3clients import dynamodb_client from config import USER_TABLE from .authentication import UserNotFoundError, cookie router = Router() dyn = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) class GoneError(ServiceError): def __init__(self, msg: str | dict): super().__init__(HTTPStatus.GONE, msg) class InvalidCodeError(GoneError): ... @router.post('/reset/') def reset( new_password: Annotated[str, Body(min_length=6, embed=True)], token: Annotated[str, Path], ): try: decoded_data = json.loads(base64.urlsafe_b64decode(token)) user_id, code = decoded_data['user_id'], decoded_data['code'] except Exception as exc: raise BadRequestError(str(exc)) with dyn.transact_writer() as transact: transact.condition( key=KeyPair(user_id, '0'), cond_expr='attribute_exists(sk)', exc_cls=UserNotFoundError, ) transact.delete( key=KeyPair('PASSWORD_RESET', f'CODE#{code}'), cond_expr='attribute_exists(sk)', exc_cls=InvalidCodeError, ) transact.delete( key=KeyPair('PASSWORD_RESET', f'USER#{user_id}'), cond_expr='attribute_exists(sk)', exc_cls=InvalidCodeError, ) transact.put( item={ 'id': user_id, 'sk': 'PASSWORD', 'hash': pbkdf2_sha256.hash(new_password), 'created_at': now(), } ) return Response( status_code=HTTPStatus.OK, cookies=[ cookie(user_id), ], )