Files
saladeaula.digital/user-management/app/events/email_receiving.py

46 lines
1.2 KiB
Python

import urllib.parse as urllib_parse
from email.utils import parseaddr
from typing import Any, Iterator
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import SESEvent, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext
logger = Logger(__name__)
@logger.inject_lambda_context
@event_source(data_class=SESEvent)
def lambda_handler(event: SESEvent, context: LambdaContext) -> dict:
ses = event.record.ses
to = urllib_parse.unquote(ses.receipt.recipients[0]).lower()
name, email_from = parseaddr(get_header_value(ses.mail.headers, 'from'))
subject = get_header_value(
ses.mail.headers,
'subject',
default='',
raise_on_missing=False,
)
if email_from == 'sergio@somosbeta.com.br':
return {'disposition': 'CONTINUE'}
return {'disposition': 'STOP_RULE_SET'}
def get_header_value(
headers: Iterator,
header_name: str,
*,
default: Any = None,
raise_on_missing: bool = True,
) -> str:
for header in headers:
if header.name.lower() == header_name:
return header.value
if raise_on_missing:
raise ValueError(f'{header_name} not found.')
return default