from http import HTTPStatus from pathlib import Path from typing import Annotated from aws_lambda_powertools.event_handler import ( Response, ) from aws_lambda_powertools.event_handler.api_gateway import Router from aws_lambda_powertools.event_handler.exceptions import ForbiddenError, NotFoundError from aws_lambda_powertools.event_handler.openapi.params import Form from aws_lambda_powertools.shared.cookies import Cookie from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair from passlib.hash import pbkdf2_sha256 from boto3clients import dynamodb_client from config import OAUTH2_TABLE from jose_ import generate_jwt router = Router() oauth2_layer = DynamoDBPersistenceLayer(OAUTH2_TABLE, dynamodb_client) @router.get('/login') def login_form(): html = Path(__file__).with_name('login.html').read_text(encoding='utf-8') return Response( body=html, status_code=HTTPStatus.OK.value, content_type='text/html', ) @router.post('/login') def login( username: Annotated[str, Form()], password: Annotated[str, Form()], ): user_id, password_hash = _get_user(username) if not pbkdf2_sha256.verify(password, password_hash): raise ForbiddenError('Invalid credentials') jwt_token = generate_jwt(user_id, username) return Response( status_code=HTTPStatus.OK, cookies=[ Cookie( name='id_token', value=jwt_token, http_only=True, same_site=None, ), ], ) def _get_user(username: str) -> tuple[str, str]: r = oauth2_layer.collection.get_item( # Post-migration: uncomment the following line # KeyPair('EMAIL', username), KeyPair('email', username), exc_cls=EmailNotFoundError, ) password = oauth2_layer.collection.get_item( KeyPair(r['user_id'], 'PASSWORD'), exc_cls=UserNotFoundError, ) return r['user_id'], password['hash'] class EmailNotFoundError(NotFoundError): def __init__(self, *_): super().__init__('Email not found') class UserNotFoundError(NotFoundError): def __init__(self, *_): super().__init__('User not found')