Files
saladeaula.digital/streams-events/app/events/index_docs_into_meili.py
2025-07-11 21:31:02 -03:00

45 lines
1.4 KiB
Python

from arnparse import arnparse
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import (
DynamoDBStreamEvent,
event_source,
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from config import MEILISEARCH_API_KEY, MEILISEARCH_HOST
from meili import Op
from meilisearch import Client as Meilisearch
logger = Logger(__name__)
meili_client = Meilisearch(MEILISEARCH_HOST, MEILISEARCH_API_KEY)
@event_source(data_class=DynamoDBStreamEvent)
@logger.inject_lambda_context
def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
with Op(meili_client) as op:
for record in event.records:
pk = record.dynamodb.keys['id'] # type: ignore
new_image = sanitize(record.dynamodb.new_image) # type: ignore
index = table_from_arn(record.event_source_arn) # type: ignore
op.append(
index,
op=record.event_name, # type: ignore
data=new_image or pk,
)
def table_from_arn(arn: str) -> str:
arn_ = arnparse(arn)
return arn_.resource.split('/')[0]
# Post-migration: remove the following lines
def sanitize(obj):
if isinstance(obj, dict):
return {k.replace(':', '__'): sanitize(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [sanitize(item) for item in obj]
else:
return obj