commit 9e62ca18853616642c326acef0cb69d478409783 Author: 朱思南 <15083356+zhu-sinan@user.noreply.gitee.com> Date: Wed Jun 11 10:49:55 2025 +0800 增加多线程版本代码,设置媒体影响力得分,修复bug diff --git a/config.py b/config.py new file mode 100644 index 0000000..97cd586 --- /dev/null +++ b/config.py @@ -0,0 +1,31 @@ +# 生产环境 +MYSQL_HOST_APP = '10.127.2.207' +MYSQL_PORT_APP = 3306 +MYSQL_USER_APP = 'financial_prod' +MYSQL_PASSWORD_APP = 'mmTFncqmDal5HLRGY0BV' +MYSQL_DB_APP = 'reference' + +# 测试环境 +#MYSQL_HOST_APP = '121.37.185.246' +#MYSQL_PORT_APP = 3306 +#MYSQL_USER_APP = 'root' +#MYSQL_PASSWORD_APP = 'Xgf_8000' +#MYSQL_DB_APP = 'reference' + +# 测试环境 +#jx_adr = "http://123.60.153.169:8040/admin/common/sync/news_id/WBysu6N1z26AbA12l" + +# 生产环境 +jx_adr = "http://10.127.2.205:13579/admin/common/sync/news_id/WBysu6N1z26AbA12l" + +mq_user = 'admin' +mq_password = 'Aa123456' + +ES_HOST = "localhost" # 替换为你的 Elasticsearch 主机地址,例如 "localhost" 或 "your_elasticsearch_host" +ES_PORT = 9200 +ES_USER = "elastic" +ES_PASSWORD = "ZxE,3VM@Thk0" + + + + diff --git a/llm_process.py b/llm_process.py new file mode 100644 index 0000000..eeef630 --- /dev/null +++ b/llm_process.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +from dashscope import Application +import pika +import json +import re +from config import * + +news1 = "越捷航空(VietJet Aviation JSC)已将空客SE宽体飞机的订单增加一倍,再订购20架a330neo,以支持其未来10年的扩张计划。根据该航空公司的一份声明,20架a330 -900neo飞机的新订单是在法国总统埃马纽埃尔·马克龙访问河内期间签署的。越捷航空董事长阮氏芳邵在声明中说:“我们对现代化、环保的机队的长期投资反映了我们加强越南和法国之间经济和技术联系的承诺。”这笔交易将使越捷积压的空客飞机数量达到近140架。该航空公司还订购了200架波音737 Max飞机。该公司去年订购了20架a330neo,并在新加坡航展上签署了初步协议。每架A330-900neo的标价为3.74亿美元,在不考虑行业惯例折扣的情况下,最新交易的价格为75亿美元。越捷航空拥有115架空客飞机,主要服务于国内航线和中国、泰国、韩国以及澳大利亚、印度和哈萨克斯坦等地区目的地。据彭博亿万富翁指数(Bloomberg Billionaires Index)显示,该航空公司的多数股权由Thao持有,他的身价约为15亿美元。然而,这家越南最大的私营航空公司正面临债权人的麻烦。上个月,该航空公司在伦敦高等法院的一起案件中败诉,并被命令支付约1.8亿美元,用于收购菲茨沃尔特资本有限公司的飞机租赁部门,以解决在新冠肺炎大流行期间开始的长达数年的法律纠纷。上周,越捷航空在上诉期间未能支付法院要求支付的三笔6,050万美元赔偿金中的第一笔。在诉讼过程中,法官对越捷航空无力支付FitzWalter Capital的问题提出了质疑,法官指出,越捷航空仍有能力签署协议,包括购买飞机。ZHXG" + +def get_label(news, source): + def _clean_text(text: str) -> str: + """清理文本,去除HTML标签和特殊字符""" + # 移除HTML标签 + text = re.sub(r'<[^>]+>', '', text) + # 移除特殊字符 + text = re.sub(r'[^\w\s\u4e00-\u9fff]', '', text) + return text.strip() + news = _clean_text(news) + if not news: + return None + biz_params = { + "news_source": source + } + """获取新闻的标签""" + try: + response = Application.call( + api_key="sk-23d05aac75f4458a94fc053d036de6eb", + app_id='024c0273115e4018a451a2393066e8d5',# 替换为实际的应用 ID + prompt=news, + biz_params=biz_params) + result_str = response.output.text.strip() + # 处理返回的字符串,去掉代码块标记 + if result_str.startswith('```json'): + result_str = result_str[7:] + if result_str.endswith('```'): + result_str = result_str[:-3] + result = json.loads(result_str.strip()) + return result if isinstance(result, dict) else {} + except Exception as e: + print(f"百炼工具流打标签失败: {str(e)}") + return None + +def send_mq(news): + # 注意这里的news是一个字典,而且必须包含id + if not isinstance(news, dict) or 'id' not in news: + raise ValueError("news must be a dictionary and contain 'id' key") + try: + # 连接 RabbitMQ + credentials = pika.PlainCredentials(mq_user, mq_password) + connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost', credentials=credentials, heartbeat=300) + ) + channel = connection.channel() + + channel.exchange_declare(exchange='zzck_llm_exchange', exchange_type='fanout') + + # 发送消息 + channel.basic_publish( exchange='zzck_llm_exchange', + routing_key='', + body=json.dumps(news), + properties=pika.BasicProperties( + delivery_mode=2, # 确保消息持久化 + ) + ) + connection.close() + except Exception as e: + print(f"发送消息到MQ失败: {str(e)}") + return + + +if __name__ == "__main__": + # 测试 + print("测试新闻1:") + print(get_label(news1, "彭博社")) diff --git a/main.py b/main.py new file mode 100644 index 0000000..aa93188 --- /dev/null +++ b/main.py @@ -0,0 +1,249 @@ +import subprocess +from bs4 import BeautifulSoup +import datetime +import os +from urllib.parse import urlparse, urljoin +import time +import pymysql +from pathlib import Path +import logging +from apscheduler.schedulers.blocking import BlockingScheduler +from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR +import json +import re +from config import * # 导入配置 +from mqsend import send_message # 导入消息发送函数 +from elasticsearch import Elasticsearch + + +def clean_control_characters(text): + return re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text) + +def do_work(target): + # 创建data目录 + os.makedirs('data', exist_ok=True) + + today = datetime.datetime.today().strftime('%Y-%m-%d') + new_target = f"{target}/{today}/" + try: + sub_result = subprocess.run(['curl', new_target], capture_output=True, text=True) + + if sub_result.returncode == 0: + soup = BeautifulSoup(sub_result.stdout, 'html.parser') + links = soup.find_all('a', href=True) + + for link in links: + href = link['href'] + if href.startswith('/'): + + file_url = f"{target}/{urljoin(new_target, href)}" + # 生成本地保存路径 + local_path = os.path.join('data', href.lstrip('/')) # 添加日期目录层级 + # 创建目录结构 + os.makedirs(os.path.dirname(local_path), exist_ok=True) + # 下载文件 + if not os.path.exists(local_path): + logger.info(f"文件不存在,准备下载: {file_url}") + download_cmd = ['curl', '-s', '-o', local_path, file_url] + dl_result = subprocess.run(download_cmd) + if dl_result.returncode == 0: + logger.info(f"下载成功: {local_path}") + # 同步到MySQL + sync_to_database(local_path) + else: + logger.error(f"下载失败: {file_url}") + + else: + logger.warning(f"忽略非相对路径: {href}") + else: + logger.error(f"获取失败: {sub_result.stderr}") + except Exception as e: + logger.error(f"处理 {new_target} 时发生未知错误: {e}") + + +def write_to_mysql(data_list, local_path): + conn = pymysql.connect( + host=MYSQL_HOST_APP, + port=MYSQL_PORT_APP, + user=MYSQL_USER_APP, + password=MYSQL_PASSWORD_APP, + db=MYSQL_DB_APP, + charset='utf8mb4' + ) + + try: + with conn.cursor() as cursor: + for data in data_list: + # 新增JSON结构解析逻辑 + category_data = data.get('c', [{}]) + category = lang = sourcename = "" + if category_data: + category = category_data[0].get('category', '') + b_data = category_data[0].get('b', [{}]) + if b_data: + lang = b_data[0].get('lang', '') + d_data = b_data[0].get('d', [{}]) + if d_data: + sourcename = d_data[0].get('sourcename', '') + + # 处理特殊字符转义 + en_content = data.get('EN_content',0).replace('quot;', '"') + + # 修改后的SQL语句 + sql = """INSERT INTO news_info + (news_id, input_date, words, title_txt, key_word, + CN_content, EN_content, URL, abstract, title_EN, + category, sourcename, lang, deleted, create_time, + update_time, file_date, file_name) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + ON DUPLICATE KEY UPDATE + category=VALUES(category), + sourcename=VALUES(sourcename), + lang=VALUES(lang)""" + + values = ( + data['id'], + data.get('input_date', ""), + data.get('words', ""), + data.get('title_txt', ""), + data.get('key_word', ""), + data.get('CN_content', ""), + en_content, # 使用处理后的英文内容 + data.get('URL', ""), + data.get('abstract', ""), + data.get('title_EN', ""), + category, + sourcename, + lang, + 0, + datetime.datetime.now(), + datetime.datetime.now(), + datetime.datetime.strptime(Path(local_path).parent.name, '%Y-%m-%d').date(), + Path(local_path).name + ) + cursor.execute(sql, values) + logger.info(f"成功写入mysql: {data['id']}") + + # + + # 发送消息到MQ + send_message(json.dumps(data)) + conn.commit() + finally: + conn.close() + return True + +# 写入ES +def write_to_es(data_list, local_path): + # 初始化ES连接(添加在文件顶部) + es = Elasticsearch( + [f"http://{ES_HOST}:{ES_PORT}"], # 将协议直接包含在hosts中 + basic_auth=(ES_USER, ES_PASSWORD) + ) + try: + for data in data_list: + # 插入ES核心逻辑 + category_data = data.get('c', [{}]) + category = lang = sourcename = "" + if category_data: + category = category_data[0].get('category', '') + b_data = category_data[0].get('b', [{}]) + if b_data: + lang = b_data[0].get('lang', '') + d_data = b_data[0].get('d', [{}]) + if d_data: + sourcename = d_data[0].get('sourcename', '') + + es.index( + index="news_info", + id=data['id'], + document={ + "news_id": data.get('id', ""), + "input_date": data.get('input_date', ""), + "words": data.get('words', ""), + "title_txt": data.get('title_txt', ""), + "key_word": data.get('key_word', ""), + "CN_content": data.get('CN_content', ""), + "EN_content": data.get('EN_content',0).replace('quot;', '"'), + "URL": data.get('URL', ""), + "abstract": data.get('abstract', ""), + "title_EN": data.get('title_EN', ""), + "category": category, + "sourcename": sourcename, + "lang": lang, + "deleted": "0", + "create_time":datetime.datetime.now(), + "update_time": datetime.datetime.now(), + "file_date": datetime.datetime.strptime(Path(local_path).parent.name, '%Y-%m-%d').date(), + "file_name": Path(local_path).name + } + ) + logger.info(f"成功写入ES文档ID: {data['id']}") + + except Exception as e: + logger.error(f"写入ES失败: {str(e)}") + +def sync_to_database(local_path): + try: + with open(local_path, 'r', encoding='utf-16') as f: # 修改编码方式 + raw_data = f.read() + cleaned_data = clean_control_characters(raw_data) + data_list = json.loads(cleaned_data) + write_to_mysql(data_list, local_path) # 传递本地路径 + write_to_es(data_list, local_path) # 传递本地路径 + + return True + + except Exception as e: + logger.error(f"处理文件 {local_path} 时发生未知错误: {e}") + return False + + + + +# 初始化日志配置(添加在文件顶部) +logger = logging.getLogger() +logger.setLevel(logging.INFO) +formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + +# 文件日志 +file_handler = logging.FileHandler('zzck_sync.log') +file_handler.setFormatter(formatter) +logger.addHandler(file_handler) + +# 控制台日志 +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) +logger.addHandler(console_handler) + +# 调度任务监听器(添加在job函数后) +def scheduler_listener(event): + if event.exception: + logger.error("任务执行失败!", exc_info=event.exception) + else: + logger.info("任务执行完成") + +if __name__ == "__main__": + scheduler = BlockingScheduler() + target = "zzbtw.ckxx.net" # 确保target变量在此处定义 + + # 修改调度任务添加方式 + scheduler.add_job( + do_work, + 'interval', + minutes=1, + id='zzck_sync_job', + args=[target], # 添加参数传递 + max_instances=1 + ) + + # 添加事件监听 + scheduler.add_listener(scheduler_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) + + try: + logger.info("程序启动,调度器开始运行...") + scheduler.start() + except (KeyboardInterrupt, SystemExit): + logger.info("收到停止信号,正在关闭调度器...") + scheduler.shutdown() + logger.info("调度器已关闭") diff --git a/media_score.txt b/media_score.txt new file mode 100644 index 0000000..b52a1f3 --- /dev/null +++ b/media_score.txt @@ -0,0 +1,613 @@ +美联社 998 +路透社 998 +纽约时报 995 +法新社 995 +英国广播公司 990 +华尔街日报 990 +金融时报 985 +华盛顿邮报 970 +经济学人 970 +共同社中文网 970 +日-共同社 970 +德-今日新闻 965 +日-日本广播协会 960 +“自然”杂志 960 +彭博社 960 +德-德国电视二台 950 +联合国官网 950 +法-世界报(法国) 950 +西-埃菲社 950 +半岛电视台 940 +日-日本经济新闻 940 +澳大利亚广播公司 940 +加拿大广播公司 940 +韩联社 930 +华尔街日报中文网 930 +德-明镜(德国) 930 +德-南德意志报 930 +德-法兰克福汇报 920 +俄-塔斯社 920 +塔通社 920 +洛杉矶时报 910 +国际货币基金组织 910 +德-时代周刊(德国) 910 +世界银行 900 +布鲁金斯学会 900 +日-读卖新闻 900 +安莎通讯社 900 +《外交》双月刊 900 +卫报(英国) 900 +哥伦比亚广播公司新闻部 890 +法-法国国际广播电台 890 +NBC新闻网 890 +日-朝日新闻 890 +ABC新闻 890 +泰晤士报 890 +西-国家报(西班牙) 880 +外交关系协会 880 +德-德国之声广播电台 880 +印度报业托拉斯 880 +爱尔兰广播电视台 880 +法-费加罗报 870 +“法国24小时”网站 870 +德-商报(德国) 870 +时事通讯社(日本) 870 +环球邮报 870 +政治报(美国) 860 +日-每日新闻(日本) 860 +卡内基国际和平研究院 860 +海峡时报 860 +德-新苏黎世报 860 +葡-圣保罗报 860 +西-世界报(西班牙) 850 +战略与国际问题研究中心(美国) 850 +全国公共广播电台 850 +新华网(外语) 850 +葡-环球在线网站(巴西) 850 +悉尼先驱晨报 850 +哈佛商业评论 850 +有线电视新闻网 830 +南华早报 830 +兰德公司 830 +大西洋月刊 820 +波士顿环球报 820 +世界贸易组织 820 +每日电讯报(英国) 820 +韩-中央日报 820 +阿纳多卢通讯社 800 +时代周刊(美国) 800 +法-回声报(法国) 800 +法-法国新闻电台 800 +葡-环球新闻电视频道(巴西) 800 +世界经济论坛 800 +外交政策(美国) 800 +韩-朝鲜日报 800 +印度教徒报 800 +法-新闻报(加拿大) 800 +德-世界报(德国) 800 +消费者新闻与商业频道 790 +澳大利亚金融评论报 790 +马新社 790 +西-先锋报(西班牙) 790 +“天空”网站 780 +韩-KBS新闻 780 +西-号角报(阿根廷) 780 +越通社 780 +印度时报 780 +时代报(澳) 770 +韩国中央日报中文网 770 +芝加哥论坛报 760 +安塔拉通讯社 760 +连线(美国) 750 +爱尔兰时报 750 +沙特通讯社 750 +“财富”杂志 750 +亚洲新闻台 750 +亚洲开发银行 750 +法-法兰西西部报 750 +印度快报 750 +法-观点(法国) 730 +西-阿贝赛报 720 +法-快报(法国) 720 +澳大利亚人报 720 +福布斯 720 +临界点(美) 720 +日-TBS电视台 700 +法-新观察家(法国) 700 +新西兰先驱报 700 +东亚日报 700 +大西洋理事会 700 +联合早报 700 +法-商业调频电视台 700 +欧洲新闻台 700 +独立报(英国) 700 +国会山报 700 +麦肯锡全球研究院 700 +多伦多星报 700 +技术纪事网 700 +今日印度 700 +阿克西奥斯新闻网 700 +法-法国欧洲第一广播电台 680 +纽约杂志 680 +葡-公众日报(葡萄牙) 680 +韩-韩国日报 680 +商业时报(新加坡) 680 +每日新闻报(瑞士) 680 +福布斯中文网 680 +法-晚报(比利时) 680 +世界报业辛迪加 680 +法-巴黎人报 650 +德-每日镜报 650 +德-新闻报(奥地利) 650 +今日美国 650 +“对话”网站(澳大利亚) 650 +俄-俄新社 650 +卡塔尔新闻社 600 +波士顿新闻网 600 +巴基斯坦联合通讯社 600 +黎明报 600 +韩-韩民族日报 580 +德-焦点(德国) 580 +星洲日报(马来) 580 +雅加达邮报 580 +外交学者 580 +全国邮报 580 +印度商界报 580 +商业旗帜报 580 +韩-京乡新闻 560 +日-产经新闻 550 +瘾科技 550 +菲律宾每日询问者报 550 +胡佛研究所 550 +欧盟委员会 550 +俄-国际文传电讯社(俄罗斯) 550 +法-时报(瑞士) 550 +市场观察网站 550 +西-宇宙报(墨西哥) 550 +MSNBC新闻 550 +韩国先驱报 550 +论坛快报 550 +曼谷邮报 530 +新美国组织 530 +西-机密报 530 +国际新闻(巴基斯坦) 530 +韩国时报 530 +赫芬顿邮报 520 +石英财经网站 520 +日-东洋经济网站 520 +金融快报(印度) 520 +菲律宾星报 520 +耶路撒冷邮报 520 +法-青年非洲(法国) 520 +法-十字架报(法国) 520 +海湾新闻(阿联酋) 520 +联合国减少灾害风险办公室 500 +日本时报 500 +基督教科学箴言报 500 +明特报 500 +新闻周刊 500 +民族日报(肯尼亚) 500 +法-蒙特利尔日报 500 +布鲁盖尔研究所 500 +日-东京新闻 500 +《时代》周刊(印尼) 500 +西-每日新闻网站(西班牙) 500 +法-自由比利时报 500 +西-日报(西班牙) 500 +新美国安全中心 500 +法-日内瓦论坛报 500 +德-新闻电视频道 500 +旁观者(英) 500 +俄-生意人报 500 +德-法兰克福评论报 500 +南洋理工大学拉惹勒南国际研究院 490 +商业内幕网站(美国) 480 +法-星期日报 480 +西-千年报(墨西哥) 480 +法-论坛报(法国) 480 +圣何塞信使新闻(美国) 480 +墨卡托中国研究所 480 +美国新闻中心 480 +渥太华公民报 480 +尤索夫伊萨东南亚研究院 480 +西-欧洲新闻社 480 +科学新闻网站 480 +抨击报(尼日利亚) 480 +西-商报(秘鲁) 480 +西-加泰罗尼亚报 480 +点名 480 +每日报(希腊) 480 +星报(马来西亚) 460 +旗帜报(肯尼亚) 460 +西-至上报(墨西哥) 460 +英国发展研究所 460 +亚洲时报 460 +自由时报 460 +印度斯坦时报 450 +德-明星(德国) 450 +洛伊研究所 450 +航空周刊 450 +卫报(尼日利亚) 450 +国家利益 450 +台北时报 450 +韩-文化日报(韩国) 450 +德-“侧面”杂志 450 +海外发展研究所 450 +中时新闻网 450 +西-西班牙人报 450 +日-钻石周刊 450 +东非人报 450 +葡-观察者报(葡萄牙) 450 +“货币控制”网站 450 +西-布宜诺斯艾利斯经济新闻网 430 +环球报(以色列) 430 +海湾时报 430 +印度展望网站 430 +集锦报 430 +西-经济学家报(西班牙) 430 +法-队报(法国) 420 +国家安全研究所(以色列) 420 +西-第三版时报(智利) 420 +法-巴黎竞赛画报 420 +新海峡时报 420 +“印刷”网站 420 +俄-导报(俄罗斯) 420 +美国进步研究中心 420 +堪培拉时报 420 +法-国际报刊 400 +国民报(阿联酋) 400 +德-德国日报 400 +俄-俄罗斯商业咨询日报 400 +GMA电视网 400 +纽约每日新闻 400 +德国马歇尔基金会 400 +法-玛丽安娜 400 +俄-新报 400 +VietnamPlus新闻网 400 +亚洲技术网 400 +布里斯班时报 400 +德干先驱报 400 +法-自由南方报 400 +“战争地带”网站 400 +韩-国民日报(韩国) 400 +美国白宫 400 +韩-首尔新闻 400 +德-《资本》月刊 400 +法-法国国际关系与战略研究所 390 +国际可持续发展研究所 390 +西-皇家埃尔卡诺研究所 390 +全球能源政策中心 390 +一周周刊(美国) 380 +观察家研究基金会 380 +德-商报(瑞士) 380 +阿拉伯新闻网 380 +德-德国外交关系协会 380 +国际发展中心 380 +非洲首页报 380 +全非新闻网 380 +韩-韩国世宗研究所 380 +省报(加拿大) 380 +民族报(泰国) 380 +联合国大学 380 +“内幕新闻”网站 380 +日-现代商业网站 380 +加德满都邮报 380 +全球发展中心 380 +国际环境与发展研究所 370 +传统基金会 360 +莱斯大学贝克公共政策研究所 360 +东亚论坛 360 +马尼拉时报 360 +政治家报(印度) 360 +海湾时报(卡塔尔) 360 +电子时报(台湾) 360 +澳大利亚国际事务研究所 360 +英为财情 350 +英国经济政策研究中心 350 +《国家报》(巴基斯坦) 350 +日本商业新闻网站 350 +乌克兰通讯社 350 +葡-“权力360”新闻网站(巴西) 350 +一周周刊(印度) 350 +马来邮报(马来西亚) 350 +预算与政策优先中心 350 +新景报 350 +西-宇宙报(厄瓜多尔) 350 +布宜诺斯艾利斯时报 350 +德干纪事报 350 +法-科学与生活(法国) 350 +德-柏林晨邮报 350 +亚洲协会 350 +亚洲经济(韩国) 350 +德-慕尼黑信使报 330 +先锋报(印度) 320 +西-公众报(西班牙) 320 +华盛顿月刊 320 +自由日报(土耳其) 320 +西-金融家报(墨西哥) 300 +商业纪录报 300 +天才男孩报道网站 300 +法-进步报(法国) 300 +合众社 300 +每日邮报(英) 300 +德-维也纳日报 300 +德-西塞罗在线 300 +德-皇冠报(奥地利) 300 +中国日报 300 +“嗡嗡喂”网站 300 +韩-世界日报(韩国) 300 +星期日旗帜报 300 +菲律宾通讯社 300 +大众科学(美国) 300 +俄-哈萨克国际通讯社 290 +美国研究中心 290 +欧洲政策中心 290 +肯尼亚通讯社 280 +全国经济研究所 280 +《国际财经日报》(美) 280 +俄-莫斯科共青团员报 280 +俄-独立报(俄罗斯) 280 +加利福尼亚公共政策研究所 280 +昆色尔 280 +全国经济与社会研究所 280 +南非国际事务研究所 280 +华盛顿时报 280 +俄-俄罗斯连塔网 280 +阿基新闻社 280 +星期日邮报(英国) 280 +“雷迪夫”网站 280 +贺维研究所 280 +麦克唐纳—劳里尔研究所 270 +印度国际经济关系研究会 270 +曼哈顿政策研究所 260 +西-宇宙报(委内瑞拉) 260 +印度世界事务委员会 260 +落基山研究所 260 +阿塞拜疆国家新闻社 260 +俄-消息报(俄罗斯) 260 +纽约邮报 260 +全球政策 260 +日-经济产业研究所 260 +国际财经日报(英国) 260 +越南新闻报 250 +马诺哈尔·帕里卡尔国防分析研究所 250 +美国商务部 250 +布鲁塞尔时报 250 +俄-论据与事实 250 +福克斯新闻网 250 +美国MSN网站 250 +日-发展中国家经济研究所 250 +约旦时报 250 +爱沙尼亚公共广播公司 250 +俄-亚美尼亚通讯社 250 +镜报(英国) 250 +进步政策研究所 250 +太阳日报(马来西亚) 250 +阿卜杜拉国王石油研究中心 250 +全球主义者 250 +俄-“专家”杂志 250 +星条旗报 240 +巴基斯坦观察家报 240 +俄-白通社 240 +亚洲时代(印度) 240 +新经济思维研究所 240 +阿曼时报 240 +西-彩色ABC 240 +可持续发展与国际关系研究所 240 +海湾日报 240 +金边邮报 230 +公民报(坦桑尼亚) 230 +欧洲之友协会 230 +德-星期五 230 +石油价格网站 230 +每日时报(巴基斯坦) 230 +“尼泊尔在线”新闻网站 230 +埃及每日新闻报 230 +共同梦想 230 +美国信息技术与创新基金会 220 +金融快报(孟加拉国) 220 +拉美社 220 +今日巴基斯坦报 220 +科威特时报 220 +西-新闻报(巴拿马) 220 +西-拉美社 220 +冲绳时报 220 +非洲经济转型中心 220 +牙买加观察家报 220 +俄-俄罗斯日报 220 +俄-俄罗斯报 220 +法-最近一小时报 200 +世界新闻网 200 +日-日本首相官邸 200 +朝鲜新闻网 200 +马来亚商业洞察 200 +福克斯商业新闻电视台 200 +环球电讯社(美国) 200 +矿业信息网 200 +加纳头条网 200 +俄-共青团真理报 200 +人民报(越南) 200 +科技时代网 200 +安第斯通讯社 200 +《契约》杂志 200 +可持续发展政策研究所 200 +里贾纳先驱邮报 200 +气候与能源解决方案中心 190 +趋势通讯社 190 +马斯喀特日报 190 +詹姆斯敦基金会 190 +星期五时报(巴基斯坦) 190 +商业与金融时报 190 +约旦新闻报 190 +印度防务评论 190 +《美国保守派》双月刊 190 +今日埃及 190 +非洲经济研究联盟 180 +华盛顿观察家报 180 +晨报(冰岛) 180 +俄-世界经济与国际关系 180 +斯塔布鲁克报 180 +葡-巴西247新闻网 180 +亚洲防务评论 180 +美国司法部 180 +阿曼观察家报 180 +“罗博报告”网站 180 +“本地”新闻网(瑞典) 180 +蒙古通讯社 180 +韩-韩国科学技术企划评价院 180 +每日快报 180 +埃塞俄比亚通讯社 180 +新西兰倡议组织 170 +每日新闻(坦桑尼亚) 170 +今日新闻报(塔吉) 170 +俄-“亚洲快讯”通讯社(塔吉) 170 +俄-哈萨克斯坦今日通讯社 170 +俄-列格努姆通讯社 170 +卢萨卡时报 170 +俄-乌兹别克斯坦通讯社 170 +法尔斯通讯社 160 +俄-“侧面”周刊(俄罗斯) 160 +新经济基金会 160 +金融镜报 160 +亚当·斯密研究所 160 +文莱第一新闻网 160 +老挝新闻网 160 +巴勒斯坦通讯社 160 +西-自由与发展研究中心 160 +西-马卡报 150 +莫斯科时报 150 +“电信大全”网站 150 +弗里德里希·瑙曼基金会 150 +巴拿马美洲报 150 +欧亚时报网站 150 +美国在线公司 150 +德-新德意志报 150 +伊马尼政策与教育中心 150 +南非政府新闻署 150 +俄-哈萨克斯坦实业报 150 +人民军队报(越南) 150 +事实(乌克兰) 150 +古巴新闻社 150 +德黑兰时报 140 +独立研究所 140 +现代外交 140 +新兴尼泊尔报 140 +富士晚刊 140 +独立报(马耳他) 140 +赞比亚时报 140 +全球建设评论 140 +国民报(巴新) 140 +法-博爱晨报 140 +哥本哈根邮报 140 +新闻报(洪都拉斯) 130 +国际日报(印尼) 130 +俄-乌拉网 130 +今日格鲁吉亚 130 +捍卫公众利益组织 130 +信使邮报(巴布亚) 130 +“进步”组织 130 +边疆邮报(巴基斯坦) 130 +赫尔辛基时报 130 +波罗的海时报 130 +法-新闻报(突尼斯) 130 +“荷兰新闻”网站 120 +荷兰时报 120 +国家法律评论 120 +法-马赛曲报 120 +西-最新消息报(委内瑞拉) 120 +“珍珠与刺激”网站 120 +公平观察者 120 +维韦卡南达国际基金会 120 +太平洋研究学会 110 +泰格新闻网 110 +以色列全国新闻网站 110 +印度叙事网站 110 +“反击”杂志 110 +阿斯塔纳时报 100 +“环法视野”网站 100 +路德维希·冯·米塞斯研究所 100 +经济教育基金会 100 +柬中时报 100 +阿克顿宗教与自由研究所 100 +竞争企业协会 100 +俄-国际事务(俄罗斯) 100 +喀麦隆论坛报 100 +俄-哈萨克斯坦真理报 100 +市政厅网站(美国) 100 +《Wedge》月刊电子版 100 +南亚杂志 90 +俄-劳动报(俄罗斯) 90 +克罗地亚综合新闻 90 +自由市场基金会 90 +法-信使报 90 +斯洛文尼亚时报 90 +万象时报 90 +伊拉克财经新闻 90 +“Scheerpost”网站 90 +地拉那时报 90 +俄-“自由媒体”网站 90 +俄-阿维斯塔通讯社(塔吉) 90 +华沙之声 80 +俄-详实网(乌兹) 80 +国民先驱报(印) 80 +联邦政府纪事 80 +圭亚那纪事报 80 +老挝时报 80 +法-现代价值 80 +每日新闻(博茨瓦纳) 80 +巴拿马新闻中心 80 +《环球时报》英文版 80 +乌兰巴托邮报 80 +哥斯达黎加新闻网 80 +松迪亚塔邮报 80 +俄-议会报 80 +塔斯尼姆通讯社 80 +葡-“其他声音”网站(巴西) 70 +斐济太阳报 70 +俄-大篷车 70 +哈特兰研究所 70 +欧洲标准化委员会 70 +俄-时事评论网 70 +”新闻是我的事“网站 70 +俄-观点报 70 +叙利亚通讯社 70 +PSM新闻 60 +俄-今日白俄罗斯报 60 +阿尔及利亚时报 60 +缅甸通讯社 60 +科布登中心 60 +俄-俄罗斯百年网 60 +论点 60 +东方真理报(乌兹) 60 +俄-军事观察网 60 +俄-欧亚专家网 60 +金融监管局 60 +乌兹别克斯坦日报 60 +法-埃及进步报 60 +缅甸环球新光报 50 +俄-晨报 50 +文莱新闻网 50 +发展报(马里) 50 +阿尔加维每日新闻网站 50 +多哥新闻报 50 +先锋报(尼日) 50 +不结盟运动新闻网 50 +革新 50 +世界中的巴基斯坦 40 +俄-红星电视台 40 +凤凰星报 40 +美国之音中文网 40 +美国民主党全国委员会 30 +俄-今日俄罗斯电视台 30 +西-格拉玛报 30 +俄-东方网(土库) 30 +世界社会主义网站 30 +参议院民主党 30 +自由亚洲电台(美) 25 +俄罗斯卫星通讯社中文网 25 +卫星通讯社 25 +喀布尔时报 20 +德-我们的时代 20 +布赖特巴特新闻网 20 +推特 10 +其他 5 diff --git a/mqreceive.py b/mqreceive.py new file mode 100644 index 0000000..99f13f1 --- /dev/null +++ b/mqreceive.py @@ -0,0 +1,178 @@ +import pika +import json +import logging +import time +import os +from config import * +from llm_process import send_mq, get_label + +# 声明一个全局变量,存媒体的权威度打分 +media_score = {} +with open("media_score.txt", "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + media, score = line.split("\t") + media_score[media.strip()] = int(score) + except ValueError as e: + print(f"解析错误: {e},行内容: {line}") + continue + +# 幂等性存储 - 记录已处理消息ID +processed_ids = set() + +def message_callback(ch, method, properties, body): + """消息处理回调函数""" + + + try: + start_time = time.time() + data = json.loads(body) + id_str = str(data["id"]) + input_date = data["input_date"] + print(id_str, input_date) + + # ch.basic_ack(delivery_tag=method.delivery_tag) + # print(f"接收到消息: {id_str}") + # return + + # 幂等性检查:如果消息已处理过,直接确认并跳过 + if id_str in processed_ids: + print(f"跳过已处理的消息: {id_str}") + ch.basic_ack(delivery_tag=method.delivery_tag) + return + + # 在此处添加业务处理逻辑 + content = data.get('CN_content', "").strip() + source = "其他" + category_data = data.get('c', [{}]) + category = "" + if category_data: + category = category_data[0].get('category', '') + b_data = category_data[0].get('b', [{}]) + if b_data: + d_data = b_data[0].get('d', [{}]) + if d_data: + source = d_data[0].get('sourcename', "其他") + source_impact = media_score.get(source, 5) + tagged_news = get_label(content, source) + public_opinion_score = tagged_news.get("public_opinion_score", 30) #资讯质量分 + China_factor = tagged_news.get("China_factor", 0.2) #中国股市相关度 + news_score = source_impact * 0.04 + public_opinion_score * 0.25 + China_factor * 35 + news_score = round(news_score, 2) + #如果想让分数整体偏高可以开根号乘10 + #news_score = round((news_score**0.5) * 10.0, 2) + + industry_confidence = tagged_news.get("industry_confidence", []) + industry_score = list(map(lambda x: round(x * news_score, 2), industry_confidence)) + concept_confidence = tagged_news.get("concept_confidence", []) + concept_score = list(map(lambda x: round(x * news_score, 2), concept_confidence)) + + tagged_news["source"] = source + tagged_news["source_impact"] = source_impact + tagged_news["industry_score"] = industry_score + tagged_news["concept_score"] = concept_score + tagged_news["news_score"] = news_score + tagged_news["id"] = id_str + + print(json.dumps(tagged_news, ensure_ascii=False)) + + # 发送百炼大模型标注过的新闻json到队列 + send_mq(tagged_news) + + # 处理成功后记录消息ID + processed_ids.add(id_str) + if len(processed_ids) > 10000: + processed_ids.clear() + + # 手动确认消息 + ch.basic_ack(delivery_tag=method.delivery_tag) + duration = time.time() - start_time + print(id_str + "\t" + "duration: {:.3f}".format(duration)) + except Exception as e: + print(f"消息处理失败: {str(e)}") + # 拒绝消息, 不重新入队 + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) + +def create_connection(): + """创建并返回RabbitMQ连接""" + credentials = pika.PlainCredentials(mq_user, mq_password) + return pika.BlockingConnection( + pika.ConnectionParameters( + host="localhost", + credentials=credentials, + heartbeat=600, + connection_attempts=3, + retry_delay=5 # 重试延迟5秒 + ) + ) + +def start_consumer(): + """启动MQ消费者""" + while True: # 使用循环而不是递归,避免递归深度问题 + try: + connection = create_connection() + channel = connection.channel() + + # 设置QoS,限制每次只取一条消息 + channel.basic_qos(prefetch_count=50) + + channel.exchange_declare( + exchange="zzck_exchange", + exchange_type="fanout" + #durable=True # 确保交换器持久化 + ) + + # 声明持久化队列 + res = channel.queue_declare( + queue="to_ai" + # durable=True # 队列持久化 + ) + + mq_queue = res.method.queue + channel.queue_bind( + exchange="zzck_exchange", + queue=mq_queue, + ) + + # 启动消费,关闭自动ACK + channel.basic_consume( + queue=mq_queue, + on_message_callback=message_callback, + auto_ack=False # 关闭自动确认 + ) + + print("消费者已启动,等待消息...") + channel.start_consuming() + + except pika.exceptions.ConnectionClosedByBroker: + # 代理主动关闭连接,可能是临时错误 + print("连接被代理关闭,将在5秒后重试...") + time.sleep(5) + except pika.exceptions.AMQPConnectionError: + # 连接错误 + print("连接失败,将在10秒后重试...") + time.sleep(10) + except KeyboardInterrupt: + print("消费者被用户中断") + try: + if connection and connection.is_open: + connection.close() + except: + pass + break + except Exception as e: + print(f"消费者异常: {str(e)}") + print("将在15秒后重试...") + time.sleep(15) + finally: + try: + if connection and connection.is_open: + connection.close() + except: + pass + +if __name__ == "__main__": + start_consumer() diff --git a/mqreceive_multithread.py b/mqreceive_multithread.py new file mode 100644 index 0000000..a4dd016 --- /dev/null +++ b/mqreceive_multithread.py @@ -0,0 +1,264 @@ +import pika +import json +import logging +import time +import os +import threading +from concurrent.futures import ThreadPoolExecutor +from queue import Queue +from config import * +from llm_process import send_mq, get_label + +# 声明一个全局变量,存媒体的权威度打分 +media_score = {} +with open("media_score.txt", "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + media, score = line.split("\t") + media_score[media.strip()] = int(score) + except ValueError as e: + print(f"解析错误: {e},行内容: {line}") + continue + +# 幂等性存储 - 记录已处理消息ID (使用线程安全的集合) +processed_ids = set() +processed_ids_lock = threading.Lock() # 用于同步对processed_ids的访问 + +# 创建消息队列用于批量处理 +message_queue = Queue() +BATCH_SIZE = 24 # 每批处理的消息数量 +MAX_WORKERS = 24 # 线程池最大工作线程数 +MIN_BATCH_SIZE = 12 # 最小批量处理消息数量 +PROCESS_INTERVAL = 10 # 处理间隔(秒) + +def process_single_message(data): + """处理单条消息的业务逻辑""" + try: + id_str = str(data["id"]) + input_date = data["input_date"] + # print(id_str + "\t" + str(input_date)) + + # 幂等性检查 + with processed_ids_lock: + if id_str in processed_ids: + print(f"跳过已处理的消息: {id_str}") + return None, True # 返回None表示不需要发送,True表示已处理 + # 先标记为已处理,防止重复 + processed_ids.add(id_str) + if len(processed_ids) > 10000: + processed_ids.clear() + + content = data.get('CN_content', "").strip() + source = "其他" + category_data = data.get('c', [{}]) + category = "" + if category_data: + category = category_data[0].get('category', '') + b_data = category_data[0].get('b', [{}]) + if b_data: + d_data = b_data[0].get('d', [{}]) + if d_data: + source = d_data[0].get('sourcename', "其他") + source_impact = media_score.get(source, 5) + tagged_news = get_label(content, source) + public_opinion_score = tagged_news.get("public_opinion_score", 30) #资讯质量分 + China_factor = tagged_news.get("China_factor", 0.2) #中国股市相关度 + news_score = source_impact * 0.04 + public_opinion_score * 0.25 + China_factor * 35 + news_score = round(news_score, 2) + + industry_confidence = tagged_news.get("industry_confidence", []) + industry_score = list(map(lambda x: round(x * news_score, 2), industry_confidence)) + concept_confidence = tagged_news.get("concept_confidence", []) + concept_score = list(map(lambda x: round(x * news_score, 2), concept_confidence)) + + # 确保最终展示的分数是两位小数 + industry_confidence = list(map(lambda x: round(x, 2), industry_confidence)) + concept_confidence = list(map(lambda x: round(x, 2), concept_confidence)) + + tagged_news["source"] = source + tagged_news["source_impact"] = source_impact + tagged_news["industry_score"] = industry_score + tagged_news["concept_score"] = concept_score + tagged_news["news_score"] = news_score + tagged_news["id"] = id_str + + #print(json.dumps(tagged_news, ensure_ascii=False)) + print(tagged_news["id"], tagged_news["title"], tagged_news["news_score"], tagged_news["industry_label"], input_date) + return tagged_news, True + + except Exception as e: + print(f"处理消息时出错: {str(e)}") + # 处理失败,从已处理集合中移除 + with processed_ids_lock: + if id_str in processed_ids: + processed_ids.remove(id_str) + return None, False + +def process_message_batch(batch): + start_time = time.time() + """并行处理一批消息""" + results = [] + # 使用线程池并行处理 + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + futures = [] + for data in batch: + futures.append(executor.submit(process_single_message, data)) + + for future in futures: + try: + result, success = future.result() + if result: + results.append(result) + except Exception as e: + print(f"处理消息时发生异常: {str(e)}") + + # 发送处理结果到MQ + for result in results: + try: + send_mq(result) + except Exception as e: + print(f"发送消息到MQ失败: {str(e)}") + + duration = time.time() - start_time + print(f"批量处理 {len(batch)} 条消息, 耗时: {duration:.2f}s, " + f"平均: {duration/len(batch):.3f}s/条") + + +def message_callback(ch, method, properties, body): + """消息处理回调函数(只负责入队)""" + try: + data = json.loads(body) + # 将消息和delivery_tag一起放入队列 + message_queue.put((data, method.delivery_tag)) + except Exception as e: + print(f"消息处理失败: {str(e)}") + # 拒绝消息, 不重新入队 + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) + +def create_connection(): + """创建并返回RabbitMQ连接""" + credentials = pika.PlainCredentials(mq_user, mq_password) + return pika.BlockingConnection( + pika.ConnectionParameters( + host="localhost", + credentials=credentials, + heartbeat=600, + connection_attempts=3, + retry_delay=5 # 重试延迟5秒 + ) + ) + +def start_consumer(): + """启动MQ消费者(批量版本)""" + while True: + try: + connection = create_connection() + channel = connection.channel() + + # 设置QoS,一次预取足够数量的消息 + channel.basic_qos(prefetch_count=BATCH_SIZE * 3) + + channel.exchange_declare( + exchange="zzck_exchange", + exchange_type="fanout" + ) + + # 声明队列 + res = channel.queue_declare(queue="to_ai") + # res = channel.queue_declare(queue='', exclusive=True) + mq_queue = res.method.queue + channel.queue_bind( + exchange="zzck_exchange", + queue=mq_queue, + ) + + # 启动消费,关闭自动ACK + channel.basic_consume( + queue=mq_queue, + on_message_callback=message_callback, + auto_ack=False + ) + + print(f"消费者已启动,批量大小: {BATCH_SIZE}, 工作线程: {MAX_WORKERS}, 等待消息...") + + last_process_time = time.time() + # 主循环 + while True: + # 处理网络事件 + connection.process_data_events(time_limit=0.1) # 非阻塞处理 + + current_time = time.time() + + queue_size = message_queue.qsize() + # 双重触发机制:达到批量大小或超过处理间隔 + if queue_size >= BATCH_SIZE or \ + (current_time - last_process_time >= PROCESS_INTERVAL and queue_size >= MIN_BATCH_SIZE): + + batch = [] + delivery_tags = [] + # 获取一批消息(最多BATCH_SIZE条) + while not message_queue.empty() and len(batch) < BATCH_SIZE: + data, delivery_tag = message_queue.get() + batch.append(data) + delivery_tags.append(delivery_tag) + + if batch: + # 处理批量消息 + process_message_batch(batch) + + # 确认消息 + for tag in delivery_tags: + channel.basic_ack(tag) + + last_process_time = current_time + + # 如果队列很小但等待时间过长,确保不会永远不处理 + elif current_time - last_process_time >= PROCESS_INTERVAL * 5 and queue_size > 0: + # 处理剩余的所有消息 + batch = [] + delivery_tags = [] + while not message_queue.empty(): + data, delivery_tag = message_queue.get() + batch.append(data) + delivery_tags.append(delivery_tag) + + if batch: + process_message_batch(batch) + for tag in delivery_tags: + channel.basic_ack(tag) + last_process_time = current_time + + # 检查连接是否关闭 + if not connection or connection.is_closed: + break + + except pika.exceptions.ConnectionClosedByBroker: + print("连接被代理关闭,将在5秒后重试...") + time.sleep(5) + except pika.exceptions.AMQPConnectionError: + print("连接失败,将在10秒后重试...") + time.sleep(10) + except KeyboardInterrupt: + print("消费者被用户中断") + try: + if connection and connection.is_open: + connection.close() + except: + pass + break + except Exception as e: + print(f"消费者异常: {str(e)}") + print("将在15秒后重试...") + time.sleep(15) + finally: + try: + if connection and connection.is_open: + connection.close() + except: + pass + +if __name__ == "__main__": + start_consumer() diff --git a/mqreceivefromllm.py b/mqreceivefromllm.py new file mode 100644 index 0000000..02574f3 --- /dev/null +++ b/mqreceivefromllm.py @@ -0,0 +1,256 @@ +# 获取 标签的消息 写入数据库 +import pika +import json +import logging, time +from config import * +import pymysql +from elasticsearch import Elasticsearch +import datetime +import requests + +def message_callback(ch, method, properties, body): + """消息处理回调函数""" + try: + data = json.loads(body) + news_score = data.get('news_score', -1) + if news_score < 0: + ch.basic_ack(delivery_tag=method.delivery_tag) + return + # 在此处添加业务处理逻辑 写入mysql数据库 + write_to_mysql(data) + # 数据写入es + write_to_es(data) + + # 数据写入资讯精选表 + write_to_news(data) + + + + # 手动确认消息 + ch.basic_ack(delivery_tag=method.delivery_tag) + except Exception as e: + print(f"消息处理失败: {str(e)}") + # 拒绝消息并重新入队 + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) + +def write_to_news(data): + news_score = data.get('news_score', 0.0) + if float(news_score) < 80: # 过滤掉news_score小于80的消息 + return + + # 获取返回数据里面的 新闻id + news_id = data.get('id', "") + adr = jx_adr.replace("news_id", news_id) + print(f"接口地址为{adr}") + response = requests.get(adr) + if response.status_code != 200: + print(f"新闻id:{news_id} 得分:{news_score}, 调用精选接口失败, 错误码:{response.status_code}") + return + print(f"新闻id:{news_id} 得分:{news_score}, 调用精选接口成功") + + + +def write_to_es(data): + """写入ES""" + # 初始化ES连接(添加在文件顶部) + es = Elasticsearch( + [f"http://{ES_HOST}:{ES_PORT}"], # 将协议直接包含在hosts中 + basic_auth=(ES_USER, ES_PASSWORD) + ) + news_id = data.get('id', "") + es.update( + index="news_info", + id=news_id, + doc={ + "news_tags": { + "id": news_id, + "abstract": data.get('abstract', ""), + "title": data.get('title', ""), + "rewrite_content": data.get('rewrite_content', ""), + "industry_label": json.dumps(data.get('industry_label', [])), + "industry_confidence": data.get('industry_confidence', []), + "industry_score": data.get('industry_score', ""), + "concept_label": data.get('concept_label', []), + "concept_confidence": data.get('concept_confidence', []), + "concept_score": data.get('concept_score', []), + "public_opinion_score": data.get('public_opinion_score', 10), + "China_factor": data.get('China_factor', 0.1), + "source": data.get('source', "其他"), + "source_impact": data.get('source_impact', 5), + "news_score": data.get('news_score', 0.0), + "news_id": news_id, + "deleted": '0', + "create_time": datetime.datetime.now(), + "update_time": datetime.datetime.now() + } + } + ) + print(f"news_id:{news_id} 得分:{data.get('news_score', 0.0)}, 写入ES成功") + + + + +def write_to_mysql(data): + conn = pymysql.connect( + host=MYSQL_HOST_APP, + port=MYSQL_PORT_APP, + user=MYSQL_USER_APP, + password=MYSQL_PASSWORD_APP, + db=MYSQL_DB_APP, + charset='utf8mb4' + ) + try: + with conn.cursor() as cursor: + # 新增JSON结构解析逻辑 + # 修改后的SQL语句 + sql = """INSERT INTO news_tags + (abstract, title, rewrite_content, industry_label, industry_confidence, industry_score, concept_label, concept_confidence, concept_score, public_opinion_score, China_factor, source, source_impact, news_score, news_id) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """ + + values = (data.get('abstract', ""), + data.get('title', ""), + data.get('rewrite_content', ""), + json.dumps(data.get('industry_label', [])), + json.dumps(data.get('industry_confidence', [])), + json.dumps(data.get('industry_score', [])), + json.dumps(data.get('concept_label', [])), + json.dumps(data.get('concept_confidence', [])), + json.dumps(data.get('concept_score', [])), + data.get('public_opinion_score', 10), + data.get('China_factor', 0.1), + data.get('source', "其他"), + data.get('source_impact', 5), + data.get('news_score', 0.0), + data.get('id', "") + ) + cursor.execute(sql, values) + conn.commit() + abstract = data.get('abstract', "") + print(f"{abstract}, 写入news_tags 表成功") + + except Exception as e: + print(f"写入news_tags失败: {str(e)}") + finally: + conn.close() + return True + +def create_connection(): + """创建并返回RabbitMQ连接""" + credentials = pika.PlainCredentials(mq_user, mq_password) + return pika.BlockingConnection( + pika.ConnectionParameters( + host="localhost", + credentials=credentials, + heartbeat=600, + connection_attempts=3, + retry_delay=5 # 重试延迟5秒 + ) + ) + +def start_consumer(): + """启动MQ消费者""" + while True: # 使用循环而不是递归,避免递归深度问题 + try: + connection = create_connection() + channel = connection.channel() + + # 设置QoS,限制每次只取一条消息 + channel.basic_qos(prefetch_count=1) + + channel.exchange_declare( + exchange="zzck_llm_exchange", + exchange_type="fanout" + ) + + # 声明持久化队列 + res = channel.queue_declare( + queue="from_ai_to_mysql" + ) + + mq_queue = res.method.queue + channel.queue_bind( + exchange="zzck_llm_exchange", + queue=mq_queue, + ) + + # 启动消费,关闭自动ACK + channel.basic_consume( + queue=mq_queue, + on_message_callback=message_callback, + auto_ack=False # 关闭自动确认 + ) + + print("消费者已启动,等待消息...") + channel.start_consuming() + + except pika.exceptions.ConnectionClosedByBroker: + # 代理主动关闭连接,可能是临时错误 + print("连接被代理关闭,将在5秒后重试...") + time.sleep(5) + except pika.exceptions.AMQPConnectionError: + # 连接错误 + print("连接失败,将在10秒后重试...") + time.sleep(10) + except KeyboardInterrupt: + print("消费者被用户中断") + try: + if connection and connection.is_open: + connection.close() + except: + pass + break + except Exception as e: + print(f"消费者异常: {str(e)}") + print("将在15秒后重试...") + time.sleep(15) + finally: + try: + if connection and connection.is_open: + connection.close() + except: + pass + +# def start_consumer(): +# """启动MQ消费者""" +# try: +# credentials = pika.PlainCredentials(mq_user, mq_password) +# connection = pika.BlockingConnection( +# pika.ConnectionParameters( +# host="localhost", +# credentials=credentials, +# heartbeat=600 +# ) +# ) +# channel = connection.channel() +# channel.exchange_declare( +# exchange="zzck_exchange", +# exchange_type="fanout", +# ) + +# # 声明队列(匹配现有队列类型) queue 的名字可以自定义 +# res = channel.queue_declare( +# queue="from_ai_to_mysql" +# ) + +# mq_queue = res.method.queue +# channel.queue_bind( +# exchange="zzck_llm_exchange", +# queue=mq_queue, +# ) + +# # 启动消费 +# channel.basic_consume( +# queue=mq_queue, +# on_message_callback=message_callback, + +# ) + +# print("消费者已启动,等待消息...") +# channel.start_consuming() + +# except Exception as e: +# print(f"消费者启动失败: {str(e)}") +# start_consumer() + +if __name__ == "__main__": + start_consumer() diff --git a/mqsend.py b/mqsend.py new file mode 100644 index 0000000..cdfb3b7 --- /dev/null +++ b/mqsend.py @@ -0,0 +1,26 @@ +import pika +from config import * + + + +def send_message(message): + # 连接 RabbitMQ + credentials = pika.PlainCredentials(mq_user, mq_password) + connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost', credentials=credentials) + ) + channel = connection.channel() + + channel.exchange_declare(exchange='zzck_exchange', exchange_type='fanout') + # 声明队列 + # channel.queue_declare(queue=mq_queue) + + # 发送消息 + channel.basic_publish( exchange='zzck_exchange', + routing_key='', + body=message, + properties=pika.BasicProperties( + expiration='40000' # 消息40秒后过期 + ) + ) + connection.close() diff --git a/start.sh b/start.sh new file mode 100644 index 0000000..a0d14d4 --- /dev/null +++ b/start.sh @@ -0,0 +1,10 @@ +#!/bin/bash +# 后台启动脚本 +cd /root/zzck/ +echo "正在杀死所有包含main.py或mqreceive_multithread.py或mqreceivefromllm.py的进程" +ps aux | grep -E 'python.*(main|mqreceive_multithread|mqreceivefromllm)\.py' | awk '{print $2}' | xargs kill -9 +echo "已杀死所有包含main.py或mqreceive_multithread.py或mqreceivefromllm.py的进程" +nohup python -u main.py > zzck_nohup.log 2>&1 & +nohup python -u mqreceive_multithread.py > mqreceive_multithread.log 2>&1 & +nohup python -u mqreceivefromllm.py > mqreceivefromllm.log 2>&1 & +echo "程序已后台启动"