176 lines
7.9 KiB
Python
176 lines
7.9 KiB
Python
import argparse
|
||
from datetime import datetime
|
||
import pymysql
|
||
from elasticsearch import Elasticsearch, helpers
|
||
import json
|
||
# 复用现有配置
|
||
from config import ES_HOST, ES_PORT, ES_USER, ES_PASSWORD, MYSQL_HOST_APP, MYSQL_USER_APP, MYSQL_PASSWORD_APP, MYSQL_DB_APP
|
||
from tqdm import tqdm
|
||
import requests
|
||
from config import jx_adr
|
||
|
||
class MySQLToESSync:
|
||
def __init__(self):
|
||
self.es = Elasticsearch(
|
||
[f"http://{ES_HOST}:{ES_PORT}"],
|
||
basic_auth=(ES_USER, ES_PASSWORD)
|
||
)
|
||
self.mysql_conn = pymysql.connect(
|
||
host=MYSQL_HOST_APP,
|
||
user=MYSQL_USER_APP,
|
||
password=MYSQL_PASSWORD_APP,
|
||
db=MYSQL_DB_APP,
|
||
charset='utf8mb4',
|
||
cursorclass=pymysql.cursors.DictCursor
|
||
)
|
||
|
||
# 更新全量表
|
||
def sync_all(self, start_time=None, end_time=None):
|
||
# 更新全量表到 es
|
||
with self.mysql_conn.cursor(pymysql.cursors.DictCursor) as cursor:
|
||
query = self._build_query("news_info",start_time, end_time)
|
||
cursor.execute(query)
|
||
# 支持两种同步模式
|
||
res = cursor.fetchall()
|
||
# 判断es中是否有该news_id
|
||
for data in tqdm(res, desc="同步全量表", unit="条"):
|
||
news_id = data['news_id']
|
||
# 从es中查询
|
||
try:
|
||
exists = self.es.get(index="news_info", id=news_id)
|
||
except:
|
||
exists = None
|
||
if exists:
|
||
pass
|
||
else:
|
||
# es中不存在,直接新增
|
||
self.es.index(
|
||
index="news_info",
|
||
id=news_id,
|
||
document={
|
||
"news_id": data.get('news_id', ""),
|
||
"input_date": data.get('input_date', ""),
|
||
"words": data.get('words', ""),
|
||
"title_txt": data.get('title_txt', ""),
|
||
"key_word": data.get('key_word', ""),
|
||
"CN_content": data.get('CN_content', ""),
|
||
"EN_content": data.get('EN_content',""),
|
||
"URL": data.get('URL', ""),
|
||
"abstract": data.get('abstract', ""),
|
||
"title_EN": data.get('title_EN', ""),
|
||
"category": data.get('category', ""),
|
||
"sourcename": data.get('sourcename', ""),
|
||
"lang": data.get('lang', ""),
|
||
"deleted": data.get('deleted', ""),
|
||
"create_time": data.get('create_time', ""),
|
||
"update_time": data.get('update_time', ""),
|
||
"file_date": data.get('file_date', ""),
|
||
"file_name": data.get('file_name', ""),
|
||
}
|
||
)
|
||
print(f"新闻 {news_id} 更新到 ES 成功")
|
||
|
||
|
||
# 更新标签表
|
||
def sync_tags(self, start_time=None, end_time=None):
|
||
|
||
with self.mysql_conn.cursor(pymysql.cursors.DictCursor) as cursor:
|
||
query = self._build_query("news_tags",start_time, end_time)
|
||
cursor.execute(query)
|
||
|
||
res = cursor.fetchall()
|
||
# 判断es中是否有该news_id
|
||
for data in tqdm(res, desc="同步新闻标签进度", unit="条"):
|
||
news_id = data['news_id']
|
||
# 从es中查询
|
||
try:
|
||
# 更新全量表到 es
|
||
es_doc = self.es.get(index="news_info", id=news_id)
|
||
news_tags = None
|
||
if es_doc and es_doc.get('found'):
|
||
news_tags = es_doc['_source'].get('news_tags', None)
|
||
if not news_tags:
|
||
# 将mysql的数据写入es
|
||
self.es.update(
|
||
index="news_info",
|
||
id=news_id,
|
||
doc={
|
||
"news_tags": {
|
||
"id": news_id,
|
||
"abstract": data.get('abstract', ""),
|
||
"title": data.get('title', ""),
|
||
"rewrite_content": data.get('rewrite_content', ""),
|
||
"industry_label": json.loads(data.get('industry_label', [])),
|
||
"industry_confidence": json.loads(data.get('industry_confidence', [])),
|
||
"industry_score": json.loads(data.get('industry_score', [])),
|
||
"concept_label": json.loads(data.get('concept_label', [])),
|
||
"concept_confidence": json.loads(data.get('concept_confidence', [])),
|
||
"concept_score": json.loads(data.get('concept_score', [])),
|
||
"public_opinion_score": data.get('public_opinion_score', 10),
|
||
"China_factor": data.get('China_factor', 0.1),
|
||
"source": data.get('source', "其他"),
|
||
"source_impact": data.get('source_impact', 5),
|
||
"news_score": data.get('news_score', 0.0),
|
||
"news_id": news_id,
|
||
"deleted": '0',
|
||
"create_time": data.get('create_time', ""),
|
||
"update_time": data.get('update_time', "")
|
||
}
|
||
}
|
||
)
|
||
print(f"新闻 {news_id} 标签更新到 ES 成功")
|
||
|
||
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
# 更新精选表
|
||
def sync_chooses(self, start_time=None, end_time=None):
|
||
with self.mysql_conn.cursor(pymysql.cursors.DictCursor) as cursor:
|
||
query = "select * from news_tags where news_score >= 80"
|
||
cursor.execute(query)
|
||
res = cursor.fetchall()
|
||
# 判断es中是否有该news_id
|
||
for data in tqdm(res, desc="同步精选表数据", unit="条"):
|
||
# 从es中查询
|
||
try:
|
||
news_id = data['news_id']
|
||
news_score = data['news_score']
|
||
# 获取返回数据里面的 新闻id
|
||
adr = jx_adr.replace("news_id", str(news_id))
|
||
response = requests.get(adr)
|
||
if response.status_code != 200:
|
||
print(f"新闻id:{news_id} 得分:{news_score}, 调用精选接口失败, 错误码:{response.status_code}")
|
||
print(f"新闻id:{news_id} 得分:{news_score}, 调用精选接口成功")
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
|
||
|
||
|
||
|
||
def _build_query(self, table_name, start_time, end_time):
|
||
base_query = f"""
|
||
SELECT *
|
||
FROM {table_name}
|
||
"""
|
||
if start_time and end_time:
|
||
return f"{base_query} WHERE create_time BETWEEN '{start_time}' AND '{end_time}'"
|
||
return base_query
|
||
|
||
if __name__ == "__main__":
|
||
parser = argparse.ArgumentParser(description='MySQL to ES Sync Tool')
|
||
parser.add_argument('--full', action='store_true', help='Full synchronization')
|
||
parser.add_argument('--start', type=str, help='Start time (YYYY-MM-DD HH:MM:SS)')
|
||
parser.add_argument('--end', type=str, help='End time (YYYY-MM-DD HH:MM:SS)')
|
||
|
||
args = parser.parse_args()
|
||
|
||
if not args.full and not (args.start and args.end):
|
||
raise ValueError("需要指定--full全量同步或--start/--end时间范围")
|
||
|
||
sync = MySQLToESSync()
|
||
sync.sync_all(args.start, args.end)
|
||
sync.sync_tags(args.start, args.end)
|
||
sync.sync_chooses(args.start, args.end)
|
||
|