import json import sqlite3 from datetime import datetime, time, timedelta from aws_lambda_powertools import Logger from aws_lambda_powertools.utilities.data_classes import ( EventBridgeEvent, event_source, ) from aws_lambda_powertools.utilities.typing import LambdaContext from layercake.dateutils import fromisoformat, now, ttl from layercake.dynamodb import ( DynamoDBPersistenceLayer, KeyPair, SortKey, TransactKey, ) from layercake.funcs import pick from sqlite_utils import Database from boto3clients import dynamodb_client from config import ( COURSE_TABLE, ENROLLMENT_TABLE, ORDER_TABLE, SQLITE_DATABASE, SQLITE_TABLE, ) from utils import get_billing_period logger = Logger(__name__) order_layer = DynamoDBPersistenceLayer(ORDER_TABLE, dynamodb_client) enrollment_layer = DynamoDBPersistenceLayer(ENROLLMENT_TABLE, dynamodb_client) course_layer = DynamoDBPersistenceLayer(COURSE_TABLE, dynamodb_client) sqlite3.register_converter('json', json.loads) @event_source(data_class=EventBridgeEvent) @logger.inject_lambda_context def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool: new_image = event.detail['new_image'] now_ = now() org_id = new_image['org_id'] enrollment = enrollment_layer.collection.get_items( TransactKey(new_image['id']) + SortKey('0') + SortKey('author') # Post-migration: uncomment the following line # + SortKey('CREATED_BY') ) if not enrollment: logger.debug('Enrollment not found') return False logger.info('Enrollment found', data=enrollment) # Keep it until the migration has been completed old_course = _get_course(enrollment['course']['id']) if old_course: enrollment['course'] = old_course created_at: datetime = fromisoformat(enrollment['created_at']) # type: ignore start_date, end_date = get_billing_period( billing_day=new_image['billing_day'], date_=created_at, ) pk = 'BILLING#ORG#{org_id}'.format(org_id=org_id) sk = 'START#{start}#END#{end}'.format( start=start_date.isoformat(), end=end_date.isoformat(), ) logger.info('Enrollment found', data=enrollment) try: with order_layer.transact_writer() as transact: transact.put( item={ 'id': pk, 'sk': sk, 'status': 'PENDING', 'created_at': now_, }, cond_expr='attribute_not_exists(sk)', exc_cls=BillingConflictError, ) transact.put( item={ 'id': pk, 'sk': f'{sk}#SCHEDULE#AUTO_CLOSE', 'ttl': ttl( start_dt=datetime.combine(end_date, time()) + timedelta(days=1) ), 'created_at': now_, } ) except BillingConflictError: pass # Add enrollment entry to billing try: created_by = enrollment.get('author') course_id = enrollment['course']['id'] course = course_layer.collection.get_items( KeyPair( pk=course_id, sk=SortKey('0', path_spec='metadata__unit_price'), rename_key='unit_price', ) + KeyPair( pk=f'CUSTOM_PRICING#ORG#{org_id}', sk=SortKey(f'COURSE#{course_id}', path_spec='unit_price'), rename_key='unit_price', ), flatten_top=False, ) with order_layer.transact_writer() as transact: transact.put( item={ 'id': pk, 'sk': f'{sk}#ENROLLMENT#{enrollment["id"]}', 'user': pick(('id', 'name'), enrollment['user']), 'course': pick(('id', 'name'), enrollment['course']), 'unit_price': course['unit_price'], 'enrolled_at': enrollment['created_at'], 'created_at': now_, } # Add created_by if present | ( { 'author': { 'id': created_by['user_id'], 'name': created_by['name'], } } if created_by else {} ), cond_expr='attribute_not_exists(sk)', ) transact.update( key=KeyPair( pk=new_image['id'], sk=new_image['sk'], ), table_name=ENROLLMENT_TABLE, update_expr='SET billing_period = :billing_period, \ updated_at = :updated_at', expr_attr_values={ ':billing_period': sk, ':updated_at': now_, }, cond_expr='attribute_exists(sk)', ) except Exception as exc: logger.exception( exc, keypair={'pk': pk, 'sk': sk}, ) return False else: return True class BillingConflictError(Exception): ... class BillingNotFoundError(Exception): ... def _get_course(course_id: str) -> dict | None: with sqlite3.connect( database=SQLITE_DATABASE, detect_types=sqlite3.PARSE_DECLTYPES ) as conn: db = Database(conn) rows = db[SQLITE_TABLE].rows_where( "json->>'$.metadata__betaeducacao_id' = ?", [course_id] ) for row in rows: return row['json'] return None