252 lines
9.4 KiB
Python
252 lines
9.4 KiB
Python
import subprocess
|
||
from bs4 import BeautifulSoup
|
||
import datetime
|
||
import os
|
||
from urllib.parse import urlparse, urljoin
|
||
import time
|
||
import pymysql
|
||
from pathlib import Path
|
||
import logging
|
||
from apscheduler.schedulers.blocking import BlockingScheduler
|
||
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
|
||
import json
|
||
import re
|
||
from config import * # 导入配置
|
||
from mqsend import send_message # 导入消息发送函数
|
||
from elasticsearch import Elasticsearch
|
||
|
||
|
||
def clean_control_characters(text):
|
||
return re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text)
|
||
|
||
def do_work(target):
|
||
# 创建data目录
|
||
os.makedirs('data', exist_ok=True)
|
||
|
||
today = datetime.datetime.today().strftime('%Y-%m-%d')
|
||
new_target = f"{target}/{today}/"
|
||
try:
|
||
sub_result = subprocess.run(['curl', new_target], capture_output=True, text=True)
|
||
|
||
if sub_result.returncode == 0:
|
||
soup = BeautifulSoup(sub_result.stdout, 'html.parser')
|
||
links = soup.find_all('a', href=True)
|
||
|
||
for link in links:
|
||
href = link['href']
|
||
if href.startswith('/'):
|
||
|
||
file_url = f"{target}/{urljoin(new_target, href)}"
|
||
# 生成本地保存路径
|
||
local_path = os.path.join(file_path+'data', href.lstrip('/')) # 添加日期目录层级
|
||
# 创建目录结构
|
||
os.makedirs(os.path.dirname(local_path), exist_ok=True)
|
||
# 下载文件
|
||
if not os.path.exists(local_path):
|
||
logger.info(f"文件不存在,准备下载: {file_url}")
|
||
download_cmd = ['curl', '-s', '-o', local_path, file_url]
|
||
dl_result = subprocess.run(download_cmd)
|
||
if dl_result.returncode == 0:
|
||
logger.info(f"下载成功: {local_path}")
|
||
# 同步到MySQL
|
||
sync_to_database(local_path)
|
||
else:
|
||
logger.error(f"下载失败: {file_url}")
|
||
|
||
else:
|
||
logger.warning(f"忽略非相对路径: {href}")
|
||
else:
|
||
logger.error(f"获取失败: {sub_result.stderr}")
|
||
except Exception as e:
|
||
logger.error(f"处理 {new_target} 时发生未知错误: {e}")
|
||
|
||
|
||
def write_to_mysql(data_list, local_path):
|
||
conn = pymysql.connect(
|
||
host=MYSQL_HOST_APP,
|
||
port=MYSQL_PORT_APP,
|
||
user=MYSQL_USER_APP,
|
||
password=MYSQL_PASSWORD_APP,
|
||
db=MYSQL_DB_APP,
|
||
charset='utf8mb4'
|
||
)
|
||
|
||
try:
|
||
with conn.cursor() as cursor:
|
||
for data in data_list:
|
||
# 新增JSON结构解析逻辑
|
||
category_data = data.get('c', [{}])
|
||
category = lang = sourcename = ""
|
||
if category_data:
|
||
category = category_data[0].get('category', '')
|
||
b_data = category_data[0].get('b', [{}])
|
||
if b_data:
|
||
lang = b_data[0].get('lang', '')
|
||
d_data = b_data[0].get('d', [{}])
|
||
if d_data:
|
||
sourcename = d_data[0].get('sourcename', '')
|
||
|
||
# 处理特殊字符转义
|
||
en_content = data.get('EN_content',0).replace('quot;', '"')
|
||
|
||
# 修改后的SQL语句
|
||
sql = """INSERT INTO news_info
|
||
(news_id, input_date, words, title_txt, key_word,
|
||
CN_content, EN_content, URL, abstract, title_EN,
|
||
category, sourcename, lang, deleted, create_time,
|
||
update_time, file_date, file_name)
|
||
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
||
ON DUPLICATE KEY UPDATE
|
||
category=VALUES(category),
|
||
sourcename=VALUES(sourcename),
|
||
lang=VALUES(lang)"""
|
||
|
||
values = (
|
||
data['id'],
|
||
data.get('input_date', ""),
|
||
data.get('words', ""),
|
||
data.get('title_txt', ""),
|
||
data.get('key_word', ""),
|
||
data.get('CN_content', ""),
|
||
en_content, # 使用处理后的英文内容
|
||
data.get('URL', ""),
|
||
data.get('abstract', ""),
|
||
data.get('title_EN', ""),
|
||
category,
|
||
sourcename,
|
||
lang,
|
||
0,
|
||
datetime.datetime.now(),
|
||
datetime.datetime.now(),
|
||
datetime.datetime.strptime(Path(local_path).parent.name, '%Y-%m-%d').date(),
|
||
Path(local_path).name
|
||
)
|
||
cursor.execute(sql, values)
|
||
logger.info(f"成功写入mysql: {data['id']}")
|
||
|
||
#
|
||
|
||
# 发送消息到MQ
|
||
send_message(json.dumps(data))
|
||
conn.commit()
|
||
except Exception as e:
|
||
logger.error(f"写入mysql失败: {str(e)}")
|
||
finally:
|
||
conn.close()
|
||
return True
|
||
|
||
# 写入ES
|
||
def write_to_es(data_list, local_path):
|
||
# 初始化ES连接(添加在文件顶部)
|
||
es = Elasticsearch(
|
||
[f"http://{ES_HOST}:{ES_PORT}"], # 将协议直接包含在hosts中
|
||
basic_auth=(ES_USER, ES_PASSWORD)
|
||
)
|
||
try:
|
||
for data in data_list:
|
||
# 插入ES核心逻辑
|
||
category_data = data.get('c', [{}])
|
||
category = lang = sourcename = ""
|
||
if category_data:
|
||
category = category_data[0].get('category', '')
|
||
b_data = category_data[0].get('b', [{}])
|
||
if b_data:
|
||
lang = b_data[0].get('lang', '')
|
||
d_data = b_data[0].get('d', [{}])
|
||
if d_data:
|
||
sourcename = d_data[0].get('sourcename', '')
|
||
|
||
es.index(
|
||
index="news_info",
|
||
id=data['id'],
|
||
document={
|
||
"news_id": data.get('id', ""),
|
||
"input_date": data.get('input_date', ""),
|
||
"words": data.get('words', ""),
|
||
"title_txt": data.get('title_txt', ""),
|
||
"key_word": data.get('key_word', ""),
|
||
"CN_content": data.get('CN_content', ""),
|
||
"EN_content": data.get('EN_content',0).replace('quot;', '"'),
|
||
"URL": data.get('URL', ""),
|
||
"abstract": data.get('abstract', ""),
|
||
"title_EN": data.get('title_EN', ""),
|
||
"category": category,
|
||
"sourcename": sourcename,
|
||
"lang": lang,
|
||
"deleted": "0",
|
||
"create_time":datetime.datetime.now(),
|
||
"update_time": datetime.datetime.now(),
|
||
"file_date": datetime.datetime.strptime(Path(local_path).parent.name, '%Y-%m-%d').date(),
|
||
"file_name": Path(local_path).name
|
||
}
|
||
)
|
||
logger.info(f"成功写入ES文档ID: {data['id']}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"写入ES失败: {str(e)}")
|
||
|
||
def sync_to_database(local_path):
|
||
try:
|
||
with open(local_path, 'r', encoding='utf-16') as f: # 修改编码方式
|
||
raw_data = f.read()
|
||
cleaned_data = clean_control_characters(raw_data)
|
||
data_list = json.loads(cleaned_data)
|
||
write_to_mysql(data_list, local_path) # 传递本地路径
|
||
write_to_es(data_list, local_path) # 传递本地路径
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理文件 {local_path} 时发生未知错误: {e}")
|
||
return False
|
||
|
||
|
||
|
||
|
||
# 初始化日志配置(添加在文件顶部)
|
||
logger = logging.getLogger()
|
||
logger.setLevel(logging.INFO)
|
||
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||
|
||
# 文件日志
|
||
file_handler = logging.FileHandler('zzck_nohup.log')
|
||
file_handler.setFormatter(formatter)
|
||
logger.addHandler(file_handler)
|
||
|
||
# 控制台日志
|
||
console_handler = logging.StreamHandler()
|
||
console_handler.setFormatter(formatter)
|
||
logger.addHandler(console_handler)
|
||
|
||
# 调度任务监听器(添加在job函数后)
|
||
def scheduler_listener(event):
|
||
if event.exception:
|
||
logger.error("任务执行失败!", exc_info=event.exception)
|
||
else:
|
||
logger.info("任务执行完成")
|
||
|
||
if __name__ == "__main__":
|
||
scheduler = BlockingScheduler()
|
||
target = "zzbtw.ckxx.net" # 确保target变量在此处定义
|
||
|
||
# 修改调度任务添加方式
|
||
scheduler.add_job(
|
||
do_work,
|
||
'interval',
|
||
minutes=1,
|
||
id='zzck_sync_job',
|
||
args=[target], # 添加参数传递
|
||
max_instances=1
|
||
)
|
||
|
||
# 添加事件监听
|
||
scheduler.add_listener(scheduler_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
|
||
|
||
try:
|
||
logger.info("程序启动,调度器开始运行...")
|
||
scheduler.start()
|
||
except (KeyboardInterrupt, SystemExit):
|
||
logger.info("收到停止信号,正在关闭调度器...")
|
||
scheduler.shutdown()
|
||
logger.info("调度器已关闭")
|