add layercake

This commit is contained in:
2025-03-20 17:46:25 -03:00
parent bc8b9805e3
commit a41f90f3a9
15 changed files with 1497 additions and 0 deletions

View File

@@ -0,0 +1,2 @@
def hello() -> str:
return "Hello from layercake!"

View File

@@ -0,0 +1,49 @@
import os
from datetime import datetime, timedelta
import pytz
TZ = os.getenv('TZ', 'UTC')
def now(tz: str = TZ) -> datetime:
"""Returns the current datetime in the specified timezone.
If no timezone is provided, returns the current datetime in the local timezone.
"""
return datetime.now(pytz.timezone(tz))
def ttl(
*,
start_dt: datetime | None = None,
tz: str = TZ,
**kwargs,
) -> int:
"""Calculates the timestamp for a datetime in the future.
If start_dt is not provided, it uses the current datetime.
Additional keyword arguments specify the timedelta to add.
"""
if not start_dt:
start_dt = now(tz)
dt = start_dt + timedelta(**kwargs)
return int(dt.timestamp())
def timestamp(dt: datetime) -> int:
"""Converts a datetime object to a Unix timestamp."""
return int(dt.timestamp())
def fromisoformat(dt: str) -> datetime | None:
"""Parses a datetime string in ISO 8601 format and returns a datetime object.
Returns None if the string is not in a valid ISO 8601 format.
"""
try:
date = datetime.fromisoformat(dt)
except ValueError:
return None
else:
return date

View File

