from http import HTTPStatus from typing import Annotated from uuid import uuid4 import boto3 from aws_lambda_powertools.event_handler import ( Response, ) from aws_lambda_powertools.event_handler.api_gateway import Router from aws_lambda_powertools.event_handler.exceptions import ForbiddenError, NotFoundError from aws_lambda_powertools.event_handler.openapi.params import Body from aws_lambda_powertools.shared.cookies import Cookie from layercake.dateutils import now, ttl from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair, SortKey from passlib.hash import pbkdf2_sha256 from boto3clients import dynamodb_client from config import ( OAUTH2_TABLE, SESSION_EXPIRES_IN, ) router = Router() dyn = DynamoDBPersistenceLayer(OAUTH2_TABLE, dynamodb_client) idp = boto3.client('cognito-idp') class InvalidCredentialsError(ForbiddenError): ... class UserNotFoundError(NotFoundError): ... @router.post('/authentication') def authentication( username: Annotated[str, Body()], password: Annotated[str, Body()], ): user_id, password_hash = _get_user(username) if not password_hash: _get_idp_user(user_id, username, password) else: if not pbkdf2_sha256.verify(password, password_hash): raise InvalidCredentialsError('Invalid credentials') return Response( status_code=HTTPStatus.OK, cookies=[ Cookie( name='SID', value=new_session(user_id), http_only=True, secure=True, same_site=None, max_age=SESSION_EXPIRES_IN, ) ], ) def _get_user(username: str) -> tuple[str, str | None]: sk = SortKey(username, path_spec='user_id') user = dyn.collection.get_items( KeyPair(pk='email', sk=sk, rename_key=sk.path_spec) + KeyPair(pk='cpf', sk=sk, rename_key=sk.path_spec), ) if not user: raise UserNotFoundError('User not found') password = dyn.collection.get_item( KeyPair( pk=user['user_id'], sk=SortKey( sk='PASSWORD', path_spec='hash', rename_key='password', ), ), raise_on_error=False, default=None, # Uncomment the following line when removing support for Cognito # exc_cls=UserNotFoundError, ) return user['user_id'], password def _get_idp_user( user_id: str, username: str, password: str, ) -> bool: import base64 import hashlib import hmac # That should be removed when completing the migration # to our own OAuth2 implementation. client_id = '3ijacqc7r2jc9l4oli2b41f7te' client_secret = 'amktf9l40g1mlqdo9fjlcfvpn2cp3mvh4pt97hu55sfelccos58' dig = hmac.new( client_secret.encode('utf-8'), msg=(username + client_id).encode('utf-8'), digestmod=hashlib.sha256, ).digest() try: idp.initiate_auth( AuthFlow='USER_PASSWORD_AUTH', AuthParameters={ 'USERNAME': username, 'PASSWORD': password, 'SECRET_HASH': base64.b64encode(dig).decode(), }, ClientId=client_id, ) dyn.put_item( item={ 'id': user_id, 'sk': 'PASSWORD', 'hash': pbkdf2_sha256.hash(password), 'created_at': now(), } ) except Exception: raise InvalidCredentialsError('Invalid credentials') return True def new_session(user_id: str) -> str: session_id = str(uuid4()) now_ = now() exp = ttl(start_dt=now_, seconds=SESSION_EXPIRES_IN) with dyn.transact_writer() as transact: transact.put( item={ 'id': 'SESSION', 'sk': session_id, 'user_id': user_id, 'ttl': exp, 'created_at': now_, } ) transact.put( item={ 'id': user_id, 'sk': f'SESSION#{session_id}', 'ttl': exp, 'created_at': now_, } ) return f'{session_id}:{user_id}'