from http import HTTPStatus from typing import Annotated from aws_lambda_powertools.event_handler import ( Response, ) from aws_lambda_powertools.event_handler.api_gateway import Router from aws_lambda_powertools.event_handler.exceptions import ForbiddenError, NotFoundError from aws_lambda_powertools.event_handler.openapi.params import Form, Param from aws_lambda_powertools.shared.cookies import Cookie from jinja2 import Environment, PackageLoader, select_autoescape from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair from passlib.hash import pbkdf2_sha256 from boto3clients import dynamodb_client from config import OAUTH2_TABLE from jose_ import generate_jwt router = Router() oauth2_layer = DynamoDBPersistenceLayer(OAUTH2_TABLE, dynamodb_client) templates = Environment( loader=PackageLoader('app'), autoescape=select_autoescape(['html']), ) @router.get('/login', compress=True) def login_form(continue_: Annotated[str, Param(alias='continue')]): template = templates.get_template('login.html') html = template.render(**{'continue': continue_}) return Response( body=html, status_code=HTTPStatus.OK, content_type='text/html', ) @router.post('/login') def login( username: Annotated[str, Form()], password: Annotated[str, Form()], continue_: Annotated[str, Form(alias='continue')], ): user_id, password_hash = _get_user(username) if not pbkdf2_sha256.verify(password, password_hash): raise ForbiddenError('Invalid credentials') jwt_token = generate_jwt(user_id, username) return Response( status_code=HTTPStatus.FOUND, headers={ 'Location': continue_, }, cookies=[ Cookie( name='id_token', value=jwt_token, http_only=True, same_site=None, ), ], ) def _get_user(username: str) -> tuple[str, str]: r = oauth2_layer.collection.get_item( # Post-migration: uncomment the following line # KeyPair('EMAIL', username), KeyPair('email', username), exc_cls=EmailNotFoundError, ) password = oauth2_layer.collection.get_item( KeyPair(r['user_id'], 'PASSWORD'), exc_cls=UserNotFoundError, ) return r['user_id'], password['hash'] class EmailNotFoundError(NotFoundError): def __init__(self, *_): super().__init__('Email not found') class UserNotFoundError(NotFoundError): def __init__(self, *_): super().__init__('User not found')