# 获取 标签的消息 写入数据库 import pika import json import logging, time from config import * import pymysql from elasticsearch import Elasticsearch import datetime import requests def message_callback(ch, method, properties, body): """消息处理回调函数""" try: data = json.loads(body) news_score = data.get('news_score', -1) if news_score < 0: ch.basic_ack(delivery_tag=method.delivery_tag) return # 在此处添加业务处理逻辑 写入mysql数据库 write_to_mysql(data) # 数据写入es write_to_es(data) # 数据写入资讯精选表 write_to_news(data) # 手动确认消息 ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: print(f"消息处理失败: {str(e)}") # 拒绝消息并重新入队 ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) def write_to_news(data): news_score = data.get('news_score', 0.0) if float(news_score) < 80: # 过滤掉news_score小于80的消息 return # 获取返回数据里面的 新闻id news_id = data.get('id', "") adr = jx_adr.replace("news_id", news_id) print(f"接口地址为{adr}") response = requests.get(adr) if response.status_code != 200: print(f"新闻id:{news_id} 得分:{news_score}, 调用精选接口失败, 错误码:{response.status_code}") return print(f"新闻id:{news_id} 得分:{news_score}, 调用精选接口成功") def write_to_es(data): """写入ES""" # 初始化ES连接(添加在文件顶部) es = Elasticsearch( [f"http://{ES_HOST}:{ES_PORT}"], # 将协议直接包含在hosts中 basic_auth=(ES_USER, ES_PASSWORD) ) news_id = data.get('id', "") es.update( index="news_info", id=news_id, doc={ "news_tags": { "id": news_id, "abstract": data.get('abstract', ""), "title": data.get('title', ""), "rewrite_content": data.get('rewrite_content', ""), "industry_label": data.get('industry_label', []), "industry_confidence": data.get('industry_confidence', []), "industry_score": data.get('industry_score', []), "concept_label": data.get('concept_label', []), "concept_confidence": data.get('concept_confidence', []), "concept_score": data.get('concept_score', []), "public_opinion_score": data.get('public_opinion_score', 10), "China_factor": data.get('China_factor', 0.1), "source": data.get('source', "其他"), "source_impact": data.get('source_impact', 5), "news_score": data.get('news_score', 0.0), "news_id": news_id, "deleted": '0', "create_time": datetime.datetime.now(), "update_time": datetime.datetime.now() } } ) print(f"news_id:{news_id} 得分:{data.get('news_score', 0.0)}, 写入ES成功") def write_to_mysql(data): conn = pymysql.connect( host=MYSQL_HOST_APP, port=MYSQL_PORT_APP, user=MYSQL_USER_APP, password=MYSQL_PASSWORD_APP, db=MYSQL_DB_APP, charset='utf8mb4' ) try: with conn.cursor() as cursor: # 新增JSON结构解析逻辑 # 修改后的SQL语句 sql = """INSERT INTO news_tags (abstract, title, rewrite_content, industry_label, industry_confidence, industry_score, concept_label, concept_confidence, concept_score, public_opinion_score, China_factor, source, source_impact, news_score, news_id) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """ values = (data.get('abstract', ""), data.get('title', ""), data.get('rewrite_content', ""), json.dumps(data.get('industry_label', [])), json.dumps(data.get('industry_confidence', [])), json.dumps(data.get('industry_score', [])), json.dumps(data.get('concept_label', [])), json.dumps(data.get('concept_confidence', [])), json.dumps(data.get('concept_score', [])), data.get('public_opinion_score', 10), data.get('China_factor', 0.1), data.get('source', "其他"), data.get('source_impact', 5), data.get('news_score', 0.0), data.get('id', "") ) cursor.execute(sql, values) conn.commit() id = data.get('id', "") industry_label = data.get('industry_label', []) concept_label = data.get('concept_label', []) print(f"{id} {industry_label} {concept_label}, 写入news_tags 表成功") except Exception as e: print(f"写入news_tags失败: {str(e)}") finally: conn.close() return True def create_connection(): """创建并返回RabbitMQ连接""" credentials = pika.PlainCredentials(mq_user, mq_password) return pika.BlockingConnection( pika.ConnectionParameters( host="localhost", credentials=credentials, heartbeat=600, connection_attempts=3, retry_delay=5 # 重试延迟5秒 ) ) def start_consumer(): """启动MQ消费者""" while True: # 使用循环而不是递归,避免递归深度问题 try: connection = create_connection() channel = connection.channel() # 设置QoS,限制每次只取一条消息 channel.basic_qos(prefetch_count=1) channel.exchange_declare( exchange="zzck_llm_exchange", exchange_type="fanout" ) # 声明持久化队列 res = channel.queue_declare( queue="from_ai_to_mysql" ) mq_queue = res.method.queue channel.queue_bind( exchange="zzck_llm_exchange", queue=mq_queue, ) # 启动消费,关闭自动ACK channel.basic_consume( queue=mq_queue, on_message_callback=message_callback, auto_ack=False # 关闭自动确认 ) print("消费者已启动,等待消息...") channel.start_consuming() except pika.exceptions.ConnectionClosedByBroker: # 代理主动关闭连接,可能是临时错误 print("连接被代理关闭,将在5秒后重试...") time.sleep(5) except pika.exceptions.AMQPConnectionError: # 连接错误 print("连接失败,将在10秒后重试...") time.sleep(10) except KeyboardInterrupt: print("消费者被用户中断") try: if connection and connection.is_open: connection.close() except: pass break except Exception as e: print(f"消费者异常: {str(e)}") print("将在15秒后重试...") time.sleep(15) finally: try: if connection and connection.is_open: connection.close() except: pass # def start_consumer(): # """启动MQ消费者""" # try: # credentials = pika.PlainCredentials(mq_user, mq_password) # connection = pika.BlockingConnection( # pika.ConnectionParameters( # host="localhost", # credentials=credentials, # heartbeat=600 # ) # ) # channel = connection.channel() # channel.exchange_declare( # exchange="zzck_exchange", # exchange_type="fanout", # ) # # 声明队列(匹配现有队列类型) queue 的名字可以自定义 # res = channel.queue_declare( # queue="from_ai_to_mysql" # ) # mq_queue = res.method.queue # channel.queue_bind( # exchange="zzck_llm_exchange", # queue=mq_queue, # ) # # 启动消费 # channel.basic_consume( # queue=mq_queue, # on_message_callback=message_callback, # ) # print("消费者已启动,等待消息...") # channel.start_consuming() # except Exception as e: # print(f"消费者启动失败: {str(e)}") # start_consumer() if __name__ == "__main__": start_consumer()