add docseal

This commit is contained in:
2025-11-03 18:10:17 -03:00
parent d6c26df63b
commit eca3ac42dc
11 changed files with 205 additions and 8 deletions

View File

@@ -0,0 +1,64 @@
import base64
from urllib.parse import urlparse
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import (
EventBridgeEvent,
event_source,
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from layercake.dynamodb import DynamoDBPersistenceLayer
from layercake.strutils import first_word
from boto3clients import dynamodb_client, s3_client
from config import ENROLLMENT_TABLE
from docseal import create_submission_from_pdf
logger = Logger(__name__)
dyn = DynamoDBPersistenceLayer(ENROLLMENT_TABLE, dynamodb_client)
SUBJECT = '{first_name}, assine seu certificado agora!'
MESSAGE = """
{first_name},
Seu certificado já está pronto e aguardando apenas a sua assinatura digital.
[👉 Assinar agora.]({{submitter.link}})
"""
@event_source(data_class=EventBridgeEvent)
@logger.inject_lambda_context
def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool:
new_image = event.detail['new_image']
user = new_image['user']
first_name = first_word(user['name'])
file_bytes = _get_file_bytes(new_image['cert']['s3_uri'])
file_base64 = base64.b64encode(file_bytes)
create_submission_from_pdf(
filename=new_image['id'],
file=file_base64.decode('utf-8'),
email_message={
'subject': SUBJECT.format(first_name=first_name),
'body': MESSAGE.format(first_name=first_name),
},
submitters=[
{
'role': 'Aluno',
'name': user['name'],
'email': user['email'],
},
],
)
return True
def _get_file_bytes(s3_uri: str) -> bytes:
parsed = urlparse(s3_uri)
bucket = parsed.netloc
key = parsed.path.lstrip('/')
r = s3_client.get_object(Bucket=bucket, Key=key)
return r['Body'].read()

View File

@@ -17,7 +17,6 @@ from config import (
BUCKET_NAME,
COURSE_TABLE,
ENROLLMENT_TABLE,
ESIGN_URI,
PAPERFORGE_API,
)
@@ -57,7 +56,7 @@ def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool:
if cert.get('exp_interval', 0) > 0
else None
)
s3_uri = _gen_cert(
s3_uri = _generate_cert(
enrollment_id,
cert=cert,
user=new_image['user'],
@@ -112,7 +111,7 @@ User = TypedDict('User', {'name': str, 'cpf': str})
Cert = TypedDict('Cert', {'s3_uri': NotRequired[str]})
def _gen_cert(
def _generate_cert(
id: str,
*,
score: int | float,
@@ -135,7 +134,6 @@ def _gen_cert(
data=json.dumps(
{
'template_uri': cert['s3_uri'],
'sign_uri': ESIGN_URI,
'args': {
'name': user['name'],
'cpf': _cpffmt(user['cpf']),
@@ -150,7 +148,7 @@ def _gen_cert(
},
},
),
timeout=5,
timeout=6,
)
r.raise_for_status()