add missing files

This commit is contained in:
2025-07-02 21:59:03 -03:00
parent 3bea74c420
commit b7499f58a3
16 changed files with 1755 additions and 1 deletions

11
streams-events/Makefile Normal file
View File

@@ -0,0 +1,11 @@
build:
sam build --use-container
deploy: build
sam deploy --debug
pytest:
uv run pytest
htmlcov: pytest
uv run python -m http.server 80 -d htmlcov

View File

@@ -0,0 +1,4 @@
import os
MEILISEARCH_HOST: str = os.getenv('MEILISEARCH_HOST') # type: ignore
MEILISEARCH_API_KEY: str = os.getenv('MEILISEARCH_API_KEY') # type: ignore

View File

View File

@@ -0,0 +1,34 @@
from arnparse import arnparse
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import (
DynamoDBStreamEvent,
event_source,
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from config import MEILISEARCH_API_KEY, MEILISEARCH_HOST
from meili import Op
from meilisearch import Client as Meilisearch
logger = Logger(__name__)
meili_client = Meilisearch(MEILISEARCH_HOST, MEILISEARCH_API_KEY)
@event_source(data_class=DynamoDBStreamEvent)
@logger.inject_lambda_context
def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
with Op(meili_client) as op:
for record in event.records:
pk = record.dynamodb.keys['id'] # type: ignore
new_image = record.dynamodb.new_image # type: ignore
index = table_from_arn(record.event_source_arn) # type: ignore
op.append(
index,
op=record.event_name, # type: ignore
data=new_image or pk,
)
def table_from_arn(arn: str) -> str:
arn_ = arnparse(arn)
return arn_.resource.split('/')[0]

View File

@@ -0,0 +1,62 @@
from typing import Self
from aws_lambda_powertools.shared.json_encoder import Encoder
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import (
DynamoDBRecordEventName,
)
from meilisearch import Client
class Op:
def __init__(self, client: Client) -> None:
self.op = {}
self.client = client
def __enter__(self) -> Self:
return self
def __exit__(self, *exc_details) -> None:
# When we exit, we need to keep flushing whatever's left
# until there's nothing left in our items buffer.
while self.op:
self._flush()
def _flush(self):
op = self.op
client = self.client
for index_, ops in op.items():
index = client.index(index_)
for op, doc in ops.items():
match op:
case DynamoDBRecordEventName.INSERT:
index.add_documents(doc, serializer=JSONEncoder)
case DynamoDBRecordEventName.MODIFY:
index.update_documents(doc, serializer=JSONEncoder)
case DynamoDBRecordEventName.REMOVE:
index.delete_documents(doc)
self.op = {}
def append(
self,
index: str,
/,
op: DynamoDBRecordEventName,
data: dict | str,
) -> bool:
if index not in self.op:
self.op[index] = {}
if op not in self.op[index]:
self.op[index][op] = []
return self.op[index][op].append(data)
class JSONEncoder(Encoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return super(__class__, self).default(obj)

View File

@@ -0,0 +1,24 @@
[project]
name = "streams"
version = "0.1.0"
description = "Streaming DynamoDB events to Meilisearch and EventBridge."
readme = ""
requires-python = ">=3.12"
dependencies = ["layercake"]
[dependency-groups]
dev = [
"pytest>=8.3.4",
"pytest-cov>=6.0.0",
"ruff>=0.9.1",
]
[tool.pytest.ini_options]
pythonpath = ["app/"]
addopts = "--cov --cov-report html -v"
[tool.ruff.format]
quote-style = "single"
[tool.uv.sources]
layercake = { path = "../layercake" }

View File

@@ -0,0 +1,3 @@
{
"extraPaths": ["app/"]
}

View File

@@ -0,0 +1,9 @@
version = 0.1
[default.deploy.parameters]
stack_name = "saladeaula-streams-events"
resolve_s3 = true
s3_prefix = "streams-events"
region = "sa-east-1"
confirm_changeset = false
capabilities = "CAPABILITY_IAM"
image_repositories = []

View File

@@ -0,0 +1,76 @@
AWSTemplateFormatVersion: 2010-09-09
Transform: AWS::Serverless-2016-10-31
Globals:
Function:
CodeUri: app/
Runtime: python3.13
Architectures:
- x86_64
Layers:
- !Sub arn:aws:lambda:sa-east-1:336641857101:layer:layercake:79
Environment:
Variables:
LOG_LEVEL: DEBUG
TZ: America/Sao_Paulo
POWERTOOLS_LOGGER_SAMPLE_RATE: 0.1
POWERTOOLS_LOGGER_LOG_EVENT: true
MEILISEARCH_HOST: https://meili.eduseg.com.br
MEILISEARCH_API_KEY: "{{resolve:ssm:/saladeaula/meili_api_key}}"
Resources:
MeilisearchLog:
Type: AWS::Logs::LogGroup
Properties:
RetentionInDays: 90
EventIndexDocsIntoMeiliFunction:
Type: AWS::Serverless::Function
Properties:
Handler: events.index_docs_into_meili.lambda_handler
LoggingConfig:
LogGroup: !Ref MeilisearchLog
Events:
Enrollments:
Type: DynamoDB
Properties:
Stream: !Sub arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/betaeducacao-prod-enrollments/stream/2023-08-22T22:56:55.612
StartingPosition: LATEST
MaximumRetryAttempts: 5
BatchSize: 25
FilterCriteria:
Filters:
- Pattern: '{ "dynamodb" : { "Keys" : { "sk" : { "S" : [ "0" ] } } } }'
Users:
Type: DynamoDB
Properties:
Stream: !Sub arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/betaeducacao-prod-users_d2o3r5gmm4it7j/stream/2022-06-12T21:33:25.634
StartingPosition: LATEST
MaximumRetryAttempts: 5
BatchSize: 25
FilterCriteria:
Filters:
- Pattern: '{ "dynamodb" : { "Keys" : { "sk" : { "S" : [ "0" ] } } } }'
Orders:
Type: DynamoDB
Properties:
Stream: !Sub arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/betaeducacao-prod-orders/stream/2023-09-15T18:58:50.395
StartingPosition: LATEST
MaximumRetryAttempts: 5
BatchSize: 25
FilterCriteria:
Filters:
- Pattern: '{ "dynamodb" : { "Keys" : { "sk" : { "S" : [ "0" ] } } } }'
Courses:
Type: DynamoDB
Properties:
Stream: !Sub arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/saladeaula_courses/stream/2025-03-12T20:42:46.706
StartingPosition: LATEST
MaximumRetryAttempts: 5
BatchSize: 25
FilterCriteria:
Filters:
- Pattern: '{ "dynamodb" : { "Keys" : { "sk" : { "S" : [ "0" ] } } } }'

View File

View File

@@ -0,0 +1,34 @@
import json
import os
from dataclasses import dataclass
import pytest
# https://docs.pytest.org/en/7.1.x/reference/reference.html#pytest.hookspec.pytest_configure
def pytest_configure():
os.environ['TZ'] = 'America/Sao_Paulo'
os.environ['MEILISEARCH_HOST'] = 'http://127.0.0.1:7700'
def load_jsonfile(path: str) -> dict:
with open(path) as fp:
return json.load(fp)
@dataclass
class LambdaContext:
function_name: str = 'test'
memory_limit_in_mb: int = 128
invoked_function_arn: str = 'arn:aws:lambda:eu-west-1:809313241:function:test'
aws_request_id: str = '52fdfc07-2182-154f-163f-5f0f9a621d72'
@pytest.fixture
def lambda_context() -> LambdaContext:
return LambdaContext()
@pytest.fixture
def dynamodb_stream_event():
return load_jsonfile('tests/samples/dynamodb_stream_event.json')

View File

@@ -0,0 +1,212 @@
{
"Records": [
{
"eventID": "c4ca4238a0b923820dcc509a6f75849b",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "us-east-1",
"dynamodb": {
"Keys": {
"id": {
"S": "102"
}
},
"NewImage": {
"message": {
"S": "New item!!"
},
"id": {
"S": "102"
},
"cpf": {
"NULL": true
},
"tenant:org_id": {
"SS": ["5OxmMjL-ujoR5IMGegQz"]
}
},
"ApproximateCreationDateTime": 1428537600,
"SequenceNumber": "4421584500000000017450439091",
"SizeBytes": 26,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/example_table_with_stream/stream/2015-06-27T00:48:05.899"
},
{
"eventID": "c4ca4238a0b923820dcc509a6f75849b",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "us-east-1",
"dynamodb": {
"Keys": {
"id": {
"S": "102"
}
},
"NewImage": {
"message": {
"S": "New item!"
},
"id": {
"S": "101"
},
"cpf": {
"NULL": true
}
},
"ApproximateCreationDateTime": 1428537600,
"SequenceNumber": "4421584500000000017450439091",
"SizeBytes": 26,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/example_table_with_stream/stream/2015-06-27T00:48:05.899"
},
{
"eventID": "c81e728d9d4c2f636f067f89cc14862c",
"eventName": "MODIFY",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "us-east-1",
"dynamodb": {
"Keys": {
"id": {
"S": "101"
}
},
"NewImage": {
"message": {
"S": "This item has changed"
},
"id": {
"S": "101"
},
"assignee": {
"M": {
"name": {
"S": "Sérgio R Siqueira"
}
}
},
"cpf": {
"S": "07879819908"
}
},
"OldImage": {
"message": {
"S": "New item!"
},
"id": {
"S": "101"
},
"assignee": {
"M": {
"name": {
"S": "Sérgio R Siqueira"
}
}
}
},
"ApproximateCreationDateTime": 1428537600,
"SequenceNumber": "4421584500000000017450439092",
"SizeBytes": 59,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/example_table_with_stream/stream/2015-06-27T00:48:05.899"
},
{
"eventID": "eccbc87e4b5ce2fe28308fd9f2a7baf3",
"eventName": "REMOVE",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "us-east-1",
"dynamodb": {
"Keys": {
"id": {
"S": "101"
}
},
"OldImage": {
"message": {
"S": "This item has changed"
},
"id": {
"S": "101"
},
"ttl": {
"N": "1710532240"
}
},
"ApproximateCreationDateTime": 1428537600,
"SequenceNumber": "4421584500000000017450439093",
"SizeBytes": 38,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/example_table_with_stream/stream/2015-06-27T00:48:05.899"
},
{
"eventID": "eccbc87e4b5ce2fe28308fd9f2a7baf3",
"eventName": "REMOVE",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "us-east-1",
"dynamodb": {
"Keys": {
"id": {
"S": "102"
}
},
"OldImage": {
"message": {
"S": "This item has changed"
},
"id": {
"S": "102"
},
"ttl": {
"N": "2530997445"
}
},
"ApproximateCreationDateTime": 1428537600,
"SequenceNumber": "4421584500000000017450439093",
"SizeBytes": 38,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/example_table_with_stream/stream/2015-06-27T00:48:05.899"
},
{
"eventID": "bbb152116867ab05f3abfcadd4873bee",
"eventName": "REMOVE",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "sa-east-1",
"dynamodb": {
"ApproximateCreationDateTime": 1710529909,
"Keys": {
"sk": {
"S": "0"
},
"id": {
"S": "DwHRXCm5bE64rcu5VA6ai6"
}
},
"OldImage": {
"sk": {
"S": "0"
},
"id": {
"S": "DwHRXCm5bE64rcu5VA6ai6"
},
"createDate": {
"S": "2024-03-15T15:44:30.374640-03:00"
}
},
"SequenceNumber": "3173521300000000009361288070",
"SizeBytes": 156,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"eventSourceARN": "arn:aws:dynamodb:sa-east-1:336641857101:table/betaeducacao-prod-users_d2o3r5gmm4it7j/stream/2022-06-12T21:33:25.634"
}
]
}

View File

@@ -0,0 +1,5 @@
import app.events.index_docs_into_meili as app
def test_record_handler(monkeypatch, dynamodb_stream_event, lambda_context):
app.lambda_handler(dynamodb_stream_event, lambda_context)

1274
streams-events/uv.lock generated Normal file

File diff suppressed because it is too large Load Diff