import requests from aws_lambda_powertools.utilities.data_classes import ( DynamoDBStreamEvent, event_source, ) from aws_lambda_powertools.utilities.typing import LambdaContext from layercake.dateutils import now from layercake.dynamodb import DynamoDBPersistenceLayer from boto3clients import dynamodb_client from config import API_URL, COURSE_TABLE course_layer = DynamoDBPersistenceLayer(COURSE_TABLE, dynamodb_client) @event_source(data_class=DynamoDBStreamEvent) def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext) -> bool: r = requests.get(f'{API_URL}/courses.json') r.raise_for_status() data = r.json() now_ = now() with course_layer.batch_writer() as batch: for course in data: batch.put_item( { 'id': {'S': course['id']}, 'sk': {'S': 'metadata#unit_price'}, 'unit_price': {'N': str(course['metadata']['unit_price'])}, 'create_date': {'S': now_.isoformat()}, } ) return True