70 lines
1.8 KiB
Python
70 lines
1.8 KiB
Python
import json
|
|
from decimal import Decimal
|
|
from typing import Self
|
|
|
|
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import (
|
|
DynamoDBRecordEventName,
|
|
)
|
|
from meilisearch import Client
|
|
|
|
|
|
class Op:
|
|
def __init__(self, client: Client) -> None:
|
|
self.op = {}
|
|
self.client = client
|
|
|
|
def __enter__(self) -> Self:
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
op = self.op
|
|
client = self.client
|
|
|
|
for index_, ops in op.items():
|
|
index = client.index(index_)
|
|
|
|
for op, doc in ops.items():
|
|
match op:
|
|
case DynamoDBRecordEventName.INSERT:
|
|
index.add_documents(doc, serializer=JSONEncoder)
|
|
case DynamoDBRecordEventName.MODIFY:
|
|
index.update_documents(doc, serializer=JSONEncoder)
|
|
case DynamoDBRecordEventName.REMOVE:
|
|
index.delete_documents(doc)
|
|
|
|
self.op = {}
|
|
|
|
def append(
|
|
self,
|
|
index: str,
|
|
/,
|
|
op: DynamoDBRecordEventName,
|
|
data: dict | str,
|
|
) -> bool:
|
|
if index not in self.op:
|
|
self.op[index] = {}
|
|
|
|
if op not in self.op[index]:
|
|
self.op[index][op] = []
|
|
|
|
return self.op[index][op].append(data)
|
|
|
|
|
|
class DecimalEncoder(json.JSONEncoder):
|
|
def default(self, o):
|
|
if isinstance(o, Decimal):
|
|
if o % 1 != 0:
|
|
return float(o.quantize(Decimal('0.00')))
|
|
return int(o)
|
|
return super(__class__, self).default(o)
|
|
|
|
|
|
class SetEncoder(json.JSONEncoder):
|
|
def default(self, o):
|
|
if isinstance(o, set):
|
|
return list(o)
|
|
return super(__class__, self).default(o)
|
|
|
|
|
|
class JSONEncoder(SetEncoder, DecimalEncoder): ...
|