pdf_code/zzb_data_word/redis_trans.py

82 lines
3.5 KiB
Python

import redis
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def migrate_redis(source_host, source_port, source_password, target_host, target_port, target_password):
try:
# 连接源 Redis
source_redis = redis.StrictRedis(host=source_host, port=source_port, password=source_password,
decode_responses=True)
# 连接目标 Redis
target_redis = redis.StrictRedis(host=target_host, port=target_port, password=target_password,
decode_responses=True)
# 获取源 Redis 的数据库数量
db_count = int(source_redis.config_get('databases')['databases'])
logging.info(f"Total databases in source Redis: {db_count}")
# 遍历每个数据库
for db in range(db_count):
try:
# 切换到当前数据库
source_redis.select(db)
target_redis.select(db)
logging.info(f"Migrating data from DB {db}")
# 创建 pipeline
pipeline = target_redis.pipeline()
# 遍历当前数据库中的所有键
for key in source_redis.scan_iter():
try:
key_type = source_redis.type(key) # 获取键的类型
logging.info(f"Migrating key: {key} (Type: {key_type}) in DB {db}")
# 根据键的类型处理数据
if key_type == 'string':
value = source_redis.get(key)
pipeline.set(key, value)
elif key_type == 'hash':
hash_data = source_redis.hgetall(key)
pipeline.hset(key, mapping=hash_data) # 使用 hset 替代 hmset
elif key_type == 'list':
list_data = source_redis.lrange(key, 0, -1)
pipeline.rpush(key, *list_data)
elif key_type == 'set':
set_data = source_redis.smembers(key)
pipeline.sadd(key, *set_data)
elif key_type == 'zset':
zset_data = source_redis.zrange(key, 0, -1, withscores=True)
for member, score in zset_data:
pipeline.zadd(key, {member: score})
else:
logging.warning(f"Unsupported key type: {key_type} for key: {key} in DB {db}")
except Exception as e:
logging.error(f"Failed to migrate key: {key} in DB {db}. Error: {e}")
# 批量执行 pipeline
pipeline.execute()
logging.info(f"Migration completed for DB {db}")
except Exception as e:
logging.error(f"Failed to migrate DB {db}. Error: {e}")
logging.info("All databases migrated successfully!")
except Exception as e:
logging.error(f"Migration failed. Error: {e}")
# 配置源 Redis 和目标 Redis 的连接信息
source_host = '10.127.2.206'
source_port = 6379
source_password = "Xgf_redis"
target_host = '10.127.2.209'
target_port = 6379
target_password = "dMrt4kmwiW6LDJXy"
# 执行迁移
migrate_redis(source_host, source_port, source_password, target_host, target_port, target_password)