From af405071f2351c2cfd073e9cf5a74097ca329a32 Mon Sep 17 00:00:00 2001 From: dswu <452725667@qq.com> Date: Tue, 2 Sep 2025 15:15:51 +0800 Subject: [PATCH] first commit --- config.py | 33 ++++++ main.py | 251 ++++++++++++++++++++++++++++++++++++++++++ mqreceive.py | 173 +++++++++++++++++++++++++++++ mqreceive2.py | 114 ++++++++++++++++++++ mqreceivefromllm.py | 258 ++++++++++++++++++++++++++++++++++++++++++++ mqsend.py | 26 +++++ readme.md | 2 + schema.sql | 33 ++++++ start.sh | 10 ++ sync_mysql_to_es.py | 176 ++++++++++++++++++++++++++++++ table_init.py | 18 ++++ test.py | 42 ++++++++ 12 files changed, 1136 insertions(+) create mode 100644 config.py create mode 100644 main.py create mode 100644 mqreceive.py create mode 100644 mqreceive2.py create mode 100644 mqreceivefromllm.py create mode 100644 mqsend.py create mode 100644 readme.md create mode 100644 schema.sql create mode 100644 start.sh create mode 100644 sync_mysql_to_es.py create mode 100644 table_init.py create mode 100644 test.py diff --git a/config.py b/config.py new file mode 100644 index 0000000..ea203b1 --- /dev/null +++ b/config.py @@ -0,0 +1,33 @@ +# 生产环境 +MYSQL_HOST_APP = '10.127.2.207' +MYSQL_PORT_APP = 3306 +MYSQL_USER_APP = 'financial_prod' +MYSQL_PASSWORD_APP = 'mmTFncqmDal5HLRGY0BV' +MYSQL_DB_APP = 'reference' + +# 测试环境 +#MYSQL_HOST_APP = '121.37.185.246' +#MYSQL_PORT_APP = 3306 +#MYSQL_USER_APP = 'root' +#MYSQL_PASSWORD_APP = 'Xgf_8000' +#MYSQL_DB_APP = 'reference' + +# 测试环境 +#jx_adr = "http://123.60.153.169:8040/admin/common/sync/news_id/WBysu6N1z26AbA12l" + +# 生产环境 +jx_adr = "http://10.127.2.205:13579/admin/common/sync/news_id/WBysu6N1z26AbA12l" + +mq_user = 'admin' +mq_password = 'Aa123456' + +ES_HOST = "localhost" # 替换为你的 Elasticsearch 主机地址,例如 "localhost" 或 "your_elasticsearch_host" +ES_PORT = 9200 +ES_USER = "elastic" +ES_PASSWORD = "ZxE,3VM@Thk0" + +file_path = "/mnt/vdb/" + + + + diff --git a/main.py b/main.py new file mode 100644 index 0000000..d6d7a00 --- /dev/null +++ b/main.py @@ -0,0 +1,251 @@ +import subprocess +from bs4 import BeautifulSoup +import datetime +import os +from urllib.parse import urlparse, urljoin +import time +import pymysql +from pathlib import Path +import logging +from apscheduler.schedulers.blocking import BlockingScheduler +from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR +import json +import re +from config import * # 导入配置 +from mqsend import send_message # 导入消息发送函数 +from elasticsearch import Elasticsearch + + +def clean_control_characters(text): + return re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text) + +def do_work(target): + # 创建data目录 + os.makedirs('data', exist_ok=True) + + today = datetime.datetime.today().strftime('%Y-%m-%d') + new_target = f"{target}/{today}/" + try: + sub_result = subprocess.run(['curl', new_target], capture_output=True, text=True) + + if sub_result.returncode == 0: + soup = BeautifulSoup(sub_result.stdout, 'html.parser') + links = soup.find_all('a', href=True) + + for link in links: + href = link['href'] + if href.startswith('/'): + + file_url = f"{target}/{urljoin(new_target, href)}" + # 生成本地保存路径 + local_path = os.path.join(file_path+'data', href.lstrip('/')) # 添加日期目录层级 + # 创建目录结构 + os.makedirs(os.path.dirname(local_path), exist_ok=True) + # 下载文件 + if not os.path.exists(local_path): + logger.info(f"文件不存在,准备下载: {file_url}") + download_cmd = ['curl', '-s', '-o', local_path, file_url] + dl_result = subprocess.run(download_cmd) + if dl_result.returncode == 0: + logger.info(f"下载成功: {local_path}") + # 同步到MySQL + sync_to_database(local_path) + else: + logger.error(f"下载失败: {file_url}") + + else: + logger.warning(f"忽略非相对路径: {href}") + else: + logger.error(f"获取失败: {sub_result.stderr}") + except Exception as e: + logger.error(f"处理 {new_target} 时发生未知错误: {e}") + + +def write_to_mysql(data_list, local_path): + conn = pymysql.connect( + host=MYSQL_HOST_APP, + port=MYSQL_PORT_APP, + user=MYSQL_USER_APP, + password=MYSQL_PASSWORD_APP, + db=MYSQL_DB_APP, + charset='utf8mb4' + ) + + try: + with conn.cursor() as cursor: + for data in data_list: + # 新增JSON结构解析逻辑 + category_data = data.get('c', [{}]) + category = lang = sourcename = "" + if category_data: + category = category_data[0].get('category', '') + b_data = category_data[0].get('b', [{}]) + if b_data: + lang = b_data[0].get('lang', '') + d_data = b_data[0].get('d', [{}]) + if d_data: + sourcename = d_data[0].get('sourcename', '') + + # 处理特殊字符转义 + en_content = data.get('EN_content',0).replace('quot;', '"') + + # 修改后的SQL语句 + sql = """INSERT INTO news_info + (news_id, input_date, words, title_txt, key_word, + CN_content, EN_content, URL, abstract, title_EN, + category, sourcename, lang, deleted, create_time, + update_time, file_date, file_name) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + ON DUPLICATE KEY UPDATE + category=VALUES(category), + sourcename=VALUES(sourcename), + lang=VALUES(lang)""" + + values = ( + data['id'], + data.get('input_date', ""), + data.get('words', ""), + data.get('title_txt', ""), + data.get('key_word', ""), + data.get('CN_content', ""), + en_content, # 使用处理后的英文内容 + data.get('URL', ""), + data.get('abstract', ""), + data.get('title_EN', ""), + category, + sourcename, + lang, + 0, + datetime.datetime.now(), + datetime.datetime.now(), + datetime.datetime.strptime(Path(local_path).parent.name, '%Y-%m-%d').date(), + Path(local_path).name + ) + cursor.execute(sql, values) + logger.info(f"成功写入mysql: {data['id']}") + + # + + # 发送消息到MQ + send_message(json.dumps(data)) + conn.commit() + except Exception as e: + logger.error(f"写入mysql失败: {str(e)}") + finally: + conn.close() + return True + +# 写入ES +def write_to_es(data_list, local_path): + # 初始化ES连接(添加在文件顶部) + es = Elasticsearch( + [f"http://{ES_HOST}:{ES_PORT}"], # 将协议直接包含在hosts中 + basic_auth=(ES_USER, ES_PASSWORD) + ) + try: + for data in data_list: + # 插入ES核心逻辑 + category_data = data.get('c', [{}]) + category = lang = sourcename = "" + if category_data: + category = category_data[0].get('category', '') + b_data = category_data[0].get('b', [{}]) + if b_data: + lang = b_data[0].get('lang', '') + d_data = b_data[0].get('d', [{}]) + if d_data: + sourcename = d_data[0].get('sourcename', '') + + es.index( + index="news_info", + id=data['id'], + document={ + "news_id": data.get('id', ""), + "input_date": data.get('input_date', ""), + "words": data.get('words', ""), + "title_txt": data.get('title_txt', ""), + "key_word": data.get('key_word', ""), + "CN_content": data.get('CN_content', ""), + "EN_content": data.get('EN_content',0).replace('quot;', '"'), + "URL": data.get('URL', ""), + "abstract": data.get('abstract', ""), + "title_EN": data.get('title_EN', ""), + "category": category, + "sourcename": sourcename, + "lang": lang, + "deleted": "0", + "create_time":datetime.datetime.now(), + "update_time": datetime.datetime.now(), + "file_date": datetime.datetime.strptime(Path(local_path).parent.name, '%Y-%m-%d').date(), + "file_name": Path(local_path).name + } + ) + logger.info(f"成功写入ES文档ID: {data['id']}") + + except Exception as e: + logger.error(f"写入ES失败: {str(e)}") + +def sync_to_database(local_path): + try: + with open(local_path, 'r', encoding='utf-16') as f: # 修改编码方式 + raw_data = f.read() + cleaned_data = clean_control_characters(raw_data) + data_list = json.loads(cleaned_data) + write_to_mysql(data_list, local_path) # 传递本地路径 + write_to_es(data_list, local_path) # 传递本地路径 + + return True + + except Exception as e: + logger.error(f"处理文件 {local_path} 时发生未知错误: {e}") + return False + + + + +# 初始化日志配置(添加在文件顶部) +logger = logging.getLogger() +logger.setLevel(logging.INFO) +formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + +# 文件日志 +file_handler = logging.FileHandler('zzck_nohup.log') +file_handler.setFormatter(formatter) +logger.addHandler(file_handler) + +# 控制台日志 +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) +logger.addHandler(console_handler) + +# 调度任务监听器(添加在job函数后) +def scheduler_listener(event): + if event.exception: + logger.error("任务执行失败!", exc_info=event.exception) + else: + logger.info("任务执行完成") + +if __name__ == "__main__": + scheduler = BlockingScheduler() + target = "zzbtw.ckxx.net" # 确保target变量在此处定义 + + # 修改调度任务添加方式 + scheduler.add_job( + do_work, + 'interval', + minutes=1, + id='zzck_sync_job', + args=[target], # 添加参数传递 + max_instances=1 + ) + + # 添加事件监听 + scheduler.add_listener(scheduler_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) + + try: + logger.info("程序启动,调度器开始运行...") + scheduler.start() + except (KeyboardInterrupt, SystemExit): + logger.info("收到停止信号,正在关闭调度器...") + scheduler.shutdown() + logger.info("调度器已关闭") diff --git a/mqreceive.py b/mqreceive.py new file mode 100644 index 0000000..a2ed7da --- /dev/null +++ b/mqreceive.py @@ -0,0 +1,173 @@ +import pika +import json +import logging +import time +import os +from config import * +from llm_process import send_mq, get_label + +# 声明一个全局变量,存媒体的权威度打分 +media_score = {} +with open("media_score.txt", "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + media, score = line.split("\t") + media_score[media.strip()] = int(score) + except ValueError as e: + print(f"解析错误: {e},行内容: {line}") + continue + +# 幂等性存储 - 记录已处理消息ID +processed_ids = set() + +def message_callback(ch, method, properties, body): + """消息处理回调函数""" + + + try: + data = json.loads(body) + id_str = str(data["id"]) + + # ch.basic_ack(delivery_tag=method.delivery_tag) + # print(f"接收到消息: {id_str}") + # return + + # 幂等性检查:如果消息已处理过,直接确认并跳过 + if id_str in processed_ids: + print(f"跳过已处理的消息: {id_str}") + ch.basic_ack(delivery_tag=method.delivery_tag) + return + + # 在此处添加业务处理逻辑 + content = data.get('CN_content', "").strip() + source = "其他" + category_data = data.get('c', [{}]) + category = "" + if category_data: + category = category_data[0].get('category', '') + b_data = category_data[0].get('b', [{}]) + if b_data: + d_data = b_data[0].get('d', [{}]) + if d_data: + source = d_data[0].get('sourcename', "其他") + source_impact = media_score.get(source, 5) + tagged_news = get_label(content, source) + public_opinion_score = tagged_news.get("public_opinion_score", 30) #资讯质量分 + China_factor = tagged_news.get("China_factor", 0.2) #中国股市相关度 + news_score = source_impact * 0.04 + public_opinion_score * 0.25 + China_factor * 35 + news_score = round(news_score, 2) + #如果想让分数整体偏高可以开根号乘10 + #news_score = round((news_score**0.5) * 10.0, 2) + + industry_confidence = tagged_news.get("industry_confidence", []) + industry_score = list(map(lambda x: round(x * news_score, 2), industry_confidence)) + concept_confidence = tagged_news.get("concept_confidence", []) + concept_score = list(map(lambda x: round(x * news_score, 2), concept_confidence)) + + tagged_news["source"] = source + tagged_news["source_impact"] = source_impact + tagged_news["industry_score"] = industry_score + tagged_news["concept_score"] = concept_score + tagged_news["news_score"] = news_score + tagged_news["id"] = id_str + + print(json.dumps(tagged_news, ensure_ascii=False)) + + # 发送百炼大模型标注过的新闻json到队列 + send_mq(tagged_news) + + # 处理成功后记录消息ID + processed_ids.add(id_str) + if len(processed_ids) > 10000: + processed_ids.clear() + + # 手动确认消息 + ch.basic_ack(delivery_tag=method.delivery_tag) + except Exception as e: + print(f"消息处理失败: {str(e)}") + # 拒绝消息, 不重新入队 + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) + +def create_connection(): + """创建并返回RabbitMQ连接""" + credentials = pika.PlainCredentials(mq_user, mq_password) + return pika.BlockingConnection( + pika.ConnectionParameters( + host="localhost", + credentials=credentials, + heartbeat=600, + connection_attempts=3, + retry_delay=5 # 重试延迟5秒 + ) + ) + +def start_consumer(): + """启动MQ消费者""" + while True: # 使用循环而不是递归,避免递归深度问题 + try: + connection = create_connection() + channel = connection.channel() + + # 设置QoS,限制每次只取一条消息 + channel.basic_qos(prefetch_count=1) + + channel.exchange_declare( + exchange="zzck_exchange", + exchange_type="fanout" + #durable=True # 确保交换器持久化 + ) + + # 声明持久化队列 + res = channel.queue_declare( + queue="to_ai" + # durable=True # 队列持久化 + ) + + mq_queue = res.method.queue + channel.queue_bind( + exchange="zzck_exchange", + queue=mq_queue, + ) + + # 启动消费,关闭自动ACK + channel.basic_consume( + queue=mq_queue, + on_message_callback=message_callback, + auto_ack=False # 关闭自动确认 + ) + + print("消费者已启动,等待消息...") + channel.start_consuming() + + except pika.exceptions.ConnectionClosedByBroker: + # 代理主动关闭连接,可能是临时错误 + print("连接被代理关闭,将在5秒后重试...") + time.sleep(5) + except pika.exceptions.AMQPConnectionError: + # 连接错误 + print("连接失败,将在10秒后重试...") + time.sleep(10) + except KeyboardInterrupt: + print("消费者被用户中断") + try: + if connection and connection.is_open: + connection.close() + except: + pass + break + except Exception as e: + print(f"消费者异常: {str(e)}") + print("将在15秒后重试...") + time.sleep(15) + finally: + try: + if connection and connection.is_open: + connection.close() + except: + pass + +if __name__ == "__main__": + start_consumer() diff --git a/mqreceive2.py b/mqreceive2.py new file mode 100644 index 0000000..c6342f5 --- /dev/null +++ b/mqreceive2.py @@ -0,0 +1,114 @@ +# 将资讯数据写入 es中 +import pika +import json +import logging +from config import * +from elasticsearch import Elasticsearch +from config import ES_HOST, ES_PORT, ES_USER, ES_PASSWORD +import datetime + + + +def message_callback(ch, method, properties, body): + try: + data = json.loads(body) + write_to_es(data) + ch.basic_ack(delivery_tag=method.delivery_tag) + except Exception as e: + print(f"ES写入失败: {str(e)}") + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) + +# 写入ES +def write_to_es(data): + # 初始化ES连接(添加在文件顶部) + es = Elasticsearch( + [f"http://{ES_HOST}:{ES_PORT}"], # 将协议直接包含在hosts中 + http_auth=(ES_USER, ES_PASSWORD) + ) + try: + # 插入ES核心逻辑 + category_data = data.get('c', [{}]) + category = lang = sourcename = "" + if category_data: + category = category_data[0].get('category', '') + b_data = category_data[0].get('b', [{}]) + if b_data: + lang = b_data[0].get('lang', '') + d_data = b_data[0].get('d', [{}]) + if d_data: + sourcename = d_data[0].get('sourcename', '') + + es.index( + index="news_info", + id=data['id'], + body={ + "news_id": data.get('id', ""), + "input_date": data.get('input_date', ""), + "words": data.get('words', ""), + "title_txt": data.get('title_txt', ""), + "key_word": data.get('key_word', ""), + "CN_content": data.get('CN_content', ""), + "EN_content": data.get('EN_content',0).replace('quot;', '"'), + "URL": data.get('URL', ""), + "abstract": data.get('abstract', ""), + "title_EN": data.get('title_EN', ""), + "category": category, + "sourcename": sourcename, + "lang": lang, + "deleted": "0", + "create_time":datetime.datetime.now(), + "update_time": datetime.datetime.now(), + "file_date": "", + "file_name": "" + } + ) + print(f"成功写入ES文档ID: {data['id']}") + except Exception as e: + print(f"写入ES失败: {str(e)}") + + +def start_consumer(): + """启动MQ消费者""" + try: + credentials = pika.PlainCredentials(mq_user, mq_password) + connection = pika.BlockingConnection( + pika.ConnectionParameters( + host="localhost", + credentials=credentials, + heartbeat=60 + ) + ) + channel = connection.channel() + channel.exchange_declare( + exchange="zzck_exchange", + exchange_type="fanout", + ) + + # 声明队列(匹配现有队列类型) queue 的名字可以自定义 + res = channel.queue_declare( + queue="to_es" + ) + + mq_queue = res.method.queue + channel.queue_bind( + exchange="zzck_exchange", + queue=mq_queue, + ) + + + # 启动消费 + channel.basic_consume( + queue=mq_queue, + on_message_callback=message_callback, + + ) + + print("消费者已启动,等待消息...") + channel.start_consuming() + + except Exception as e: + print(f"消费者启动失败: {str(e)}") + raise + +if __name__ == "__main__": + start_consumer() \ No newline at end of file diff --git a/mqreceivefromllm.py b/mqreceivefromllm.py new file mode 100644 index 0000000..7927f08 --- /dev/null +++ b/mqreceivefromllm.py @@ -0,0 +1,258 @@ +# 获取 标签的消息 写入数据库 +import pika +import json +import logging, time +from config import * +import pymysql +from elasticsearch import Elasticsearch +import datetime +import requests + +def message_callback(ch, method, properties, body): + """消息处理回调函数""" + try: + data = json.loads(body) + news_score = data.get('news_score', -1) + if news_score < 0: + ch.basic_ack(delivery_tag=method.delivery_tag) + return + # 在此处添加业务处理逻辑 写入mysql数据库 + write_to_mysql(data) + # 数据写入es + write_to_es(data) + + # 数据写入资讯精选表 + write_to_news(data) + + + + # 手动确认消息 + ch.basic_ack(delivery_tag=method.delivery_tag) + except Exception as e: + print(f"消息处理失败: {str(e)}") + # 拒绝消息并重新入队 + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) + +def write_to_news(data): + news_score = data.get('news_score', 0.0) + if float(news_score) < 80: # 过滤掉news_score小于80的消息 + return + + # 获取返回数据里面的 新闻id + news_id = data.get('id', "") + adr = jx_adr.replace("news_id", news_id) + print(f"接口地址为{adr}") + response = requests.get(adr) + if response.status_code != 200: + print(f"新闻id:{news_id} 得分:{news_score}, 调用精选接口失败, 错误码:{response.status_code}") + return + print(f"新闻id:{news_id} 得分:{news_score}, 调用精选接口成功") + + + +def write_to_es(data): + """写入ES""" + # 初始化ES连接(添加在文件顶部) + es = Elasticsearch( + [f"http://{ES_HOST}:{ES_PORT}"], # 将协议直接包含在hosts中 + basic_auth=(ES_USER, ES_PASSWORD) + ) + news_id = data.get('id', "") + es.update( + index="news_info", + id=news_id, + doc={ + "news_tags": { + "id": news_id, + "abstract": data.get('abstract', ""), + "title": data.get('title', ""), + "rewrite_content": data.get('rewrite_content', ""), + "industry_label": data.get('industry_label', []), + "industry_confidence": data.get('industry_confidence', []), + "industry_score": data.get('industry_score', []), + "concept_label": data.get('concept_label', []), + "concept_confidence": data.get('concept_confidence', []), + "concept_score": data.get('concept_score', []), + "public_opinion_score": data.get('public_opinion_score', 10), + "China_factor": data.get('China_factor', 0.1), + "source": data.get('source', "其他"), + "source_impact": data.get('source_impact', 5), + "news_score": data.get('news_score', 0.0), + "news_id": news_id, + "deleted": '0', + "create_time": datetime.datetime.now(), + "update_time": datetime.datetime.now() + } + } + ) + print(f"news_id:{news_id} 得分:{data.get('news_score', 0.0)}, 写入ES成功") + + + + +def write_to_mysql(data): + conn = pymysql.connect( + host=MYSQL_HOST_APP, + port=MYSQL_PORT_APP, + user=MYSQL_USER_APP, + password=MYSQL_PASSWORD_APP, + db=MYSQL_DB_APP, + charset='utf8mb4' + ) + try: + with conn.cursor() as cursor: + # 新增JSON结构解析逻辑 + # 修改后的SQL语句 + sql = """INSERT INTO news_tags + (abstract, title, rewrite_content, industry_label, industry_confidence, industry_score, concept_label, concept_confidence, concept_score, public_opinion_score, China_factor, source, source_impact, news_score, news_id) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """ + + values = (data.get('abstract', ""), + data.get('title', ""), + data.get('rewrite_content', ""), + json.dumps(data.get('industry_label', [])), + json.dumps(data.get('industry_confidence', [])), + json.dumps(data.get('industry_score', [])), + json.dumps(data.get('concept_label', [])), + json.dumps(data.get('concept_confidence', [])), + json.dumps(data.get('concept_score', [])), + data.get('public_opinion_score', 10), + data.get('China_factor', 0.1), + data.get('source', "其他"), + data.get('source_impact', 5), + data.get('news_score', 0.0), + data.get('id', "") + ) + cursor.execute(sql, values) + conn.commit() + id = data.get('id', "") + industry_label = data.get('industry_label', []) + concept_label = data.get('concept_label', []) + print(f"{id} {industry_label} {concept_label}, 写入news_tags 表成功") + + except Exception as e: + print(f"写入news_tags失败: {str(e)}") + finally: + conn.close() + return True + +def create_connection(): + """创建并返回RabbitMQ连接""" + credentials = pika.PlainCredentials(mq_user, mq_password) + return pika.BlockingConnection( + pika.ConnectionParameters( + host="localhost", + credentials=credentials, + heartbeat=600, + connection_attempts=3, + retry_delay=5 # 重试延迟5秒 + ) + ) + +def start_consumer(): + """启动MQ消费者""" + while True: # 使用循环而不是递归,避免递归深度问题 + try: + connection = create_connection() + channel = connection.channel() + + # 设置QoS,限制每次只取一条消息 + channel.basic_qos(prefetch_count=1) + + channel.exchange_declare( + exchange="zzck_llm_exchange", + exchange_type="fanout" + ) + + # 声明持久化队列 + res = channel.queue_declare( + queue="from_ai_to_mysql" + ) + + mq_queue = res.method.queue + channel.queue_bind( + exchange="zzck_llm_exchange", + queue=mq_queue, + ) + + # 启动消费,关闭自动ACK + channel.basic_consume( + queue=mq_queue, + on_message_callback=message_callback, + auto_ack=False # 关闭自动确认 + ) + + print("消费者已启动,等待消息...") + channel.start_consuming() + + except pika.exceptions.ConnectionClosedByBroker: + # 代理主动关闭连接,可能是临时错误 + print("连接被代理关闭,将在5秒后重试...") + time.sleep(5) + except pika.exceptions.AMQPConnectionError: + # 连接错误 + print("连接失败,将在10秒后重试...") + time.sleep(10) + except KeyboardInterrupt: + print("消费者被用户中断") + try: + if connection and connection.is_open: + connection.close() + except: + pass + break + except Exception as e: + print(f"消费者异常: {str(e)}") + print("将在15秒后重试...") + time.sleep(15) + finally: + try: + if connection and connection.is_open: + connection.close() + except: + pass + +# def start_consumer(): +# """启动MQ消费者""" +# try: +# credentials = pika.PlainCredentials(mq_user, mq_password) +# connection = pika.BlockingConnection( +# pika.ConnectionParameters( +# host="localhost", +# credentials=credentials, +# heartbeat=600 +# ) +# ) +# channel = connection.channel() +# channel.exchange_declare( +# exchange="zzck_exchange", +# exchange_type="fanout", +# ) + +# # 声明队列(匹配现有队列类型) queue 的名字可以自定义 +# res = channel.queue_declare( +# queue="from_ai_to_mysql" +# ) + +# mq_queue = res.method.queue +# channel.queue_bind( +# exchange="zzck_llm_exchange", +# queue=mq_queue, +# ) + +# # 启动消费 +# channel.basic_consume( +# queue=mq_queue, +# on_message_callback=message_callback, + +# ) + +# print("消费者已启动,等待消息...") +# channel.start_consuming() + +# except Exception as e: +# print(f"消费者启动失败: {str(e)}") +# start_consumer() + +if __name__ == "__main__": + start_consumer() diff --git a/mqsend.py b/mqsend.py new file mode 100644 index 0000000..c2ef453 --- /dev/null +++ b/mqsend.py @@ -0,0 +1,26 @@ +import pika +from config import * + + + +def send_message(message): + # 连接 RabbitMQ + credentials = pika.PlainCredentials(mq_user, mq_password) + connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost', credentials=credentials) + ) + channel = connection.channel() + + channel.exchange_declare(exchange='zzck_exchange', exchange_type='fanout') + # 声明队列 + # channel.queue_declare(queue=mq_queue) + + # 发送消息 + channel.basic_publish( exchange='zzck_exchange', + routing_key='', + body=message, + properties=pika.BasicProperties( + expiration='10' # 消息1秒后过期 + ) + ) + connection.close() \ No newline at end of file diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..a74a992 --- /dev/null +++ b/readme.md @@ -0,0 +1,2 @@ +1 中证参考 解析host 文件 +2 host 地址为 zzbtw.ckxx.net \ No newline at end of file diff --git a/schema.sql b/schema.sql new file mode 100644 index 0000000..fbd1d60 --- /dev/null +++ b/schema.sql @@ -0,0 +1,33 @@ +CREATE TABLE news_info ( + id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY COMMENT '唯一标识符', + news_id INT UNSIGNED NOT NULL COMMENT '新闻ID', + input_date DATETIME NOT NULL COMMENT '数据输入时间', + words MEDIUMINT UNSIGNED NOT NULL COMMENT '字数统计', + title_txt VARCHAR(255) NOT NULL COMMENT '中文标题', + key_word VARCHAR(255) NOT NULL COMMENT '关键词列表(分号分隔)', + CN_content TEXT NOT NULL COMMENT '中文正文内容', + EN_content TEXT NOT NULL COMMENT '英文正文内容', + URL VARCHAR(512) NOT NULL COMMENT '原文链接', + abstract VARCHAR(512) NOT NULL COMMENT '摘要', + title_EN VARCHAR(255) NOT NULL COMMENT '英文标题', + category VARCHAR(255) NOT NULL COMMENT '分类信息(JSON格式)', + sourcename VARCHAR(255) NOT NULL COMMENT '数据来源名称或标识', + lang VARCHAR(255) NOT NULL COMMENT '语言', + deleted TINYINT(5) DEFAULT 0 COMMENT '是否删除 0正常,1已删除', + create_time DATETIME NOT NULL COMMENT '创建时间', + update_time DATETIME DEFAULT NULL COMMENT '修改时间', + file_date DATE NOT NULL COMMENT '文件日期(取自目录结构)', + file_name VARCHAR(255) NOT NULL COMMENT '完整文件名(含扩展名)' +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + + +CREATE TABLE IF NOT EXISTS news_tags ( + id INT AUTO_INCREMENT PRIMARY KEY COMMENT '唯一标识', + abstract VARCHAR(500) NOT NULL COMMENT '摘要内容', + industry_label JSON COMMENT '行业标签数组', + concept_label JSON COMMENT '概念标签数组', + news_id INT NOT NULL COMMENT '关联的新闻ID', + deleted TINYINT(5) DEFAULT 0 COMMENT '是否删除 0正常,1已删除', + create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间' +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; \ No newline at end of file diff --git a/start.sh b/start.sh new file mode 100644 index 0000000..a0d14d4 --- /dev/null +++ b/start.sh @@ -0,0 +1,10 @@ +#!/bin/bash +# 后台启动脚本 +cd /root/zzck/ +echo "正在杀死所有包含main.py或mqreceive_multithread.py或mqreceivefromllm.py的进程" +ps aux | grep -E 'python.*(main|mqreceive_multithread|mqreceivefromllm)\.py' | awk '{print $2}' | xargs kill -9 +echo "已杀死所有包含main.py或mqreceive_multithread.py或mqreceivefromllm.py的进程" +nohup python -u main.py > zzck_nohup.log 2>&1 & +nohup python -u mqreceive_multithread.py > mqreceive_multithread.log 2>&1 & +nohup python -u mqreceivefromllm.py > mqreceivefromllm.log 2>&1 & +echo "程序已后台启动" diff --git a/sync_mysql_to_es.py b/sync_mysql_to_es.py new file mode 100644 index 0000000..a5fec90 --- /dev/null +++ b/sync_mysql_to_es.py @@ -0,0 +1,176 @@ +import argparse +from datetime import datetime +import pymysql +from elasticsearch import Elasticsearch, helpers +import json +# 复用现有配置 +from config import ES_HOST, ES_PORT, ES_USER, ES_PASSWORD, MYSQL_HOST_APP, MYSQL_USER_APP, MYSQL_PASSWORD_APP, MYSQL_DB_APP +from tqdm import tqdm +import requests +from config import jx_adr + +class MySQLToESSync: + def __init__(self): + self.es = Elasticsearch( + [f"http://{ES_HOST}:{ES_PORT}"], + basic_auth=(ES_USER, ES_PASSWORD) + ) + self.mysql_conn = pymysql.connect( + host=MYSQL_HOST_APP, + user=MYSQL_USER_APP, + password=MYSQL_PASSWORD_APP, + db=MYSQL_DB_APP, + charset='utf8mb4', + cursorclass=pymysql.cursors.DictCursor + ) + + # 更新全量表 + def sync_all(self, start_time=None, end_time=None): + # 更新全量表到 es + with self.mysql_conn.cursor(pymysql.cursors.DictCursor) as cursor: + query = self._build_query("news_info",start_time, end_time) + cursor.execute(query) + # 支持两种同步模式 + res = cursor.fetchall() + # 判断es中是否有该news_id + for data in tqdm(res, desc="同步全量表", unit="条"): + news_id = data['news_id'] + # 从es中查询 + try: + exists = self.es.get(index="news_info", id=news_id) + except: + exists = None + if exists: + pass + else: + # es中不存在,直接新增 + self.es.index( + index="news_info", + id=news_id, + document={ + "news_id": data.get('news_id', ""), + "input_date": data.get('input_date', ""), + "words": data.get('words', ""), + "title_txt": data.get('title_txt', ""), + "key_word": data.get('key_word', ""), + "CN_content": data.get('CN_content', ""), + "EN_content": data.get('EN_content',""), + "URL": data.get('URL', ""), + "abstract": data.get('abstract', ""), + "title_EN": data.get('title_EN', ""), + "category": data.get('category', ""), + "sourcename": data.get('sourcename', ""), + "lang": data.get('lang', ""), + "deleted": data.get('deleted', ""), + "create_time": data.get('create_time', ""), + "update_time": data.get('update_time', ""), + "file_date": data.get('file_date', ""), + "file_name": data.get('file_name', ""), + } + ) + print(f"新闻 {news_id} 更新到 ES 成功") + + + # 更新标签表 + def sync_tags(self, start_time=None, end_time=None): + + with self.mysql_conn.cursor(pymysql.cursors.DictCursor) as cursor: + query = self._build_query("news_tags",start_time, end_time) + cursor.execute(query) + + res = cursor.fetchall() + # 判断es中是否有该news_id + for data in tqdm(res, desc="同步新闻标签进度", unit="条"): + news_id = data['news_id'] + # 从es中查询 + try: + # 更新全量表到 es + es_doc = self.es.get(index="news_info", id=news_id) + news_tags = None + if es_doc and es_doc.get('found'): + news_tags = es_doc['_source'].get('news_tags', None) + if not news_tags: + # 将mysql的数据写入es + self.es.update( + index="news_info", + id=news_id, + doc={ + "news_tags": { + "id": news_id, + "abstract": data.get('abstract', ""), + "title": data.get('title', ""), + "rewrite_content": data.get('rewrite_content', ""), + "industry_label": json.loads(data.get('industry_label', [])), + "industry_confidence": json.loads(data.get('industry_confidence', [])), + "industry_score": json.loads(data.get('industry_score', [])), + "concept_label": json.loads(data.get('concept_label', [])), + "concept_confidence": json.loads(data.get('concept_confidence', [])), + "concept_score": json.loads(data.get('concept_score', [])), + "public_opinion_score": data.get('public_opinion_score', 10), + "China_factor": data.get('China_factor', 0.1), + "source": data.get('source', "其他"), + "source_impact": data.get('source_impact', 5), + "news_score": data.get('news_score', 0.0), + "news_id": news_id, + "deleted": '0', + "create_time": data.get('create_time', ""), + "update_time": data.get('update_time', "") + } + } + ) + print(f"新闻 {news_id} 标签更新到 ES 成功") + + + except Exception as e: + print(e) + + # 更新精选表 + def sync_chooses(self, start_time=None, end_time=None): + with self.mysql_conn.cursor(pymysql.cursors.DictCursor) as cursor: + query = "select * from news_tags where news_score >= 80" + cursor.execute(query) + res = cursor.fetchall() + # 判断es中是否有该news_id + for data in tqdm(res, desc="同步精选表数据", unit="条"): + # 从es中查询 + try: + news_id = data['news_id'] + news_score = data['news_score'] + # 获取返回数据里面的 新闻id + adr = jx_adr.replace("news_id", str(news_id)) + response = requests.get(adr) + if response.status_code != 200: + print(f"新闻id:{news_id} 得分:{news_score}, 调用精选接口失败, 错误码:{response.status_code}") + print(f"新闻id:{news_id} 得分:{news_score}, 调用精选接口成功") + except Exception as e: + print(e) + + + + + + def _build_query(self, table_name, start_time, end_time): + base_query = f""" + SELECT * + FROM {table_name} + """ + if start_time and end_time: + return f"{base_query} WHERE create_time BETWEEN '{start_time}' AND '{end_time}'" + return base_query + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='MySQL to ES Sync Tool') + parser.add_argument('--full', action='store_true', help='Full synchronization') + parser.add_argument('--start', type=str, help='Start time (YYYY-MM-DD HH:MM:SS)') + parser.add_argument('--end', type=str, help='End time (YYYY-MM-DD HH:MM:SS)') + + args = parser.parse_args() + + if not args.full and not (args.start and args.end): + raise ValueError("需要指定--full全量同步或--start/--end时间范围") + + sync = MySQLToESSync() + sync.sync_all(args.start, args.end) + sync.sync_tags(args.start, args.end) + sync.sync_chooses(args.start, args.end) + \ No newline at end of file diff --git a/table_init.py b/table_init.py new file mode 100644 index 0000000..2634c98 --- /dev/null +++ b/table_init.py @@ -0,0 +1,18 @@ +from config import * +import pymysql +import json +import logging +import datetime + +conn = pymysql.connect( + host=MYSQL_HOST_APP, + port=MYSQL_PORT_APP, + user=MYSQL_USER_APP, + password=MYSQL_PASSWORD_APP, + db=MYSQL_DB_APP, + charset='utf8mb4' + ) +with conn.cursor() as cursor: + with open('schema.sql', 'r') as schema_file: + create_table_sql = schema_file.read().replace('\n', ' ') # Remove line breaks + cursor.execute(create_table_sql) \ No newline at end of file diff --git a/test.py b/test.py new file mode 100644 index 0000000..269daad --- /dev/null +++ b/test.py @@ -0,0 +1,42 @@ +import os +from urllib.parse import urlparse, urljoin +import time +import pymysql +from pathlib import Path +import logging +from apscheduler.schedulers.blocking import BlockingScheduler +from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR +import json +import re +from config import * # 导入配置 +import datetime + +def clean_control_characters(text): + return re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text) + # 修改文件编码为UTF-16 +with open("data\\2025-05-20\\01-11-04.txt", 'r', encoding='utf-16') as f: # 修改编码方式 + raw_data = f.read() + cleaned_data = clean_control_characters(raw_data) + data_list = json.loads(cleaned_data) + for data in data_list: # 遍历数组中的每个对象 + # 处理特殊字符转义 + en_content = data['EN_content'].replace('quot;', '"') + for k,v in data.items(): + print(k) + values = ( + data['id'], + data['input_date'], + data['words'], + data['title_txt'], + data['key_word'], + data['CN_content'], + en_content, # 使用处理后的英文内容 + data['URL'], + data['abstract'], + data['title_EN'], + data.get('data_source', 'zzck'), + json.dumps(data['c']), + 0, + + ) + \ No newline at end of file