from datetime import datetime from http import HTTPStatus from aws_lambda_powertools.event_handler.api_gateway import Router from layercake.batch import BatchProcessor from layercake.dynamodb import ( DynamoDBCollection, DynamoDBPersistenceLayer, ) from pydantic import BaseModel from api_gateway import JSONResponse from boto3clients import dynamodb_client from config import ( ENROLLMENT_TABLE, USER_TABLE, ) from middlewares import Tenant, TenantMiddleware from models import Course, Enrollment, User from rules.enrollment import enroll router = Router() enrollment_layer = DynamoDBPersistenceLayer(ENROLLMENT_TABLE, dynamodb_client) user_layer = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) user_collect = DynamoDBCollection(user_layer) enrollment_collect = DynamoDBCollection(enrollment_layer) processor = BatchProcessor() class Item(BaseModel): user: User course: Course deduplication_window: dict = {} schedule_date: datetime | None = None class Payload(BaseModel): items: tuple[Item, ...] @router.post( '/', compress=True, tags=['Enrollment'], middlewares=[ TenantMiddleware(user_collect), ], ) def enroll_(payload: Payload): context = {'tenant': router.context['tenant']} with processor(payload.items, handler, context): processor.process() print(processor.exceptions) return JSONResponse( HTTPStatus.OK, { 'successes': processor.successes, 'failures': processor.failures, 'exceptions': [str(exc) for exc in processor.exceptions], }, ) def handler(record: Item, context: dict): tenant: Tenant = context['tenant'] enrollment = Enrollment( user=record.user, course=record.course, ) enroll( enrollment=enrollment, tenant={ 'id': str(tenant.id), 'name': tenant.name, }, deduplication_window=record.deduplication_window, # type: ignore persistence_layer=enrollment_layer, ) return enrollment