replication to meili

This commit is contained in:
2025-08-24 21:23:45 -03:00
parent f004be8a4a
commit 1326530991
9 changed files with 152 additions and 26 deletions

View File

@@ -21,7 +21,7 @@ def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
with Op(meili_client) as op:
for record in event.records:
pk = record.dynamodb.keys['id'] # type: ignore
new_image = sanitize(record.dynamodb.new_image) # type: ignore
new_image = record.dynamodb.new_image # type: ignore
index = table_from_arn(record.event_source_arn) # type: ignore
op.append(
@@ -29,13 +29,3 @@ def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
op=record.event_name, # type: ignore
data=new_image or pk,
)
# Post-migration: remove the following function
def sanitize(obj):
if isinstance(obj, dict):
return {k.replace(':', '__'): sanitize(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [sanitize(item) for item in obj]
else:
return obj

View File

@@ -0,0 +1,33 @@
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.data_classes import (
DynamoDBStreamEvent,
event_source,
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from config import MEILISEARCH_API_KEY, MEILISEARCH_HOST
from layercake.strutils import md5_hash
from meili import Op
from meilisearch import Client as Meilisearch
from utils import table_from_arn
logger = Logger(__name__)
tracer = Tracer()
meili_client = Meilisearch(MEILISEARCH_HOST, MEILISEARCH_API_KEY)
@event_source(data_class=DynamoDBStreamEvent)
@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
with Op(meili_client) as op:
for record in event.records:
new_image = record.dynamodb.new_image # type: ignore
index = table_from_arn(record.event_source_arn) # type: ignore
keys = record.dynamodb.keys # type: ignore
_id = md5_hash(str(keys))
op.append(
f'_{index}_replication',
op=record.event_name, # type: ignore
data=({'_id': _id} | new_image) or _id,
)

View File

@@ -4,7 +4,10 @@ version = "0.1.0"
description = "Streaming DynamoDB events to Meilisearch and EventBridge."
readme = ""
requires-python = ">=3.12"
dependencies = ["layercake"]
dependencies = [
"layercake",
"pycouchdb>=1.16.0",
]
[dependency-groups]
dev = [

View File

@@ -118,3 +118,42 @@ Resources:
StartingPosition: LATEST
MaximumRetryAttempts: 5
BatchSize: 25
EventReplicateIntoMeiliFunction:
Type: AWS::Serverless::Function
Properties:
Handler: events.replicate_into_meili.lambda_handler
LoggingConfig:
LogGroup: !Ref EventBusLog
Policies:
- EventBridgePutEventsPolicy:
EventBusName: default
Events:
Users:
Type: DynamoDB
Properties:
Stream: !Sub arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/betaeducacao-prod-users_d2o3r5gmm4it7j/stream/2022-06-12T21:33:25.634
StartingPosition: LATEST
MaximumRetryAttempts: 5
BatchSize: 25
Enrolllments:
Type: DynamoDB
Properties:
Stream: !Sub arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/betaeducacao-prod-enrollments/stream/2023-08-22T22:56:55.612
StartingPosition: LATEST
MaximumRetryAttempts: 5
BatchSize: 25
Orders:
Type: DynamoDB
Properties:
Stream: !Sub arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/betaeducacao-prod-orders/stream/2023-09-15T18:58:50.395
StartingPosition: LATEST
MaximumRetryAttempts: 5
BatchSize: 25
Courses:
Type: DynamoDB
Properties:
Stream: !Sub arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/saladeaula_courses/stream/2025-03-12T20:42:46.706
StartingPosition: LATEST
MaximumRetryAttempts: 5
BatchSize: 25

View File

@@ -10,15 +10,21 @@
"Keys": {
"id": {
"S": "102"
},
"sk": {
"S": "0"
}
},
"NewImage": {
"message": {
"S": "New item!!"
"S": "New item!!!"
},
"id": {
"S": "102"
},
"sk": {
"S": "0"
},
"cpf": {
"NULL": true
},
@@ -34,7 +40,7 @@
"eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/example_table_with_stream/stream/2015-06-27T00:48:05.899"
},
{
"eventID": "c4ca4238a0b923820dcc509a6f75849b",
"eventID": "c4ca4238a0b923820dcc509a6f75849c",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
@@ -42,7 +48,10 @@
"dynamodb": {
"Keys": {
"id": {
"S": "102"
"S": "101"
},
"sk": {
"S": "0"
}
},
"NewImage": {
@@ -52,6 +61,9 @@
"id": {
"S": "101"
},
"sk": {
"S": "0"
},
"cpf": {
"NULL": true
}
@@ -72,7 +84,10 @@
"dynamodb": {
"Keys": {
"id": {
"S": "101"
"S": "103"
},
"sk": {
"S": "0"
}
},
"NewImage": {
@@ -80,7 +95,10 @@
"S": "This item has changed"
},
"id": {
"S": "101"
"S": "103"
},
"sk": {
"S": "0"
},
"assignee": {
"M": {
@@ -100,6 +118,9 @@
"id": {
"S": "101"
},
"sk": {
"S": "0"
},
"assignee": {
"M": {
"name": {
@@ -125,6 +146,9 @@
"Keys": {
"id": {
"S": "101"
},
"sk": {
"S": "0"
}
},
"OldImage": {
@@ -134,6 +158,9 @@
"id": {
"S": "101"
},
"sk": {
"S": "0"
},
"ttl": {
"N": "1710532240"
}
@@ -155,6 +182,9 @@
"Keys": {
"id": {
"S": "102"
},
"sk": {
"S": "0"
}
},
"OldImage": {
@@ -184,20 +214,20 @@
"dynamodb": {
"ApproximateCreationDateTime": 1710529909,
"Keys": {
"sk": {
"S": "0"
},
"id": {
"S": "DwHRXCm5bE64rcu5VA6ai6"
},
"sk": {
"S": "0"
}
},
"OldImage": {
"sk": {
"S": "0"
},
"id": {
"S": "DwHRXCm5bE64rcu5VA6ai6"
},
"sk": {
"S": "0"
},
"createDate": {
"S": "2024-03-15T15:44:30.374640-03:00"
}

View File

@@ -1,5 +1,5 @@
import app.events.docs_into_eventbus as app
def test_record_handler(monkeypatch, dynamodb_stream_event, lambda_context):
def test_record_handler(dynamodb_stream_event, lambda_context):
app.lambda_handler(dynamodb_stream_event, lambda_context)

View File

@@ -1,5 +1,5 @@
import app.events.index_docs_into_meili as app
def test_record_handler(monkeypatch, dynamodb_stream_event, lambda_context):
def test_record_handler(dynamodb_stream_event, lambda_context):
app.lambda_handler(dynamodb_stream_event, lambda_context)

View File

@@ -0,0 +1,5 @@
import app.events.replicate_into_meili as app
def test_record_handler(dynamodb_stream_event, lambda_context):
app.lambda_handler(dynamodb_stream_event, lambda_context)

28
streams-events/uv.lock generated
View File

@@ -215,6 +215,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/7c/fc/6a8cb64e5f0324877d503c854da15d76c1e50eb722e320b15345c4d0c6de/cffi-1.17.1-cp313-cp313-win_amd64.whl", hash = "sha256:f6a16c31041f09ead72d69f583767292f750d24913dadacf5756b966aacb3f1a", size = 182009, upload-time = "2024-09-04T20:44:45.309Z" },
]
[[package]]
name = "chardet"
version = "5.2.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f3/0d/f7b6ab21ec75897ed80c17d79b15951a719226b9fababf1e40ea74d69079/chardet-5.2.0.tar.gz", hash = "sha256:1b3b6ff479a8c414bc3fa2c0852995695c4a026dcd6d0633b2dd092ca39c1cf7", size = 2069618, upload-time = "2023-08-01T19:23:02.662Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/38/6f/f5fbc992a329ee4e0f288c1fe0e2ad9485ed064cac731ed2fe47dcc38cbf/chardet-5.2.0-py3-none-any.whl", hash = "sha256:e1cf59446890a00105fe7b7912492ea04b6e6f06d4b742b2c788469e34c82970", size = 199385, upload-time = "2023-08-01T19:23:00.661Z" },
]
[[package]]
name = "charset-normalizer"
version = "3.4.1"
@@ -625,6 +634,19 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/a3/58/35da89ee790598a0700ea49b2a66594140f44dec458c07e8e3d4979137fc/ply-3.11-py2.py3-none-any.whl", hash = "sha256:096f9b8350b65ebd2fd1346b12452efe5b9607f7482813ffca50c22722a807ce", size = 49567, upload-time = "2018-02-15T19:01:27.172Z" },
]
[[package]]
name = "pycouchdb"
version = "1.16.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "chardet" },
{ name = "requests" },
]
sdist = { url = "https://files.pythonhosted.org/packages/c1/b4/4f699a686a2ce14ab31cb17902693f2cf201ba51c3a6fb7aba210725c154/pycouchdb-1.16.0.tar.gz", hash = "sha256:309d71c3ce3f98bbee5731db00f514753438b81e6e7adefbb8c134312200a4f9", size = 11351, upload-time = "2024-05-29T10:00:11.726Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/4c/63/b4397a7215c089c7951afb258069cc58a06788224f1bb6a0d4f976f2d476/pycouchdb-1.16.0-py3-none-any.whl", hash = "sha256:e26ce58f626fcabbe2f5b15b3ad2b89cdd3f6d666da673632037476d1191ab67", size = 12560, upload-time = "2024-05-29T10:00:09.31Z" },
]
[[package]]
name = "pycparser"
version = "2.22"
@@ -908,6 +930,7 @@ version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "layercake" },
{ name = "pycouchdb" },
]
[package.dev-dependencies]
@@ -919,7 +942,10 @@ dev = [
]
[package.metadata]
requires-dist = [{ name = "layercake", directory = "../layercake" }]
requires-dist = [
{ name = "layercake", directory = "../layercake" },
{ name = "pycouchdb", specifier = ">=1.16.0" },
]
[package.metadata.requires-dev]
dev = [