zzck/mqreceive2.py

114 lines
3.5 KiB
Python
Raw Permalink Normal View History

2025-09-02 15:15:51 +08:00
# 将资讯数据写入 es中
import pika
import json
import logging
from config import *
from elasticsearch import Elasticsearch
from config import ES_HOST, ES_PORT, ES_USER, ES_PASSWORD
import datetime
def message_callback(ch, method, properties, body):
try:
data = json.loads(body)
write_to_es(data)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"ES写入失败: {str(e)}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
# 写入ES
def write_to_es(data):
# 初始化ES连接添加在文件顶部
es = Elasticsearch(
[f"http://{ES_HOST}:{ES_PORT}"], # 将协议直接包含在hosts中
http_auth=(ES_USER, ES_PASSWORD)
)
try:
# 插入ES核心逻辑
category_data = data.get('c', [{}])
category = lang = sourcename = ""
if category_data:
category = category_data[0].get('category', '')
b_data = category_data[0].get('b', [{}])
if b_data:
lang = b_data[0].get('lang', '')
d_data = b_data[0].get('d', [{}])
if d_data:
sourcename = d_data[0].get('sourcename', '')
es.index(
index="news_info",
id=data['id'],
body={
"news_id": data.get('id', ""),
"input_date": data.get('input_date', ""),
"words": data.get('words', ""),
"title_txt": data.get('title_txt', ""),
"key_word": data.get('key_word', ""),
"CN_content": data.get('CN_content', ""),
"EN_content": data.get('EN_content',0).replace('quot;', '"'),
"URL": data.get('URL', ""),
"abstract": data.get('abstract', ""),
"title_EN": data.get('title_EN', ""),
"category": category,
"sourcename": sourcename,
"lang": lang,
"deleted": "0",
"create_time":datetime.datetime.now(),
"update_time": datetime.datetime.now(),
"file_date": "",
"file_name": ""
}
)
print(f"成功写入ES文档ID: {data['id']}")
except Exception as e:
print(f"写入ES失败: {str(e)}")
def start_consumer():
"""启动MQ消费者"""
try:
credentials = pika.PlainCredentials(mq_user, mq_password)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host="localhost",
credentials=credentials,
heartbeat=60
)
)
channel = connection.channel()
channel.exchange_declare(
exchange="zzck_exchange",
exchange_type="fanout",
)
# 声明队列(匹配现有队列类型) queue 的名字可以自定义
res = channel.queue_declare(
queue="to_es"
)
mq_queue = res.method.queue
channel.queue_bind(
exchange="zzck_exchange",
queue=mq_queue,
)
# 启动消费
channel.basic_consume(
queue=mq_queue,
on_message_callback=message_callback,
)
print("消费者已启动,等待消息...")
channel.start_consuming()
except Exception as e:
print(f"消费者启动失败: {str(e)}")
raise
if __name__ == "__main__":
start_consumer()