import base64 import json import os import urllib.parse as urlparse from datetime import datetime from enum import Enum from typing import Any, Type from glom import glom from layercake.dateutils import now, timestamp from layercake.dynamodb import DynamoDBPersistenceLayer, Key DELIMITER = '#' TZ = os.getenv('TZ', 'UTC') def json_b64encode(obj: dict) -> str: s = json.dumps(obj).encode('utf-8') s = base64.urlsafe_b64encode(s).decode('utf-8') return urlparse.quote(s) def json_b64decode(s: str) -> dict: obj = urlparse.unquote(s).encode('utf-8') obj = base64.urlsafe_b64decode(obj).decode('utf-8') return json.loads(obj) class KeyLoc(Enum): PK = 'pk' # Partition Key SK = 'sk' # Sort Key def _join_prefix( prefix: str | tuple[str, ...], delimiter: str, *args, ) -> str: """ Joins a prefix (or prefixes) with additional arguments using a specified delimiter. Parameters ---------- prefix : str | tuple[str, ...] A prefix or a tuple of prefixes to be added. delimiter : str Delimiter to use for joining the strings. *args Additional strings to be joined with the prefix. Returns ------- str A single string with the prefix and additional arguments joined by the delimiter. """ if isinstance(prefix, str): prefix = (prefix,) return delimiter.join(prefix + args) def _keys( *, pk: str, sk: str, prefix: str | tuple[str, ...], keyloc_prefix: KeyLoc, delimiter: str, ) -> dict: """ Parameters ---------- keyloc_prefix : str | tuple[str, ...] Specifies whether to add the prefix to the Partition Key or Sort Key. delimiter : str Delimiter used to join the prefix and the key. Returns ------- dict A dict with keys id and sk representing the constructed keys for the DynamoDB item. """ if keyloc_prefix == KeyLoc.PK: pk = _join_prefix(prefix, delimiter, pk) sk = sk else: pk = pk sk = _join_prefix(prefix, delimiter, sk) return { 'id': pk, 'sk': sk, } def _ttl_kwargs(ttl: datetime | int | None = None, tz=None) -> dict: if not ttl: return {} if isinstance(ttl, int): return { 'ttl': ttl, 'ttl_date': datetime.fromtimestamp(ttl, tz), } return { 'ttl': timestamp(ttl), 'ttl_date': ttl, } class MissingRecordError(ValueError): pass def get_record( pk: str, sk: str | tuple[str, ...], *, glom_spec: str | None = None, raise_on_missing: bool = True, default_on_missing: Any = None, missing_cls: Type[Exception] = MissingRecordError, delimiter: str = DELIMITER, persistence_layer: DynamoDBPersistenceLayer, ) -> Any: if not issubclass(missing_cls, Exception): raise TypeError( f'missing_cls must be a subclass of Exception, got {missing_cls}' ) record = persistence_layer.get_item( Key( pk, sk if isinstance(sk, str) else delimiter.join(sk), ) ) if raise_on_missing and not record: raise missing_cls(f'Record with Key({pk}, {sk}) not found.') if glom_spec and record: return glom(record, glom_spec, default=default_on_missing) return record or default_on_missing def add_record( pk: str, sk: str, prefix: str | tuple[str, ...], *, keyloc_prefix: KeyLoc = KeyLoc.SK, ttl: datetime | int | None = None, delimiter: str = DELIMITER, persistence_layer: DynamoDBPersistenceLayer, **kwargs: Any, ) -> bool: """ Parameters ---------- keyloc_prefix: str | tuple[str, ...] Specifies whether to add the prefix to the Partition Key or Sort Key. delimiter: str Delimiter used to join the prefix and the key. Returns ------- bool """ current_time = now(TZ) keys = _keys( pk=pk, sk=sk, prefix=prefix, keyloc_prefix=keyloc_prefix, delimiter=delimiter, ) return persistence_layer.put_item( item=keys | kwargs | {'create_date': current_time} | _ttl_kwargs(ttl, current_time.tzinfo) ) def del_record( pk: str, sk: str, prefix: str | tuple[str, ...], *, keyloc_prefix: KeyLoc = KeyLoc.SK, delimiter: str = DELIMITER, persistence_layer: DynamoDBPersistenceLayer, ) -> bool: """ Parameters ---------- keyloc_prefix: str | tuple[str, ...] Specifies whether to add the prefix to the Partition Key or Sort Key. delimiter: str Delimiter used to join the prefix and the key. Returns ------- bool """ return persistence_layer.delete_item( key=_keys( pk=pk, sk=sk, prefix=prefix, keyloc_prefix=keyloc_prefix, delimiter=delimiter, ) ) def get_records( pk: str, prefix: str | tuple[str, ...], *, keyloc_prefix: KeyLoc = KeyLoc.SK, expr_attr_name: dict = {}, expr_attr_values: dict = {}, limit: int = 10, start_key: str | None = None, filter_expr: str | None = None, delimiter: str = DELIMITER, persistence_layer: DynamoDBPersistenceLayer, ) -> dict[str, Any]: """ Parameters ---------- pk: str Partition Key value. keyloc_prefix: str | tuple[str, ...] Specifies whether to add the prefix to the Partition Key or Sort Key. delimiter: str Delimiter used to join the prefix and the key. Returns ------- dict """ prefix_ = _join_prefix(prefix, delimiter) pk = f'{prefix_}{delimiter}{pk}' if keyloc_prefix == KeyLoc.PK else pk sk = f'{prefix_}{delimiter}' key_cond_expr = ( 'id = :pk' if keyloc_prefix == KeyLoc.PK else 'id = :pk AND begins_with(sk, :sk)' ) expr_attr_values_ = ( {':pk': pk} if keyloc_prefix == KeyLoc.PK else { ':pk': pk, ':sk': sk, } ) result = persistence_layer.query( key_cond_expr=key_cond_expr, expr_attr_name=expr_attr_name, expr_attr_values=expr_attr_values_ | expr_attr_values, filter_expr=filter_expr, limit=limit, index_forward=False, start_key=json_b64decode(start_key) if start_key else {}, ) items = ( result['items'] if keyloc_prefix == KeyLoc.PK # Removes the prefix from sk else [x | {'sk': x['sk'].removeprefix(sk)} for x in result['items']] ) return { 'items': items, 'last_key': json_b64encode(result['last_key']) if result['last_key'] else None, }