Files
saladeaula.digital/streams-events/app/meili.py
2025-08-15 00:06:05 -03:00

64 lines
1.6 KiB
Python

from typing import Self
from aws_lambda_powertools.shared.json_encoder import Encoder
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import (
DynamoDBRecordEventName,
)
from meilisearch import Client
class JSONEncoder(Encoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return super().default(obj)
class Op:
def __init__(self, client: Client) -> None:
self.op = {}
self.client = client
def __enter__(self) -> Self:
return self
def __exit__(self, *exc_details) -> None:
# When we exit, we need to keep flushing whatever's left
# until there's nothing left in our items buffer.
while self.op:
self._flush()
def _flush(self):
op = self.op
client = self.client
for index_, ops in op.items():
index = client.index(index_)
for op, doc in ops.items():
match op:
case (
DynamoDBRecordEventName.INSERT | DynamoDBRecordEventName.MODIFY
):
index.add_documents(doc, serializer=JSONEncoder)
case DynamoDBRecordEventName.REMOVE:
index.delete_documents(doc)
self.op = {}
def append(
self,
index: str,
/,
op: DynamoDBRecordEventName,
data: dict | str,
) -> bool:
if index not in self.op:
self.op[index] = {}
if op not in self.op[index]:
self.op[index][op] = []
return self.op[index][op].append(data)