import psycopg2 from datetime import date, datetimeSRC_TABLE = "aaa" TARGET_TABLE = "bbb"# ---------------------------- # Step 1: Connect to GaussDB # ---------------------------- src_conn = psycopg2.connect(host="1.2.3.4",port="8000",database="source_db",user="user1",password="password1" ) src_cur = src_conn.cursor() src_cur.execute("SELECT id, name, salary FROM employees;")# ---------------------------- # Step 2: Initialize Hive connector # ---------------------------- from hive_connector import Connector hive_conn = Connector()def to_hive_value(v):if v is None:return "NULL"elif isinstance(v, str):return f"'{v}'"elif isinstance(v, date) and not isinstance(v, datetime):return f"'{v.strftime('%Y-%m-%d')}'"elif isinstance(v, datetime):return f"'{v.strftime('%Y-%m-%d %H:%M:%S')}'"elif isinstance(v, dict):# store as string "{}" or convert to Hive map()return "'{}'"elif isinstance(v, list) or isinstance(v, tuple):# could convert to Hive array() if schema requiresreturn f"array({', '.join(to_hive_value(x) for x in v)})"else:return str(v)# ---------------------------- # Step 3: Fetch & Insert # ---------------------------- BATCH_SIZE = 500 # send multiple rows per INSERT if neededwhile True:rows = src_cur.fetchmany(BATCH_SIZE)if not rows:break# Convert each row into a Hive VALUES tuplevalues_list = []for row in rows:# Make sure to escape single quotes in string valuesrow_values = [to_hive_value(v) for v in row]if row_values:values_list.append(f"({', '.join(row_values)})")# Send a single INSERT statement to Hiveif values_list:insert_sql = f"INSERT INTO {TARGET_TABLE} VALUES {', '.join(values_list)}" # DO NOT add a ';' at the end. hive_conn.run_sql(insert_sql)# ---------------------------- # Step 4: Cleanup # ---------------------------- src_cur.close() src_conn.close()