201 lines
		
	
	
		
			7.8 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			201 lines
		
	
	
		
			7.8 KiB
		
	
	
	
		
			Python
		
	
	
	
| import pymssql
 | ||
| import mysql.connector
 | ||
| import logging
 | ||
| 
 | ||
| # 配置日志
 | ||
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 | ||
| 
 | ||
| # SQL Server配置
 | ||
| sql_server_config = {
 | ||
|     "server": "203.192.15.17",
 | ||
|     "port": 28063,
 | ||
|     "user": "zncbuser",
 | ||
|     "password": "ZZB-Cbindex-data",
 | ||
|     "database": "jydb",
 | ||
| }
 | ||
| 
 | ||
| # MySQL配置
 | ||
| mysql_config = {
 | ||
|     "host": "rm-bp1f85h3xs6mvnf5e3o.mysql.rds.aliyuncs.com",
 | ||
|     "user": "zzb_jydb",
 | ||
|     "password": "Ysdbsdjs89Yrqwp",
 | ||
|     "database": "zzb_jydb",
 | ||
| }
 | ||
| 
 | ||
| def sync_table(table_name):
 | ||
|     try:
 | ||
|         # 连接到SQL Server
 | ||
|         sql_server_conn = pymssql.connect(**sql_server_config)
 | ||
|         sql_server_cursor = sql_server_conn.cursor()
 | ||
| 
 | ||
|         # 连接到MySQL
 | ||
|         mysql_conn = mysql.connector.connect(**mysql_config)
 | ||
|         mysql_cursor = mysql_conn.cursor()
 | ||
| 
 | ||
|         logging.info(f"Processing table: {table_name}")
 | ||
| 
 | ||
|         # 检查MySQL中是否已存在该表
 | ||
|         mysql_cursor.execute(f"SHOW TABLES LIKE '{table_name}'")
 | ||
|         table_exists = mysql_cursor.fetchone()
 | ||
| 
 | ||
|         # 获取表的列信息
 | ||
|         sql_server_cursor.execute(f"""
 | ||
|             SELECT 
 | ||
|                 COLUMN_NAME, 
 | ||
|                 DATA_TYPE, 
 | ||
|                 CHARACTER_MAXIMUM_LENGTH, 
 | ||
|                 NUMERIC_PRECISION, 
 | ||
|                 NUMERIC_SCALE
 | ||
|             FROM INFORMATION_SCHEMA.COLUMNS
 | ||
|             WHERE TABLE_NAME = '{table_name}'
 | ||
|         """)
 | ||
|         columns = sql_server_cursor.fetchall()
 | ||
| 
 | ||
|         # 检查是否存在 XGRQ 或 UpdateTime 字段
 | ||
|         update_time_fields = ['xgrq', 'updatetime']  # 可能的字段名
 | ||
|         update_time_field = None
 | ||
|         for col in columns:
 | ||
|             if col[0].lower() in update_time_fields:
 | ||
|                 update_time_field = col[0]  # 找到第一个匹配的字段
 | ||
|                 break
 | ||
| 
 | ||
|         logging.info(f"Table {table_name} has update time field: {update_time_field}")
 | ||
| 
 | ||
|         if not table_exists:
 | ||
|             # 如果表不存在,创建表
 | ||
|             create_table_sql = f"CREATE TABLE {table_name} ("
 | ||
|             for col in columns:
 | ||
|                 col_name = col[0]
 | ||
|                 col_type = col[1]
 | ||
|                 # 类型映射逻辑(略)
 | ||
|                 create_table_sql += f"`{col_name}` {col_type}, "
 | ||
|             create_table_sql = create_table_sql.rstrip(", ") + ")"
 | ||
|             logging.info(f"Create table SQL: {create_table_sql}")
 | ||
| 
 | ||
|             # 在MySQL中创建表
 | ||
|             mysql_cursor.execute(create_table_sql)
 | ||
|             logging.info(f"Table {table_name} created in MySQL.")
 | ||
|         else:
 | ||
|             logging.info(f"Table {table_name} already exists in MySQL. Updating data...")
 | ||
| 
 | ||
|         # 获取SQL Server中的所有id
 | ||
|         sql_server_cursor.execute(f"SELECT {columns[0][0]} FROM {table_name}")
 | ||
|         sql_server_ids = {row[0] for row in sql_server_cursor.fetchall()}
 | ||
| 
 | ||
|         # 获取MySQL中的所有id
 | ||
|         mysql_cursor.execute(f"SELECT {columns[0][0]} FROM {table_name}")
 | ||
|         mysql_ids = {row[0] for row in mysql_cursor.fetchall()}
 | ||
| 
 | ||
|         # 找出需要插入的id
 | ||
|         ids_to_insert = sql_server_ids - mysql_ids
 | ||
|         logging.info(f"Found {len(ids_to_insert)} new rows to insert.")
 | ||
| 
 | ||
|         # 分批插入数据
 | ||
|         batch_size = 10000  # 每批次处理的行数
 | ||
|         id_list = list(ids_to_insert)
 | ||
|         for i in range(0, len(id_list), batch_size):
 | ||
|             batch_ids = id_list[i:i + batch_size]
 | ||
| 
 | ||
|             # 从SQL Server中查询需要插入的数据
 | ||
|             sql_server_cursor.execute(f"""
 | ||
|                 SELECT * FROM {table_name}
 | ||
|                 WHERE {columns[0][0]} IN ({', '.join(map(str, batch_ids))})
 | ||
|             """)
 | ||
|             rows_to_insert = sql_server_cursor.fetchall()
 | ||
