207 lines
7.8 KiB
Python
207 lines
7.8 KiB
Python
import pymssql
|
||
import mysql.connector
|
||
import logging
|
||
from multiprocessing import Pool
|
||
|
||
# 配置日志
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
|
||
# SQL Server配置
|
||
sql_server_config = {
|
||
"server": "203.192.15.17", # SQL Server 的 IP 地址
|
||
"port": 28063, # SQL Server 的端口
|
||
"user": "zncbuser", # 用户名
|
||
"password": "ZZB-Cbindex-data", # 密码
|
||
"database": "jydb", # 数据库名称
|
||
}
|
||
|
||
# MySQL配置
|
||
mysql_config = {
|
||
"host": "rm-bp1f85h3xs6mvnf5e3o.mysql.rds.aliyuncs.com", # MySQL 的 IP 地址
|
||
"user": "zzb_jydb", # 用户名
|
||
"password": "Ysdbsdjs89Yrqwp", # 密码
|
||
"database": "zzb_jydb", # 数据库名称
|
||
}
|
||
|
||
# 分批大小(每次查询和插入的行数)
|
||
BATCH_SIZE = 100000
|
||
|
||
# 最大进程数
|
||
MAX_PROCESSES = 1
|
||
|
||
def sync_table(table_name):
|
||
try:
|
||
# 连接到SQL Server
|
||
sql_server_conn = pymssql.connect(
|
||
server=sql_server_config["server"],
|
||
port=sql_server_config["port"],
|
||
user=sql_server_config["user"],
|
||
password=sql_server_config["password"],
|
||
database=sql_server_config["database"],
|
||
)
|
||
sql_server_cursor = sql_server_conn.cursor()
|
||
|
||
# 连接到MySQL
|
||
mysql_conn = mysql.connector.connect(**mysql_config)
|
||
mysql_cursor = mysql_conn.cursor()
|
||
|
||
logging.info(f"Processing table: {table_name}")
|
||
|
||
# 检查MySQL中是否已存在该表
|
||
mysql_cursor.execute(f"SHOW TABLES LIKE '{table_name}'")
|
||
table_exists = mysql_cursor.fetchone()
|
||
|
||
if not table_exists:
|
||
# 如果表不存在,创建表
|
||
sql_server_cursor.execute(f"""
|
||
SELECT
|
||
COLUMN_NAME,
|
||
DATA_TYPE,
|
||
CHARACTER_MAXIMUM_LENGTH,
|
||
NUMERIC_PRECISION,
|
||
NUMERIC_SCALE
|
||
FROM INFORMATION_SCHEMA.COLUMNS
|
||
WHERE TABLE_NAME = '{table_name}'
|
||
""")
|
||
columns = sql_server_cursor.fetchall()
|
||
|
||
# 生成MySQL的CREATE TABLE语句
|
||
create_table_sql = f"CREATE TABLE {table_name} ("
|
||
for col in columns:
|
||
col_name = col[0]
|
||
col_type = col[1]
|
||
|
||
# 获取字段长度
|
||
char_length = col[2]
|
||
numeric_precision = col[3]
|
||
numeric_scale = col[4]
|
||
|
||
# 简单类型映射(可能需要根据实际情况调整)
|
||
if col_type == "varchar":
|
||
col_type = "VARCHAR(255)"
|
||
elif col_type == "int":
|
||
col_type = "INT"
|
||
elif col_type == "datetime":
|
||
col_type = "DATETIME"
|
||
elif col_type == "decimal":
|
||
if numeric_precision and numeric_scale:
|
||
col_type = f"DECIMAL({numeric_precision}, {numeric_scale})"
|
||
else:
|
||
col_type = "DECIMAL(10, 2)" # 默认值
|
||
elif col_type == "money":
|
||
col_type = "DECIMAL(19, 4)"
|
||
|
||
elif col_type == "smallmoney":
|
||
col_type = "DECIMAL(19, 4)"
|
||
elif col_type == "image":
|
||
col_type = "LONGBLOB"
|
||
|
||
# 设置列的 NULL 属性
|
||
if col_name.lower() == "id":
|
||
# ID 列不允许 NULL
|
||
create_table_sql += f"`{col_name}` {col_type} NOT NULL, "
|
||
else:
|
||
# 其他列允许 NULL
|
||
create_table_sql += f"`{col_name}` {col_type} , "
|
||
|
||
# 添加主键约束(假设 ID 是主键)
|
||
create_table_sql = create_table_sql.rstrip(", ") + f", PRIMARY KEY ({columns[0][0]}))"
|
||
logging.info(f"Create table SQL: {create_table_sql}")
|
||
|
||
# 在MySQL中创建表
|
||
mysql_cursor.execute(create_table_sql)
|
||
logging.info(f"Table {table_name} created in MySQL.")
|
||
else:
|
||
logging.info(f"Table {table_name} already exists in MySQL. Updating data...")
|
||
|
||
# 获取表的列信息
|
||
sql_server_cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}'")
|
||
columns = sql_server_cursor.fetchall()
|
||
|
||
# 获取目标数据库中该表的 id 最大值
|
||
mysql_cursor.execute(f"SELECT MAX({columns[0][0]}) FROM {table_name}")
|
||
max_id = mysql_cursor.fetchone()[0]
|
||
if max_id is None:
|
||
max_id = 0 # 如果表中没有数据,设置 max_id 为 0
|
||
logging.info(f"Target table {table_name} has max ID: {max_id}")
|
||
|
||
# 获取SQL Server中的数据(分批查询)
|
||
offset = 0
|
||
while True:
|
||
# 使用 ROW_NUMBER() 实现分页查询
|
||
sql_server_cursor.execute(f"""
|
||
SELECT * FROM (
|
||
SELECT *, ROW_NUMBER() OVER (ORDER BY {columns[0][0]}) AS RowNum
|
||
FROM {table_name}
|
||
WHERE {columns[0][0]} > {max_id}
|
||
) AS SubQuery
|
||
WHERE RowNum BETWEEN {offset + 1} AND {offset + BATCH_SIZE}
|
||
""")
|
||
rows = sql_server_cursor.fetchall()
|
||
if not rows:
|
||
logging.info(f"表:{table_name} 数据已经是最新的,不需要更新")
|
||
break # 如果没有数据了,退出循环
|
||
|
||
insert_values = [row[:-1] for row in rows]
|
||
|
||
# 批量插入数据
|
||
if insert_values:
|
||
# 动态生成插入语句的列名和占位符
|
||
placeholders = ", ".join(["%s"] * len(insert_values[0]))
|
||
columns_list = ", ".join([col[0] for col in columns])
|
||
insert_sql = f"INSERT INTO {table_name} ({columns_list}) VALUES ({placeholders})"
|
||
# 执行批量插入
|
||
try:
|
||
mysql_cursor.executemany(insert_sql, insert_values)
|
||
mysql_conn.commit()
|
||
logging.info(f"Inserted {len(insert_values)} rows into {table_name}.")
|
||
except mysql.connector.errors.DataError as e:
|
||
logging.error(f"DataError: {e}")
|
||
mysql_conn.rollback()
|
||
|
||
offset += BATCH_SIZE
|
||
logging.info(f"Processed {offset} rows in {table_name}...")
|
||
|
||
# 关闭连接
|
||
sql_server_cursor.close()
|
||
sql_server_conn.close()
|
||
mysql_cursor.close()
|
||
mysql_conn.close()
|
||
|
||
logging.info(f"Sync completed for table: {table_name}")
|
||
except Exception as e:
|
||
logging.error(f"Failed to sync table {table_name}. Error: {e}")
|
||
|
||
def main():
|
||
try:
|
||
# 连接到SQL Server
|
||
sql_server_conn = pymssql.connect(
|
||
server=sql_server_config["server"],
|
||
port=sql_server_config["port"],
|
||
user=sql_server_config["user"],
|
||
password=sql_server_config["password"],
|
||
database=sql_server_config["database"],
|
||
)
|
||
sql_server_cursor = sql_server_conn.cursor()
|
||
|
||
# 获取SQL Server中的所有表
|
||
sql_server_cursor.execute("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' ORDER BY TABLE_NAME")
|
||
tables = sql_server_cursor.fetchall()
|
||
|
||
# 使用进程池并发处理每个表
|
||
with Pool(processes=MAX_PROCESSES) as pool:
|
||
pool.map(sync_table, [table[0] for table in tables])
|
||
|
||
logging.info("All tables synced successfully!")
|
||
except Exception as e:
|
||
logging.error(f"Main function failed. Error: {e}")
|
||
finally:
|
||
# 关闭连接
|
||
if 'sql_server_cursor' in locals():
|
||
sql_server_cursor.close()
|
||
if 'sql_server_conn' in locals():
|
||
sql_server_conn.close()
|
||
|
||
# 启动主函数
|
||
if __name__ == "__main__":
|
||
main() |