import pika import json import logging import time import os from config import * from llm_process import send_mq, get_label # 声明一个全局变量,存媒体的权威度打分 media_score = {} with open("media_score.txt", "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: media, score = line.split("\t") media_score[media.strip()] = int(score) except ValueError as e: print(f"解析错误: {e},行内容: {line}") continue # 幂等性存储 - 记录已处理消息ID processed_ids = set() def message_callback(ch, method, properties, body): """消息处理回调函数""" try: data = json.loads(body) id_str = str(data["id"]) # ch.basic_ack(delivery_tag=method.delivery_tag) # print(f"接收到消息: {id_str}") # return # 幂等性检查:如果消息已处理过,直接确认并跳过 if id_str in processed_ids: print(f"跳过已处理的消息: {id_str}") ch.basic_ack(delivery_tag=method.delivery_tag) return # 在此处添加业务处理逻辑 content = data.get('CN_content', "").strip() source = "其他" category_data = data.get('c', [{}]) category = "" if category_data: category = category_data[0].get('category', '') b_data = category_data[0].get('b', [{}]) if b_data: d_data = b_data[0].get('d', [{}]) if d_data: source = d_data[0].get('sourcename', "其他") source_impact = media_score.get(source, 5) tagged_news = get_label(content, source) public_opinion_score = tagged_news.get("public_opinion_score", 30) #资讯质量分 China_factor = tagged_news.get("China_factor", 0.2) #中国股市相关度 news_score = source_impact * 0.04 + public_opinion_score * 0.25 + China_factor * 35 news_score = round(news_score, 2) #如果想让分数整体偏高可以开根号乘10 #news_score = round((news_score**0.5) * 10.0, 2) industry_confidence = tagged_news.get("industry_confidence", []) industry_score = list(map(lambda x: round(x * news_score, 2), industry_confidence)) concept_confidence = tagged_news.get("concept_confidence", []) concept_score = list(map(lambda x: round(x * news_score, 2), concept_confidence)) tagged_news["source"] = source tagged_news["source_impact"] = source_impact tagged_news["industry_score"] = industry_score tagged_news["concept_score"] = concept_score tagged_news["news_score"] = news_score tagged_news["id"] = id_str print(json.dumps(tagged_news, ensure_ascii=False)) # 发送百炼大模型标注过的新闻json到队列 send_mq(tagged_news) # 处理成功后记录消息ID processed_ids.add(id_str) if len(processed_ids) > 10000: processed_ids.clear() # 手动确认消息 ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: print(f"消息处理失败: {str(e)}") # 拒绝消息, 不重新入队 ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) def create_connection(): """创建并返回RabbitMQ连接""" credentials = pika.PlainCredentials(mq_user, mq_password) return pika.BlockingConnection( pika.ConnectionParameters( host="localhost", credentials=credentials, heartbeat=600, connection_attempts=3, retry_delay=5 # 重试延迟5秒 ) ) def start_consumer(): """启动MQ消费者""" while True: # 使用循环而不是递归,避免递归深度问题 try: connection = create_connection() channel = connection.channel() # 设置QoS,限制每次只取一条消息 channel.basic_qos(prefetch_count=1) channel.exchange_declare( exchange="zzck_exchange", exchange_type="fanout" #durable=True # 确保交换器持久化 ) # 声明持久化队列 res = channel.queue_declare( queue="to_ai" # durable=True # 队列持久化 ) mq_queue = res.method.queue channel.queue_bind( exchange="zzck_exchange", queue=mq_queue, ) # 启动消费,关闭自动ACK channel.basic_consume( queue=mq_queue, on_message_callback=message_callback, auto_ack=False # 关闭自动确认 ) print("消费者已启动,等待消息...") channel.start_consuming() except pika.exceptions.ConnectionClosedByBroker: # 代理主动关闭连接,可能是临时错误 print("连接被代理关闭,将在5秒后重试...") time.sleep(5) except pika.exceptions.AMQPConnectionError: # 连接错误 print("连接失败,将在10秒后重试...") time.sleep(10) except KeyboardInterrupt: print("消费者被用户中断") try: if connection and connection.is_open: connection.close() except: pass break except Exception as e: print(f"消费者异常: {str(e)}") print("将在15秒后重试...") time.sleep(15) finally: try: if connection and connection.is_open: connection.close() except: pass if __name__ == "__main__": start_consumer()