diff --git a/streams-events/app/events/index_docs_into_meili.py b/streams-events/app/events/index_docs_into_meili.py index 7f248b5..c136bbf 100644 --- a/streams-events/app/events/index_docs_into_meili.py +++ b/streams-events/app/events/index_docs_into_meili.py @@ -21,7 +21,7 @@ def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): with Op(meili_client) as op: for record in event.records: pk = record.dynamodb.keys['id'] # type: ignore - new_image = sanitize(record.dynamodb.new_image) # type: ignore + new_image = record.dynamodb.new_image # type: ignore index = table_from_arn(record.event_source_arn) # type: ignore op.append( @@ -29,13 +29,3 @@ def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): op=record.event_name, # type: ignore data=new_image or pk, ) - - -# Post-migration: remove the following function -def sanitize(obj): - if isinstance(obj, dict): - return {k.replace(':', '__'): sanitize(v) for k, v in obj.items()} - elif isinstance(obj, list): - return [sanitize(item) for item in obj] - else: - return obj diff --git a/streams-events/app/events/replicate_into_meili.py b/streams-events/app/events/replicate_into_meili.py new file mode 100644 index 0000000..239d888 --- /dev/null +++ b/streams-events/app/events/replicate_into_meili.py @@ -0,0 +1,33 @@ +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.data_classes import ( + DynamoDBStreamEvent, + event_source, +) +from aws_lambda_powertools.utilities.typing import LambdaContext +from config import MEILISEARCH_API_KEY, MEILISEARCH_HOST +from layercake.strutils import md5_hash +from meili import Op +from meilisearch import Client as Meilisearch +from utils import table_from_arn + +logger = Logger(__name__) +tracer = Tracer() +meili_client = Meilisearch(MEILISEARCH_HOST, MEILISEARCH_API_KEY) + + +@event_source(data_class=DynamoDBStreamEvent) +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): + with Op(meili_client) as op: + for record in event.records: + new_image = record.dynamodb.new_image # type: ignore + index = table_from_arn(record.event_source_arn) # type: ignore + keys = record.dynamodb.keys # type: ignore + _id = md5_hash(str(keys)) + + op.append( + f'_{index}_replication', + op=record.event_name, # type: ignore + data=({'_id': _id} | new_image) or _id, + ) diff --git a/streams-events/pyproject.toml b/streams-events/pyproject.toml index 1fbf0e5..a3458df 100644 --- a/streams-events/pyproject.toml +++ b/streams-events/pyproject.toml @@ -4,7 +4,10 @@ version = "0.1.0" description = "Streaming DynamoDB events to Meilisearch and EventBridge." readme = "" requires-python = ">=3.12" -dependencies = ["layercake"] +dependencies = [ + "layercake", + "pycouchdb>=1.16.0", +] [dependency-groups] dev = [ diff --git a/streams-events/template.yaml b/streams-events/template.yaml index ae6246f..d14e769 100644 --- a/streams-events/template.yaml +++ b/streams-events/template.yaml @@ -118,3 +118,42 @@ Resources: StartingPosition: LATEST MaximumRetryAttempts: 5 BatchSize: 25 + + EventReplicateIntoMeiliFunction: + Type: AWS::Serverless::Function + Properties: + Handler: events.replicate_into_meili.lambda_handler + LoggingConfig: + LogGroup: !Ref EventBusLog + Policies: + - EventBridgePutEventsPolicy: + EventBusName: default + Events: + Users: + Type: DynamoDB + Properties: + Stream: !Sub arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/betaeducacao-prod-users_d2o3r5gmm4it7j/stream/2022-06-12T21:33:25.634 + StartingPosition: LATEST + MaximumRetryAttempts: 5 + BatchSize: 25 + Enrolllments: + Type: DynamoDB + Properties: + Stream: !Sub arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/betaeducacao-prod-enrollments/stream/2023-08-22T22:56:55.612 + StartingPosition: LATEST + MaximumRetryAttempts: 5 + BatchSize: 25 + Orders: + Type: DynamoDB + Properties: + Stream: !Sub arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/betaeducacao-prod-orders/stream/2023-09-15T18:58:50.395 + StartingPosition: LATEST + MaximumRetryAttempts: 5 + BatchSize: 25 + Courses: + Type: DynamoDB + Properties: + Stream: !Sub arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/saladeaula_courses/stream/2025-03-12T20:42:46.706 + StartingPosition: LATEST + MaximumRetryAttempts: 5 + BatchSize: 25 diff --git a/streams-events/tests/samples/dynamodb_stream_event.json b/streams-events/tests/samples/dynamodb_stream_event.json index b260493..a507dfb 100644 --- a/streams-events/tests/samples/dynamodb_stream_event.json +++ b/streams-events/tests/samples/dynamodb_stream_event.json @@ -10,15 +10,21 @@ "Keys": { "id": { "S": "102" + }, + "sk": { + "S": "0" } }, "NewImage": { "message": { - "S": "New item!!" + "S": "New item!!!" }, "id": { "S": "102" }, + "sk": { + "S": "0" + }, "cpf": { "NULL": true }, @@ -34,7 +40,7 @@ "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/example_table_with_stream/stream/2015-06-27T00:48:05.899" }, { - "eventID": "c4ca4238a0b923820dcc509a6f75849b", + "eventID": "c4ca4238a0b923820dcc509a6f75849c", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", @@ -42,7 +48,10 @@ "dynamodb": { "Keys": { "id": { - "S": "102" + "S": "101" + }, + "sk": { + "S": "0" } }, "NewImage": { @@ -52,6 +61,9 @@ "id": { "S": "101" }, + "sk": { + "S": "0" + }, "cpf": { "NULL": true } @@ -72,7 +84,10 @@ "dynamodb": { "Keys": { "id": { - "S": "101" + "S": "103" + }, + "sk": { + "S": "0" } }, "NewImage": { @@ -80,7 +95,10 @@ "S": "This item has changed" }, "id": { - "S": "101" + "S": "103" + }, + "sk": { + "S": "0" }, "assignee": { "M": { @@ -100,6 +118,9 @@ "id": { "S": "101" }, + "sk": { + "S": "0" + }, "assignee": { "M": { "name": { @@ -125,6 +146,9 @@ "Keys": { "id": { "S": "101" + }, + "sk": { + "S": "0" } }, "OldImage": { @@ -134,6 +158,9 @@ "id": { "S": "101" }, + "sk": { + "S": "0" + }, "ttl": { "N": "1710532240" } @@ -155,6 +182,9 @@ "Keys": { "id": { "S": "102" + }, + "sk": { + "S": "0" } }, "OldImage": { @@ -184,20 +214,20 @@ "dynamodb": { "ApproximateCreationDateTime": 1710529909, "Keys": { - "sk": { - "S": "0" - }, "id": { "S": "DwHRXCm5bE64rcu5VA6ai6" + }, + "sk": { + "S": "0" } }, "OldImage": { - "sk": { - "S": "0" - }, "id": { "S": "DwHRXCm5bE64rcu5VA6ai6" }, + "sk": { + "S": "0" + }, "createDate": { "S": "2024-03-15T15:44:30.374640-03:00" } diff --git a/streams-events/tests/test_docs_into_eventbus.py b/streams-events/tests/test_docs_into_eventbus.py index 5b3c4f0..457452c 100644 --- a/streams-events/tests/test_docs_into_eventbus.py +++ b/streams-events/tests/test_docs_into_eventbus.py @@ -1,5 +1,5 @@ import app.events.docs_into_eventbus as app -def test_record_handler(monkeypatch, dynamodb_stream_event, lambda_context): +def test_record_handler(dynamodb_stream_event, lambda_context): app.lambda_handler(dynamodb_stream_event, lambda_context) diff --git a/streams-events/tests/test_index_docs_into_meili.py b/streams-events/tests/test_index_docs_into_meili.py index 5ec7574..b6214d4 100644 --- a/streams-events/tests/test_index_docs_into_meili.py +++ b/streams-events/tests/test_index_docs_into_meili.py @@ -1,5 +1,5 @@ import app.events.index_docs_into_meili as app -def test_record_handler(monkeypatch, dynamodb_stream_event, lambda_context): +def test_record_handler(dynamodb_stream_event, lambda_context): app.lambda_handler(dynamodb_stream_event, lambda_context) diff --git a/streams-events/tests/test_replicate_into_meili.py b/streams-events/tests/test_replicate_into_meili.py new file mode 100644 index 0000000..9aede9f --- /dev/null +++ b/streams-events/tests/test_replicate_into_meili.py @@ -0,0 +1,5 @@ +import app.events.replicate_into_meili as app + + +def test_record_handler(dynamodb_stream_event, lambda_context): + app.lambda_handler(dynamodb_stream_event, lambda_context) diff --git a/streams-events/uv.lock b/streams-events/uv.lock index e04b650..2d1278c 100644 --- a/streams-events/uv.lock +++ b/streams-events/uv.lock @@ -215,6 +215,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7c/fc/6a8cb64e5f0324877d503c854da15d76c1e50eb722e320b15345c4d0c6de/cffi-1.17.1-cp313-cp313-win_amd64.whl", hash = "sha256:f6a16c31041f09ead72d69f583767292f750d24913dadacf5756b966aacb3f1a", size = 182009, upload-time = "2024-09-04T20:44:45.309Z" }, ] +[[package]] +name = "chardet" +version = "5.2.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/f3/0d/f7b6ab21ec75897ed80c17d79b15951a719226b9fababf1e40ea74d69079/chardet-5.2.0.tar.gz", hash = "sha256:1b3b6ff479a8c414bc3fa2c0852995695c4a026dcd6d0633b2dd092ca39c1cf7", size = 2069618, upload-time = "2023-08-01T19:23:02.662Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/38/6f/f5fbc992a329ee4e0f288c1fe0e2ad9485ed064cac731ed2fe47dcc38cbf/chardet-5.2.0-py3-none-any.whl", hash = "sha256:e1cf59446890a00105fe7b7912492ea04b6e6f06d4b742b2c788469e34c82970", size = 199385, upload-time = "2023-08-01T19:23:00.661Z" }, +] + [[package]] name = "charset-normalizer" version = "3.4.1" @@ -625,6 +634,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a3/58/35da89ee790598a0700ea49b2a66594140f44dec458c07e8e3d4979137fc/ply-3.11-py2.py3-none-any.whl", hash = "sha256:096f9b8350b65ebd2fd1346b12452efe5b9607f7482813ffca50c22722a807ce", size = 49567, upload-time = "2018-02-15T19:01:27.172Z" }, ] +[[package]] +name = "pycouchdb" +version = "1.16.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "chardet" }, + { name = "requests" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c1/b4/4f699a686a2ce14ab31cb17902693f2cf201ba51c3a6fb7aba210725c154/pycouchdb-1.16.0.tar.gz", hash = "sha256:309d71c3ce3f98bbee5731db00f514753438b81e6e7adefbb8c134312200a4f9", size = 11351, upload-time = "2024-05-29T10:00:11.726Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/4c/63/b4397a7215c089c7951afb258069cc58a06788224f1bb6a0d4f976f2d476/pycouchdb-1.16.0-py3-none-any.whl", hash = "sha256:e26ce58f626fcabbe2f5b15b3ad2b89cdd3f6d666da673632037476d1191ab67", size = 12560, upload-time = "2024-05-29T10:00:09.31Z" }, +] + [[package]] name = "pycparser" version = "2.22" @@ -908,6 +930,7 @@ version = "0.1.0" source = { virtual = "." } dependencies = [ { name = "layercake" }, + { name = "pycouchdb" }, ] [package.dev-dependencies] @@ -919,7 +942,10 @@ dev = [ ] [package.metadata] -requires-dist = [{ name = "layercake", directory = "../layercake" }] +requires-dist = [ + { name = "layercake", directory = "../layercake" }, + { name = "pycouchdb", specifier = ">=1.16.0" }, +] [package.metadata.requires-dev] dev = [