import json import sqlite3 from functools import partial from pathlib import Path from typing import Generator import jsonlines from aws_lambda_powertools.shared.json_encoder import Encoder from layercake.dynamodb import deserialize from tqdm import tqdm class JSONEncoder(Encoder): def default(self, obj): if isinstance(obj, set): return list(obj) return super().default(obj) def readlines(dirpath: Path) -> Generator: for path in dirpath.iterdir(): if not path.is_file(): continue with jsonlines.open(path) as fp: for obj in fp: yield deserialize(obj['Item']) sqlite3.register_adapter(dict, partial(json.dumps, cls=JSONEncoder)) if __name__ == '__main__': try: input_dirpath = Path(input('šŸ“‚ Path to the folder with .jsonl files: ')) if not input_dirpath.exists() or not input_dirpath.is_dir(): print(f'āŒ Directory "{input_dirpath}" not found or is not a folder.') exit(1) table_name = input('šŸ’¾ Enter the name of the table (e.g., users): ') with sqlite3.connect('mydatabase.db') as conn: cursor = conn.cursor() cursor.execute( 'CREATE TABLE IF NOT EXISTS %s (pk TEXT, sk TEXT, json JSON)' % table_name ) for record in tqdm( readlines(input_dirpath), desc=f'ā³ Inserting into table {table_name}', ): cursor.execute( 'INSERT INTO %s (pk, sk, json) VALUES (:pk, :sk, :json)' % table_name, { 'pk': record['id'], 'sk': record['sk'], 'json': record, }, ) except KeyboardInterrupt: print('\nšŸ‘‹ Cancelled by user') except Exception as e: print(f'šŸ’„ Error: {e}')