from http import HTTPStatus from typing import Annotated from uuid import uuid4 from aws_lambda_powertools.event_handler import ( Response, ) from aws_lambda_powertools.event_handler.api_gateway import Router from aws_lambda_powertools.event_handler.exceptions import ( NotFoundError, UnauthorizedError, ) from aws_lambda_powertools.event_handler.openapi.params import Body from aws_lambda_powertools.shared.cookies import Cookie from layercake.dateutils import now, ttl from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair, SortKey from passlib.hash import pbkdf2_sha256 from boto3clients import dynamodb_client, idp_client from config import SESSION_EXPIRES_IN, USER_TABLE router = Router() dyn = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) class InvalidCredentialsError(UnauthorizedError): ... class UserNotFoundError(NotFoundError): ... @router.post('/authentication') def authentication( username: Annotated[str, Body()], password: Annotated[str, Body()], ): user_id, password_hash = _get_user(username) if not password_hash: _get_idp_user(user_id, username, password) else: if not pbkdf2_sha256.verify(password, password_hash): dyn.update_item( key=KeyPair(user_id, 'FAILED_ATTEMPTS'), update_expr='SET #count = if_not_exists(#count, :zero) + :one, \ updated_at = :now', expr_attr_names={ '#count': 'failed_attempts', }, expr_attr_values={ ':zero': 0, ':one': 1, ':now': now(), }, ) raise InvalidCredentialsError('Invalid credentials') return Response( status_code=HTTPStatus.OK, cookies=[ Cookie( name='SID', value=new_session(user_id), http_only=True, secure=True, same_site=None, max_age=SESSION_EXPIRES_IN, ) ], ) def _get_user(username: str) -> tuple[str, str | None]: sk = SortKey(username, path_spec='user_id') user = dyn.collection.get_items( KeyPair(pk='email', sk=sk, rename_key=sk.path_spec) + KeyPair(pk='cpf', sk=sk, rename_key=sk.path_spec), ) if not user: raise UserNotFoundError('User not found') password = dyn.collection.get_item( KeyPair( pk=user['user_id'], sk=SortKey( sk='PASSWORD', path_spec='hash', rename_key='password', ), ), raise_on_error=False, default=None, # Uncomment the following line when removing support for Cognito # exc_cls=UserNotFoundError, ) return user['user_id'], password def _get_idp_user( user_id: str, username: str, password: str, ) -> bool: import base64 import hashlib import hmac # That should be removed when completing the migration # to our own OAuth2 implementation. client_id = '3ijacqc7r2jc9l4oli2b41f7te' client_secret = 'amktf9l40g1mlqdo9fjlcfvpn2cp3mvh4pt97hu55sfelccos58' dig = hmac.new( client_secret.encode('utf-8'), msg=(username + client_id).encode('utf-8'), digestmod=hashlib.sha256, ).digest() try: idp_client.initiate_auth( AuthFlow='USER_PASSWORD_AUTH', AuthParameters={ 'USERNAME': username, 'PASSWORD': password, 'SECRET_HASH': base64.b64encode(dig).decode(), }, ClientId=client_id, ) dyn.put_item( item={ 'id': user_id, 'sk': 'PASSWORD', 'hash': pbkdf2_sha256.hash(password), 'created_at': now(), } ) except Exception: raise InvalidCredentialsError('Invalid credentials') return True def new_session(user_id: str) -> str: session_id = str(uuid4()) now_ = now() exp = ttl(start_dt=now_, seconds=SESSION_EXPIRES_IN) with dyn.transact_writer() as transact: transact.delete( key=KeyPair(user_id, 'FAILED_ATTEMPTS'), ) transact.update( key=KeyPair(user_id, '0'), # Post-migration (users): uncomment the following line # update_expr='SET last_login = :now', update_expr='SET lastLogin = :now', expr_attr_values={ ':now': now_, }, ) transact.put( item={ 'id': 'SESSION', 'sk': session_id, 'user_id': user_id, 'ttl': exp, 'created_at': now_, } ) transact.put( item={ 'id': user_id, 'sk': f'SESSION#{session_id}', 'ttl': exp, 'created_at': now_, } ) return f'{session_id}:{user_id}'