import boto3 import pytest import layercake.jsonl as jsonl from layercake.dynamodb import DynamoDBPersistenceLayer table_name = 'pytest' dynamodb_endpoint_url = 'http://127.0.0.1:8000' @pytest.fixture def dynamodb_client(): client = boto3.client('dynamodb', endpoint_url=dynamodb_endpoint_url) client.create_table( AttributeDefinitions=[ {'AttributeName': 'id', 'AttributeType': 'S'}, {'AttributeName': 'sk', 'AttributeType': 'S'}, ], TableName=table_name, KeySchema=[ {'AttributeName': 'id', 'KeyType': 'HASH'}, {'AttributeName': 'sk', 'KeyType': 'RANGE'}, ], ProvisionedThroughput={ 'ReadCapacityUnits': 123, 'WriteCapacityUnits': 123, }, ) yield client client.delete_table(TableName=table_name) @pytest.fixture() def dynamodb_persistence_layer(dynamodb_client) -> DynamoDBPersistenceLayer: return DynamoDBPersistenceLayer(table_name, dynamodb_client) @pytest.fixture() def dynamodb_seeds(dynamodb_client): with jsonl.readlines('tests/seeds.jsonl') as lines: for line in lines: dynamodb_client.put_item(TableName=table_name, Item=line)