fix
This commit is contained in:
74
streams-events/app/events/docs_into_eventbus.py
Normal file
74
streams-events/app/events/docs_into_eventbus.py
Normal file
@@ -0,0 +1,74 @@
|
||||
import json
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import boto3
|
||||
from aws_lambda_powertools import Logger, Tracer
|
||||
from aws_lambda_powertools.shared.json_encoder import Encoder
|
||||
from aws_lambda_powertools.utilities.batch import (
|
||||
BatchProcessor,
|
||||
EventType,
|
||||
process_partial_response,
|
||||
)
|
||||
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import (
|
||||
DynamoDBRecord,
|
||||
DynamoDBRecordEventName,
|
||||
)
|
||||
from aws_lambda_powertools.utilities.typing import LambdaContext
|
||||
from layercake.dateutils import now, ttl
|
||||
from utils import diff, table_from_arn
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from mypy_boto3_events.client import EventBridgeClient
|
||||
else:
|
||||
EventBridgeClient = object
|
||||
|
||||
client: EventBridgeClient = boto3.client('events')
|
||||
processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
|
||||
tracer = Tracer()
|
||||
logger = Logger()
|
||||
|
||||
|
||||
@tracer.capture_method
|
||||
def record_handler(record: DynamoDBRecord):
|
||||
detail_type = record.raw_event['eventName']
|
||||
table_name: str = table_from_arn(record.event_source_arn) # type: ignore
|
||||
new_image: dict = record.dynamodb.new_image # type: ignore
|
||||
old_image: dict = record.dynamodb.old_image # type: ignore
|
||||
record_ttl: int = old_image.get('ttl') # type: ignore
|
||||
modified = diff(new_image, old_image)
|
||||
now_ = now()
|
||||
|
||||
# Should be EXPIRE if event is REMOVE and TTL has elapsed
|
||||
if record.event_name is DynamoDBRecordEventName.REMOVE and ttl() >= record_ttl:
|
||||
detail_type = 'EXPIRE'
|
||||
|
||||
detail = {
|
||||
'keys': record.dynamodb.keys, # type: ignore
|
||||
'new_image': new_image,
|
||||
'old_image': old_image,
|
||||
'modified': modified,
|
||||
}
|
||||
|
||||
result = client.put_events(
|
||||
Entries=[
|
||||
{
|
||||
'Source': record.event_source, # type: ignore
|
||||
'DetailType': detail_type,
|
||||
'Resources': [table_name],
|
||||
'Detail': json.dumps(detail, cls=Encoder),
|
||||
'Time': now_,
|
||||
}
|
||||
]
|
||||
)
|
||||
logger.info('Event result', result=result)
|
||||
|
||||
|
||||
@logger.inject_lambda_context
|
||||
@tracer.capture_lambda_handler
|
||||
def lambda_handler(event: dict, context: LambdaContext):
|
||||
return process_partial_response(
|
||||
event=event,
|
||||
record_handler=record_handler,
|
||||
processor=processor,
|
||||
context=context,
|
||||
)
|
||||
@@ -1,5 +1,4 @@
|
||||
from arnparse import arnparse
|
||||
from aws_lambda_powertools import Logger
|
||||
from aws_lambda_powertools import Logger, Tracer
|
||||
from aws_lambda_powertools.utilities.data_classes import (
|
||||
DynamoDBStreamEvent,
|
||||
event_source,
|
||||
@@ -8,13 +7,16 @@ from aws_lambda_powertools.utilities.typing import LambdaContext
|
||||
from config import MEILISEARCH_API_KEY, MEILISEARCH_HOST
|
||||
from meili import Op
|
||||
from meilisearch import Client as Meilisearch
|
||||
from utils import table_from_arn
|
||||
|
||||
logger = Logger(__name__)
|
||||
tracer = Tracer()
|
||||
meili_client = Meilisearch(MEILISEARCH_HOST, MEILISEARCH_API_KEY)
|
||||
|
||||
|
||||
@event_source(data_class=DynamoDBStreamEvent)
|
||||
@logger.inject_lambda_context
|
||||
@tracer.capture_lambda_handler
|
||||
def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
|
||||
with Op(meili_client) as op:
|
||||
for record in event.records:
|
||||
@@ -29,12 +31,7 @@ def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
|
||||
)
|
||||
|
||||
|
||||
def table_from_arn(arn: str) -> str:
|
||||
arn_ = arnparse(arn)
|
||||
return arn_.resource.split('/')[0]
|
||||
|
||||
|
||||
# Post-migration: remove the following lines
|
||||
# Post-migration: remove the following function
|
||||
def sanitize(obj):
|
||||
if isinstance(obj, dict):
|
||||
return {k.replace(':', '__'): sanitize(v) for k, v in obj.items()}
|
||||
|
||||
Reference in New Issue
Block a user