From 5f3d43a42175f19320f02cac24b97455c9f59f29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9rgio=20Rafael=20Siqueira?= Date: Tue, 26 Aug 2025 13:50:39 -0300 Subject: [PATCH] add replication to postgres --- .../app/events/replicate_into_postgres.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/streams-events/app/events/replicate_into_postgres.py b/streams-events/app/events/replicate_into_postgres.py index 6dc9034..09049ba 100644 --- a/streams-events/app/events/replicate_into_postgres.py +++ b/streams-events/app/events/replicate_into_postgres.py @@ -34,6 +34,8 @@ class JSONEncoder(Encoder): logger = Logger(__name__) tracer = Tracer() +CONNINFO = f'dbname={POSTGRES_DB} user={POSTGRES_USER} password={POSTGRES_PASSWORD} \ + host={POSTGRES_HOST} port={POSTGRES_PORT}' UPSERT_QUERY = sql.SQL(""" @@ -50,13 +52,7 @@ DELETE FROM {0} WHERE _id = ANY(%s); @logger.inject_lambda_context @tracer.capture_lambda_handler def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): - with psycopg.connect( - dbname=POSTGRES_DB, - user=POSTGRES_USER, - password=POSTGRES_PASSWORD, - host=POSTGRES_HOST, - port=POSTGRES_PORT, - ) as conn: + with psycopg.connect(CONNINFO) as conn: with conn.cursor() as cur: upsert_batches = {} delete_batches = {} @@ -84,11 +80,11 @@ def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): delete_batches[table_name].append(_id) for table_name, rows in upsert_batches.items(): - query = UPSERT_QUERY.format(sql.Identifier(table_name)).as_string(conn) + query = UPSERT_QUERY.format(sql.Identifier(table_name)) cur.executemany(query, rows) for table_name, ids in delete_batches.items(): - query = DELETE_QUERY.format(sql.Identifier(table_name)).as_string(conn) + query = DELETE_QUERY.format(sql.Identifier(table_name)) cur.execute(query, [ids]) conn.commit()