from datetime import datetime from aws_lambda_powertools import Logger from aws_lambda_powertools.utilities.data_classes import ( EventBridgeEvent, event_source, ) from aws_lambda_powertools.utilities.typing import LambdaContext from layercake.dynamodb import DynamoDBPersistenceLayer from boto3clients import dynamodb_client from config import ENROLLMENT_TABLE from enrollment import Enrollment, enroll logger = Logger(__name__) dyn = DynamoDBPersistenceLayer(ENROLLMENT_TABLE, dynamodb_client) @event_source(data_class=EventBridgeEvent) @logger.inject_lambda_context def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool: old_image = event.detail['old_image'] # Key pattern `SCHEDULED#ORG#{org_id}` *_, org_id = old_image['id'].split('#') offset_days = old_image.get('dedup_window_offset_days') billing_day = old_image.get('subscription_billing_day') enrollment = Enrollment( course=old_image['course'], user=old_image['user'], ) return enroll( enrollment, org={ 'org_id': org_id, 'name': old_image['org_name'], }, subscription=( { 'org_id': org_id, 'billing_day': int(billing_day), } if billing_day else None ), scheduled_at=datetime.fromisoformat(old_image['created_at']), # Transfer the deduplication window if it exists deduplication_window={'offset_days': offset_days} if offset_days else None, persistence_layer=dyn, )