252 lines
9.4 KiB
Python
252 lines
9.4 KiB
Python
|
import subprocess
|
|||
|
from bs4 import BeautifulSoup
|
|||
|
import datetime
|
|||
|
import os
|
|||
|
from urllib.parse import urlparse, urljoin
|
|||
|
import time
|
|||
|
import pymysql
|
|||
|
from pathlib import Path
|
|||
|
import logging
|
|||
|
from apscheduler.schedulers.blocking import BlockingScheduler
|
|||
|
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
|
|||
|
import json
|
|||
|
import re
|
|||
|
from config import * # 导入配置
|
|||
|
from mqsend import send_message # 导入消息发送函数
|
|||
|
from elasticsearch import Elasticsearch
|
|||
|
|
|||
|
|
|||
|
def clean_control_characters(text):
|
|||
|
return re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text)
|
|||
|
|
|||
|
def do_work(target):
|
|||
|
# 创建data目录
|
|||
|
os.makedirs('data', exist_ok=True)
|
|||
|
|
|||
|
today = datetime.datetime.today().strftime('%Y-%m-%d')
|
|||
|
new_target = f"{target}/{today}/"
|
|||
|
try:
|
|||
|
sub_result = subprocess.run(['curl', new_target], capture_output=True, text=True)
|
|||
|
|
|||
|
if sub_result.returncode == 0:
|
|||
|
soup = BeautifulSoup(sub_result.stdout, 'html.parser')
|
|||
|
links = soup.find_all('a', href=True)
|
|||
|
|
|||
|
for link in links:
|
|||
|
href = link['href']
|
|||
|
if href.startswith('/'):
|
|||
|
|
|||
|
file_url = f"{target}/{urljoin(new_target, href)}"
|
|||
|
# 生成本地保存路径
|
|||
|
local_path = os.path.join(file_path+'data', href.lstrip('/')) # 添加日期目录层级
|
|||
|
# 创建目录结构
|
|||
|
os.makedirs(os.path.dirname(local_path), exist_ok=True)
|
|||
|
# 下载文件
|
|||
|
if not os.path.exists(local_path):
|
|||
|
logger.info(f"文件不存在,准备下载: {file_url}")
|
|||
|
download_cmd = ['curl', '-s', '-o', local_path, file_url]
|
|||
|
dl_result = subprocess.run(download_cmd)
|
|||
|
if dl_result.returncode == 0:
|
|||
|
logger.info(f"下载成功: {local_path}")
|
|||
|
# 同步到MySQL
|
|||
|
sync_to_database(local_path)
|
|||
|
else:
|
|||
|
logger.error(f"下载失败: {file_url}")
|
|||
|
|
|||
|
else:
|
|||
|
logger.warning(f"忽略非相对路径: {href}")
|
|||
|
else:
|
|||
|
logger.error(f"获取失败: {sub_result.stderr}")
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"处理 {new_target} 时发生未知错误: {e}")
|
|||
|
|
|||
|
|
|||
|
def write_to_mysql(data_list, local_path):
|
|||
|
conn = pymysql.connect(
|
|||
|
host=MYSQL_HOST_APP,
|
|||
|
port=MYSQL_PORT_APP,
|
|||
|
user=MYSQL_USER_APP,
|
|||
|
password=MYSQL_PASSWORD_APP,
|
|||
|
db=MYSQL_DB_APP,
|
|||
|
charset='utf8mb4'
|
|||
|
)
|
|||
|
|
|||
|
try:
|
|||
|
with conn.cursor() as cursor:
|
|||
|
for data in data_list:
|
|||
|
# 新增JSON结构解析逻辑
|
|||
|
category_data = data.get('c', [{}])
|
|||
|
category = lang = sourcename = ""
|
|||
|
if category_data:
|
|||
|
category = category_data[0].get('category', '')
|
|||
|
b_data = category_data[0].get('b', [{}])
|
|||
|
if b_data:
|
|||
|
lang = b_data[0].get('lang', '')
|
|||
|
d_data = b_data[0].get('d', [{}])
|
|||
|
if d_data:
|
|||
|
sourcename = d_data[0].get('sourcename', '')
|
|||
|
|
|||
|
# 处理特殊字符转义
|
|||
|
en_content = data.get('EN_content',0).replace('quot;', '"')
|
|||
|
|
|||
|
# 修改后的SQL语句
|
|||
|
sql = """INSERT INTO news_info
|
|||
|
(news_id, input_date, words, title_txt, key_word,
|
|||
|
CN_content, EN_content, URL, abstract, title_EN,
|
|||
|
category, sourcename, lang, deleted, create_time,
|
|||
|
update_time, file_date, file_name)
|
|||
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
|||
|
ON DUPLICATE KEY UPDATE
|
|||
|
category=VALUES(category),
|
|||
|
sourcename=VALUES(sourcename),
|
|||
|
lang=VALUES(lang)"""
|
|||
|
|
|||
|
values = (
|
|||
|
data['id'],
|
|||
|
data.get('input_date', ""),
|
|||
|
data.get('words', ""),
|
|||
|
data.get('title_txt', ""),
|
|||
|
data.get('key_word', ""),
|
|||
|
data.get('CN_content', ""),
|
|||
|
en_content, # 使用处理后的英文内容
|
|||
|
data.get('URL', ""),
|
|||
|
data.get('abstract', ""),
|
|||
|
data.get('title_EN', ""),
|
|||
|
category,
|
|||
|
sourcename,
|
|||
|
lang,
|
|||
|
0,
|
|||
|
datetime.datetime.now(),
|
|||
|
datetime.datetime.now(),
|
|||
|
datetime.datetime.strptime(Path(local_path).parent.name, '%Y-%m-%d').date(),
|
|||
|
Path(local_path).name
|
|||
|
)
|
|||
|
cursor.execute(sql, values)
|
|||
|
logger.info(f"成功写入mysql: {data['id']}")
|
|||
|
|
|||
|
#
|
|||
|
|
|||
|
# 发送消息到MQ
|
|||
|
send_message(json.dumps(data))
|
|||
|
conn.commit()
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"写入mysql失败: {str(e)}")
|
|||
|
finally:
|
|||
|
conn.close()
|
|||
|
return True
|
|||
|
|
|||
|
# 写入ES
|
|||
|
def write_to_es(data_list, local_path):
|
|||
|
# 初始化ES连接(添加在文件顶部)
|
|||
|
es = Elasticsearch(
|
|||
|
[f"http://{ES_HOST}:{ES_PORT}"], # 将协议直接包含在hosts中
|
|||
|
basic_auth=(ES_USER, ES_PASSWORD)
|
|||
|
)
|
|||
|
try:
|
|||
|
for data in data_list:
|
|||
|
# 插入ES核心逻辑
|
|||
|
category_data = data.get('c', [{}])
|
|||
|
category = lang = sourcename = ""
|
|||
|
if category_data:
|
|||
|
category = category_data[0].get('category', '')
|
|||
|
b_data = category_data[0].get('b', [{}])
|
|||
|
if b_data:
|
|||
|
lang = b_data[0].get('lang', '')
|
|||
|
d_data = b_data[0].get('d', [{}])
|
|||
|
if d_data:
|
|||
|
sourcename = d_data[0].get('sourcename', '')
|
|||
|
|
|||
|
es.index(
|
|||
|
index="news_info",
|
|||
|
id=data['id'],
|
|||
|
document={
|
|||
|
"news_id": data.get('id', ""),
|
|||
|
"input_date": data.get('input_date', ""),
|
|||
|
"words": data.get('words', ""),
|
|||
|
"title_txt": data.get('title_txt', ""),
|
|||
|
"key_word": data.get('key_word', ""),
|
|||
|
"CN_content": data.get('CN_content', ""),
|
|||
|
"EN_content": data.get('EN_content',0).replace('quot;', '"'),
|
|||
|
"URL": data.get('URL', ""),
|
|||
|
"abstract": data.get('abstract', ""),
|
|||
|
"title_EN": data.get('title_EN', ""),
|
|||
|
"category": category,
|
|||
|
"sourcename": sourcename,
|
|||
|
"lang": lang,
|
|||
|
"deleted": "0",
|
|||
|
"create_time":datetime.datetime.now(),
|
|||
|
"update_time": datetime.datetime.now(),
|
|||
|
"file_date": datetime.datetime.strptime(Path(local_path).parent.name, '%Y-%m-%d').date(),
|
|||
|
"file_name": Path(local_path).name
|
|||
|
}
|
|||
|
)
|
|||
|
logger.info(f"成功写入ES文档ID: {data['id']}")
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"写入ES失败: {str(e)}")
|
|||
|
|
|||
|
def sync_to_database(local_path):
|
|||
|
try:
|
|||
|
with open(local_path, 'r', encoding='utf-16') as f: # 修改编码方式
|
|||
|
raw_data = f.read()
|
|||
|
cleaned_data = clean_control_characters(raw_data)
|
|||
|
data_list = json.loads(cleaned_data)
|
|||
|
write_to_mysql(data_list, local_path) # 传递本地路径
|
|||
|
write_to_es(data_list, local_path) # 传递本地路径
|
|||
|
|
|||
|
return True
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"处理文件 {local_path} 时发生未知错误: {e}")
|
|||
|
return False
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|
|||
|
# 初始化日志配置(添加在文件顶部)
|
|||
|
logger = logging.getLogger()
|
|||
|
logger.setLevel(logging.INFO)
|
|||
|
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
|||
|
|
|||
|
# 文件日志
|
|||
|
file_handler = logging.FileHandler('zzck_nohup.log')
|
|||
|
file_handler.setFormatter(formatter)
|
|||
|
logger.addHandler(file_handler)
|
|||
|
|
|||
|
# 控制台日志
|
|||
|
console_handler = logging.StreamHandler()
|
|||
|
console_handler.setFormatter(formatter)
|
|||
|
logger.addHandler(console_handler)
|
|||
|
|
|||
|
# 调度任务监听器(添加在job函数后)
|
|||
|
def scheduler_listener(event):
|
|||
|
if event.exception:
|
|||
|
logger.error("任务执行失败!", exc_info=event.exception)
|
|||
|
else:
|
|||
|
logger.info("任务执行完成")
|
|||
|
|
|||
|
if __name__ == "__main__":
|
|||
|
scheduler = BlockingScheduler()
|
|||
|
target = "zzbtw.ckxx.net" # 确保target变量在此处定义
|
|||
|
|
|||
|
# 修改调度任务添加方式
|
|||
|
scheduler.add_job(
|
|||
|
do_work,
|
|||
|
'interval',
|
|||
|
minutes=1,
|
|||
|
id='zzck_sync_job',
|
|||
|
args=[target], # 添加参数传递
|
|||
|
max_instances=1
|
|||
|
)
|
|||
|
|
|||
|
# 添加事件监听
|
|||
|
scheduler.add_listener(scheduler_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
|
|||
|
|
|||
|
try:
|
|||
|
logger.info("程序启动,调度器开始运行...")
|
|||
|
scheduler.start()
|
|||
|
except (KeyboardInterrupt, SystemExit):
|
|||
|
logger.info("收到停止信号,正在关闭调度器...")
|
|||
|
scheduler.shutdown()
|
|||
|
logger.info("调度器已关闭")
|