import json from http import HTTPStatus from typing import Annotated from aws_lambda_powertools.event_handler.api_gateway import Router from aws_lambda_powertools.event_handler.exceptions import ( BadRequestError as PowertoolsBadRequestError, ) from elasticsearch import Elasticsearch from layercake.dynamodb import ( ComposeKey, DynamoDBCollection, DynamoDBPersistenceLayer, KeyPair, MissingError, PartitionKey, PrefixKey, ) from pydantic import UUID4, BaseModel, EmailStr, StringConstraints import cognito import elastic from api_gateway import JSONResponse from boto3clients import dynamodb_client, idp_client from middlewares import AuditLogMiddleware from models import User from settings import ELASTIC_CONN, USER_POOOL_ID, USER_TABLE from user import add_email, del_email, set_email_as_primary class BadRequestError(MissingError, PowertoolsBadRequestError): ... router = Router() user_layer = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) user_collect = DynamoDBCollection(user_layer, exception_cls=BadRequestError) elastic_client = Elasticsearch(**ELASTIC_CONN) @router.get('/', compress=True, tags=['User'], summary='Get users') def get_users(): event = router.current_event query = event.get_query_string_value('query', '{}') page_size = event.get_query_string_value('page_size', '25') return elastic.search( index=USER_TABLE, page_size=int(page_size), query=json.loads(query), elastic_client=elastic_client, ) @router.post( '/', compress=True, tags=['User'], summary='Create user', middlewares=[AuditLogMiddleware('USER_ADD', user_collect)], ) def post_user(payload: User): return JSONResponse(status_code=HTTPStatus.CREATED) class Password(BaseModel): cognito_sub: UUID4 new_password: Annotated[str, StringConstraints(min_length=6)] @router.post( '//password', compress=True, tags=['User'], include_in_schema=False, middlewares=[ AuditLogMiddleware('PASSWORD_RESET', user_collect, ('id', 'cognito_sub')) ], ) def password(id: str, payload: Password): cognito.admin_set_user_password( username=str(payload.cognito_sub), password=payload.new_password, user_pool_id=USER_POOOL_ID, idp_client=idp_client, ) return JSONResponse( body={ 'id': id, 'cognito_sub': payload.cognito_sub, }, status_code=HTTPStatus.OK, ) @router.get('/', compress=True, tags=['User'], summary='Get user') def get_user(id: str): return user_collect.get_item(KeyPair(id, '0')) @router.get('//idp', compress=True, include_in_schema=False) def get_idp(sub: str): return cognito.admin_get_user( sub=sub, user_pool_id=USER_POOOL_ID, idp_client=idp_client, ) @router.get( '//emails', compress=True, tags=['User'], summary='Get user emails', ) def get_emails(id: str): return user_collect.query( KeyPair(id, PrefixKey('emails')), start_key=router.current_event.get_query_string_value('start_key', None), ) class Email(BaseModel): email: EmailStr @router.post( '//emails', compress=True, tags=['User'], summary='Add user email', middlewares=[AuditLogMiddleware('EMAIL_ADD', user_collect, ('email',))], ) def post_email(id: str, payload: Email): add_email(id, payload.email, persistence_layer=user_layer) return JSONResponse( body=payload, status_code=HTTPStatus.CREATED, ) class EmailAsPrimary(BaseModel): new_email: EmailStr old_email: EmailStr email_verified: bool = False @router.patch( '//emails', compress=True, tags=['User'], summary='Add user email as primary', middlewares=[ AuditLogMiddleware( 'EMAIL_CHANGE', user_collect, ( 'new_email', 'old_email', ), ) ], ) def patch_email(id: str, payload: EmailAsPrimary): set_email_as_primary( id, payload.new_email, payload.old_email, email_verified=payload.email_verified, persistence_layer=user_layer, ) return JSONResponse(body=payload, status_code=HTTPStatus.OK) @router.delete( '//emails', compress=True, tags=['User'], summary='Delete user email', middlewares=[AuditLogMiddleware('EMAIL_DEL', user_collect, ('email',))], ) def delete_email(id: str, payload: Email): assert del_email(id, payload.email, persistence_layer=user_layer) return payload @router.get( '//logs', compress=True, tags=['User'], summary='Get user logs', ) def get_logs(id: str): return user_collect.query( # Post-migration: uncomment to enable PartitionKey with a composite key (id with `logs` prefix). # PartitionKey(ComposeKey(id, 'logs')), PartitionKey(ComposeKey(id, 'log', delimiter=':')), start_key=router.current_event.get_query_string_value('start_key', None), ) @router.get( '//orgs', compress=True, tags=['User'], summary='Get user orgs', ) def get_orgs(id: str): return user_collect.query( KeyPair(id, PrefixKey('orgs')), start_key=router.current_event.get_query_string_value('start_key', None), )