import json import sqlite3 from functools import partial from pathlib import Path from typing import Generator import jsonlines from aws_lambda_powertools.shared.json_encoder import Encoder from layercake.dynamodb import deserialize from layercake.strutils import md5_hash from tqdm import tqdm class JSONEncoder(Encoder): def default(self, obj): if isinstance(obj, set): return list(obj) return super().default(obj) def readlines(dirpath: Path) -> Generator: for path in dirpath.iterdir(): if not path.is_file(): continue with jsonlines.open(path) as fp: for obj in fp: yield deserialize(obj['Item']) sqlite3.register_adapter(dict, partial(json.dumps, cls=JSONEncoder)) if __name__ == '__main__': try: input_dirpath = Path(input('šŸ“‚ Path to the folder with .jsonl files: ')) if not input_dirpath.exists() or not input_dirpath.is_dir(): print(f'āŒ Directory "{input_dirpath}" not found or is not a folder.') exit(1) table_name = input('šŸ’¾ Enter the name of the table (e.g., users): ') with sqlite3.connect('mydatabase.db') as conn: cursor = conn.cursor() cursor.execute( """ CREATE TABLE IF NOT EXISTS %s ( _id TEXT PRIMARY KEY, json JSON NOT NULL ) """ % table_name ) for record in tqdm( readlines(input_dirpath), desc=f'ā³ Inserting into table {table_name}', ): _id = md5_hash( str( { 'id': record['id'], 'sk': record['sk'], } ) ) cursor.execute( 'INSERT INTO %s (_id, json) VALUES (:_id, :json)' % table_name, { '_id': _id, 'json': record, }, ) except KeyboardInterrupt: print('\nšŸ‘‹ Cancelled by user') except Exception as e: print(f'šŸ’„ Error: {e}')