zzck/mqreceive2.py

114 lines
3.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 将资讯数据写入 es中
import pika
import json
import logging
from config import *
from elasticsearch import Elasticsearch
from config import ES_HOST, ES_PORT, ES_USER, ES_PASSWORD
import datetime
def message_callback(ch, method, properties, body):
try:
data = json.loads(body)
write_to_es(data)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"ES写入失败: {str(e)}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
# 写入ES
def write_to_es(data):
# 初始化ES连接添加在文件顶部
es = Elasticsearch(
[f"http://{ES_HOST}:{ES_PORT}"], # 将协议直接包含在hosts中
http_auth=(ES_USER, ES_PASSWORD)
)
try:
# 插入ES核心逻辑
category_data = data.get('c', [{}])
category = lang = sourcename = ""
if category_data:
category = category_data[0].get('category', '')
b_data = category_data[0].get('b', [{}])
if b_data:
lang = b_data[0].get('lang', '')
d_data = b_data[0].get('d', [{}])
if d_data:
sourcename = d_data[0].get('sourcename', '')
es.index(
index="news_info",
id=data['id'],
body={
"news_id": data.get('id', ""),
"input_date": data.get('input_date', ""),
"words": data.get('words', ""),
"title_txt": data.get('title_txt', ""),
"key_word": data.get('key_word', ""),
"CN_content": data.get('CN_content', ""),
"EN_content": data.get('EN_content',0).replace('quot;', '"'),
"URL": data.get('URL', ""),
"abstract": data.get('abstract', ""),
"title_EN": data.get('title_EN', ""),
"category": category,
"sourcename": sourcename,
"lang": lang,
"deleted": "0",
"create_time":datetime.datetime.now(),
"update_time": datetime.datetime.now(),
"file_date": "",
"file_name": ""
}
)
print(f"成功写入ES文档ID: {data['id']}")
except Exception as e:
print(f"写入ES失败: {str(e)}")
def start_consumer():
"""启动MQ消费者"""
try:
credentials = pika.PlainCredentials(mq_user, mq_password)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host="localhost",
credentials=credentials,
heartbeat=60
)
)
channel = connection.channel()
channel.exchange_declare(
exchange="zzck_exchange",
exchange_type="fanout",
)
# 声明队列(匹配现有队列类型) queue 的名字可以自定义
res = channel.queue_declare(
queue="to_es"
)
mq_queue = res.method.queue
channel.queue_bind(
exchange="zzck_exchange",
queue=mq_queue,
)
# 启动消费
channel.basic_consume(
queue=mq_queue,
on_message_callback=message_callback,
)
print("消费者已启动,等待消息...")
channel.start_consuming()
except Exception as e:
print(f"消费者启动失败: {str(e)}")
raise
if __name__ == "__main__":
start_consumer()