import json from http import HTTPStatus from typing import Annotated from aws_lambda_powertools.event_handler.api_gateway import Router from aws_lambda_powertools.event_handler.exceptions import ( BadRequestError as PowertoolsBadRequestError, ) from elasticsearch import Elasticsearch from layercake.dynamodb import ( DynamoDBCollection, DynamoDBPersistenceLayer, MissingError, SortKey, TransactKey, ) from layercake.extra_types import CpfStr, NameStr from pydantic import UUID4, BaseModel, StringConstraints import cognito import elastic from api_gateway import JSONResponse from boto3clients import dynamodb_client, idp_client from conf import ELASTIC_CONN, USER_POOOL_ID, USER_TABLE from middlewares import AuditLogMiddleware from models import User from rules.user import update_user from .emails import router as emails from .logs import router as logs from .orgs import router as orgs __all__ = ['logs', 'emails', 'orgs'] class BadRequestError(MissingError, PowertoolsBadRequestError): pass router = Router() user_layer = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) user_collect = DynamoDBCollection(user_layer, exc_cls=BadRequestError) elastic_client = Elasticsearch(**ELASTIC_CONN) @router.get('/', compress=True, tags=['User'], summary='Get users') def get_users(): event = router.current_event query = event.get_query_string_value('query', '{}') page_size = event.get_query_string_value('page_size', '25') return elastic.search( index=USER_TABLE, page_size=int(page_size), query=json.loads(query), elastic_client=elastic_client, ) @router.post( '/', compress=True, tags=['User'], summary='Create user', middlewares=[AuditLogMiddleware('USER_ADD', user_collect)], ) def post_user(payload: User): return JSONResponse(status_code=HTTPStatus.CREATED) class UserData(BaseModel): name: NameStr cpf: CpfStr @router.put( '/', compress=True, tags=['User'], summary='Update user', middlewares=[ AuditLogMiddleware('USER_UPDATE', user_collect, ('id', 'name', 'new_cpf')) ], ) def put_user(id: str, payload: UserData): update_user( { 'id': id, 'name': payload.name, 'cpf': payload.cpf, }, persistence_layer=user_layer, ) return JSONResponse( body={ 'id': id, 'name': payload.name, 'new_cpf': payload.cpf, }, status_code=HTTPStatus.OK, ) @router.get('/', compress=True, tags=['User'], summary='Get user') def get_user(id: str): return user_collect.get_items( TransactKey(id) + SortKey('0') + SortKey('last_profile_edit') ) class Password(BaseModel): cognito_sub: UUID4 new_password: Annotated[str, StringConstraints(min_length=6)] @router.post( '//password', compress=True, tags=['User'], include_in_schema=False, middlewares=[ AuditLogMiddleware('PASSWORD_RESET', user_collect, ('id', 'cognito_sub')) ], ) def password(id: str, payload: Password): cognito.admin_set_user_password( username=str(payload.cognito_sub), password=payload.new_password, user_pool_id=USER_POOOL_ID, idp_client=idp_client, ) return JSONResponse( body={ 'id': id, 'cognito_sub': payload.cognito_sub, }, status_code=HTTPStatus.OK, ) @router.get('//idp', compress=True, include_in_schema=False) def get_idp(sub: str): return cognito.admin_get_user( sub=sub, user_pool_id=USER_POOOL_ID, idp_client=idp_client, )