add partition key to env var

This commit is contained in:
2025-03-21 12:14:00 -03:00
parent 1e874bf106
commit b22644a165
7 changed files with 170 additions and 27 deletions

View File

@@ -1,7 +1,8 @@
import os
from abc import ABC, abstractmethod
from datetime import datetime
from ipaddress import IPv4Address
from typing import Any, Type
from typing import Any, Type, TypedDict
from aws_lambda_powertools import Logger
from boto3.dynamodb.types import TypeDeserializer, TypeSerializer
@@ -10,6 +11,11 @@ from botocore.exceptions import ClientError
from .dateutils import now, timestamp
TZ = os.getenv('TZ', 'UTC')
DELIMITER = os.getenv('DELIMITER', '#')
LIMIT = int(os.getenv('LIMIT', 25))
PARTITION_KEY = os.getenv('PARTITION_KEY', 'pk')
SORT_KEY = os.getenv('SORT_KEY', 'sk')
logger = Logger(__name__)
@@ -40,11 +46,11 @@ def deserialize(value: dict) -> dict:
return {k: deserializer.deserialize(v) for k, v in value.items()}
def Key(
def ComposeKey(
keyparts: str | tuple[str, ...],
*,
prefix: str | None = None,
delimiter: str = '#',
delimiter: str = DELIMITER,
) -> str:
"""Creates a composite key by joining string parts with a specified delimiter.
If a prefix is provided, it is added at the beginning of the key parts.
@@ -67,14 +73,49 @@ def Key(
return delimiter.join(keyparts)
class KeyPair(dict):
class PrimaryKey(ABC, dict):
@abstractmethod
def expr_attr_name(self) -> dict: ...
@abstractmethod
def expr_attr_values(self) -> dict: ...
class PartitionKey(PrimaryKey):
"""Represents a partition key for DynamoDB queries"""
def __init__(self, pk: str) -> None:
super().__init__(**{PARTITION_KEY: pk})
def expr_attr_name(self) -> dict:
return {'#pk': PARTITION_KEY}
def expr_attr_values(self) -> dict:
return {':pk': self[PARTITION_KEY]}
class KeyPair(PrimaryKey):
"""Represents a composite key (partition key and sort key) for DynamoDB queries"""
def __init__(self, pk: str, sk: str) -> None:
super().__init__(id=pk, sk=sk)
super().__init__(**{PARTITION_KEY: pk, SORT_KEY: sk})
def __repr__(self) -> str:
pk, sk = self.values()
return f'KeyPair({pk!r}, {sk!r})'
def expr_attr_name(self) -> dict:
return {
'#pk': PARTITION_KEY,
'#sk': SORT_KEY,
}
def expr_attr_values(self) -> dict:
return {
':pk': self[PARTITION_KEY],
':sk': self[SORT_KEY],
}
class TransactItems:
"""
@@ -440,20 +481,40 @@ class DynamoDBCollection:
"""
Example
-------
Get an item using a composed sort key
**Get an item using a composed sort key**
collect = DynamoDBCollection(...)
collect.get_item(
key=KeyPair(
pk='5OxmMjL-ujoR5IMGegQz',
sk=Key('sergio@somosbeta.com.br', prefix='emails'),
KeyPair(
'b3511b5a-cb32-4833-a373-f8223f2088d4',
ComposeKey('sergio@somosbeta.com.br', prefix='emails'),
),
)
**Get items using a composed partition key**
collect = DynamoDBCollection(...)
collect.get_items(
PartitionKey(
ComposeKey('b3511b5a-cb32-4833-a373-f8223f2088d4', prefix='logs')
),
)
**Get items using a key pair**
collect = DynamoDBCollection(...)
collect.get_items(
KeyPair('b3511b5a-cb32-4833-a373-f8223f2088d4', 'emails),
)
"""
class MissingError(ValueError):
pass
class PaginatedResult(TypedDict):
items: list[dict]
last_key: str | None
def __init__(
self,
persistence_layer: DynamoDBPersistenceLayer,
@@ -533,4 +594,37 @@ class DynamoDBCollection:
expr_attr_values=expr_attr_values,
)
def get_items(self): ...
def get_items(
self,
key: PartitionKey | KeyPair,
*,
expr_attr_name: dict = {},
expr_attr_values: dict = {},
start_key: str | None = None,
filter_expr: str | None = None,
index_forward: bool = False,
limit: int = LIMIT,
) -> PaginatedResult:
key_cond_expr = (
'#pk = :pk AND begins_with(#sk, :sk)'
if isinstance(key, KeyPair)
else '#pk = :pk'
)
expr_attr_name.update(key.expr_attr_name())
expr_attr_values.update(key.expr_attr_values())
response = self.persistence_layer.query(
key_cond_expr=key_cond_expr,
expr_attr_name=expr_attr_name,
expr_attr_values=expr_attr_values,
filter_expr=filter_expr,
index_forward=index_forward,
limit=limit,
# start_key=start_key if start_key else {},
)
return {
'items': response['items'],
'last_key': response['last_key'] if 'last_key' in response else None,
}