Files
saladeaula.digital/streams-events/app/meili.py
2025-07-14 12:24:22 -03:00

56 lines
1.5 KiB
Python

from typing import Self
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import (
DynamoDBRecordEventName,
)
from meilisearch import Client
from utils import JSONEncoder
class Op:
def __init__(self, client: Client) -> None:
self.op = {}
self.client = client
def __enter__(self) -> Self:
return self
def __exit__(self, *exc_details) -> None:
# When we exit, we need to keep flushing whatever's left
# until there's nothing left in our items buffer.
while self.op:
self._flush()
def _flush(self):
op = self.op
client = self.client
for index_, ops in op.items():
index = client.index(index_)
for op, doc in ops.items():
match op:
case DynamoDBRecordEventName.INSERT:
index.add_documents(doc, serializer=JSONEncoder)
case DynamoDBRecordEventName.MODIFY:
index.update_documents(doc, serializer=JSONEncoder)
case DynamoDBRecordEventName.REMOVE:
index.delete_documents(doc)
self.op = {}
def append(
self,
index: str,
/,
op: DynamoDBRecordEventName,
data: dict | str,
) -> bool:
if index not in self.op:
self.op[index] = {}
if op not in self.op[index]:
self.op[index][op] = []
return self.op[index][op].append(data)