zzck/sync_mysql_to_es.py

176 lines
7.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import argparse
from datetime import datetime
import pymysql
from elasticsearch import Elasticsearch, helpers
import json
# 复用现有配置
from config import ES_HOST, ES_PORT, ES_USER, ES_PASSWORD, MYSQL_HOST_APP, MYSQL_USER_APP, MYSQL_PASSWORD_APP, MYSQL_DB_APP
from tqdm import tqdm
import requests
from config import jx_adr
class MySQLToESSync:
def __init__(self):
self.es = Elasticsearch(
[f"http://{ES_HOST}:{ES_PORT}"],
basic_auth=(ES_USER, ES_PASSWORD)
)
self.mysql_conn = pymysql.connect(
host=MYSQL_HOST_APP,
user=MYSQL_USER_APP,
password=MYSQL_PASSWORD_APP,
db=MYSQL_DB_APP,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
# 更新全量表
def sync_all(self, start_time=None, end_time=None):
# 更新全量表到 es
with self.mysql_conn.cursor(pymysql.cursors.DictCursor) as cursor:
query = self._build_query("news_info",start_time, end_time)
cursor.execute(query)
# 支持两种同步模式
res = cursor.fetchall()
# 判断es中是否有该news_id
for data in tqdm(res, desc="同步全量表", unit=""):
news_id = data['news_id']
# 从es中查询
try:
exists = self.es.get(index="news_info", id=news_id)
except:
exists = None
if exists:
pass
else:
# es中不存在直接新增
self.es.index(
index="news_info",
id=news_id,
document={
"news_id": data.get('news_id', ""),
"input_date": data.get('input_date', ""),
"words": data.get('words', ""),
"title_txt": data.get('title_txt', ""),
"key_word": data.get('key_word', ""),
"CN_content": data.get('CN_content', ""),
"EN_content": data.get('EN_content',""),
"URL": data.get('URL', ""),
"abstract": data.get('abstract', ""),
"title_EN": data.get('title_EN', ""),
"category": data.get('category', ""),
"sourcename": data.get('sourcename', ""),
"lang": data.get('lang', ""),
"deleted": data.get('deleted', ""),
"create_time": data.get('create_time', ""),
"update_time": data.get('update_time', ""),
"file_date": data.get('file_date', ""),
"file_name": data.get('file_name', ""),
}
)
print(f"新闻 {news_id} 更新到 ES 成功")
# 更新标签表
def sync_tags(self, start_time=None, end_time=None):
with self.mysql_conn.cursor(pymysql.cursors.DictCursor) as cursor:
query = self._build_query("news_tags",start_time, end_time)
cursor.execute(query)
res = cursor.fetchall()
# 判断es中是否有该news_id
for data in tqdm(res, desc="同步新闻标签进度", unit=""):
news_id = data['news_id']
# 从es中查询
try:
# 更新全量表到 es
es_doc = self.es.get(index="news_info", id=news_id)
news_tags = None
if es_doc and es_doc.get('found'):
news_tags = es_doc['_source'].get('news_tags', None)
if not news_tags:
# 将mysql的数据写入es
self.es.update(
index="news_info",
id=news_id,
doc={
"news_tags": {
"id": news_id,
"abstract": data.get('abstract', ""),
"title": data.get('title', ""),
"rewrite_content": data.get('rewrite_content', ""),
"industry_label": json.loads(data.get('industry_label', [])),
"industry_confidence": json.loads(data.get('industry_confidence', [])),
"industry_score": json.loads(data.get('industry_score', [])),
"concept_label": json.loads(data.get('concept_label', [])),
"concept_confidence": json.loads(data.get('concept_confidence', [])),
"concept_score": json.loads(data.get('concept_score', [])),
"public_opinion_score": data.get('public_opinion_score', 10),
"China_factor": data.get('China_factor', 0.1),
"source": data.get('source', "其他"),
"source_impact": data.get('source_impact', 5),
"news_score": data.get('news_score', 0.0),
"news_id": news_id,
"deleted": '0',
"create_time": data.get('create_time', ""),
"update_time": data.get('update_time', "")
}
}
)
print(f"新闻 {news_id} 标签更新到 ES 成功")
except Exception as e:
print(e)
# 更新精选表
def sync_chooses(self, start_time=None, end_time=None):
with self.mysql_conn.cursor(pymysql.cursors.DictCursor) as cursor:
query = "select * from news_tags where news_score >= 80"
cursor.execute(query)
res = cursor.fetchall()
# 判断es中是否有该news_id
for data in tqdm(res, desc="同步精选表数据", unit=""):
# 从es中查询
try:
news_id = data['news_id']
news_score = data['news_score']
# 获取返回数据里面的 新闻id
adr = jx_adr.replace("news_id", str(news_id))
response = requests.get(adr)
if response.status_code != 200:
print(f"新闻id:{news_id} 得分:{news_score}, 调用精选接口失败, 错误码:{response.status_code}")
print(f"新闻id:{news_id} 得分:{news_score}, 调用精选接口成功")
except Exception as e:
print(e)
def _build_query(self, table_name, start_time, end_time):
base_query = f"""
SELECT *
FROM {table_name}
"""
if start_time and end_time:
return f"{base_query} WHERE create_time BETWEEN '{start_time}' AND '{end_time}'"
return base_query
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='MySQL to ES Sync Tool')
parser.add_argument('--full', action='store_true', help='Full synchronization')
parser.add_argument('--start', type=str, help='Start time (YYYY-MM-DD HH:MM:SS)')
parser.add_argument('--end', type=str, help='End time (YYYY-MM-DD HH:MM:SS)')
args = parser.parse_args()
if not args.full and not (args.start and args.end):
raise ValueError("需要指定--full全量同步或--start/--end时间范围")
sync = MySQLToESSync()
sync.sync_all(args.start, args.end)
sync.sync_tags(args.start, args.end)
sync.sync_chooses(args.start, args.end)