add dynamodb collection

This commit is contained in:
2025-03-21 09:20:57 -03:00
parent 76277d17b9
commit a58e782ddb
6 changed files with 201 additions and 50 deletions

View File

@@ -1,6 +1,6 @@
from datetime import datetime
from ipaddress import IPv4Address
from typing import Any
from typing import Any, Type
from aws_lambda_powertools import Logger
from boto3.dynamodb.types import TypeDeserializer, TypeSerializer
@@ -9,53 +9,66 @@ from botocore.exceptions import ClientError
logger = Logger(__name__)
def _serialize_python_types(obj: Any) -> str | dict | list:
match obj:
def _serialize_python_type(value: Any) -> str | dict | list:
match value:
case datetime():
return obj.isoformat()
return value.isoformat()
case IPv4Address():
return str(obj)
return str(value)
case list() | tuple():
return [_serialize_python_types(v) for v in obj]
return [_serialize_python_type(v) for v in value]
case dict():
return {k: _serialize_python_types(v) for k, v in obj.items()}
return {k: _serialize_python_type(v) for k, v in value.items()}
case _:
return obj
return value
def serialize(obj: dict) -> dict:
def serialize(value: dict) -> dict:
serializer = TypeSerializer()
return {k: serializer.serialize(_serialize_python_types(v)) for k, v in obj.items()}
return {
k: serializer.serialize(_serialize_python_type(v)) for k, v in value.items()
}
def deserialize(obj: dict) -> dict:
def deserialize(value: dict) -> dict:
deserializer = TypeDeserializer()
return {k: deserializer.deserialize(v) for k, v in obj.items()}
return {k: deserializer.deserialize(v) for k, v in value.items()}
def Key(
val: str | tuple[str, ...],
keyparts: str | tuple[str, ...],
*,
prefix: str | None = None,
delimiter: str = '#',
) -> str:
if not prefix and not isinstance(val, tuple):
return val
"""Creates a composite key by joining string parts with a specified delimiter.
If a prefix is provided, it is added at the beginning of the key parts.
if isinstance(val, str):
val = (val,)
Example
-------
>>> Key(('abc', 'xyz'), prefix='examples', delimiter='#')
'examples#abc#xyz'
"""
if not prefix and not isinstance(keyparts, tuple):
return keyparts
if isinstance(keyparts, str):
keyparts = (keyparts,)
if prefix:
val = (prefix,) + val
keyparts = (prefix,) + keyparts
return delimiter.join(val)
return delimiter.join(keyparts)
def KeyPair(pk: str, sk: str) -> dict[str, str]:
return {
'id': pk,
'sk': sk,
}
class KeyPair(dict):
def __init__(self, pk: str, sk: str) -> None:
super().__init__(id=pk, sk=sk)
def __repr__(self) -> str:
pk, sk = self.values()
return f'KeyPair({pk!r}, {sk!r})'
class TransactItems:
@@ -416,3 +429,42 @@ class DynamoDBPersistenceLayer:
raise
else:
return True
class DynamoDBCollection:
class MissingError(ValueError):
pass
def __init__(
self,
persistence_layer: DynamoDBPersistenceLayer,
exception_cls: Type[ValueError] = MissingError,
) -> None:
if not issubclass(exception_cls, ValueError):
raise TypeError(
f'exception_cls must be a subclass of ValueError, got {exception_cls}'
)
self.persistence_layer = persistence_layer
self.exception_cls = exception_cls
def get_item(
self,
key: KeyPair,
path_spec: str | None = None,
raise_if_missing: bool = True,
default: Any = None,
delimiter: str = '#',
) -> Any:
exc_cls = self.exception_cls
data = self.persistence_layer.get_item(key)
if raise_if_missing and not data:
raise exc_cls(f'Item with {key} not found.')
if path_spec and data:
from glom import glom
return glom(data, path_spec, default=default)
return data or default

View File

@@ -0,0 +1,18 @@
import json
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Generator
@contextmanager
def readlines(path: Path | str) -> Generator[Any, None, None]:
"""Return the lines from a JSON."""
if isinstance(path, str):
path = Path(path)
if not path.exists():
yield iter(())
return None
with open(path) as fp:
yield (json.loads(line) for line in fp)