pdf_code/zzb_data_word/DB_Trans.py

207 lines
7.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pymssql
import mysql.connector
import logging
from multiprocessing import Pool
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# SQL Server配置
sql_server_config = {
"server": "203.192.15.17", # SQL Server 的 IP 地址
"port": 28063, # SQL Server 的端口
"user": "zncbuser", # 用户名
"password": "ZZB-Cbindex-data", # 密码
"database": "jydb", # 数据库名称
}
# MySQL配置
mysql_config = {
"host": "rm-bp1f85h3xs6mvnf5e3o.mysql.rds.aliyuncs.com", # MySQL 的 IP 地址
"user": "zzb_jydb", # 用户名
"password": "Ysdbsdjs89Yrqwp", # 密码
"database": "zzb_jydb", # 数据库名称
}
# 分批大小(每次查询和插入的行数)
BATCH_SIZE = 100000
# 最大进程数
MAX_PROCESSES = 1
def sync_table(table_name):
try:
# 连接到SQL Server
sql_server_conn = pymssql.connect(
server=sql_server_config["server"],
port=sql_server_config["port"],
user=sql_server_config["user"],
password=sql_server_config["password"],
database=sql_server_config["database"],
)
sql_server_cursor = sql_server_conn.cursor()
# 连接到MySQL
mysql_conn = mysql.connector.connect(**mysql_config)
mysql_cursor = mysql_conn.cursor()
logging.info(f"Processing table: {table_name}")
# 检查MySQL中是否已存在该表
mysql_cursor.execute(f"SHOW TABLES LIKE '{table_name}'")
table_exists = mysql_cursor.fetchone()
if not table_exists:
# 如果表不存在,创建表
sql_server_cursor.execute(f"""
SELECT
COLUMN_NAME,
DATA_TYPE,
CHARACTER_MAXIMUM_LENGTH,
NUMERIC_PRECISION,
NUMERIC_SCALE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = '{table_name}'
""")
columns = sql_server_cursor.fetchall()
# 生成MySQL的CREATE TABLE语句
create_table_sql = f"CREATE TABLE {table_name} ("
for col in columns:
col_name = col[0]
col_type = col[1]
# 获取字段长度
char_length = col[2]
numeric_precision = col[3]
numeric_scale = col[4]
# 简单类型映射(可能需要根据实际情况调整)
if col_type == "varchar":
col_type = "VARCHAR(255)"
elif col_type == "int":
col_type = "INT"
elif col_type == "datetime":
col_type = "DATETIME"
elif col_type == "decimal":
if numeric_precision and numeric_scale:
col_type = f"DECIMAL({numeric_precision}, {numeric_scale})"
else:
col_type = "DECIMAL(10, 2)" # 默认值
elif col_type == "money":
col_type = "DECIMAL(19, 4)"
elif col_type == "smallmoney":
col_type = "DECIMAL(19, 4)"
elif col_type == "image":
col_type = "LONGBLOB"
# 设置列的 NULL 属性
if col_name.lower() == "id":
# ID 列不允许 NULL
create_table_sql += f"`{col_name}` {col_type} NOT NULL, "
else:
# 其他列允许 NULL
create_table_sql += f"`{col_name}` {col_type} , "
# 添加主键约束(假设 ID 是主键)
create_table_sql = create_table_sql.rstrip(", ") + f", PRIMARY KEY ({columns[0][0]}))"
logging.info(f"Create table SQL: {create_table_sql}")
# 在MySQL中创建表
mysql_cursor.execute(create_table_sql)
logging.info(f"Table {table_name} created in MySQL.")
else:
logging.info(f"Table {table_name} already exists in MySQL. Updating data...")
# 获取表的列信息
sql_server_cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}'")
columns = sql_server_cursor.fetchall()
# 获取目标数据库中该表的 id 最大值
mysql_cursor.execute(f"SELECT MAX({columns[0][0]}) FROM {table_name}")
max_id = mysql_cursor.fetchone()[0]
if max_id is None:
max_id = 0 # 如果表中没有数据,设置 max_id 为 0
logging.info(f"Target table {table_name} has max ID: {max_id}")
# 获取SQL Server中的数据分批查询
offset = 0
while True:
# 使用 ROW_NUMBER() 实现分页查询
sql_server_cursor.execute(f"""
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (ORDER BY {columns[0][0]}) AS RowNum
FROM {table_name}
WHERE {columns[0][0]} > {max_id}
) AS SubQuery
WHERE RowNum BETWEEN {offset + 1} AND {offset + BATCH_SIZE}
""")
rows = sql_server_cursor.fetchall()
if not rows:
logging.info(f"表:{table_name} 数据已经是最新的,不需要更新")
break # 如果没有数据了,退出循环
insert_values = [row[:-1] for row in rows]
# 批量插入数据
if insert_values:
# 动态生成插入语句的列名和占位符
placeholders = ", ".join(["%s"] * len(insert_values[0]))
columns_list = ", ".join([col[0] for col in columns])
insert_sql = f"INSERT INTO {table_name} ({columns_list}) VALUES ({placeholders})"
# 执行批量插入
try:
mysql_cursor.executemany(insert_sql, insert_values)
mysql_conn.commit()
logging.info(f"Inserted {len(insert_values)} rows into {table_name}.")
except mysql.connector.errors.DataError as e:
logging.error(f"DataError: {e}")
mysql_conn.rollback()
offset += BATCH_SIZE
logging.info(f"Processed {offset} rows in {table_name}...")
# 关闭连接
sql_server_cursor.close()
sql_server_conn.close()
mysql_cursor.close()
mysql_conn.close()
logging.info(f"Sync completed for table: {table_name}")
except Exception as e:
logging.error(f"Failed to sync table {table_name}. Error: {e}")
def main():
try:
# 连接到SQL Server
sql_server_conn = pymssql.connect(
server=sql_server_config["server"],
port=sql_server_config["port"],
user=sql_server_config["user"],
password=sql_server_config["password"],
database=sql_server_config["database"],
)
sql_server_cursor = sql_server_conn.cursor()
# 获取SQL Server中的所有表
sql_server_cursor.execute("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' ORDER BY TABLE_NAME")
tables = sql_server_cursor.fetchall()
# 使用进程池并发处理每个表
with Pool(processes=MAX_PROCESSES) as pool:
pool.map(sync_table, [table[0] for table in tables])
logging.info("All tables synced successfully!")
except Exception as e:
logging.error(f"Main function failed. Error: {e}")
finally:
# 关闭连接
if 'sql_server_cursor' in locals():
sql_server_cursor.close()
if 'sql_server_conn' in locals():
sql_server_conn.close()
# 启动主函数
if __name__ == "__main__":
main()