259 lines
8.8 KiB
Python
259 lines
8.8 KiB
Python
# 获取 标签的消息 写入数据库
|
||
import pika
|
||
import json
|
||
import logging, time
|
||
from config import *
|
||
import pymysql
|
||
from elasticsearch import Elasticsearch
|
||
import datetime
|
||
import requests
|
||
|
||
def message_callback(ch, method, properties, body):
|
||
"""消息处理回调函数"""
|
||
try:
|
||
data = json.loads(body)
|
||
news_score = data.get('news_score', -1)
|
||
if news_score < 0:
|
||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||
return
|
||
# 在此处添加业务处理逻辑 写入mysql数据库
|
||
write_to_mysql(data)
|
||
# 数据写入es
|
||
write_to_es(data)
|
||
|
||
# 数据写入资讯精选表
|
||
write_to_news(data)
|
||
|
||
|
||
|
||
# 手动确认消息
|
||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||
except Exception as e:
|
||
print(f"消息处理失败: {str(e)}")
|
||
# 拒绝消息并重新入队
|
||
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
|
||
|
||
def write_to_news(data):
|
||
news_score = data.get('news_score', 0.0)
|
||
if float(news_score) < 80: # 过滤掉news_score小于80的消息
|
||
return
|
||
|
||
# 获取返回数据里面的 新闻id
|
||
news_id = data.get('id', "")
|
||
adr = jx_adr.replace("news_id", news_id)
|
||
print(f"接口地址为{adr}")
|
||
response = requests.get(adr)
|
||
if response.status_code != 200:
|
||
print(f"新闻id:{news_id} 得分:{news_score}, 调用精选接口失败, 错误码:{response.status_code}")
|
||
return
|
||
print(f"新闻id:{news_id} 得分:{news_score}, 调用精选接口成功")
|
||
|
||
|
||
|
||
def write_to_es(data):
|
||
"""写入ES"""
|
||
# 初始化ES连接(添加在文件顶部)
|
||
es = Elasticsearch(
|
||
[f"http://{ES_HOST}:{ES_PORT}"], # 将协议直接包含在hosts中
|
||
basic_auth=(ES_USER, ES_PASSWORD)
|
||
)
|
||
news_id = data.get('id', "")
|
||
es.update(
|
||
index="news_info",
|
||
id=news_id,
|
||
doc={
|
||
"news_tags": {
|
||
"id": news_id,
|
||
"abstract": data.get('abstract', ""),
|
||
"title": data.get('title', ""),
|
||
"rewrite_content": data.get('rewrite_content', ""),
|
||
"industry_label": data.get('industry_label', []),
|
||
"industry_confidence": data.get('industry_confidence', []),
|
||
"industry_score": data.get('industry_score', []),
|
||
"concept_label": data.get('concept_label', []),
|
||
"concept_confidence": data.get('concept_confidence', []),
|
||
"concept_score": data.get('concept_score', []),
|
||
"public_opinion_score": data.get('public_opinion_score', 10),
|
||
"China_factor": data.get('China_factor', 0.1),
|
||
"source": data.get('source', "其他"),
|
||
"source_impact": data.get('source_impact', 5),
|
||
"news_score": data.get('news_score', 0.0),
|
||
"news_id": news_id,
|
||
"deleted": '0',
|
||
"create_time": datetime.datetime.now(),
|
||
"update_time": datetime.datetime.now()
|
||
}
|
||
}
|
||
)
|
||
print(f"news_id:{news_id} 得分:{data.get('news_score', 0.0)}, 写入ES成功")
|
||
|
||
|
||
|
||
|
||
def write_to_mysql(data):
|
||
conn = pymysql.connect(
|
||
host=MYSQL_HOST_APP,
|
||
port=MYSQL_PORT_APP,
|
||
user=MYSQL_USER_APP,
|
||
password=MYSQL_PASSWORD_APP,
|
||
db=MYSQL_DB_APP,
|
||
charset='utf8mb4'
|
||
)
|
||
try:
|
||
with conn.cursor() as cursor:
|
||
# 新增JSON结构解析逻辑
|
||
# 修改后的SQL语句
|
||
sql = """INSERT INTO news_tags
|
||
(abstract, title, rewrite_content, industry_label, industry_confidence, industry_score, concept_label, concept_confidence, concept_score, public_opinion_score, China_factor, source, source_impact, news_score, news_id)
|
||
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """
|
||
|
||
values = (data.get('abstract', ""),
|
||
data.get('title', ""),
|
||
data.get('rewrite_content', ""),
|
||
json.dumps(data.get('industry_label', [])),
|
||
json.dumps(data.get('industry_confidence', [])),
|
||
json.dumps(data.get('industry_score', [])),
|
||
json.dumps(data.get('concept_label', [])),
|
||
json.dumps(data.get('concept_confidence', [])),
|
||
json.dumps(data.get('concept_score', [])),
|
||
data.get('public_opinion_score', 10),
|
||
data.get('China_factor', 0.1),
|
||
data.get('source', "其他"),
|
||
data.get('source_impact', 5),
|
||
data.get('news_score', 0.0),
|
||
data.get('id', "")
|
||
)
|
||
cursor.execute(sql, values)
|
||
conn.commit()
|
||
id = data.get('id', "")
|
||
industry_label = data.get('industry_label', [])
|
||
concept_label = data.get('concept_label', [])
|
||
print(f"{id} {industry_label} {concept_label}, 写入news_tags 表成功")
|
||
|
||
except Exception as e:
|
||
print(f"写入news_tags失败: {str(e)}")
|
||
finally:
|
||
conn.close()
|
||
return True
|
||
|
||
def create_connection():
|
||
"""创建并返回RabbitMQ连接"""
|
||
credentials = pika.PlainCredentials(mq_user, mq_password)
|
||
return pika.BlockingConnection(
|
||
pika.ConnectionParameters(
|
||
host="localhost",
|
||
credentials=credentials,
|
||
heartbeat=600,
|
||
connection_attempts=3,
|
||
retry_delay=5 # 重试延迟5秒
|
||
)
|
||
)
|
||
|
||
def start_consumer():
|
||
"""启动MQ消费者"""
|
||
while True: # 使用循环而不是递归,避免递归深度问题
|
||
try:
|
||
connection = create_connection()
|
||
channel = connection.channel()
|
||
|
||
# 设置QoS,限制每次只取一条消息
|
||
channel.basic_qos(prefetch_count=1)
|
||
|
||
channel.exchange_declare(
|
||
exchange="zzck_llm_exchange",
|
||
exchange_type="fanout"
|
||
)
|
||
|
||
# 声明持久化队列
|
||
res = channel.queue_declare(
|
||
queue="from_ai_to_mysql"
|
||
)
|
||
|
||
mq_queue = res.method.queue
|
||
channel.queue_bind(
|
||
exchange="zzck_llm_exchange",
|
||
queue=mq_queue,
|
||
)
|
||
|
||
# 启动消费,关闭自动ACK
|
||
channel.basic_consume(
|
||
queue=mq_queue,
|
||
on_message_callback=message_callback,
|
||
auto_ack=False # 关闭自动确认
|
||
)
|
||
|
||
print("消费者已启动,等待消息...")
|
||
channel.start_consuming()
|
||
|
||
except pika.exceptions.ConnectionClosedByBroker:
|
||
# 代理主动关闭连接,可能是临时错误
|
||
print("连接被代理关闭,将在5秒后重试...")
|
||
time.sleep(5)
|
||
except pika.exceptions.AMQPConnectionError:
|
||
# 连接错误
|
||
print("连接失败,将在10秒后重试...")
|
||
time.sleep(10)
|
||
except KeyboardInterrupt:
|
||
print("消费者被用户中断")
|
||
try:
|
||
if connection and connection.is_open:
|
||
connection.close()
|
||
except:
|
||
pass
|
||
break
|
||
except Exception as e:
|
||
print(f"消费者异常: {str(e)}")
|
||
print("将在15秒后重试...")
|
||
time.sleep(15)
|
||
finally:
|
||
try:
|
||
if connection and connection.is_open:
|
||
connection.close()
|
||
except:
|
||
pass
|
||
|
||
# def start_consumer():
|
||
# """启动MQ消费者"""
|
||
# try:
|
||
# credentials = pika.PlainCredentials(mq_user, mq_password)
|
||
# connection = pika.BlockingConnection(
|
||
# pika.ConnectionParameters(
|
||
# host="localhost",
|
||
# credentials=credentials,
|
||
# heartbeat=600
|
||
# )
|
||
# )
|
||
# channel = connection.channel()
|
||
# channel.exchange_declare(
|
||
# exchange="zzck_exchange",
|
||
# exchange_type="fanout",
|
||
# )
|
||
|
||
# # 声明队列(匹配现有队列类型) queue 的名字可以自定义
|
||
# res = channel.queue_declare(
|
||
# queue="from_ai_to_mysql"
|
||
# )
|
||
|
||
# mq_queue = res.method.queue
|
||
# channel.queue_bind(
|
||
# exchange="zzck_llm_exchange",
|
||
# queue=mq_queue,
|
||
# )
|
||
|
||
# # 启动消费
|
||
# channel.basic_consume(
|
||
# queue=mq_queue,
|
||
# on_message_callback=message_callback,
|
||
|
||
# )
|
||
|
||
# print("消费者已启动,等待消息...")
|
||
# channel.start_consuming()
|
||
|
||
# except Exception as e:
|
||
# print(f"消费者启动失败: {str(e)}")
|
||
# start_consumer()
|
||
|
||
if __name__ == "__main__":
|
||
start_consumer()
|