pdf_code/zzb_data_word/syc_table.py

157 lines
5.5 KiB
Python

import pymssql
import mysql.connector
import logging
from multiprocessing import Pool
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# SQL Server配置
sql_server_config = {
"server": "203.192.15.17", # SQL Server 的 IP 地址
"port": 28063, # SQL Server 的端口
"user": "zncbuser", # 用户名
"password": "ZZB-Cbindex-data", # 密码
"database": "jydb", # 数据库名称
}
# MySQL配置
mysql_config = {
"host": "rm-bp1f85h3xs6mvnf5e3o.mysql.rds.aliyuncs.com", # MySQL 的 IP 地址
"user": "zzb_jydb", # 用户名
"password": "Ysdbsdjs89Yrqwp", # 密码
"database": "zzb_jydb", # 数据库名称
}
# 最大进程数
MAX_PROCESSES = 1
def sync_table_structure(table_name):
try:
# 连接到SQL Server
sql_server_conn = pymssql.connect(
server=sql_server_config["server"],
port=sql_server_config["port"],
user=sql_server_config["user"],
password=sql_server_config["password"],
database=sql_server_config["database"],
)
sql_server_cursor = sql_server_conn.cursor()
# 连接到MySQL
mysql_conn = mysql.connector.connect(**mysql_config)
mysql_cursor = mysql_conn.cursor()
logging.info(f"Processing table: {table_name}")
# 检查MySQL中是否已存在该表
mysql_cursor.execute(f"SHOW TABLES LIKE '{table_name}'")
table_exists = mysql_cursor.fetchone()
if not table_exists:
# 如果表不存在,创建表
sql_server_cursor.execute(f"""
SELECT
COLUMN_NAME,
DATA_TYPE,
CHARACTER_MAXIMUM_LENGTH,
NUMERIC_PRECISION,
NUMERIC_SCALE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = '{table_name}'
""")
columns = sql_server_cursor.fetchall()
# 生成MySQL的CREATE TABLE语句
create_table_sql = f"CREATE TABLE {table_name} ("
for col in columns:
col_name = col[0]
col_type = col[1]
# 获取字段长度
char_length = col[2]
numeric_precision = col[3]
numeric_scale = col[4]
# 简单类型映射(可能需要根据实际情况调整)
if col_type == "varchar":
col_type = "VARCHAR(255)"
elif col_type == "int":
col_type = "INT"
elif col_type == "datetime":
col_type = "DATETIME"
elif col_type == "decimal":
if numeric_precision and numeric_scale:
col_type = f"DECIMAL({numeric_precision}, {numeric_scale})"
else:
col_type = "DECIMAL(10, 2)" # 默认值
elif col_type == "money":
col_type = "DECIMAL(19, 4)"
elif col_type == "smallmoney":
col_type = "DECIMAL(19, 4)"
elif col_type == "image":
col_type = "LONGBLOB"
# 设置列的 NULL 属性
if col_name.lower() == "id":
# ID 列不允许 NULL
create_table_sql += f"`{col_name}` {col_type} NOT NULL, "
else:
# 其他列允许 NULL
create_table_sql += f"`{col_name}` {col_type} , "
# 添加主键约束(假设 ID 是主键)
create_table_sql = create_table_sql.rstrip(", ") + f", PRIMARY KEY ({columns[0][0]}))"
logging.info(f"Create table SQL: {create_table_sql}")
# 在MySQL中创建表
mysql_cursor.execute(create_table_sql)
logging.info(f"Table {table_name} created in MySQL.")
else:
logging.info(f"Table {table_name} already exists in MySQL. Skipping...")
# 关闭连接
sql_server_cursor.close()
sql_server_conn.close()
mysql_cursor.close()
mysql_conn.close()
logging.info(f"Sync completed for table: {table_name}")
except Exception as e:
logging.error(f"Failed to sync table {table_name}. Error: {e}")
def main():
try:
# 连接到SQL Server
sql_server_conn = pymssql.connect(
server=sql_server_config["server"],
port=sql_server_config["port"],
user=sql_server_config["user"],
password=sql_server_config["password"],
database=sql_server_config["database"],
)
sql_server_cursor = sql_server_conn.cursor()
# 获取SQL Server中的所有表
sql_server_cursor.execute("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' ORDER BY TABLE_NAME")
tables = sql_server_cursor.fetchall()
# 使用进程池并发处理每个表
with Pool(processes=MAX_PROCESSES) as pool:
pool.map(sync_table_structure, [table[0] for table in tables])
logging.info("All tables synced successfully!")
except Exception as e:
logging.error(f"Main function failed. Error: {e}")
finally:
# 关闭连接
if 'sql_server_cursor' in locals():
sql_server_cursor.close()
if 'sql_server_conn' in locals():
sql_server_conn.close()
# 启动主函数
if __name__ == "__main__":
main()