@@ -0,0 +1,400 @@
from datetime import datetime
from ipaddress import IPv4Address
from typing import Any
from aws_lambda_powertools import Logger
from boto3.dynamodb.types import TypeDeserializer, TypeSerializer
from botocore.exceptions import ClientError
logger = Logger(__name__)
def _serialize(v):
if isinstance(v, datetime):
return v.isoformat()
if isinstance(v, IPv4Address):
return str(v)
if isinstance(v, (list, tuple)):
return [_serialize(x) for x in v]
if isinstance(v, dict):
return {k: _serialize(dv) for k, dv in v.items()}
return v
def serialize(obj: dict) -> dict:
return {k: TypeSerializer().serialize(_serialize(v)) for k, v in obj.items()}
def deserialize(obj: dict) -> dict:
return {k: TypeDeserializer().deserialize(v) for k, v in obj.items()}
def Key(pk: str, sk: str) -> dict[str, str]:
return {
'id': pk,
'sk': sk,
}
class TransactItems:
"""
Documentation:
--------------
- https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html
- https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactGetItems.html
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.transact_write_items
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.transact_get_items
"""
def __init__(self, table_name: str) -> None:
self.table_name = table_name
self.items: list[dict] = []
def put(
self,
*,
item: dict,
table_name: str | None = None,
cond_expr: str | None = None,
) -> None:
attrs: dict = {}
if cond_expr:
attrs['ConditionExpression'] = cond_expr
if not table_name:
table_name = self.table_name
self.items.append(
dict(
Put=dict(
TableName=table_name,
Item=serialize(item),
**attrs,
)
)
)
def update(
self,
*,
key: dict,
update_expr: str,
cond_expr: str | None = None,
table_name: str | None = None,
expr_attr_names: dict = {},
expr_attr_values: dict = {},
) -> None:
attrs: dict = {}
if cond_expr:
attrs['ConditionExpression'] = cond_expr
if expr_attr_names:
attrs['ExpressionAttributeNames'] = expr_attr_names
if expr_attr_values:
attrs['ExpressionAttributeValues'] = serialize(expr_attr_values)
if not table_name:
table_name = self.table_name
self.items.append(
dict(
Update=dict(
TableName=table_name,
Key=serialize(key),
UpdateExpression=update_expr,
**attrs,
)
)
)
def get(
self,
*,
table_name: str | None = None,
key: dict,
expr_attr_names: str | None = None,
) -> None:
attrs: dict = {}
if expr_attr_names:
attrs['ExpressionAttributeNames'] = expr_attr_names
if not table_name:
table_name = self.table_name
self.items.append(
dict(
Get=dict(
TableName=table_name,
Key=serialize(key),
**attrs,
)
)
)
def delete(
self,
*,
key: dict,
table_name: str | None = None,
cond_expr: str | None = None,
expr_attr_names: dict = {},
expr_attr_values: dict = {},
) -> None:
attrs: dict = {}
if cond_expr:
attrs['ConditionExpression'] = cond_expr
if expr_attr_names:
attrs['ExpressionAttributeNames'] = expr_attr_names
if expr_attr_values:
attrs['ExpressionAttributeValues'] = serialize(expr_attr_values)
if not table_name:
table_name = self.table_name
self.items.append(
dict(
Delete=dict(
TableName=table_name,
Key=serialize(key),
**attrs,
)
)
)
def condition(
self,
*,
key: dict,
cond_expr: str,
table_name: str | None = None,
expr_attr_names: dict = {},
expr_attr_values: dict = {},
) -> None:
attrs: dict = {'ConditionExpression': cond_expr}
if expr_attr_names:
attrs['ExpressionAttributeNames'] = expr_attr_names
if expr_attr_values:
attrs['ExpressionAttributeValues'] = serialize(expr_attr_values)
if not table_name:
table_name = self.table_name
self.items.append(
dict(
ConditionCheck=dict(
TableName=table_name,
Key=serialize(key),
**attrs,
)
)
)
class DynamoDBPersistenceLayer:
"""
Documentation:
--------------
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.query
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.get_item
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.put_item
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.update_item
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.delete_item
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.transact_get_items
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.transact_write_items
"""
def __init__(self, table_name: str, dynamodb_client) -> None:
self.table_name = table_name
self.dynamodb_client = dynamodb_client
def query(
self,
*,
key_cond_expr: str,
expr_attr_name: dict = {},
expr_attr_values: dict = {},
start_key: dict = {},
filter_expr: str | None = None,
limit: int | None = None,
index_forward: bool = True,
) -> dict[str, Any]:
"""You must provide the name of the partition key attribute and a single value for that attribute.
Query returns all items with that partition key value.
Optionally, you can provide a sort key attribute and use a comparison operator to refine the search results.
...
A `Query` operation always returns a result set. If no matching items are found, the result set will be empty.
Queries that do not return results consume the minimum number of read capacity units for that type of read operation.
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/query.html
"""
attrs: dict = {'ScanIndexForward': index_forward}
if expr_attr_name:
attrs['ExpressionAttributeNames'] = expr_attr_name
if expr_attr_values:
attrs['ExpressionAttributeValues'] = serialize(expr_attr_values)
if start_key:
attrs['ExclusiveStartKey'] = start_key
if filter_expr:
attrs['FilterExpression'] = filter_expr
if limit:
attrs['Limit'] = limit
try:
response = self.dynamodb_client.query(
TableName=self.table_name,
KeyConditionExpression=key_cond_expr,
**attrs,
)
except ClientError as err:
logger.exception(err)
raise
else:
return dict(
items=[deserialize(v) for v in response.get('Items', [])],
last_key=response.get('LastEvaluatedKey', None),
)
def get_item(self, key: dict) -> dict:
"""The GetItem operation returns a set of attributes for the item with the given primary key.
If there is no matching item, GetItem does not return any data and there will be no Item element in the response.
"""
try:
response = self.dynamodb_client.get_item(
TableName=self.table_name,
Key=serialize(key),
)
except ClientError as err:
logger.exception(err)
raise
else:
return deserialize(response.get('Item', {}))
def put_item(self, item: dict, *, cond_expr: str | None = None) -> bool:
attrs = {}
if cond_expr:
attrs['ConditionExpression'] = cond_expr
try:
self.dynamodb_client.put_item(
TableName=self.table_name,
Item=serialize(item),
**attrs,
)
except ClientError as err:
logger.exception(err)
raise
else:
return True
def update_item(
self,
key: dict,
*,
update_expr: str,
cond_expr: str | None = None,
expr_attr_names: dict | None = None,
expr_attr_values: dict | None = None,
) -> bool:
attrs: dict = {}
if cond_expr:
attrs['ConditionExpression'] = cond_expr
if expr_attr_names:
attrs['ExpressionAttributeNames'] = expr_attr_names
if expr_attr_values:
attrs['ExpressionAttributeValues'] = serialize(expr_attr_values)
try:
self.dynamodb_client.update_item(
TableName=self.table_name,
Key=serialize(key),
UpdateExpression=update_expr,
**attrs,
)
except ClientError as err:
logger.exception(err)
raise
else:
return True
def delete_item(
self,
key: dict,
*,
cond_expr: str | None = None,
expr_attr_names: dict | None = None,
expr_attr_values: dict | None = None,
) -> bool:
"""Deletes a single item in a table by primary key. You can perform a conditional delete operation that deletes
the item if it exists, or if it has an expected attribute value.
"""
attrs: dict = {}
if cond_expr:
attrs['ConditionExpression'] = cond_expr
if expr_attr_names:
attrs['ExpressionAttributeNames'] = expr_attr_names
if expr_attr_values:
attrs['ExpressionAttributeValues'] = serialize(expr_attr_values)
try:
self.dynamodb_client.delete_item(
TableName=self.table_name, Key=serialize(key), **attrs
)
except ClientError as err:
logger.exception(err)
raise
else:
return True
def transact_get_items(self, transact_items: TransactItems) -> list[dict]:
try:
response = self.dynamodb_client.transact_get_items(
TransactItems=transact_items.items
)
except ClientError as err:
logger.exception(err)
raise
else:
return [
deserialize(response.get('Item', {}))
for response in response.get('Responses', [])
]
def transact_write_items(self, transact_items: TransactItems) -> bool:
try:
self.dynamodb_client.transact_write_items(
TransactItems=transact_items.items
)
except ClientError as err:
logger.exception(err)
raise
else:
return True

