from typing import Any, Generator import boto3 from elasticsearch import Elasticsearch import jsonlines from layercake.dynamodb import deserialize from tqdm import tqdm elastic_client = Elasticsearch('http://127.0.0.1:9200') dynamodb_client = boto3.client('dynamodb', endpoint_url='http://127.0.0.1:8000') jsonl_files = ( 'test-orders.jsonl', 'test-users.jsonl', 'test-enrollments.jsonl', 'test-courses.jsonl', ) def put_item(item: dict, table_name: str, /, dynamodb_client) -> bool: try: dynamodb_client.put_item( TableName=table_name, Item=item, ) except Exception: return False else: return True def scan_table(table_name: str, /, dynamodb_client, **kwargs) -> Generator: try: r = dynamodb_client.scan(TableName=table_name, **kwargs) except Exception: yield from () else: for item in r['Items']: yield deserialize(item) if 'LastEvaluatedKey' in r: yield from scan_table( table_name, dynamodb_client=dynamodb_client, ExclusiveStartKey=r['LastEvaluatedKey'], ) class Elastic: def __init__(self, client: Elasticsearch) -> None: self.client = client def index_item( self, /, id: str, index: str, doc: dict, ): return self.client.update( index=index, id=id, doc=_serialize_to_basic_types(doc), doc_as_upsert=True, ) def delete_index(self, index: str) -> bool: try: self.client.indices.delete(index=index) except Exception: return False else: return True def _serialize_to_basic_types(value: Any) -> Any: if isinstance(value, dict): return {k: _serialize_to_basic_types(v) for k, v in value.items()} if isinstance(value, set): return list(value) if isinstance(value, list): return [_serialize_to_basic_types(v) for v in value] return value if __name__ == '__main__': elastic = Elastic(elastic_client) # Populate DynamoDB tables with data from JSONL files for file in tqdm(jsonl_files, desc='Processing files'): with open(f'seeds/{file}') as fp: table_name = file.removesuffix('.jsonl') reader = jsonlines.Reader(fp).iter(skip_invalid=True) for line in tqdm(reader, desc=f'Processing lines in {file}'): put_item(line, table_name, dynamodb_client) # type: ignore # Scan DynamoDB tables and index the data into Elasticsearch for file in tqdm(jsonl_files, desc='Scanning tables'): table_name = file.removesuffix('.jsonl') elastic.delete_index(table_name) for doc in tqdm( scan_table( table_name, dynamodb_client, FilterExpression='sk = :sk', ExpressionAttributeValues={':sk': {'S': '0'}}, ), desc=f'Indexing {table_name}', ): elastic.index_item(id=doc['id'], index=table_name, doc=doc)