import meilisearch from arnparse import arnparse from aws_lambda_powertools.utilities.data_classes import ( DynamoDBStreamEvent, event_source, ) from aws_lambda_powertools.utilities.typing import LambdaContext from meili import Op from settings import MEILISEARCH_API_KEY, MEILISEARCH_HOST meili_client = meilisearch.Client(MEILISEARCH_HOST, MEILISEARCH_API_KEY) INDEXES = { 'saladeaula_courses': 'courses', 'betaeducacao-prod-enrollments': 'enrollments', 'betaeducacao-prod-orders': 'orders', 'betaeducacao-prod-users_d2o3r5gmm4it7j': 'users', } @event_source(data_class=DynamoDBStreamEvent) def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): with Op(meili_client) as op: for record in event.records: pk = record.dynamodb.keys['id'] # type: ignore new_image = record.dynamodb.new_image # type: ignore table_name = table_from_arn(record.event_source_arn) # type: ignore index = INDEXES[table_name] # type: ignore op.append( index, op=record.event_name, # type: ignore data=new_image or pk, ) def table_from_arn(arn: str) -> str: arn_ = arnparse(arn) return arn_.resource.split('/')[0]