View File

@@ -0,0 +1,158 @@
import re
from datetime import date
from typing import TYPE_CHECKING, Annotated, Any
import ftfy
from pycpfcnpj import cpfcnpj
from pydantic import BaseModel, Field, GetCoreSchemaHandler
from pydantic_core import CoreSchema, core_schema
from pydantic_extra_types.payment import PaymentCardNumber
if TYPE_CHECKING:
NameStr = Annotated[str, ...]
else:
class NameStr(str):
"""
>>> class User(BaseModel):
... name: NameStr
>>> User(name='Sérgio R Siqueira ').name
'Sérgio R Siqueira'
>>> User(name=' Sérgio R Siqueira ').name
'Sérgio R Siqueira'
>>> User(name='Siqueira')
Traceback (most recent call last):
...
"""
@classmethod
def __get_pydantic_core_schema__(
cls, _source: type[Any], _handler: GetCoreSchemaHandler
) -> CoreSchema:
return core_schema.no_info_after_validator_function(
cls._validate, core_schema.str_schema()
)
@classmethod
def _validate(cls, __input_value: str) -> str:
name = ftfy.fix_text(__input_value.strip())
if ' ' not in name:
raise ValueError('Invalid name.')
return name
class PaymentCardValidation:
"""
>>> class CreditCard(BaseModel):
... exp: PaymentCardValidation
>>> CreditCard(exp='20/23')
Traceback (most recent call last):
...
pydantic_core._pydantic_core.ValidationError: 1 validation error for CreditCard
exp
Value error, month must be in 1..12 [type=value_error, input_value='20/23', input_type=str]
For further information visit https://errors.pydantic.dev/2.4/v/value_error
>>> CreditCard(exp='12/23')
CreditCard(exp=datetime.date(2023, 12, 1))
>>> CreditCard(exp='12/2024')
CreditCard(exp=datetime.date(2024, 12, 1))
>>> CreditCard(exp='2024-12-02')
CreditCard(exp=datetime.date(2024, 12, 2))
"""
@classmethod
def __get_pydantic_core_schema__(
cls, _source: type[Any], _handler: GetCoreSchemaHandler
) -> CoreSchema:
return core_schema.no_info_after_validator_function(
cls._validate, core_schema.str_schema()
)
@classmethod
def _validate(cls, __input_value: str) -> date:
if '/' in __input_value:
month, year = __input_value.split('/')
return date(int(f'20{year[-2:]}'), int(month), 1)
try:
return date.fromisoformat(__input_value)
except Exception:
raise ValueError('Invalid card expiration date.')
class CreditCard(BaseModel):
name: NameStr
number: PaymentCardNumber
cvv: str = Field(..., min_length=3)
exp: PaymentCardValidation
@property
def brand(self) -> str:
return self.number.brand
@property
def last4(self) -> str:
return self.number.last4
@property
def first_name(self) -> str:
first_name, _ = self.name.split(' ', 1)
return first_name
@property
def last_name(self) -> str:
_, last_name = self.name.split(' ', 1)
return last_name
class CpfCnpj:
@classmethod
def __get_pydantic_core_schema__(
cls, _source: type[Any], _handler: GetCoreSchemaHandler
) -> CoreSchema:
return core_schema.no_info_after_validator_function(
cls._validate, core_schema.str_schema()
)
@classmethod
def _validate(cls, __input_value: str) -> str | None:
if __input_value == '':
return None
cpfcnpj_ = re.sub(r'\D', '', __input_value)
if not cpfcnpj.validate(cpfcnpj_):
raise ValueError(f'Invalid {cls.__name__}.')
return cpfcnpj_
if TYPE_CHECKING:
CpfStr = Annotated[str, ...]
CnpjStr = Annotated[str, ...]
else:
class CpfStr(CpfCnpj):
...
class CnpjStr(CpfCnpj):
...
if __name__ == '__main__':
import doctest
doctest.testmod()

View File

@@ -0,0 +1,20 @@
from typing import Any
def pick(
keys: list[str] | tuple[str, ...],
dct: dict[str, Any],
exclude_none: bool = True,
default: Any = None,
) -> dict[str, Any]:
"""Returns a partial copy of an object containing only the keys specified."""
return {
k: dct.get(k, default) for k in keys if k in dct or not exclude_none
}
def omit(
keys: list[str] | tuple[str, ...], dct: dict[str, Any]
) -> dict[str, Any]:
"""Returns a partial copy of an object omitting the keys specified."""
return {k: dct[k] for k in dct.keys() if k not in keys}