from typing import Any, Generator import layercake.jsonl as jsonl from elasticsearch import Elasticsearch from layercake.dynamodb import deserialize from tqdm import tqdm from boto3clients import dynamodb_client elastic_client = Elasticsearch('http://127.0.0.1:9200') files = ( 'test-orders.jsonl', 'test-users.jsonl', 'test-enrollments.jsonl', 'test-courses.jsonl', ) def put_item(item: dict, table_name: str, *, dynamodb_client) -> bool: try: dynamodb_client.put_item( TableName=table_name, Item=item, ) except Exception: return False else: return True def scan_table(table_name: str, *, dynamodb_client, **kwargs) -> Generator: try: r = dynamodb_client.scan(TableName=table_name, **kwargs) except Exception: yield from () else: for item in r['Items']: yield deserialize(item) if 'LastEvaluatedKey' in r: yield from scan_table( table_name, dynamodb_client=dynamodb_client, ExclusiveStartKey=r['LastEvaluatedKey'], ) def _serialize_python_type(value: Any) -> Any: if isinstance(value, dict): return {k: _serialize_python_type(v) for k, v in value.items()} if isinstance(value, set): return list(value) if isinstance(value, list): return [_serialize_python_type(v) for v in value] return value def index_item( id: str, index: str, doc: dict, *, elastic_client: Elasticsearch, ): return elastic_client.index( index=index, id=id, document=_serialize_python_type(doc), ) def delete_index(index: str, *, elastic_client: Elasticsearch) -> bool: try: elastic_client.indices.delete(index=index) except Exception: return False else: return True if __name__ == '__main__': for file in tqdm(files, desc='Processing files'): with jsonl.readlines(f'seeds/{file}') as lines: table_name = file.removesuffix('.jsonl') for line in tqdm(lines, desc=f'Processing lines in {file}'): put_item(line, table_name, dynamodb_client=dynamodb_client) for file in tqdm(files, desc='Scanning tables'): table_name = file.removesuffix('.jsonl') delete_index(table_name, elastic_client=elastic_client) for record in tqdm( scan_table( table_name, dynamodb_client=dynamodb_client, FilterExpression='sk = :sk', ExpressionAttributeValues={':sk': {'S': '0'}}, ), desc=f'Indexing {table_name}', ): index_item( id=record['id'], index=table_name, doc=record, elastic_client=elastic_client, )