# 将资讯数据写入 es中 import pika import json import logging from config import * from elasticsearch import Elasticsearch from config import ES_HOST, ES_PORT, ES_USER, ES_PASSWORD import datetime def message_callback(ch, method, properties, body): try: data = json.loads(body) write_to_es(data) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: print(f"ES写入失败: {str(e)}") ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) # 写入ES def write_to_es(data): # 初始化ES连接(添加在文件顶部) es = Elasticsearch( [f"http://{ES_HOST}:{ES_PORT}"], # 将协议直接包含在hosts中 http_auth=(ES_USER, ES_PASSWORD) ) try: # 插入ES核心逻辑 category_data = data.get('c', [{}]) category = lang = sourcename = "" if category_data: category = category_data[0].get('category', '') b_data = category_data[0].get('b', [{}]) if b_data: lang = b_data[0].get('lang', '') d_data = b_data[0].get('d', [{}]) if d_data: sourcename = d_data[0].get('sourcename', '') es.index( index="news_info", id=data['id'], body={ "news_id": data.get('id', ""), "input_date": data.get('input_date', ""), "words": data.get('words', ""), "title_txt": data.get('title_txt', ""), "key_word": data.get('key_word', ""), "CN_content": data.get('CN_content', ""), "EN_content": data.get('EN_content',0).replace('quot;', '"'), "URL": data.get('URL', ""), "abstract": data.get('abstract', ""), "title_EN": data.get('title_EN', ""), "category": category, "sourcename": sourcename, "lang": lang, "deleted": "0", "create_time":datetime.datetime.now(), "update_time": datetime.datetime.now(), "file_date": "", "file_name": "" } ) print(f"成功写入ES文档ID: {data['id']}") except Exception as e: print(f"写入ES失败: {str(e)}") def start_consumer(): """启动MQ消费者""" try: credentials = pika.PlainCredentials(mq_user, mq_password) connection = pika.BlockingConnection( pika.ConnectionParameters( host="localhost", credentials=credentials, heartbeat=60 ) ) channel = connection.channel() channel.exchange_declare( exchange="zzck_exchange", exchange_type="fanout", ) # 声明队列(匹配现有队列类型) queue 的名字可以自定义 res = channel.queue_declare( queue="to_es" ) mq_queue = res.method.queue channel.queue_bind( exchange="zzck_exchange", queue=mq_queue, ) # 启动消费 channel.basic_consume( queue=mq_queue, on_message_callback=message_callback, ) print("消费者已启动,等待消息...") channel.start_consuming() except Exception as e: print(f"消费者启动失败: {str(e)}") raise if __name__ == "__main__": start_consumer()