pdf_code/zzb_data_prod/app.py

232 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import os
import utils
import queue
from multiprocessing import Process,Manager
import pdf_title
import main
import time
import threading
import config
import requests
import db_service
import threading
#import pdf_company_0824
app = FastAPI()
cpu_count = os.cpu_count()
job_queue = queue.Queue()
# 定义请求体模型
class FileItem(BaseModel):
file_path: str
file_id: str
def run_job():
#判断是否有任务在执行
if_run = True
if job_queue.empty():
print(f"job_queue为空: {file_path}")
if_run = False
if if_run:
job_config = job_queue.get()
page_list = []
file_path = job_config['file_path']
file_id = job_config['file_id']
job_status = True
continue_execution = True
try:
#下载pdf
start_time = time.time()
print(f"开始启动文件解析任务: {file_path}")
if file_path.startswith('http'):
file_path = utils.save_pdf_from_url(file_path, config.FILE_PATH)
try:
file_info = pdf_title.create_text_outline(file_path,file_id)
except Exception as e:
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 7})
print(f'通知任务状态url:{file_id}:{response.url}')
print(f'通知任务状态任务:{file_id}:{response.text}')
print(f"{file_id}运行失败: {e}")
continue_execution = False
if continue_execution:
print(cpu_count)
parent_table_pages = file_info['parent_table_pages']
print('parent_table_pages的值是')
print(parent_table_pages)
# page_nums = [
# '1-3',
# '4-6',
# ]
print(cpu_count)
print('测试')
page_num = file_info['page_count']
if page_num < cpu_count:
p_count = page_num
else :
p_count = cpu_count
for i in range(p_count):
# for i in range(2):
page_list.append({
'type': 'table',
'page_num': file_info['split_parts']['table_split_parts'][i],
# 'page_num': page_nums[i],
'path': file_path,
'file_id': file_id,
'parent_table_pages': parent_table_pages,
'page_count': file_info['page_count'],
'tables_range': {},
})
# 通知开始解析
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 5})
print(f'通知pdf开始解析url:{file_id}:{response.url}')
print(f'通知pdf开始解析状态:{file_id}:{response.text}')
parser_start_time = time.time()
processes = []
time_dispatch_job = time.time()
for job_info in page_list:
p = Process(target=main.dispatch_job, args=(job_info,))
processes.append(p)
p.start()
#time_dispatch_job_end = time.time()
#process_time = time_dispatch_job_end - time_dispatch_job
#db_service.process_time(file_id,'1',process_time)
print('等待所有子任务完成任务ID:', file_id)
for p in processes:
p.join()
print('pdf解析任务完成任务完成任务ID:', file_id)
time_dispatch_job_end = time.time()
process_time = time_dispatch_job_end - time_dispatch_job
db_service.process_time(file_id,'1',process_time,time_dispatch_job,time_dispatch_job_end)
parser_end_time = time.time()
print(f"解析任务 {file_id} 完成,耗时{(parser_end_time - parser_start_time):.2f} 秒。")
#这里做一步判断,看看是否还要继续。
if db_service.file_type_check(file_id):
print("文本较真表格生成已结束")
else:
# 通知抽取指标
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 6})
print(f'通知开始抽取指标url:{file_id}:{response.url}')
print(f'通知开始抽取指标状态:{file_id}:{response.text}')
parser_start_time = time.time()
print('开始表格指标抽取任务ID:', file_id)
time_start = time.time()
if db_service.file_type_check_v2(file_id) ==3:#判断是否为3季报
main.start_table_measure_job(file_id)
#time_start_end = time.time()
#process_time = time_start_end - time_start
#db_service.process_time(file_id,'2',process_time)
time_start_end = time.time()
process_time = time_start_end - time_start
db_service.process_time(file_id,'2',process_time,time_start,time_start_end)
print('表格指标抽取完成任务ID:', file_id)
parser_end_time = time.time()
print(f"表格指标抽取 {file_id} 完成,耗时{(parser_end_time - parser_start_time):.2f} 秒。")
print('启动这个指标归一化任务ID-修改测试:', file_id)
time_update = time.time()
main.update_measure_data(file_id,file_path,parent_table_pages)
#time_update_end = time.time()
#process_time = time_update_end - time_update
#db_service.process_time(file_id,'3',process_time)
print('归一化完成任务ID:', file_id)
end_time = time.time()
print(f"任务 {file_id} 完成,耗时{(end_time - start_time):.2f} 秒。")
time_update_end = time.time()
process_time = time_update_end - time_update
db_service.process_time(file_id,'3',process_time,time_update,time_update_end)
else:#不是三季报就直接按照年报和半年报走
main.start_table_measure_job(file_id)
#time_start_end = time.time()
#process_time = time_start_end - time_start
#db_service.process_time(file_id,'2',process_time)
time_start_end = time.time()
process_time = time_start_end - time_start
db_service.process_time(file_id,'2',process_time,time_start,time_start_end)
print('表格指标抽取完成任务ID:', file_id)
parser_end_time = time.time()
print(f"表格指标抽取 {file_id} 完成,耗时{(parser_end_time - parser_start_time):.2f} 秒。")
print('启动这个指标归一化任务ID-修改测试:', file_id)
time_update = time.time()
main.update_measure_data(file_id,file_path,parent_table_pages)
#time_update_end = time.time()
#process_time = time_update_end - time_update
#db_service.process_time(file_id,'3',process_time)
print('归一化完成任务ID:', file_id)
end_time = time.time()
print(f"任务 {file_id} 完成,耗时{(end_time - start_time):.2f} 秒。")
time_update_end = time.time()
process_time = time_update_end - time_update
db_service.process_time(file_id,'3',process_time,time_update,time_update_end)
#通知任务完成
response_time = time.time()
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 1})
print(f'通知任务状态url:{file_id}:{response.url}')
print(f'通知任务状态任务:{file_id}:{response.text}')
response_time_end = time.time()
process_time = response_time_end - response_time
db_service.process_time(file_id,'4',process_time,response_time,response_time_end)
except Exception as e:
#通知任务完成
response_time = time.time()
if "integer division or modulo by zero" in str(e):
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id, 'status': 4})
else:
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id, 'status': 4})
#response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 4})
response_time_end = time.time()
process_time = response_time_end - response_time
db_service.process_time(file_id,'4',process_time,response_time,response_time_end)
print(f'通知任务状态url:{file_id}:{response.url}')
print(f'通知任务状态任务:{file_id}:{response.text}')
print(f"Response status code: {response.status_code}")
print(f"{file_id}运行失败: {e}")
finally:
print(f"任务 {file_id} 完成,运行状态:{job_status}")
#pdf_company_0824.name_code_fix(file_id,file_path)
#print('公司名与编码填充完毕')
else:
print("有任务运行中,需要等待.....")
def parse_pdf_route(fileItem: FileItem):
# 创建一个队列,保证每次只执行一个文件解析任务
job_queue.put({
'file_path' : fileItem.file_path,
'file_id' : fileItem.file_id
})
print(f"增加 {fileItem.file_id} 到队列.")
threading.Thread(target=run_job, args=()).start()
return {"success": True, "msg": "文件解析开始"}
app.post("/parser/start",
tags=["parser"],
summary="解析Pdf文件",
)(parse_pdf_route)
# 运行 FastAPI 应用
if __name__ == "__main__":
# 服务器启动服务
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=config.PORT)
# 本地调试任务
job_queue.put({
'file_path' : '1.pdf',
'file_id' : '2122'
})
run_job()