| 
									
										
										
										
											2025-08-20 09:49:07 +08:00
										 |  |  |  | from fastapi import FastAPI, HTTPException | 
					
						
							|  |  |  |  | from pydantic import BaseModel | 
					
						
							|  |  |  |  | import os | 
					
						
							|  |  |  |  | import utils | 
					
						
							|  |  |  |  | import queue | 
					
						
							|  |  |  |  | from multiprocessing import Process,Manager | 
					
						
							|  |  |  |  | import pdf_title | 
					
						
							|  |  |  |  | import main | 
					
						
							|  |  |  |  | import time | 
					
						
							|  |  |  |  | import threading | 
					
						
							|  |  |  |  | import config | 
					
						
							|  |  |  |  | import requests | 
					
						
							|  |  |  |  | import db_service | 
					
						
							|  |  |  |  | import threading | 
					
						
							|  |  |  |  | from Mil_unit import create_partition_by_hour | 
					
						
							|  |  |  |  | from datetime import datetime, timedelta | 
					
						
							|  |  |  |  | from log_config import logger | 
					
						
							| 
									
										
										
										
											2025-09-09 18:59:30 +08:00
										 |  |  |  | from vector_storage import VectorStorage | 
					
						
							| 
									
										
										
										
											2025-08-20 09:49:07 +08:00
										 |  |  |  | 
 | 
					
						
							|  |  |  |  | app = FastAPI() | 
					
						
							|  |  |  |  | cpu_count = 4 | 
					
						
							|  |  |  |  | job_queue = queue.Queue() | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | # 定义请求体模型 | 
					
						
							|  |  |  |  | class FileItem(BaseModel): | 
					
						
							|  |  |  |  |     file_path: str | 
					
						
							|  |  |  |  |     file_id: str | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | def run_job(): | 
					
						
							|  |  |  |  |     #判断是否有任务在执行 | 
					
						
							|  |  |  |  |     if_run = True | 
					
						
							|  |  |  |  |      | 
					
						
							|  |  |  |  |     if job_queue.empty(): | 
					
						
							|  |  |  |  |         logger.info(f"job_queue为空: {file_path}") | 
					
						
							|  |  |  |  |         if_run = False | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     if if_run: | 
					
						
							|  |  |  |  |         job_config = job_queue.get() | 
					
						
							|  |  |  |  |         page_list = [] | 
					
						
							|  |  |  |  |         file_path = job_config['file_path'] | 
					
						
							|  |  |  |  |         file_id = job_config['file_id'] | 
					
						
							|  |  |  |  |         job_status = True | 
					
						
							|  |  |  |  |         continue_execution = True | 
					
						
							|  |  |  |  |         try: | 
					
						
							|  |  |  |  |             #下载pdf | 
					
						
							|  |  |  |  |             start_time = time.time() | 
					
						
							|  |  |  |  |             logger.info(f"开始启动文件解析任务: {file_path}") | 
					
						
							|  |  |  |  |             if file_path.startswith('http'): | 
					
						
							|  |  |  |  |                 file_path = utils.save_pdf_from_url(file_path, config.FILE_PATH) | 
					
						
							|  |  |  |  |             try: | 
					
						
							|  |  |  |  |                 file_info = pdf_title.create_text_outline(file_path,file_id) | 
					
						
							|  |  |  |  |             except Exception as e: | 
					
						
							|  |  |  |  |                 response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 7}) | 
					
						
							|  |  |  |  |                 logger.info(f'通知任务状态url:{file_id}:{response.url}') | 
					
						
							|  |  |  |  |                 logger.info(f'通知任务状态任务:{file_id}:{response.text}') | 
					
						
							|  |  |  |  |                 logger.info(f"{file_id}运行失败: {e}") | 
					
						
							|  |  |  |  |                 continue_execution = False | 
					
						
							|  |  |  |  |             if continue_execution: | 
					
						
							|  |  |  |  |                 parent_table_pages = file_info['parent_table_pages'] | 
					
						
							|  |  |  |  |                 page_num = file_info['page_count'] | 
					
						
							|  |  |  |  |                 if page_num < cpu_count: | 
					
						
							|  |  |  |  |                     p_count = page_num | 
					
						
							|  |  |  |  |                 else : | 
					
						
							|  |  |  |  |                     p_count = cpu_count | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                 for i in range(p_count): | 
					
						
							|  |  |  |  |                     page_list.append({ | 
					
						
							|  |  |  |  |                         'type': 'table', | 
					
						
							|  |  |  |  |                         'page_num': file_info['split_parts']['table_split_parts'][i], | 
					
						
							|  |  |  |  |                         # 'page_num': page_nums[i], | 
					
						
							|  |  |  |  |                         'path': file_path, | 
					
						
							|  |  |  |  |                         'file_id': file_id, | 
					
						
							|  |  |  |  |                         'parent_table_pages': parent_table_pages, | 
					
						
							|  |  |  |  |                         'page_count': file_info['page_count'], | 
					
						
							|  |  |  |  |                         'tables_range': {}, | 
					
						
							|  |  |  |  |                     }) | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |              | 
					
						
							|  |  |  |  |                 # 通知开始解析 | 
					
						
							|  |  |  |  |                 response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 5}) | 
					
						
							|  |  |  |  |                 logger.info(f'通知pdf开始解析url:{file_id}:{response.url}') | 
					
						
							|  |  |  |  |                 logger.info(f'通知pdf开始解析状态:{file_id}:{response.text}') | 
					
						
							|  |  |  |  |                 parser_start_time = time.time() | 
					
						
							|  |  |  |  |                 processes = [] | 
					
						
							|  |  |  |  |                 time_dispatch_job = time.time() | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                 for job_info in page_list: | 
					
						
							|  |  |  |  |                     p = Process(target=main.dispatch_job, args=(job_info,)) | 
					
						
							|  |  |  |  |                     processes.append(p) | 
					
						
							|  |  |  |  |                     p.start() | 
					
						
							|  |  |  |  |            | 
					
						
							|  |  |  |  |                 logger.info(f'等待所有子任务完成,任务ID:{file_id}') | 
					
						
							|  |  |  |  |                 for p in processes: | 
					
						
							|  |  |  |  |                     p.join() | 
					
						
							|  |  |  |  |                 logger.info(f'pdf解析任务完成任务完成,任务ID:{file_id}') | 
					
						
							|  |  |  |  |                 time_dispatch_job_end = time.time() | 
					
						
							|  |  |  |  |                 process_time = time_dispatch_job_end - time_dispatch_job | 
					
						
							|  |  |  |  |                 db_service.process_time(file_id,'1',process_time,time_dispatch_job,time_dispatch_job_end) | 
					
						
							|  |  |  |  |                 parser_end_time = time.time() | 
					
						
							|  |  |  |  |                 logger.info(f"解析任务 {file_id} 完成,耗时{(parser_end_time - parser_start_time):.2f} 秒。") | 
					
						
							|  |  |  |  |                 #这里做一步判断,看看是否还要继续。 | 
					
						
							|  |  |  |  |                 if db_service.file_type_check(file_id): | 
					
						
							|  |  |  |  |                     logger.info(f"文本较真表格生成已结束") | 
					
						
							|  |  |  |  |                 else: | 
					
						
							|  |  |  |  |                     # 通知抽取指标 | 
					
						
							|  |  |  |  |                     response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 6}) | 
					
						
							|  |  |  |  |                     logger.info(f'通知开始抽取指标url:{file_id}:{response.url}') | 
					
						
							|  |  |  |  |                     logger.info(f'通知开始抽取指标状态:{file_id}:{response.text}') | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                     parser_start_time = time.time() | 
					
						
							|  |  |  |  |                     logger.info(f'开始表格指标抽取,任务ID:{file_id}') | 
					
						
							|  |  |  |  |                     time_start = time.time() | 
					
						
							|  |  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-09-09 18:59:30 +08:00
										 |  |  |  |                     # 初始化向量存储类 | 
					
						
							|  |  |  |  |                     dim = 1024 | 
					
						
							|  |  |  |  |                     max_vectors = 5000 | 
					
						
							|  |  |  |  |                     shared_storage = VectorStorage(dim, max_vectors) | 
					
						
							| 
									
										
										
										
											2025-08-20 09:49:07 +08:00
										 |  |  |  |                     # 判断是否为3季报 | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                     if db_service.file_type_check_v2(file_id) == 3: | 
					
						
							| 
									
										
										
										
											2025-09-09 18:59:30 +08:00
										 |  |  |  |                         main.start_table_measure_job(file_id, shared_storage) | 
					
						
							| 
									
										
										
										
											2025-08-20 09:49:07 +08:00
										 |  |  |  |                         time_start_end = time.time() | 
					
						
							|  |  |  |  |                         process_time = time_start_end - time_start | 
					
						
							|  |  |  |  |                         db_service.process_time(file_id,'2',process_time,time_start,time_start_end) | 
					
						
							|  |  |  |  |                         logger.info(f'表格指标抽取完成,任务ID:{file_id}') | 
					
						
							|  |  |  |  |                         parser_end_time = time.time() | 
					
						
							|  |  |  |  |                         logger.info(f"表格指标抽取 {file_id} 完成,耗时{(parser_end_time - parser_start_time):.2f} 秒。") | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                         logger.info(f'启动这个指标归一化任务ID-修改测试:{file_id}') | 
					
						
							|  |  |  |  |                         time_update = time.time() | 
					
						
							| 
									
										
										
										
											2025-09-09 18:59:30 +08:00
										 |  |  |  |                         main.update_measure_data(file_id,file_path,parent_table_pages,shared_storage) | 
					
						
							| 
									
										
										
										
											2025-08-20 09:49:07 +08:00
										 |  |  |  | 
 | 
					
						
							|  |  |  |  |                         logger.info(f'归一化完成任务ID:{file_id}') | 
					
						
							|  |  |  |  |                         end_time = time.time() | 
					
						
							|  |  |  |  |                         logger.info(f"任务 {file_id} 完成,耗时{(end_time - start_time):.2f} 秒。") | 
					
						
							|  |  |  |  |                         time_update_end = time.time() | 
					
						
							|  |  |  |  |                         process_time = time_update_end - time_update | 
					
						
							|  |  |  |  |                         db_service.process_time(file_id,'3',process_time,time_update,time_update_end) | 
					
						
							|  |  |  |  |                     # 不是三季报就直接按照年报和半年报走 | 
					
						
							|  |  |  |  |                     else: | 
					
						
							| 
									
										
										
										
											2025-09-09 18:59:30 +08:00
										 |  |  |  |                         main.start_table_measure_job(file_id, shared_storage) | 
					
						
							| 
									
										
										
										
											2025-08-20 09:49:07 +08:00
										 |  |  |  |                         time_start_end = time.time() | 
					
						
							|  |  |  |  |                         process_time = time_start_end - time_start | 
					
						
							|  |  |  |  |                         db_service.process_time(file_id,'2',process_time,time_start,time_start_end) | 
					
						
							|  |  |  |  |                         logger.info(f'表格指标抽取完成,任务ID:{file_id}') | 
					
						
							|  |  |  |  |                         parser_end_time = time.time() | 
					
						
							|  |  |  |  |                         logger.info(f"表格指标抽取 {file_id} 完成,耗时{(parser_end_time - parser_start_time):.2f} 秒。") | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                         logger.info(f'启动这个指标归一化任务ID-修改测试:{file_id}') | 
					
						
							|  |  |  |  |                         time_update = time.time() | 
					
						
							| 
									
										
										
										
											2025-09-09 18:59:30 +08:00
										 |  |  |  |                         main.update_measure_data(file_id,file_path,parent_table_pages, shared_storage) | 
					
						
							| 
									
										
										
										
											2025-08-20 09:49:07 +08:00
										 |  |  |  | 
 | 
					
						
							|  |  |  |  |                         logger.info(f'归一化完成任务ID:{file_id}') | 
					
						
							|  |  |  |  |                         end_time = time.time() | 
					
						
							|  |  |  |  |                         logger.info(f"任务 {file_id} 完成,耗时{(end_time - start_time):.2f} 秒。") | 
					
						
							|  |  |  |  |                         time_update_end = time.time() | 
					
						
							|  |  |  |  |                         process_time = time_update_end - time_update | 
					
						
							|  |  |  |  |                         db_service.process_time(file_id,'3',process_time,time_update,time_update_end) | 
					
						
							|  |  |  |  |                 #通知任务完成 | 
					
						
							|  |  |  |  |                 response_time = time.time() | 
					
						
							|  |  |  |  |                 response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 1}) | 
					
						
							|  |  |  |  |                 logger.info(f'通知任务状态url:{file_id}:{response.url}') | 
					
						
							|  |  |  |  |                 logger.info(f'通知任务状态任务:{file_id}:{response.text}') | 
					
						
							|  |  |  |  |                 response_time_end = time.time() | 
					
						
							|  |  |  |  |                 process_time = response_time_end - response_time | 
					
						
							|  |  |  |  |                 db_service.process_time(file_id,'4',process_time,response_time,response_time_end) | 
					
						
							|  |  |  |  |         except Exception as e: | 
					
						
							|  |  |  |  |             #通知任务完成 | 
					
						
							|  |  |  |  |             response_time = time.time() | 
					
						
							|  |  |  |  |             if "integer division or modulo by zero" in str(e): | 
					
						
							|  |  |  |  |                 response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id, 'status': 4}) | 
					
						
							|  |  |  |  |             else: | 
					
						
							|  |  |  |  |                 response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id, 'status': 4}) | 
					
						
							|  |  |  |  |             #response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 4}) | 
					
						
							|  |  |  |  |             response_time_end = time.time() | 
					
						
							|  |  |  |  |             process_time = response_time_end - response_time | 
					
						
							|  |  |  |  |             db_service.process_time(file_id,'4',process_time,response_time,response_time_end) | 
					
						
							|  |  |  |  |             logger.info(f'通知任务状态url:{file_id}:{response.url}') | 
					
						
							|  |  |  |  |             logger.info(f'通知任务状态任务:{file_id}:{response.text}') | 
					
						
							|  |  |  |  |             logger.info(f"Response status code: {response.status_code}") | 
					
						
							|  |  |  |  |             logger.info(f"{file_id}运行失败: {e}") | 
					
						
							|  |  |  |  |         finally: | 
					
						
							|  |  |  |  |             logger.info(f"任务 {file_id} 完成,运行状态:{job_status}") | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             #pdf_company_0824.name_code_fix(file_id,file_path) | 
					
						
							|  |  |  |  |             #print('公司名与编码填充完毕') | 
					
						
							|  |  |  |  |     else: | 
					
						
							|  |  |  |  |         logger.info(f"有任务运行中,需要等待.....") | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | def parse_pdf_route(fileItem: FileItem): | 
					
						
							|  |  |  |  |      | 
					
						
							|  |  |  |  |     # 创建一个队列,保证每次只执行一个文件解析任务 | 
					
						
							|  |  |  |  |     job_queue.put({ | 
					
						
							|  |  |  |  |         'file_path' : fileItem.file_path, | 
					
						
							|  |  |  |  |         'file_id' : fileItem.file_id | 
					
						
							|  |  |  |  |     }) | 
					
						
							|  |  |  |  |     logger.info(f"增加 {fileItem.file_id} 到队列.") | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     threading.Thread(target=run_job, args=()).start() | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     return {"success": True, "msg": "文件解析开始"} | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | app.post("/parser/start", | 
					
						
							|  |  |  |  |         tags=["parser"], | 
					
						
							|  |  |  |  |         summary="解析Pdf文件",  | 
					
						
							|  |  |  |  |         )(parse_pdf_route) | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | def get_local_ip(): | 
					
						
							|  |  |  |  |     try: | 
					
						
							|  |  |  |  |         # 创建一个 UDP 套接字 | 
					
						
							|  |  |  |  |         s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | 
					
						
							|  |  |  |  |         # 连接到一个外部地址(这里使用 Google 的公共 DNS 服务器) | 
					
						
							|  |  |  |  |         s.connect(("8.8.8.8", 80)) | 
					
						
							|  |  |  |  |         # 获取本地套接字的 IP 地址 | 
					
						
							|  |  |  |  |         local_ip = s.getsockname()[0] | 
					
						
							|  |  |  |  |     except Exception as e: | 
					
						
							|  |  |  |  |         logger.info(f"获取内网 IP 失败: {e}") | 
					
						
							|  |  |  |  |         local_ip = "127.0.0.1"  # 如果失败,返回本地回环地址 | 
					
						
							|  |  |  |  |     finally: | 
					
						
							|  |  |  |  |         s.close()  # 关闭套接字 | 
					
						
							|  |  |  |  |     return local_ip | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | # 运行 FastAPI 应用 | 
					
						
							|  |  |  |  | if __name__ == "__main__": | 
					
						
							|  |  |  |  |     # 服务器启动服务 | 
					
						
							|  |  |  |  |     import uvicorn | 
					
						
							|  |  |  |  |     uvicorn.run(app, host="0.0.0.0", port=config.PORT) | 
					
						
							|  |  |  |  |     try: | 
					
						
							|  |  |  |  |         # 获取内网IP | 
					
						
							|  |  |  |  |         ip = get_local_ip() | 
					
						
							|  |  |  |  |         response = requests.get(f"/api/tenant/report/restart?address={ip}:{config.PORT}") | 
					
						
							|  |  |  |  |     except KeyboardInterrupt: | 
					
						
							|  |  |  |  |         logger.info("Shutdown server") | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     # 本地调试任务 | 
					
						
							|  |  |  |  |     # job_queue.put({ | 
					
						
							|  |  |  |  |     #  'file_path' : '1.pdf', | 
					
						
							|  |  |  |  |     #  'file_id' : '2122' | 
					
						
							|  |  |  |  |     #  }) | 
					
						
							|  |  |  |  |     # | 
					
						
							|  |  |  |  |     # run_job() |