zzck/sync_mysql_to_es.py

176 lines
7.9 KiB
Python
Raw Permalink Normal View History

2025-09-02 15:15:51 +08:00
import argparse
from datetime import datetime
import pymysql
from elasticsearch import Elasticsearch, helpers
import json
# 复用现有配置
from config import ES_HOST, ES_PORT, ES_USER, ES_PASSWORD, MYSQL_HOST_APP, MYSQL_USER_APP, MYSQL_PASSWORD_APP, MYSQL_DB_APP
from tqdm import tqdm
import requests
from config import jx_adr
class MySQLToESSync:
def __init__(self):
self.es = Elasticsearch(
[f"http://{ES_HOST}:{ES_PORT}"],
basic_auth=(ES_USER, ES_PASSWORD)
)
self.mysql_conn = pymysql.connect(
host=MYSQL_HOST_APP,
user=MYSQL_USER_APP,
password=MYSQL_PASSWORD_APP,
db=MYSQL_DB_APP,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
# 更新全量表
def sync_all(self, start_time=None, end_time=None):
# 更新全量表到 es
with self.mysql_conn.cursor(pymysql.cursors.DictCursor) as cursor:
query = self._build_query("news_info",start_time, end_time)
cursor.execute(query)
# 支持两种同步模式
res = cursor.fetchall()
# 判断es中是否有该news_id
for data in tqdm(res, desc="同步全量表", unit=""):
news_id = data['news_id']
# 从es中查询
try:
exists = self.es.get(index="news_info", id=news_id)
except:
exists = None
if exists:
pass
else:
# es中不存在直接新增
self.es.index(
index="news_info",
id=news_id,
document={
"news_id": data.get('news_id', ""),
"input_date": data.get('input_date', ""),
"words": data.get('words', ""),
"title_txt": data.get('title_txt', ""),
"key_word": data.get('key_word', ""),
"CN_content": data.get('CN_content', ""),
"EN_content": data.get('EN_content',""),
"URL": data.get('URL', ""),
"abstract": data.get('abstract', ""),
"title_EN": data.get('title_EN', ""),
"category": data.get('category', ""),
"sourcename": data.get('sourcename', ""),
"lang": data.get('lang', ""),
"deleted": data.get('deleted', ""),
"create_time": data.get('create_time', ""),
"update_time": data.get('update_time', ""),
"file_date": data.get('file_date', ""),
"file_name": data.get('file_name', ""),
}
)
print(f"新闻 {news_id} 更新到 ES 成功")
# 更新标签表
def sync_tags(self, start_time=None, end_time=None):
with self.mysql_conn.cursor(pymysql.cursors.DictCursor) as cursor:
query = self._build_query("news_tags",start_time, end_time)
cursor.execute(query)
res = cursor.fetchall()
# 判断es中是否有该news_id
for data in tqdm(res, desc="同步新闻标签进度", unit=""):
news_id = data['news_id']
# 从es中查询
try:
# 更新全量表到 es
es_doc = self.es.get(index="news_info", id=news_id)
news_tags = None
if es_doc and es_doc.get('found'):
news_tags = es_doc['_source'].get('news_tags', None)
if not news_tags:
# 将mysql的数据写入es
self.es.update(
index="news_info",
id=news_id,
doc={
"news_tags": {
"id": news_id,
"abstract": data.get('abstract', ""),
"title": data.get('title', ""),
"rewrite_content": data.get('rewrite_content', ""),
"industry_label": json.loads(data.get('industry_label', [])),
"industry_confidence": json.loads(data.get('industry_confidence', [])),
"industry_score": json.loads(data.get('industry_score', [])),
"concept_label": json.loads(data.get('concept_label', [])),
"concept_confidence": json.loads(data.get('concept_confidence', [])),
"concept_score": json.loads(data.get('concept_score', [])),
"public_opinion_score": data.get('public_opinion_score', 10),
"China_factor": data.get('China_factor', 0.1),
"source": data.get('source', "其他"),
"source_impact": data.get('source_impact', 5),
"news_score": data.get('news_score', 0.0),
"news_id": news_id,
"deleted": '0',
"create_time": data.get('create_time', ""),
"update_time": data.get('update_time', "")
}
}
)
print(f"新闻 {news_id} 标签更新到 ES 成功")
except Exception as e:
print(e)
# 更新精选表
def sync_chooses(self, start_time=None, end_time=None):
with self.mysql_conn.cursor(pymysql.cursors.DictCursor) as cursor:
query = "select * from news_tags where news_score >= 80"
cursor.execute(query)
res = cursor.fetchall()
# 判断es中是否有该news_id
for data in tqdm(res, desc="同步精选表数据", unit=""):
# 从es中查询
try:
news_id = data['news_id']
news_score = data['news_score']
# 获取返回数据里面的 新闻id
adr = jx_adr.replace("news_id", str(news_id))
response = requests.get(adr)
if response.status_code != 200:
print(f"新闻id:{news_id} 得分:{news_score}, 调用精选接口失败, 错误码:{response.status_code}")
print(f"新闻id:{news_id} 得分:{news_score}, 调用精选接口成功")
except Exception as e:
print(e)
def _build_query(self, table_name, start_time, end_time):
base_query = f"""
SELECT *
FROM {table_name}
"""
if start_time and end_time:
return f"{base_query} WHERE create_time BETWEEN '{start_time}' AND '{end_time}'"
return base_query
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='MySQL to ES Sync Tool')
parser.add_argument('--full', action='store_true', help='Full synchronization')
parser.add_argument('--start', type=str, help='Start time (YYYY-MM-DD HH:MM:SS)')
parser.add_argument('--end', type=str, help='End time (YYYY-MM-DD HH:MM:SS)')
args = parser.parse_args()
if not args.full and not (args.start and args.end):
raise ValueError("需要指定--full全量同步或--start/--end时间范围")
sync = MySQLToESSync()
sync.sync_all(args.start, args.end)
sync.sync_tags(args.start, args.end)
sync.sync_chooses(args.start, args.end)