zzck/main.py

252 lines
9.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import subprocess
from bs4 import BeautifulSoup
import datetime
import os
from urllib.parse import urlparse, urljoin
import time
import pymysql
from pathlib import Path
import logging
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import json
import re
from config import * # 导入配置
from mqsend import send_message # 导入消息发送函数
from elasticsearch import Elasticsearch
def clean_control_characters(text):
return re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text)
def do_work(target):
# 创建data目录
os.makedirs('data', exist_ok=True)
today = datetime.datetime.today().strftime('%Y-%m-%d')
new_target = f"{target}/{today}/"
try:
sub_result = subprocess.run(['curl', new_target], capture_output=True, text=True)
if sub_result.returncode == 0:
soup = BeautifulSoup(sub_result.stdout, 'html.parser')
links = soup.find_all('a', href=True)
for link in links:
href = link['href']
if href.startswith('/'):
file_url = f"{target}/{urljoin(new_target, href)}"
# 生成本地保存路径
local_path = os.path.join(file_path+'data', href.lstrip('/')) # 添加日期目录层级
# 创建目录结构
os.makedirs(os.path.dirname(local_path), exist_ok=True)
# 下载文件
if not os.path.exists(local_path):
logger.info(f"文件不存在,准备下载: {file_url}")
download_cmd = ['curl', '-s', '-o', local_path, file_url]
dl_result = subprocess.run(download_cmd)
if dl_result.returncode == 0:
logger.info(f"下载成功: {local_path}")
# 同步到MySQL
sync_to_database(local_path)
else:
logger.error(f"下载失败: {file_url}")
else:
logger.warning(f"忽略非相对路径: {href}")
else:
logger.error(f"获取失败: {sub_result.stderr}")
except Exception as e:
logger.error(f"处理 {new_target} 时发生未知错误: {e}")
def write_to_mysql(data_list, local_path):
conn = pymysql.connect(
host=MYSQL_HOST_APP,
port=MYSQL_PORT_APP,
user=MYSQL_USER_APP,
password=MYSQL_PASSWORD_APP,
db=MYSQL_DB_APP,
charset='utf8mb4'
)
try:
with conn.cursor() as cursor:
for data in data_list:
# 新增JSON结构解析逻辑
category_data = data.get('c', [{}])
category = lang = sourcename = ""
if category_data:
category = category_data[0].get('category', '')
b_data = category_data[0].get('b', [{}])
if b_data:
lang = b_data[0].get('lang', '')
d_data = b_data[0].get('d', [{}])
if d_data:
sourcename = d_data[0].get('sourcename', '')
# 处理特殊字符转义
en_content = data.get('EN_content',0).replace('quot;', '"')
# 修改后的SQL语句
sql = """INSERT INTO news_info
(news_id, input_date, words, title_txt, key_word,
CN_content, EN_content, URL, abstract, title_EN,
category, sourcename, lang, deleted, create_time,
update_time, file_date, file_name)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON DUPLICATE KEY UPDATE
category=VALUES(category),
sourcename=VALUES(sourcename),
lang=VALUES(lang)"""
values = (
data['id'],
data.get('input_date', ""),
data.get('words', ""),
data.get('title_txt', ""),
data.get('key_word', ""),
data.get('CN_content', ""),
en_content, # 使用处理后的英文内容
data.get('URL', ""),
data.get('abstract', ""),
data.get('title_EN', ""),
category,
sourcename,
lang,
0,
datetime.datetime.now(),
datetime.datetime.now(),
datetime.datetime.strptime(Path(local_path).parent.name, '%Y-%m-%d').date(),
Path(local_path).name
)
cursor.execute(sql, values)
logger.info(f"成功写入mysql: {data['id']}")
#
# 发送消息到MQ
send_message(json.dumps(data))
conn.commit()
except Exception as e:
logger.error(f"写入mysql失败: {str(e)}")
finally:
conn.close()
return True
# 写入ES
def write_to_es(data_list, local_path):
# 初始化ES连接添加在文件顶部
es = Elasticsearch(
[f"http://{ES_HOST}:{ES_PORT}"], # 将协议直接包含在hosts中
basic_auth=(ES_USER, ES_PASSWORD)
)
try:
for data in data_list:
# 插入ES核心逻辑
category_data = data.get('c', [{}])
category = lang = sourcename = ""
if category_data:
category = category_data[0].get('category', '')
b_data = category_data[0].get('b', [{}])
if b_data:
lang = b_data[0].get('lang', '')
d_data = b_data[0].get('d', [{}])
if d_data:
sourcename = d_data[0].get('sourcename', '')
es.index(
index="news_info",
id=data['id'],
document={
"news_id": data.get('id', ""),
"input_date": data.get('input_date', ""),
"words": data.get('words', ""),
"title_txt": data.get('title_txt', ""),
"key_word": data.get('key_word', ""),
"CN_content": data.get('CN_content', ""),
"EN_content": data.get('EN_content',0).replace('quot;', '"'),
"URL": data.get('URL', ""),
"abstract": data.get('abstract', ""),
"title_EN": data.get('title_EN', ""),
"category": category,
"sourcename": sourcename,
"lang": lang,
"deleted": "0",
"create_time":datetime.datetime.now(),
"update_time": datetime.datetime.now(),
"file_date": datetime.datetime.strptime(Path(local_path).parent.name, '%Y-%m-%d').date(),
"file_name": Path(local_path).name
}
)
logger.info(f"成功写入ES文档ID: {data['id']}")
except Exception as e:
logger.error(f"写入ES失败: {str(e)}")
def sync_to_database(local_path):
try:
with open(local_path, 'r', encoding='utf-16') as f: # 修改编码方式
raw_data = f.read()
cleaned_data = clean_control_characters(raw_data)
data_list = json.loads(cleaned_data)
write_to_mysql(data_list, local_path) # 传递本地路径
write_to_es(data_list, local_path) # 传递本地路径
return True
except Exception as e:
logger.error(f"处理文件 {local_path} 时发生未知错误: {e}")
return False
# 初始化日志配置(添加在文件顶部)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# 文件日志
file_handler = logging.FileHandler('zzck_nohup.log')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# 控制台日志
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# 调度任务监听器添加在job函数后
def scheduler_listener(event):
if event.exception:
logger.error("任务执行失败!", exc_info=event.exception)
else:
logger.info("任务执行完成")
if __name__ == "__main__":
scheduler = BlockingScheduler()
target = "zzbtw.ckxx.net" # 确保target变量在此处定义
# 修改调度任务添加方式
scheduler.add_job(
do_work,
'interval',
minutes=1,
id='zzck_sync_job',
args=[target], # 添加参数传递
max_instances=1
)
# 添加事件监听
scheduler.add_listener(scheduler_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
try:
logger.info("程序启动,调度器开始运行...")
scheduler.start()
except (KeyboardInterrupt, SystemExit):
logger.info("收到停止信号,正在关闭调度器...")
scheduler.shutdown()
logger.info("调度器已关闭")