add replication to postgres

This commit is contained in:
2025-08-26 13:50:39 -03:00
parent 25349ff533
commit 5f3d43a421

View File

@@ -34,6 +34,8 @@ class JSONEncoder(Encoder):
logger = Logger(__name__) logger = Logger(__name__)
tracer = Tracer() tracer = Tracer()
CONNINFO = f'dbname={POSTGRES_DB} user={POSTGRES_USER} password={POSTGRES_PASSWORD} \
host={POSTGRES_HOST} port={POSTGRES_PORT}'
UPSERT_QUERY = sql.SQL(""" UPSERT_QUERY = sql.SQL("""
@@ -50,13 +52,7 @@ DELETE FROM {0} WHERE _id = ANY(%s);
@logger.inject_lambda_context @logger.inject_lambda_context
@tracer.capture_lambda_handler @tracer.capture_lambda_handler
def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
with psycopg.connect( with psycopg.connect(CONNINFO) as conn:
dbname=POSTGRES_DB,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
host=POSTGRES_HOST,
port=POSTGRES_PORT,
) as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
upsert_batches = {} upsert_batches = {}
delete_batches = {} delete_batches = {}
@@ -84,11 +80,11 @@ def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
delete_batches[table_name].append(_id) delete_batches[table_name].append(_id)
for table_name, rows in upsert_batches.items(): for table_name, rows in upsert_batches.items():
query = UPSERT_QUERY.format(sql.Identifier(table_name)).as_string(conn) query = UPSERT_QUERY.format(sql.Identifier(table_name))
cur.executemany(query, rows) cur.executemany(query, rows)
for table_name, ids in delete_batches.items(): for table_name, ids in delete_batches.items():
query = DELETE_QUERY.format(sql.Identifier(table_name)).as_string(conn) query = DELETE_QUERY.format(sql.Identifier(table_name))
cur.execute(query, [ids]) cur.execute(query, [ids])
conn.commit() conn.commit()