from typing import Self from aws_lambda_powertools.shared.json_encoder import Encoder from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import ( DynamoDBRecordEventName, ) from meilisearch import Client class JSONEncoder(Encoder): def default(self, obj): if isinstance(obj, set): return list(obj) return super().default(obj) class Op: def __init__(self, client: Client) -> None: self.op = {} self.client = client def __enter__(self) -> Self: return self def __exit__(self, *exc_details) -> None: # When we exit, we need to keep flushing whatever's left # until there's nothing left in our items buffer. while self.op: self._flush() def _flush(self): op = self.op client = self.client for index_, ops in op.items(): index = client.index(index_) for op, doc in ops.items(): match op: case DynamoDBRecordEventName.INSERT: index.add_documents(doc, serializer=JSONEncoder) case DynamoDBRecordEventName.MODIFY: index.update_documents(doc, serializer=JSONEncoder) case DynamoDBRecordEventName.REMOVE: index.delete_documents(doc) self.op = {} def append( self, index: str, /, op: DynamoDBRecordEventName, data: dict | str, ) -> bool: if index not in self.op: self.op[index] = {} if op not in self.op[index]: self.op[index][op] = [] return self.op[index][op].append(data)