pdf_code/zzb_data_prod/app.py

232 lines
10 KiB
Python
Raw Normal View History

2024-11-29 15:58:06 +08:00
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import os
import utils
import queue
from multiprocessing import Process,Manager
import pdf_title
import main
import time
import threading
import config
import requests
import db_service
import threading
#import pdf_company_0824
app = FastAPI()
cpu_count = os.cpu_count()
job_queue = queue.Queue()
# 定义请求体模型
class FileItem(BaseModel):
file_path: str
file_id: str
def run_job():
#判断是否有任务在执行
if_run = True
if job_queue.empty():
print(f"job_queue为空: {file_path}")
if_run = False
if if_run:
job_config = job_queue.get()
page_list = []
file_path = job_config['file_path']
file_id = job_config['file_id']
job_status = True
continue_execution = True
try:
#下载pdf
start_time = time.time()
print(f"开始启动文件解析任务: {file_path}")
if file_path.startswith('http'):
file_path = utils.save_pdf_from_url(file_path, config.FILE_PATH)
try:
file_info = pdf_title.create_text_outline(file_path,file_id)
except Exception as e:
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 7})
print(f'通知任务状态url:{file_id}:{response.url}')
print(f'通知任务状态任务:{file_id}:{response.text}')
print(f"{file_id}运行失败: {e}")
continue_execution = False
if continue_execution:
print(cpu_count)
parent_table_pages = file_info['parent_table_pages']
print('parent_table_pages的值是')
print(parent_table_pages)
# page_nums = [
# '1-3',
# '4-6',
# ]
print(cpu_count)
print('测试')
page_num = file_info['page_count']
if page_num < cpu_count:
p_count = page_num
else :
p_count = cpu_count
for i in range(p_count):
# for i in range(2):
page_list.append({
'type': 'table',
'page_num': file_info['split_parts']['table_split_parts'][i],
# 'page_num': page_nums[i],
'path': file_path,
'file_id': file_id,
'parent_table_pages': parent_table_pages,
'page_count': file_info['page_count'],
'tables_range': {},
})
# 通知开始解析
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 5})
print(f'通知pdf开始解析url:{file_id}:{response.url}')
print(f'通知pdf开始解析状态:{file_id}:{response.text}')
parser_start_time = time.time()
processes = []
time_dispatch_job = time.time()
for job_info in page_list:
p = Process(target=main.dispatch_job, args=(job_info,))
processes.append(p)
p.start()
#time_dispatch_job_end = time.time()
#process_time = time_dispatch_job_end - time_dispatch_job
#db_service.process_time(file_id,'1',process_time)
print('等待所有子任务完成任务ID:', file_id)
for p in processes:
p.join()
print('pdf解析任务完成任务完成任务ID:', file_id)
time_dispatch_job_end = time.time()
process_time = time_dispatch_job_end - time_dispatch_job
db_service.process_time(file_id,'1',process_time,time_dispatch_job,time_dispatch_job_end)
parser_end_time = time.time()
print(f"解析任务 {file_id} 完成,耗时{(parser_end_time - parser_start_time):.2f} 秒。")
#这里做一步判断,看看是否还要继续。
if db_service.file_type_check(file_id):
print("文本较真表格生成已结束")
else:
# 通知抽取指标
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 6})
print(f'通知开始抽取指标url:{file_id}:{response.url}')
print(f'通知开始抽取指标状态:{file_id}:{response.text}')
parser_start_time = time.time()
print('开始表格指标抽取任务ID:', file_id)
time_start = time.time()
if db_service.file_type_check_v2(file_id) ==3:#判断是否为3季报
main.start_table_measure_job(file_id)
#time_start_end = time.time()
#process_time = time_start_end - time_start
#db_service.process_time(file_id,'2',process_time)
time_start_end = time.time()
process_time = time_start_end - time_start
db_service.process_time(file_id,'2',process_time,time_start,time_start_end)
print('表格指标抽取完成任务ID:', file_id)
parser_end_time = time.time()
print(f"表格指标抽取 {file_id} 完成,耗时{(parser_end_time - parser_start_time):.2f} 秒。")
print('启动这个指标归一化任务ID-修改测试:', file_id)
time_update = time.time()
main.update_measure_data(file_id,file_path,parent_table_pages)
#time_update_end = time.time()
#process_time = time_update_end - time_update
#db_service.process_time(file_id,'3',process_time)
print('归一化完成任务ID:', file_id)
end_time = time.time()
print(f"任务 {file_id} 完成,耗时{(end_time - start_time):.2f} 秒。")
time_update_end = time.time()
process_time = time_update_end - time_update
db_service.process_time(file_id,'3',process_time,time_update,time_update_end)
else:#不是三季报就直接按照年报和半年报走
main.start_table_measure_job(file_id)
#time_start_end = time.time()
#process_time = time_start_end - time_start
#db_service.process_time(file_id,'2',process_time)
time_start_end = time.time()
process_time = time_start_end - time_start
db_service.process_time(file_id,'2',process_time,time_start,time_start_end)
print('表格指标抽取完成任务ID:', file_id)
parser_end_time = time.time()
print(f"表格指标抽取 {file_id} 完成,耗时{(parser_end_time - parser_start_time):.2f} 秒。")
print('启动这个指标归一化任务ID-修改测试:', file_id)
time_update = time.time()
main.update_measure_data(file_id,file_path,parent_table_pages)
#time_update_end = time.time()
#process_time = time_update_end - time_update
#db_service.process_time(file_id,'3',process_time)
print('归一化完成任务ID:', file_id)
end_time = time.time()
print(f"任务 {file_id} 完成,耗时{(end_time - start_time):.2f} 秒。")
time_update_end = time.time()
process_time = time_update_end - time_update
db_service.process_time(file_id,'3',process_time,time_update,time_update_end)
#通知任务完成
response_time = time.time()
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 1})
print(f'通知任务状态url:{file_id}:{response.url}')
print(f'通知任务状态任务:{file_id}:{response.text}')
response_time_end = time.time()
process_time = response_time_end - response_time
db_service.process_time(file_id,'4',process_time,response_time,response_time_end)
except Exception as e:
#通知任务完成
response_time = time.time()
if "integer division or modulo by zero" in str(e):
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id, 'status': 4})
else:
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id, 'status': 4})
#response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 4})
response_time_end = time.time()
process_time = response_time_end - response_time
db_service.process_time(file_id,'4',process_time,response_time,response_time_end)
print(f'通知任务状态url:{file_id}:{response.url}')
print(f'通知任务状态任务:{file_id}:{response.text}')
print(f"Response status code: {response.status_code}")
print(f"{file_id}运行失败: {e}")
finally:
print(f"任务 {file_id} 完成,运行状态:{job_status}")
#pdf_company_0824.name_code_fix(file_id,file_path)
#print('公司名与编码填充完毕')
else:
print("有任务运行中,需要等待.....")
def parse_pdf_route(fileItem: FileItem):
# 创建一个队列,保证每次只执行一个文件解析任务
job_queue.put({
'file_path' : fileItem.file_path,
'file_id' : fileItem.file_id
})
print(f"增加 {fileItem.file_id} 到队列.")
threading.Thread(target=run_job, args=()).start()
return {"success": True, "msg": "文件解析开始"}
app.post("/parser/start",
tags=["parser"],
summary="解析Pdf文件",
)(parse_pdf_route)
# 运行 FastAPI 应用
if __name__ == "__main__":
# 服务器启动服务
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=config.PORT)
# 本地调试任务
#job_queue.put({
#'file_path' : '6281.pdf',
#'file_id' : '6281'
#})
#run_job()