pdf_code/zzb_data_word/db_update.py

201 lines
7.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pymssql
import mysql.connector
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# SQL Server配置
sql_server_config = {
"server": "203.192.15.17",
"port": 28063,
"user": "zncbuser",
"password": "ZZB-Cbindex-data",
"database": "jydb",
}
# MySQL配置
mysql_config = {
"host": "rm-bp1f85h3xs6mvnf5e3o.mysql.rds.aliyuncs.com",
"user": "zzb_jydb",
"password": "Ysdbsdjs89Yrqwp",
"database": "zzb_jydb",
}
def sync_table(table_name):
try:
# 连接到SQL Server
sql_server_conn = pymssql.connect(**sql_server_config)
sql_server_cursor = sql_server_conn.cursor()
# 连接到MySQL
mysql_conn = mysql.connector.connect(**mysql_config)
mysql_cursor = mysql_conn.cursor()
logging.info(f"Processing table: {table_name}")
# 检查MySQL中是否已存在该表
mysql_cursor.execute(f"SHOW TABLES LIKE '{table_name}'")
table_exists = mysql_cursor.fetchone()
# 获取表的列信息
sql_server_cursor.execute(f"""
SELECT
COLUMN_NAME,
DATA_TYPE,
CHARACTER_MAXIMUM_LENGTH,
NUMERIC_PRECISION,
NUMERIC_SCALE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = '{table_name}'
""")
columns = sql_server_cursor.fetchall()
# 检查是否存在 XGRQ 或 UpdateTime 字段
update_time_fields = ['xgrq', 'updatetime'] # 可能的字段名
update_time_field = None
for col in columns:
if col[0].lower() in update_time_fields:
update_time_field = col[0] # 找到第一个匹配的字段
break
logging.info(f"Table {table_name} has update time field: {update_time_field}")
if not table_exists:
# 如果表不存在,创建表
create_table_sql = f"CREATE TABLE {table_name} ("
for col in columns:
col_name = col[0]
col_type = col[1]
# 类型映射逻辑(略)
create_table_sql += f"`{col_name}` {col_type}, "
create_table_sql = create_table_sql.rstrip(", ") + ")"
logging.info(f"Create table SQL: {create_table_sql}")
# 在MySQL中创建表
mysql_cursor.execute(create_table_sql)
logging.info(f"Table {table_name} created in MySQL.")
else:
logging.info(f"Table {table_name} already exists in MySQL. Updating data...")
# 获取SQL Server中的所有id
sql_server_cursor.execute(f"SELECT {columns[0][0]} FROM {table_name}")
sql_server_ids = {row[0] for row in sql_server_cursor.fetchall()}
# 获取MySQL中的所有id
mysql_cursor.execute(f"SELECT {columns[0][0]} FROM {table_name}")
mysql_ids = {row[0] for row in mysql_cursor.fetchall()}
# 找出需要插入的id
ids_to_insert = sql_server_ids - mysql_ids
logging.info(f"Found {len(ids_to_insert)} new rows to insert.")
# 分批插入数据
batch_size = 10000 # 每批次处理的行数
id_list = list(ids_to_insert)
for i in range(0, len(id_list), batch_size):
batch_ids = id_list[i:i + batch_size]
# 从SQL Server中查询需要插入的数据
sql_server_cursor.execute(f"""
SELECT * FROM {table_name}
WHERE {columns[0][0]} IN ({', '.join(map(str, batch_ids))})
""")
rows_to_insert = sql_server_cursor.fetchall()
# 插入数据到MySQL
if rows_to_insert:
insert_sql = f"INSERT INTO {table_name} ({', '.join([f'`{col[0]}`' for col in columns])}) VALUES ({', '.join(['%s'] * len(columns))})"
mysql_cursor.executemany(insert_sql, rows_to_insert)
mysql_conn.commit()
logging.info(f"Inserted {len(rows_to_insert)} rows into {table_name}.")
# 如果存在更新字段XGRQ 或 UpdateTime检查是否需要更新
if update_time_field:
logging.info(f"Checking for updates based on {update_time_field} field in table: {table_name}")
# 获取SQL Server中的id和更新字段的值且更新字段大于2023年
sql_server_cursor.execute(f"""
SELECT {columns[0][0]}, {update_time_field} FROM {table_name}
WHERE {update_time_field} > '2023-11-12 20:23:23'
""")
sql_server_update_data = {row[0]: row[1] for row in sql_server_cursor.fetchall()}
# 获取MySQL中的id和更新字段的值
mysql_cursor.execute(f"""
SELECT {columns[0][0]}, {update_time_field} FROM {table_name}
""")
mysql_update_data = {row[0]: row[1] for row in mysql_cursor.fetchall()}
# 找出需要更新的id
ids_to_update = []
for id, sql_server_update_time in sql_server_update_data.items():
if id in mysql_update_data and sql_server_update_time != mysql_update_data[id]:
ids_to_update.append(id)
logging.info(f"Found {len(ids_to_update)} rows to update.")
# 分批更新数据
for i in range(0, len(ids_to_update), batch_size):
batch_ids = ids_to_update[i:i + batch_size]
# 从SQL Server中查询需要更新的数据且更新字段大于2023年
sql_server_cursor.execute(f"""
SELECT * FROM {table_name}
WHERE {columns[0][0]} IN ({', '.join(map(str, batch_ids))})
AND {update_time_field} > '2023-11-12 20:23:23'
""")
rows_to_update = sql_server_cursor.fetchall()
# 更新数据到MySQL
if rows_to_update:
update_sql = f"UPDATE {table_name} SET "
update_sql += ", ".join([f"`{col[0]}` = %s" for col in columns[1:]]) # 跳过id列
update_sql += f" WHERE `{columns[0][0]}` = %s"
update_values = [list(row[1:]) + [row[0]] for row in rows_to_update] # 跳过id列
mysql_cursor.executemany(update_sql, update_values)
mysql_conn.commit()
logging.info(f"Updated {len(rows_to_update)} rows in table {table_name}.")
logging.info(f"Sync completed for table: {table_name}")
except Exception as e:
logging.error(f"Failed to sync table {table_name}. Error: {e}")
finally:
# 关闭连接
if 'sql_server_cursor' in locals():
sql_server_cursor.close()
if 'sql_server_conn' in locals():
sql_server_conn.close()
if 'mysql_cursor' in locals():
mysql_cursor.close()
if 'mysql_conn' in locals():
mysql_conn.close()
def main():
try:
# 连接到SQL Server
sql_server_conn = pymssql.connect(**sql_server_config)
sql_server_cursor = sql_server_conn.cursor()
# 获取SQL Server中的所有表
sql_server_cursor.execute("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' ORDER BY TABLE_NAME")
tables = sql_server_cursor.fetchall()
# 处理每个表
for table in tables:
if table[0].lower() == "lc_mainshlistnew":
sync_table(table[0])
logging.info("All tables synced successfully!")
except Exception as e:
logging.error(f"Main function failed. Error: {e}")
finally:
# 关闭连接
if 'sql_server_cursor' in locals():
sql_server_cursor.close()
if 'sql_server_conn' in locals():
sql_server_conn.close()
# 启动主函数
if __name__ == "__main__":
main()