import meilisearch from arnparse import arnparse from aws_lambda_powertools.utilities.data_classes import ( DynamoDBStreamEvent, event_source, ) from aws_lambda_powertools.utilities.typing import LambdaContext from settings import MEILISEARCH_API_KEY, MEILISEARCH_HOST meili_client = meilisearch.Client(MEILISEARCH_HOST, MEILISEARCH_API_KEY) @event_source(data_class=DynamoDBStreamEvent) def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): for record in event.records: pk = record.dynamodb.keys['id'] # type: ignore new_image = record.dynamodb.new_image # type: ignore index_name = table_from_arn(record.event_source_arn) # type: ignore print(index_name, pk, record.event_name, new_image) # match record.event_name: # case DynamoDBRecordEventName.MODIFY: # update_docs.append(new_image) # case DynamoDBRecordEventName.REMOVE: # remove_docs.append(new_image) # case _: # add_docs.append(new_image) # print(add_docs) # meili_client.index('users').add_documents(add_docs, serializer=JSONEncoder) def table_from_arn(arn: str) -> str: arn_ = arnparse(arn) return arn_.resource.split('/')[0]