| 
 | ||
|             # 插入数据到MySQL
 | ||
|             if rows_to_insert:
 | ||
|                 insert_sql = f"INSERT INTO {table_name} ({', '.join([f'`{col[0]}`' for col in columns])}) VALUES ({', '.join(['%s'] * len(columns))})"
 | ||
|                 mysql_cursor.executemany(insert_sql, rows_to_insert)
 | ||
|                 mysql_conn.commit()
 | ||
|                 logging.info(f"Inserted {len(rows_to_insert)} rows into {table_name}.")
 | ||
| 
 | ||
|         # 如果存在更新字段(XGRQ 或 UpdateTime),检查是否需要更新
 | ||
|         if update_time_field:
 | ||
|             logging.info(f"Checking for updates based on {update_time_field} field in table: {table_name}")
 | ||
| 
 | ||
|             # 获取SQL Server中的id和更新字段的值,且更新字段大于2023年
 | ||
|             sql_server_cursor.execute(f"""
 | ||
|                 SELECT {columns[0][0]}, {update_time_field} FROM {table_name}
 | ||
|                 WHERE {update_time_field} > '2023-11-12 20:23:23'
 | ||
|             """)
 | ||
|             sql_server_update_data = {row[0]: row[1] for row in sql_server_cursor.fetchall()}
 | ||
| 
 | ||
|             # 获取MySQL中的id和更新字段的值
 | ||
|             mysql_cursor.execute(f"""
 | ||
|                 SELECT {columns[0][0]}, {update_time_field} FROM {table_name}
 | ||
|             """)
 | ||
|             mysql_update_data = {row[0]: row[1] for row in mysql_cursor.fetchall()}
 | ||
| 
 | ||
|             # 找出需要更新的id
 | ||
|             ids_to_update = []
 | ||
|             for id, sql_server_update_time in sql_server_update_data.items():
 | ||
|                 if id in mysql_update_data and sql_server_update_time != mysql_update_data[id]:
 | ||
|                     ids_to_update.append(id)
 | ||
| 
 | ||
|             logging.info(f"Found {len(ids_to_update)} rows to update.")
 | ||
| 
 | ||
|             # 分批更新数据
 | ||
|             for i in range(0, len(ids_to_update), batch_size):
 | ||
|                 batch_ids = ids_to_update[i:i + batch_size]
 | ||
| 
 | ||
|                 # 从SQL Server中查询需要更新的数据,且更新字段大于2023年
 | ||
|                 sql_server_cursor.execute(f"""
 | ||
|                     SELECT * FROM {table_name}
 | ||
|                     WHERE {columns[0][0]} IN ({', '.join(map(str, batch_ids))})
 | ||
|                       AND {update_time_field} > '2023-11-12 20:23:23'
 | ||
|                 """)
 | ||
|                 rows_to_update = sql_server_cursor.fetchall()
 | ||
| 
 | ||
|                 # 更新数据到MySQL
 | ||
|                 if rows_to_update:
 | ||
|                     update_sql = f"UPDATE {table_name} SET "
 | ||
|                     update_sql += ", ".join([f"`{col[0]}` = %s" for col in columns[1:]])  # 跳过id列
 | ||
|                     update_sql += f" WHERE `{columns[0][0]}` = %s"
 | ||
|                     update_values = [list(row[1:]) + [row[0]] for row in rows_to_update]  # 跳过id列
 | ||
|                     mysql_cursor.executemany(update_sql, update_values)
 | ||
|                     mysql_conn.commit()
 | ||
|                     logging.info(f"Updated {len(rows_to_update)} rows in table {table_name}.")
 | ||
| 
 | ||
|         logging.info(f"Sync completed for table: {table_name}")
 | ||
|     except Exception as e:
 | ||
|         logging.error(f"Failed to sync table {table_name}. Error: {e}")
 | ||
|     finally:
 | ||
|         # 关闭连接
 | ||
|         if 'sql_server_cursor' in locals():
 | ||
|             sql_server_cursor.close()
 | ||
|         if 'sql_server_conn' in locals():
 | ||
|             sql_server_conn.close()
 | ||
|         if 'mysql_cursor' in locals():
 | ||
|             mysql_cursor.close()
 | ||
|         if 'mysql_conn' in locals():
 | ||
|             mysql_conn.close()
 | ||
| 
 | ||
| def main():
 | ||
|     try:
 | ||
|         # 连接到SQL Server
 | ||
|         sql_server_conn = pymssql.connect(**sql_server_config)
 | ||
|         sql_server_cursor = sql_server_conn.cursor()
 | ||
| 
 | ||
|         # 获取SQL Server中的所有表
 | ||
|         sql_server_cursor.execute("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' ORDER BY TABLE_NAME")
 | ||
|         tables = sql_server_cursor.fetchall()
 | ||
| 
 | ||
|         # 处理每个表
 | ||
|         for table in tables:
 | ||
|             if table[0].lower() == "lc_mainshlistnew":
 | ||
|                 sync_table(table[0])
 | ||
| 
 | ||
|         logging.info("All tables synced successfully!")
 | ||
|     except Exception as e:
 | ||
|         logging.error(f"Main function failed. Error: {e}")
 | ||
|     finally:
 | ||
|         # 关闭连接
 | ||
|         if 'sql_server_cursor' in locals():
 | ||
|             sql_server_cursor.close()
 | ||
|         if 'sql_server_conn' in locals():
 | ||
|             sql_server_conn.close()
 | ||
| 
 | ||
| # 启动主函数
 | ||
| if __name__ == "__main__":
 | ||
|     main() |