76 lines
1.7 KiB
Python
76 lines
1.7 KiB
Python
from typing import TYPE_CHECKING, TypedDict
|
|
|
|
from aws_lambda_powertools import Logger
|
|
from layercake.dateutils import now
|
|
from layercake.dynamodb import DynamoDBPersistenceLayer
|
|
from layercake.email_ import Message
|
|
from layercake.strutils import first_word, truncate_str
|
|
|
|
logger = Logger(__name__)
|
|
|
|
if TYPE_CHECKING:
|
|
from mypy_boto3_sesv2 import SESV2Client
|
|
else:
|
|
SESV2Client = object
|
|
|
|
Event = TypedDict('Event', {'id': str, 'sk': str})
|
|
|
|
|
|
def send_email(
|
|
to: tuple[str, str],
|
|
subject: str,
|
|
message: str,
|
|
context: dict = {},
|
|
*,
|
|
sender: tuple[str, str],
|
|
event: Event,
|
|
sesv2_client: SESV2Client,
|
|
dynamodb_persistence_layer: DynamoDBPersistenceLayer,
|
|
) -> bool:
|
|
now_ = now()
|
|
name, _ = to
|
|
emailmsg = Message(
|
|
from_=sender,
|
|
to=to,
|
|
subject=subject.format(
|
|
course=truncate_str(context['course']),
|
|
),
|
|
)
|
|
emailmsg.add_alternative(
|
|
message.format(
|
|
first_name=first_word(name),
|
|
course=context['course'],
|
|
)
|
|
)
|
|
|
|
try:
|
|
sesv2_client.send_email(
|
|
Content={
|
|
'Raw': {
|
|
'Data': emailmsg.as_bytes(),
|
|
},
|
|
}
|
|
)
|
|
dynamodb_persistence_layer.put_item(
|
|
item={
|
|
'id': event['id'],
|
|
'sk': f'{event["sk"]}#EXECUTED',
|
|
'created_at': now_,
|
|
}
|
|
)
|
|
logger.info('Email sent')
|
|
except Exception as exc:
|
|
logger.exception(exc)
|
|
|
|
dynamodb_persistence_layer.put_item(
|
|
item={
|
|
'id': event['id'],
|
|
'sk': f'{event["sk"]}#FAILED',
|
|
'created_at': now_,
|
|
}
|
|
)
|
|
|
|
return False
|
|
|
|
return True
|