from arnparse import arnparse from aws_lambda_powertools.utilities.data_classes import ( DynamoDBStreamEvent, event_source, ) from aws_lambda_powertools.utilities.typing import LambdaContext from meilisearch import Client as Meilisearch from config import MEILISEARCH_API_KEY, MEILISEARCH_HOST from meili import Op meili_client = Meilisearch(MEILISEARCH_HOST, MEILISEARCH_API_KEY) @event_source(data_class=DynamoDBStreamEvent) def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): with Op(meili_client) as op: for record in event.records: pk = record.dynamodb.keys['id'] # type: ignore new_image = record.dynamodb.new_image # type: ignore index = table_from_arn(record.event_source_arn) # type: ignore op.append( index, op=record.event_name, # type: ignore data=new_image or pk, ) def table_from_arn(arn: str) -> str: arn_ = arnparse(arn) return arn_.resource.split('/')[0]