114 lines
3.5 KiB
Python
114 lines
3.5 KiB
Python
|
# 将资讯数据写入 es中
|
|||
|
import pika
|
|||
|
import json
|
|||
|
import logging
|
|||
|
from config import *
|
|||
|
from elasticsearch import Elasticsearch
|
|||
|
from config import ES_HOST, ES_PORT, ES_USER, ES_PASSWORD
|
|||
|
import datetime
|
|||
|
|
|||
|
|
|||
|
|
|||
|
def message_callback(ch, method, properties, body):
|
|||
|
try:
|
|||
|
data = json.loads(body)
|
|||
|
write_to_es(data)
|
|||
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|||
|
except Exception as e:
|
|||
|
print(f"ES写入失败: {str(e)}")
|
|||
|
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
|||
|
|
|||
|
# 写入ES
|
|||
|
def write_to_es(data):
|
|||
|
# 初始化ES连接(添加在文件顶部)
|
|||
|
es = Elasticsearch(
|
|||
|
[f"http://{ES_HOST}:{ES_PORT}"], # 将协议直接包含在hosts中
|
|||
|
http_auth=(ES_USER, ES_PASSWORD)
|
|||
|
)
|
|||
|
try:
|
|||
|
# 插入ES核心逻辑
|
|||
|
category_data = data.get('c', [{}])
|
|||
|
category = lang = sourcename = ""
|
|||
|
if category_data:
|
|||
|
category = category_data[0].get('category', '')
|
|||
|
b_data = category_data[0].get('b', [{}])
|
|||
|
if b_data:
|
|||
|
lang = b_data[0].get('lang', '')
|
|||
|
d_data = b_data[0].get('d', [{}])
|
|||
|
if d_data:
|
|||
|
sourcename = d_data[0].get('sourcename', '')
|
|||
|
|
|||
|
es.index(
|
|||
|
index="news_info",
|
|||
|
id=data['id'],
|
|||
|
body={
|
|||
|
"news_id": data.get('id', ""),
|
|||
|
"input_date": data.get('input_date', ""),
|
|||
|
"words": data.get('words', ""),
|
|||
|
"title_txt": data.get('title_txt', ""),
|
|||
|
"key_word": data.get('key_word', ""),
|
|||
|
"CN_content": data.get('CN_content', ""),
|
|||
|
"EN_content": data.get('EN_content',0).replace('quot;', '"'),
|
|||
|
"URL": data.get('URL', ""),
|
|||
|
"abstract": data.get('abstract', ""),
|
|||
|
"title_EN": data.get('title_EN', ""),
|
|||
|
"category": category,
|
|||
|
"sourcename": sourcename,
|
|||
|
"lang": lang,
|
|||
|
"deleted": "0",
|
|||
|
"create_time":datetime.datetime.now(),
|
|||
|
"update_time": datetime.datetime.now(),
|
|||
|
"file_date": "",
|
|||
|
"file_name": ""
|
|||
|
}
|
|||
|
)
|
|||
|
print(f"成功写入ES文档ID: {data['id']}")
|
|||
|
except Exception as e:
|
|||
|
print(f"写入ES失败: {str(e)}")
|
|||
|
|
|||
|
|
|||
|
def start_consumer():
|
|||
|
"""启动MQ消费者"""
|
|||
|
try:
|
|||
|
credentials = pika.PlainCredentials(mq_user, mq_password)
|
|||
|
connection = pika.BlockingConnection(
|
|||
|
pika.ConnectionParameters(
|
|||
|
host="localhost",
|
|||
|
credentials=credentials,
|
|||
|
heartbeat=60
|
|||
|
)
|
|||
|
)
|
|||
|
channel = connection.channel()
|
|||
|
channel.exchange_declare(
|
|||
|
exchange="zzck_exchange",
|
|||
|
exchange_type="fanout",
|
|||
|
)
|
|||
|
|
|||
|
# 声明队列(匹配现有队列类型) queue 的名字可以自定义
|
|||
|
res = channel.queue_declare(
|
|||
|
queue="to_es"
|
|||
|
)
|
|||
|
|
|||
|
mq_queue = res.method.queue
|
|||
|
channel.queue_bind(
|
|||
|
exchange="zzck_exchange",
|
|||
|
queue=mq_queue,
|
|||
|
)
|
|||
|
|
|||
|
|
|||
|
# 启动消费
|
|||
|
channel.basic_consume(
|
|||
|
queue=mq_queue,
|
|||
|
on_message_callback=message_callback,
|
|||
|
|
|||
|
)
|
|||
|
|
|||
|
print("消费者已启动,等待消息...")
|
|||
|
channel.start_consuming()
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
print(f"消费者启动失败: {str(e)}")
|
|||
|
raise
|
|||
|
|
|||
|
if __name__ == "__main__":
|
|||
|
start_consumer()
|