import base64 from urllib.parse import urlparse from aws_lambda_powertools import Logger from aws_lambda_powertools.utilities.data_classes import ( EventBridgeEvent, event_source, ) from aws_lambda_powertools.utilities.typing import LambdaContext from layercake.dynamodb import DynamoDBPersistenceLayer from layercake.strutils import first_word from boto3clients import dynamodb_client, s3_client from config import ENROLLMENT_TABLE from docseal import create_submission_from_pdf logger = Logger(__name__) dyn = DynamoDBPersistenceLayer(ENROLLMENT_TABLE, dynamodb_client) SUBJECT = '{first_name}, assine seu certificado agora!' MESSAGE = """ {first_name}, Seu certificado já está pronto e aguardando apenas a sua assinatura digital. [👉 Assinar agora.]({{submitter.link}}) """ @event_source(data_class=EventBridgeEvent) @logger.inject_lambda_context def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool: new_image = event.detail['new_image'] user = new_image['user'] first_name = first_word(user['name']) file_bytes = _get_file_bytes(new_image['cert']['s3_uri']) file_base64 = base64.b64encode(file_bytes) create_submission_from_pdf( filename=new_image['id'], file=file_base64.decode('utf-8'), email_message={ 'subject': SUBJECT.format(first_name=first_name), 'body': MESSAGE.format(first_name=first_name), }, submitters=[ { 'role': 'First Party', 'name': user['name'], 'email': user['email'], 'external_id': user['id'], }, ], ) return True def _get_file_bytes(s3_uri: str) -> bytes: parsed = urlparse(s3_uri) bucket = parsed.netloc key = parsed.path.lstrip('/') r = s3_client.get_object(Bucket=bucket, Key=key) return r['Body'].read()