from http import HTTPStatus from typing import Annotated from uuid import uuid4 import jwt from aws_lambda_powertools.event_handler import ( Response, ) from aws_lambda_powertools.event_handler.api_gateway import Router from aws_lambda_powertools.event_handler.exceptions import ForbiddenError, NotFoundError from aws_lambda_powertools.event_handler.openapi.params import Body from aws_lambda_powertools.shared.cookies import Cookie from layercake.dateutils import now, ttl from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair, SortKey from passlib.hash import pbkdf2_sha256 from boto3clients import dynamodb_client from config import ( ISSUER, JWT_ALGORITHM, JWT_SECRET, OAUTH2_REFRESH_TOKEN_EXPIRES_IN, OAUTH2_TABLE, ) router = Router() dyn = DynamoDBPersistenceLayer(OAUTH2_TABLE, dynamodb_client) @router.post('/session') def session( username: Annotated[str, Body()], password: Annotated[str, Body()], ): user_id, password_hash = _get_user(username) if not pbkdf2_sha256.verify(password, password_hash): raise ForbiddenError('Invalid credentials') return Response( status_code=HTTPStatus.OK, cookies=[ Cookie( name='session_id', value=new_session(user_id), http_only=True, secure=True, same_site=None, max_age=OAUTH2_REFRESH_TOKEN_EXPIRES_IN, ) ], ) def _get_user(username: str) -> tuple[str, str]: sk = SortKey(username, path_spec='user_id') user = dyn.collection.get_items( KeyPair(pk='email', sk=sk, rename_key=sk.path_spec) + KeyPair(pk='cpf', sk=sk, rename_key=sk.path_spec), ) if not user: raise UserNotFoundError() password = dyn.collection.get_item( KeyPair( pk=user['user_id'], sk=SortKey( sk='PASSWORD', path_spec='hash', rename_key='password', ), ), exc_cls=UserNotFoundError, ) return user['user_id'], password def new_session(sub: str) -> str: session_id = str(uuid4()) now_ = now() exp = ttl(start_dt=now_, seconds=OAUTH2_REFRESH_TOKEN_EXPIRES_IN) token = jwt.encode( { 'sid': session_id, 'sub': sub, 'iss': ISSUER, 'iat': int(now_.timestamp()), 'exp': exp, }, JWT_SECRET, algorithm=JWT_ALGORITHM, ) with dyn.transact_writer() as transact: transact.put( item={ 'id': 'SESSION', 'sk': session_id, 'user_id': sub, 'ttl': exp, 'created_at': now_, } ) transact.put( item={ 'id': sub, 'sk': f'SESSION#{session_id}', 'ttl': exp, 'created_at': now_, } ) return token class UserNotFoundError(NotFoundError): def __init__(self, *_): super().__init__('User not found')