commit af405071f2351c2cfd073e9cf5a74097ca329a32 Author: dswu <452725667@qq.com> Date: Tue Sep 2 15:15:51 2025 +0800 first commit diff --git a/config.py b/config.py new file mode 100644 index 0000000..ea203b1 --- /dev/null +++ b/config.py @@ -0,0 +1,33 @@ +# 生产环境 +MYSQL_HOST_APP = '10.127.2.207' +MYSQL_PORT_APP = 3306 +MYSQL_USER_APP = 'financial_prod' +MYSQL_PASSWORD_APP = 'mmTFncqmDal5HLRGY0BV' +MYSQL_DB_APP = 'reference' + +# 测试环境 +#MYSQL_HOST_APP = '121.37.185.246' +#MYSQL_PORT_APP = 3306 +#MYSQL_USER_APP = 'root' +#MYSQL_PASSWORD_APP = 'Xgf_8000' +#MYSQL_DB_APP = 'reference' + +# 测试环境 +#jx_adr = "http://123.60.153.169:8040/admin/common/sync/news_id/WBysu6N1z26AbA12l" + +# 生产环境 +jx_adr = "http://10.127.2.205:13579/admin/common/sync/news_id/WBysu6N1z26AbA12l" + +mq_user = 'admin' +mq_password = 'Aa123456' + +ES_HOST = "localhost" # 替换为你的 Elasticsearch 主机地址,例如 "localhost" 或 "your_elasticsearch_host" +ES_PORT = 9200 +ES_USER = "elastic" +ES_PASSWORD = "ZxE,3VM@Thk0" + +file_path = "/mnt/vdb/" + + + + diff --git a/main.py b/main.py new file mode 100644 index 0000000..d6d7a00 --- /dev/null +++ b/main.py @@ -0,0 +1,251 @@ +import subprocess +from bs4 import BeautifulSoup +import datetime +import os +from urllib.parse import urlparse, urljoin +import time +import pymysql +from pathlib import Path +import logging +from apscheduler.schedulers.blocking import BlockingScheduler +from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR +import json +import re +from config import * # 导入配置 +from mqsend import send_message # 导入消息发送函数 +from elasticsearch import Elasticsearch + + +def clean_control_characters(text): + return re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text) + +def do_work(target): + # 创建data目录 + os.makedirs('data', exist_ok=True) + + today = datetime.datetime.today().strftime('%Y-%m-%d') + new_target = f"{target}/{today}/" + try: + sub_result = subprocess.run(['curl', new_target], capture_output=True, text=True) + + if sub_result.returncode == 0: + soup = BeautifulSoup(sub_result.stdout, 'html.parser') + links = soup.find_all('a', href=True) + + for link in links: + href = link['href'] + if href.startswith('/'): + + file_url = f"{target}/{urljoin(new_target, href)}" + # 生成本地保存路径 + local_path = os.path.join(file_path+'data', href.lstrip('/')) # 添加日期目录层级 + # 创建目录结构 + os.makedirs(os.path.dirname(local_path), exist_ok=True) + # 下载文件 + if not os.path.exists(local_path): + logger.info(f"文件不存在,准备下载: {file_url}") + download_cmd = ['curl', '-s', '-o', local_path, file_url] + dl_result = subprocess.run(download_cmd) + if dl_result.returncode == 0: + logger.info(f"下载成功: {local_path}") + # 同步到MySQL + sync_to_database(local_path) + else: + logger.error(f"下载失败: {file_url}") + + else: + logger.warning(f"忽略非相对路径: {href}") + else: + logger.error(f"获取失败: {sub_result.stderr}") + except Exception as e: + logger.error(f"处理 {new_target} 时发生未知错误: {e}") + + +def write_to_mysql(data_list, local_path): + conn = pymysql.connect( + host=MYSQL_HOST_APP, + port=MYSQL_PORT_APP, + user=MYSQL_USER_APP, + password=MYSQL_PASSWORD_APP, + db=MYSQL_DB_APP, + charset='utf8mb4' + ) + + try: + with conn.cursor() as cursor: + for data in data_list: + # 新增JSON结构解析逻辑 + category_data = data.get('c', [{}]) + category = lang = sourcename = "" + if category_data: + category = category_data[0].get('category', '') + b_data = category_data[0].get('b', [{}]) + if b_data: + lang = b_data[0].get('lang', '') + d_data = b_data[0].get('d', [{}]) + if d_data: + sourcename = d_data[0].get('sourcename', '') + + # 处理特殊字符转义 + en_content = data.get('EN_content',0).replace('quot;', '"') + + # 修改后的SQL语句 + sql = """INSERT INTO news_info + (news_id, input_date, words, title_txt, key_word, + CN_content, EN_content, URL, abstract, title_EN, + category, sourcename, lang, deleted, create_time, + update_time, file_date, file_name) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + ON DUPLICATE KEY UPDATE + category=VALUES(category), + sourcename=VALUES(sourcename), + lang=VALUES(lang)""" + + values = ( + data['id'], + data.get('input_date', ""), + data.get('words', ""), + data.get('title_txt', ""), + data.get('key_word', ""), + data.get('CN_content', ""), + en_content, # 使用处理后的英文内容 + data.get('URL', ""), + data.get('abstract', ""), + data.get('title_EN', ""), + category, + sourcename, + lang, + 0, + datetime.datetime.now(), + datetime.datetime.now(), + datetime.datetime.strptime(Path(local_path).parent.name, '%Y-%m-%d').date(), + Path(local_path).name + ) + cursor.execute(sql, values) + logger.info(f"成功写入mysql: {data['id']}") + + # + + # 发送消息到MQ + send_message(json.dumps(data)) + conn.commit() + except Exception as e: + logger.error(f"写入mysql失败: {str(e)}") + finally: + conn.close() + return True + +# 写入ES +def write_to_es(data_list, local_path): + # 初始化ES连接(添加在文件顶部) + es = Elasticsearch( + [f"http://{ES_HOST}:{ES_PORT}"], # 将协议直接包含在hosts中 + basic_auth=(ES_USER, ES_PASSWORD) + ) + try: + for data in data_list: + # 插入ES核心逻辑 + category_data = data.get('c', [{}]) + category = lang = sourcename = "" + if category_data: + category = category_data[0].get('category', '') + b_data = category_data[0].get('b', [{}]) + if b_data: + lang = b_data[0].get('lang', '') + d_data = b_data[0].get('d', [{}]) + if d_data: + sourcename = d_data[0].get('sourcename', '') + + es.index( + index="news_info", + id=data['id'], + document={ + "news_id": data.get('id', ""), + "input_date": data.get('input_date', ""), + "words": data.get('words', ""), + "title_txt": data.get('title_txt', ""), + "key_word": data.get('key_word', ""), + "CN_content": data.get('CN_content', ""), + "EN_content": data.get('EN_content',0).replace('quot;', '"'), + "URL": data.get('URL', ""), + "abstract": data.get('abstract', ""), + "title_EN": data.get('title_EN', ""), + "category": category, + "sourcename": sourcename, + "lang": lang, + "deleted": "0", + "create_time":datetime.datetime.now(), + "update_time": datetime.datetime.now(), + "file_date": datetime.datetime.strptime(Path(local_path).parent.name, '%Y-%m-%d').date(), + "file_name": Path(local_path).name + } + ) + logger.info(f"成功写入ES文档ID: {data['id']}") + + except Exception as e: + logger.error(f"写入ES失败: {str(e)}") + +def sync_to_database(local_path): + try: + with open(local_path, 'r', encoding='utf-16') as f: # 修改编码方式 + raw_data = f.read() + cleaned_data = clean_control_characters(raw_data) + data_list = json.loads(cleaned_data) + write_to_mysql(data_list, local_path) # 传递本地路径 + write_to_es(data_list, local_path) # 传递本地路径 + + return True + + except Exception as e: + logger.error(f"处理文件 {local_path} 时发生未知错误: {e}") + return False + + + + +# 初始化日志配置(添加在文件顶部) +logger = logging.getLogger() +logger.setLevel(logging.INFO) +formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + +# 文件日志 +file_handler = logging.FileHandler('zzck_nohup.log') +file_handler.setFormatter(formatter) +logger.addHandler(file_handler) + +# 控制台日志 +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) +logger.addHandler(console_handler) + +# 调度任务监听器(添加在job函数后) +def scheduler_listener(event): + if event.exception: + logger.error("任务执行失败!", exc_info=event.exception) + else: + logger.info("任务执行完成") + +if __name__ == "__main__": + scheduler = BlockingScheduler() + target = "zzbtw.ckxx.net" # 确保target变量在此处定义 + + # 修改调度任务添加方式 + scheduler.add_job( + do_work, + 'interval', + minutes=1, + id='zzck_sync_job', + args=[target], # 添加参数传递 + max_instances=1 + ) + + # 添加事件监听 + scheduler.add_listener(scheduler_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) + + try: + logger.info("程序启动,调度器开始运行...") + scheduler.start() + except (KeyboardInterrupt, SystemExit): + logger.info("收到停止信号,正在关闭调度器...") + scheduler.shutdown() + logger.info("调度器已关闭") diff --git a/mqreceive.py b/mqreceive.py new file mode 100644 index 0000000..a2ed7da --- /dev/null +++ b/mqreceive.py @@ -0,0 +1,173 @@ +import pika +import json +import logging +import time +import os +from config import * +from llm_process import send_mq, get_label + +# 声明一个全局变量,存媒体的权威度打分 +media_score = {} +with open("media_score.txt", "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + media, score = line.split("\t") + media_score[media.strip()] = int(score) + except ValueError as e: + print(f"解析错误: {e},行内容: {line}") + continue + +# 幂等性存储 - 记录已处理消息ID +processed_ids = set() + +def message_callback(ch, method, properties, body): + """消息处理回调函数""" + + + try: + data = json.loads(body) + id_str = str(data["id"]) + + # ch.basic_ack(delivery_tag=method.delivery_tag) + # print(f"接收到消息: {id_str}") + # return + + # 幂等性检查:如果消息已处理过,直接确认并跳过 + if id_str in processed_ids: + print(f"跳过已处理的消息: {id_str}") + ch.basic_ack(delivery_tag=method.delivery_tag) + return + + # 在此处添加业务处理逻辑 + content = data.get('CN_content', "").strip() + source = "其他" + category_data = data.get('c', [{}]) + category = "" + if category_data: + category = category_data[0].get('category', '') + b_data = category_data[0].get('b', [{}]) + if b_data: + d_data = b_data[0].get('d', [{}]) + if d_data: + source = d_data[0].get('sourcename', "其他") + source_impact = media_score.get(source, 5) + tagged_news = get_label(content, source) + public_opinion_score = tagged_news.get("public_opinion_score", 30) #资讯质量分 + China_factor = tagged_news.get("China_factor", 0.2) #中国股市相关度 + news_score = source_impact * 0.04 + public_opinion_score * 0.25 + China_factor * 35 + news_score = round(news_score, 2) + #如果想让分数整体偏高可以开根号乘10 + #news_score = round((news_score**0.5) * 10.0, 2) + + industry_confidence = tagged_news.get("industry_confidence", []) + industry_score = list(map(lambda x: round(x * news_score, 2), industry_confidence)) + concept_confidence = tagged_news.get("concept_confidence", []) + concept_score = list(map(lambda x: round(x * news_score, 2), concept_confidence)) + + tagged_news["source"] = source + tagged_news["source_impact"] = source_impact + tagged_news["industry_score"] = industry_score + tagged_news["concept_score"] = concept_score + tagged_news["news_score"] = news_score + tagged_news["id"] = id_str + + print(json.dumps(tagged_news, ensure_ascii=False)) + + # 发送百炼大模型标注过的新闻json到队列 + send_mq(tagged_news) + + # 处理成功后记录消息ID + processed_ids.add(id_str) + if len(processed_ids) > 10000: + processed_ids.clear() + + # 手动确认消息 + ch.basic_ack(delivery_tag=method.delivery_tag) + except Exception as e: + print(f"消息处理失败: {str(e)}") + # 拒绝消息, 不重新入队 + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) + +def create_connection(): + """创建并返回RabbitMQ连接""" + credentials = pika.PlainCredentials(mq_user, mq_password) + return pika.BlockingConnection( + pika.ConnectionParameters( + host="localhost", + credentials=credentials, + heartbeat=600, + connection_attempts=3, + retry_delay=5 # 重试延迟5秒 + ) + ) + +def start_consumer(): + """启动MQ消费者""" + while True: # 使用循环而不是递归,避免递归深度问题 + try: + connection = create_connection() + channel = connection.channel() + + # 设置QoS,限制每次只取一条消息 + channel.basic_qos(prefetch_count=1) + + channel.exchange_declare( + exchange="zzck_exchange", + exchange_type="fanout" + #durable=True # 确保交换器持久化 + ) + + # 声明持久化队列 + res = channel.queue_declare( + queue="to_ai" + # durable=True # 队列持久化 + ) + + mq_queue = res.method.queue + channel.queue_bind( + exchange="zzck_exchange", + queue=mq_queue, + ) + + # 启动消费,关闭自动ACK + channel.basic_consume( + queue=mq_queue, + on_message_callback=message_callback, + auto_ack=False # 关闭自动确认 + ) + + print("消费者已启动,等待消息...") + channel.start_consuming() + + except pika.exceptions.ConnectionClosedByBroker: + # 代理主动关闭连接,可能是临时错误 + print("连接被代理关闭,将在5秒后重试...") + time.sleep(5) + except pika.exceptions.AMQPConnectionError: + # 连接错误 + print("连接失败,将在10秒后重试...") + time.sleep(10) + except KeyboardInterrupt: + print("消费者被用户中断") + try: + if connection and connection.is_open: + connection.close() + except: + pass + break + except Exception as e: + print(f"消费者异常: {str(e)}") + print("将在15秒后重试...") + time.sleep(15) + finally: + try: + if connection and connection.is_open: + connection.close() + except: + pass + +if __name__ == "__main__": + start_consumer() diff --git a/mqreceive2.py b/mqreceive2.py new file mode 100644 index 0000000..c6342f5 --- /dev/null +++ b/mqreceive2.py @@ -0,0 +1,114 @@ +# 将资讯数据写入 es中 +import pika +import json +import logging +from config import * +from elasticsearch import Elasticsearch +from config import ES_HOST, ES_PORT, ES_USER, ES_PASSWORD +import datetime + + + +def message_callback(ch, method, properties, body): + try: + data = json.loads(body) + write_to_es(data) + ch.basic_ack(delivery_tag=method.delivery_tag) + except Exception as e: + print(f"ES写入失败: {str(e)}") + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) + +# 写入ES +def write_to_es(data): + # 初始化ES连接(添加在文件顶部) + es = Elasticsearch( + [f"http://{ES_HOST}:{ES_PORT}"], # 将协议直接包含在hosts中 + http_auth=(ES_USER, ES_PASSWORD) + ) + try: + # 插入ES核心逻辑 + category_data = data.get('c', [{}]) + category = lang = sourcename = "" + if category_data: + category = category_data[0].get('category', '') + b_data = category_data[0].get('b', [{}]) + if b_data: + lang = b_data[0].get('lang', '') + d_data = b_data[0].get('d', [{}]) + if d_data: + sourcename = d_data[0].get('sourcename', '') + + es.index( + index="news_info", + id=data['id'], + body={ + "news_id": data.get('id', ""), + "input_date": data.get('input_date', ""), + "words": data.get('words', ""), + "title_txt": data.get('title_txt', ""), + "key_word": data.get('key_word', ""), + "CN_content": data.get('CN_content', ""), + "EN_content": data.get('EN_content',0).replace('quot;', '"'), + "URL": data.get('URL', ""), + "abstract": data.get('abstract', ""), + "title_EN": data.get('title_EN', ""), + "category": category, + "sourcename": sourcename, + "lang": lang, + "deleted": "0", + "create_time":datetime.datetime.now(), + "update_time": datetime.datetime.now(), + "file_date": "", + "file_name": "" + } + ) + print(f"成功写入ES文档ID: {data['id']}") + except Exception as e: + print(f"写入ES失败: {str(e)}") + + +def start_consumer(): + """启动MQ消费者""" + try: + credentials = pika.PlainCredentials(mq_user, mq_password) + connection = pika.BlockingConnection( + pika.ConnectionParameters( + host="localhost", + credentials=credentials, + heartbeat=60 + ) + ) + channel = connection.channel() + channel.exchange_declare( + exchange="zzck_exchange", + exchange_type="fanout", + ) + + # 声明队列(匹配现有队列类型) queue 的名字可以自定义 + res = channel.queue_declare( + queue="to_es" + ) + + mq_queue = res.method.queue + channel.queue_bind( + exchange="zzck_exchange", + queue=mq_queue, + ) + + + # 启动消费 + channel.basic_consume( + queue=mq_queue, + on_message_callback=message_callback, + + ) + + print("消费者已启动,等待消息...") + channel.start_consuming() + + except Exception as e: + print(f"消费者启动失败: {str(e)}") + raise + +if __name__ == "__main__": + start_consumer() \ No newline at end of file diff --git a/mqreceivefromllm.py b/mqreceivefromllm.py new file mode 100644 index 0000000..7927f08 --- /dev/null +++ b/mqreceivefromllm.py @@ -0,0 +1,258 @@ +# 获取 标签的消息 写入数据库 +import pika +import json +import logging, time +from config import * +import pymysql +from elasticsearch import Elasticsearch +import datetime +import requests + +def message_callback(ch, method, properties, body): + """消息处理回调函数""" + try: + data = json.loads(body) + news_score = data.get('news_score', -1) + if news_score < 0: + ch.basic_ack(delivery_tag=method.delivery_tag) + return + # 在此处添加业务处理逻辑 写入mysql数据库 + write_to_mysql(data) + # 数据写入es + write_to_es(data) + + # 数据写入资讯精选表 + write_to_news(data) + + + + # 手动确认消息 + ch.basic_ack(delivery_tag=method.delivery_tag) + except Exception as e: + print(f"消息处理失败: {str(e)}") + # 拒绝消息并重新入队 + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) + +def write_to_news(data): + news_score = data.get('news_score', 0.0) + if float(news_score) < 80: # 过滤掉news_score小于80的消息 + return + + # 获取返回数据里面的 新闻id + news_id = data.get('id', "") + adr = jx_adr.replace("news_id", news_id) + print(f"接口地址为{adr}") + response = requests.get(adr) + if response.status_code != 200: + print(f"新闻id:{news_id} 得分:{news_score}, 调用精选接口失败, 错误码:{response.status_code}") + return + print(f"新闻id:{news_id} 得分:{news_score}, 调用精选接口成功") + + + +def write_to_es(data): + """写入ES""" + # 初始化ES连接(添加在文件顶部) + es = Elasticsearch( + [f"http://{ES_HOST}:{ES_PORT}"], # 将协议直接包含在hosts中 + basic_auth=(ES_USER, ES_PASSWORD) + ) + news_id = data.get('id', "") + es.update( + index="news_info", + id=news_id, + doc={ + "news_tags": { + "id": news_id, + "abstract": data.get('abstract', ""), + "title": data.get('title', ""), + "rewrite_content": data.get('rewrite_content', ""), + "industry_label": data.get('industry_label', []), + "industry_confidence": data.get('industry_confidence', []), + "industry_score": data.get('industry_score', []), + "concept_label": data.get('concept_label', []), + "concept_confidence": data.get('concept_confidence', []), + "concept_score": data.get('concept_score', []), + "public_opinion_score": data.get('public_opinion_score', 10), + "China_factor": data.get('China_factor', 0.1), + "source": data.get('source', "其他"), + "source_impact": data.get('source_impact', 5), + "news_score": data.get('news_score', 0.0), + "news_id": news_id, + "deleted": '0', + "create_time": datetime.datetime.now(), + "update_time": datetime.datetime.now() + } + } + ) + print(f"news_id:{news_id} 得分:{data.get('news_score', 0.0)}, 写入ES成功") + + + + +def write_to_mysql(data): + conn = pymysql.connect( + host=MYSQL_HOST_APP, + port=MYSQL_PORT_APP, + user=MYSQL_USER_APP, + password=MYSQL_PASSWORD_APP, + db=MYSQL_DB_APP, + charset='utf8mb4' + ) + try: + with conn.cursor() as cursor: + # 新增JSON结构解析逻辑 + # 修改后的SQL语句 + sql = """INSERT INTO news_tags + (abstract, title, rewrite_content, industry_label, industry_confidence, industry_score, concept_label, concept_confidence, concept_score, public_opinion_score, China_factor, source, source_impact, news_score, news_id) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """ + + values = (data.get('abstract', ""), + data.get('title', ""), + data.get('rewrite_content', ""), + json.dumps(data.get('industry_label', [])), + json.dumps(data.get('industry_confidence', [])), + json.dumps(data.get('industry_score', [])), + json.dumps(data.get('concept_label', [])), + json.dumps(data.get('concept_confidence', [])), + json.dumps(data.get('concept_score', [])), + data.get('public_opinion_score', 10), + data.get('China_factor', 0.1), + data.get('source', "其他"), + data.get('source_impact', 5), + data.get('news_score', 0.0), + data.get('id', "") + ) + cursor.execute(sql, values) + conn.commit() + id = data.get('id', "") + industry_label = data.get('industry_label', []) + concept_label = data.get('concept_label', []) + print(f"{id} {industry_label} {concept_label}, 写入news_tags 表成功") + + except Exception as e: + print(f"写入news_tags失败: {str(e)}") + finally: + conn.close() + return True + +def create_connection(): + """创建并返回RabbitMQ连接""" + credentials = pika.PlainCredentials(mq_user, mq_password) + return pika.BlockingConnection( + pika.ConnectionParameters( + host="localhost", + credentials=credentials, + heartbeat=600, + connection_attempts=3, + retry_delay=5 # 重试延迟5秒 + ) + ) + +def start_consumer(): + """启动MQ消费者""" + while True: # 使用循环而不是递归,避免递归深度问题 + try: + connection = create_connection() + channel = connection.channel() + + # 设置QoS,限制每次只取一条消息 + channel.basic_qos(prefetch_count=1) + + channel.exchange_declare( + exchange="zzck_llm_exchange", + exchange_type="fanout" + ) + + # 声明持久化队列 + res = channel.queue_declare( + queue="from_ai_to_mysql" + ) + + mq_queue = res.method.queue + channel.queue_bind( + exchange="zzck_llm_exchange", + queue=mq_queue, + ) + + # 启动消费,关闭自动ACK + channel.basic_consume( + queue=mq_queue, + on_message_callback=message_callback, + auto_ack=False # 关闭自动确认 + ) + + print("消费者已启动,等待消息...") + channel.start_consuming() + + except pika.exceptions.ConnectionClosedByBroker: + # 代理主动关闭连接,可能是临时错误 + print("连接被代理关闭,将在5秒后重试...") + time.sleep(5) + except pika.exceptions.AMQPConnectionError: + # 连接错误 + print("连接失败,将在10秒后重试...") + time.sleep(10) + except KeyboardInterrupt: + print("消费者被用户中断") + try: + if connection and connection.is_open: + connection.close() + except: + pass + break + except Exception as e: + print(f"消费者异常: {str(e)}") + print("将在15秒后重试...") + time.sleep(15) + finally: + try: + if connection and connection.is_open: + connection.close() + except: + pass + +# def start_consumer(): +# """启动MQ消费者""" +# try: +# credentials = pika.PlainCredentials(mq_user, mq_password) +# connection = pika.BlockingConnection( +# pika.ConnectionParameters( +# host="localhost", +# credentials=credentials, +# heartbeat=600 +# ) +# ) +# channel = connection.channel() +# channel.exchange_declare( +# exchange="zzck_exchange", +# exchange_type="fanout", +# ) + +# # 声明队列(匹配现有队列类型) queue 的名字可以自定义 +# res = channel.queue_declare( +# queue="from_ai_to_mysql" +# ) + +# mq_queue = res.method.queue +# channel.queue_bind( +# exchange="zzck_llm_exchange", +# queue=mq_queue, +# ) + +# # 启动消费 +# channel.basic_consume( +# queue=mq_queue, +# on_message_callback=message_callback, + +# ) + +# print("消费者已启动,等待消息...") +# channel.start_consuming() + +# except Exception as e: +# print(f"消费者启动失败: {str(e)}") +# start_consumer() + +if __name__ == "__main__": + start_consumer() diff --git a/mqsend.py b/mqsend.py new file mode 100644 index 0000000..c2ef453 --- /dev/null +++ b/mqsend.py @@ -0,0 +1,26 @@ +import pika +from config import * + + + +def send_message(message): + # 连接 RabbitMQ + credentials = pika.PlainCredentials(mq_user, mq_password) + connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost', credentials=credentials) + ) + channel = connection.channel() + + channel.exchange_declare(exchange='zzck_exchange', exchange_type='fanout') + # 声明队列 + # channel.queue_declare(queue=mq_queue) + + # 发送消息 + channel.basic_publish( exchange='zzck_exchange', + routing_key='', + body=message, + properties=pika.BasicProperties( + expiration='10' # 消息1秒后过期 + ) + ) + connection.close() \ No newline at end of file diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..a74a992 --- /dev/null +++ b/readme.md @@ -0,0 +1,2 @@ +1 中证参考 解析host 文件 +2 host 地址为 zzbtw.ckxx.net \ No newline at end of file diff --git a/schema.sql b/schema.sql new file mode 100644 index 0000000..fbd1d60 --- /dev/null +++ b/schema.sql @@ -0,0 +1,33 @@ +CREATE TABLE news_info ( + id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY COMMENT '唯一标识符', + news_id INT UNSIGNED NOT NULL COMMENT '新闻ID', + input_date DATETIME NOT NULL COMMENT '数据输入时间', + words MEDIUMINT UNSIGNED NOT NULL COMMENT '字数统计', + title_txt VARCHAR(255) NOT NULL COMMENT '中文标题', + key_word VARCHAR(255) NOT NULL COMMENT '关键词列表(分号分隔)', + CN_content TEXT NOT NULL COMMENT '中文正文内容', + EN_content TEXT NOT NULL COMMENT '英文正文内容', + URL VARCHAR(512) NOT NULL COMMENT '原文链接', + abstract VARCHAR(512) NOT NULL COMMENT '摘要', + title_EN VARCHAR(255) NOT NULL COMMENT '英文标题', + category VARCHAR(255) NOT NULL COMMENT '分类信息(JSON格式)', + sourcename VARCHAR(255) NOT NULL COMMENT '数据来源名称或标识', + lang VARCHAR(255) NOT NULL COMMENT '语言', + deleted TINYINT(5) DEFAULT 0 COMMENT '是否删除 0正常,1已删除', + create_time DATETIME NOT NULL COMMENT '创建时间', + update_time DATETIME DEFAULT NULL COMMENT '修改时间', + file_date DATE NOT NULL COMMENT '文件日期(取自目录结构)', + file_name VARCHAR(255) NOT NULL COMMENT '完整文件名(含扩展名)' +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + + +CREATE TABLE IF NOT EXISTS news_tags ( + id INT AUTO_INCREMENT PRIMARY KEY COMMENT '唯一标识', + abstract VARCHAR(500) NOT NULL COMMENT '摘要内容', + industry_label JSON COMMENT '行业标签数组', + concept_label JSON COMMENT '概念标签数组', + news_id INT NOT NULL COMMENT '关联的新闻ID', + deleted TINYINT(5) DEFAULT 0 COMMENT '是否删除 0正常,1已删除', + create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间' +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; \ No newline at end of file diff --git a/start.sh b/start.sh new file mode 100644 index 0000000..a0d14d4 --- /dev/null +++ b/start.sh @@ -0,0 +1,10 @@ +#!/bin/bash +# 后台启动脚本 +cd /root/zzck/ +echo "正在杀死所有包含main.py或mqreceive_multithread.py或mqreceivefromllm.py的进程" +ps aux | grep -E 'python.*(main|mqreceive_multithread|mqreceivefromllm)\.py' | awk '{print $2}' | xargs kill -9 +echo "已杀死所有包含main.py或mqreceive_multithread.py或mqreceivefromllm.py的进程" +nohup python -u main.py > zzck_nohup.log 2>&1 & +nohup python -u mqreceive_multithread.py > mqreceive_multithread.log 2>&1 & +nohup python -u mqreceivefromllm.py > mqreceivefromllm.log 2>&1 & +echo "程序已后台启动" diff --git a/sync_mysql_to_es.py b/sync_mysql_to_es.py new file mode 100644 index 0000000..a5fec90 --- /dev/null +++ b/sync_mysql_to_es.py @@ -0,0 +1,176 @@ +import argparse +from datetime import datetime +import pymysql +from elasticsearch import Elasticsearch, helpers +import json +# 复用现有配置 +from config import ES_HOST, ES_PORT, ES_USER, ES_PASSWORD, MYSQL_HOST_APP, MYSQL_USER_APP, MYSQL_PASSWORD_APP, MYSQL_DB_APP +from tqdm import tqdm +import requests +from config import jx_adr + +class MySQLToESSync: + def __init__(self): + self.es = Elasticsearch( + [f"http://{ES_HOST}:{ES_PORT}"], + basic_auth=(ES_USER, ES_PASSWORD) + ) + self.mysql_conn = pymysql.connect( + host=MYSQL_HOST_APP, + user=MYSQL_USER_APP, + password=MYSQL_PASSWORD_APP, + db=MYSQL_DB_APP, + charset='utf8mb4', + cursorclass=pymysql.cursors.DictCursor + ) + + # 更新全量表 + def sync_all(self, start_time=None, end_time=None): + # 更新全量表到 es + with self.mysql_conn.cursor(pymysql.cursors.DictCursor) as cursor: + query = self._build_query("news_info",start_time, end_time) + cursor.execute(query) + # 支持两种同步模式 + res = cursor.fetchall() + # 判断es中是否有该news_id + for data in tqdm(res, desc="同步全量表", unit="条"): + news_id = data['news_id'] + # 从es中查询 + try: + exists = self.es.get(index="news_info", id=news_id) + except: + exists = None + if exists: + pass + else: + # es中不存在,直接新增 + self.es.index( + index="news_info", + id=news_id, + document={ + "news_id": data.get('news_id', ""), + "input_date": data.get('input_date', ""), + "words": data.get('words', ""), + "title_txt": data.get('title_txt', ""), + "key_word": data.get('key_word', ""), + "CN_content": data.get('CN_content', ""), + "EN_content": data.get('EN_content',""), + "URL": data.get('URL', ""), + "abstract": data.get('abstract', ""), + "title_EN": data.get('title_EN', ""), + "category": data.get('category', ""), + "sourcename": data.get('sourcename', ""), + "lang": data.get('lang', ""), + "deleted": data.get('deleted', ""), + "create_time": data.get('create_time', ""), + "update_time": data.get('update_time', ""), + "file_date": data.get('file_date', ""), + "file_name": data.get('file_name', ""), + } + ) + print(f"新闻 {news_id} 更新到 ES 成功") + + + # 更新标签表 + def sync_tags(self, start_time=None, end_time=None): + + with self.mysql_conn.cursor(pymysql.cursors.DictCursor) as cursor: + query = self._build_query("news_tags",start_time, end_time) + cursor.execute(query) + + res = cursor.fetchall() + # 判断es中是否有该news_id + for data in tqdm(res, desc="同步新闻标签进度", unit="条"): + news_id = data['news_id'] + # 从es中查询 + try: + # 更新全量表到 es + es_doc = self.es.get(index="news_info", id=news_id) + news_tags = None + if es_doc and es_doc.get('found'): + news_tags = es_doc['_source'].get('news_tags', None) + if not news_tags: + # 将mysql的数据写入es + self.es.update( + index="news_info", + id=news_id, + doc={ + "news_tags": { + "id": news_id, + "abstract": data.get('abstract', ""), + "title": data.get('title', ""), + "rewrite_content": data.get('rewrite_content', ""), + "industry_label": json.loads(data.get('industry_label', [])), + "industry_confidence": json.loads(data.get('industry_confidence', [])), + "industry_score": json.loads(data.get('industry_score', [])), + "concept_label": json.loads(data.get('concept_label', [])), + "concept_confidence": json.loads(data.get('concept_confidence', [])), + "concept_score": json.loads(data.get('concept_score', [])), + "public_opinion_score": data.get('public_opinion_score', 10), + "China_factor": data.get('China_factor', 0.1), + "source": data.get('source', "其他"), + "source_impact": data.get('source_impact', 5), + "news_score": data.get('news_score', 0.0), + "news_id": news_id, + "deleted": '0', + "create_time": data.get('create_time', ""), + "update_time": data.get('update_time', "") + } + } + ) + print(f"新闻 {news_id} 标签更新到 ES 成功") + + + except Exception as e: + print(e) + + # 更新精选表 + def sync_chooses(self, start_time=None, end_time=None): + with self.mysql_conn.cursor(pymysql.cursors.DictCursor) as cursor: + query = "select * from news_tags where news_score >= 80" + cursor.execute(query) + res = cursor.fetchall() + # 判断es中是否有该news_id + for data in tqdm(res, desc="同步精选表数据", unit="条"): + # 从es中查询 + try: + news_id = data['news_id'] + news_score = data['news_score'] + # 获取返回数据里面的 新闻id + adr = jx_adr.replace("news_id", str(news_id)) + response = requests.get(adr) + if response.status_code != 200: + print(f"新闻id:{news_id} 得分:{news_score}, 调用精选接口失败, 错误码:{response.status_code}") + print(f"新闻id:{news_id} 得分:{news_score}, 调用精选接口成功") + except Exception as e: + print(e) + + + + + + def _build_query(self, table_name, start_time, end_time): + base_query = f""" + SELECT * + FROM {table_name} + """ + if start_time and end_time: + return f"{base_query} WHERE create_time BETWEEN '{start_time}' AND '{end_time}'" + return base_query + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='MySQL to ES Sync Tool') + parser.add_argument('--full', action='store_true', help='Full synchronization') + parser.add_argument('--start', type=str, help='Start time (YYYY-MM-DD HH:MM:SS)') + parser.add_argument('--end', type=str, help='End time (YYYY-MM-DD HH:MM:SS)') + + args = parser.parse_args() + + if not args.full and not (args.start and args.end): + raise ValueError("需要指定--full全量同步或--start/--end时间范围") + + sync = MySQLToESSync() + sync.sync_all(args.start, args.end) + sync.sync_tags(args.start, args.end) + sync.sync_chooses(args.start, args.end) + \ No newline at end of file diff --git a/table_init.py b/table_init.py new file mode 100644 index 0000000..2634c98 --- /dev/null +++ b/table_init.py @@ -0,0 +1,18 @@ +from config import * +import pymysql +import json +import logging +import datetime + +conn = pymysql.connect( + host=MYSQL_HOST_APP, + port=MYSQL_PORT_APP, + user=MYSQL_USER_APP, + password=MYSQL_PASSWORD_APP, + db=MYSQL_DB_APP, + charset='utf8mb4' + ) +with conn.cursor() as cursor: + with open('schema.sql', 'r') as schema_file: + create_table_sql = schema_file.read().replace('\n', ' ') # Remove line breaks + cursor.execute(create_table_sql) \ No newline at end of file diff --git a/test.py b/test.py new file mode 100644 index 0000000..269daad --- /dev/null +++ b/test.py @@ -0,0 +1,42 @@ +import os +from urllib.parse import urlparse, urljoin +import time +import pymysql +from pathlib import Path +import logging +from apscheduler.schedulers.blocking import BlockingScheduler +from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR +import json +import re +from config import * # 导入配置 +import datetime + +def clean_control_characters(text): + return re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text) + # 修改文件编码为UTF-16 +with open("data\\2025-05-20\\01-11-04.txt", 'r', encoding='utf-16') as f: # 修改编码方式 + raw_data = f.read() + cleaned_data = clean_control_characters(raw_data) + data_list = json.loads(cleaned_data) + for data in data_list: # 遍历数组中的每个对象 + # 处理特殊字符转义 + en_content = data['EN_content'].replace('quot;', '"') + for k,v in data.items(): + print(k) + values = ( + data['id'], + data['input_date'], + data['words'], + data['title_txt'], + data['key_word'], + data['CN_content'], + en_content, # 使用处理后的英文内容 + data['URL'], + data['abstract'], + data['title_EN'], + data.get('data_source', 'zzck'), + json.dumps(data['c']), + 0, + + ) + \ No newline at end of file