226 lines
11 KiB
Python
226 lines
11 KiB
Python
from fastapi import FastAPI
|
||
from pydantic import BaseModel
|
||
import os
|
||
import utils
|
||
import queue
|
||
from multiprocessing import Process
|
||
import word_title
|
||
import time
|
||
import config
|
||
import requests
|
||
import threading
|
||
from parse_word import parse_docx, split_text_table
|
||
import json
|
||
import db_service_word
|
||
import main_word
|
||
from zzb_logger import applog
|
||
|
||
|
||
app = FastAPI()
|
||
cpu_count = os.cpu_count()
|
||
job_queue = queue.Queue()
|
||
|
||
# 定义请求体模型
|
||
class FileItem(BaseModel):
|
||
file_path: str
|
||
file_id: str
|
||
|
||
def split_list(lst, n):
|
||
k, m = divmod(len(lst), n)
|
||
return [lst[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n)]
|
||
|
||
def run_job():
|
||
#判断是否有任务在执行
|
||
if_run = True
|
||
|
||
if job_queue.empty():
|
||
applog.info(f"job_queue为空:")
|
||
if_run = False
|
||
|
||
if if_run:
|
||
job_config = job_queue.get()
|
||
file_path = job_config['file_path']
|
||
file_id = job_config['file_id']
|
||
continue_execution = True
|
||
try:
|
||
|
||
start_time = time.time()
|
||
applog.info(f"开始启动文件解析任务: {file_path}")
|
||
if file_path.startswith('http'):
|
||
file_path = utils.save_pdf_from_url(file_path, config.FILE_PATH)
|
||
try:
|
||
time_dispatch_job = time.time()
|
||
# 通知开始解析 暂时不通知
|
||
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 5})
|
||
applog.info(f'通知pdf开始解析url:{file_id}:{response.url}')
|
||
applog.info(f'通知pdf开始解析状态:{file_id}:{response.text}')
|
||
parsed_content, catalog_content = parse_docx(file_path) # catalog_content 目录需要写入数据库
|
||
|
||
json_parsed_content = json.loads(parsed_content)
|
||
json_catalog_content = json.loads(catalog_content)
|
||
|
||
db_service_word.word_title_insert_mysql(file_id, json_catalog_content)
|
||
|
||
parent_table_pages = word_title.get_parent_table_pages(json_catalog_content,file_id)
|
||
|
||
text_elements_json, table_elements_json = split_text_table(json_parsed_content)
|
||
#
|
||
processes = []
|
||
text_list = split_list(json.loads(text_elements_json), cpu_count)
|
||
applog.info(f'text,任务ID:{file_id}')
|
||
for job_info in text_list:
|
||
p = Process(target=main_word.process_text_content, args=(file_id, job_info,json.loads(table_elements_json),json.loads(text_elements_json)))
|
||
processes.append(p)
|
||
p.start()
|
||
applog.info(f'等待所有子任务完成,任务ID:{file_id}')
|
||
for p in processes:
|
||
p.join()
|
||
applog.info(f'word表格中 text解析完成,任务ID:{file_id}',)
|
||
|
||
processes = []
|
||
table_list = split_list(json.loads(table_elements_json), cpu_count)
|
||
applog.info(f'开始解析word表表格中的table,任务ID:{file_id}')
|
||
for job_info in table_list:
|
||
p = Process(target=main_word.process_table, args=(file_id, job_info,))
|
||
processes.append(p)
|
||
p.start()
|
||
applog.info(f'等待所有子任务完成,任务ID:{file_id}' )
|
||
for p in processes:
|
||
p.join()
|
||
|
||
# main_word.process_table(file_id, json.loads(table_elements_json))
|
||
applog.info(f'word表格中 table解析完成,任务ID:{file_id}')
|
||
|
||
|
||
time_dispatch_job_end = time.time()
|
||
process_time = time_dispatch_job_end - time_dispatch_job
|
||
db_service_word.process_time(file_id, '1', process_time, time_dispatch_job, time_dispatch_job_end)
|
||
parser_end_time = time.time()
|
||
applog.info(f"解析任务 {file_id} 完成,耗时{(parser_end_time - time_dispatch_job):.2f} 秒。")
|
||
|
||
except Exception as e:
|
||
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 7})
|
||
applog.info(f'通知任务状态url:{file_id}:{response.url}')
|
||
applog.info(f'通知任务状态任务:{file_id}:{response.text}')
|
||
applog.info(f"{file_id}运行失败: {e}")
|
||
continue_execution = False
|
||
if continue_execution :
|
||
#这里做一步判断,看看是否还要继续。
|
||
if db_service_word.file_type_check(file_id):
|
||
applog.info("文本较真表格生成已结束")
|
||
else:
|
||
# 通知抽取指标---------------------------------
|
||
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 6})
|
||
applog.info(f'通知开始抽取指标url:{file_id}:{response.url}')
|
||
applog.info(f'通知开始抽取指标状态:{file_id}:{response.text}')
|
||
|
||
parser_start_time = time.time()
|
||
applog.info(f'开始表格指标抽取,任务ID:{file_id}')
|
||
time_start = time.time()
|
||
if db_service_word.file_type_check_v2(file_id) == 3 : #判断是否为3季报
|
||
main_word.start_table_measure_job(file_id)
|
||
#time_start_end = time.time()
|
||
#process_time = time_start_end - time_start
|
||
#db_service.process_time(file_id,'2',process_time)
|
||
time_start_end = time.time()
|
||
process_time = time_start_end - time_start
|
||
db_service_word.process_time(file_id,'2',process_time,time_start,time_start_end)
|
||
applog.info(f'表格指标抽取完成,任务ID:{file_id}')
|
||
parser_end_time = time.time()
|
||
applog.info(f"表格指标抽取 {file_id} 完成,耗时{(parser_end_time - parser_start_time):.2f} 秒。")
|
||
|
||
applog.info(f'启动这个指标归一化任务ID-修改测试:{file_id}')
|
||
time_update = time.time()
|
||
main_word.update_measure_data(file_id,file_path,parent_table_pages)
|
||
#time_update_end = time.time()
|
||
#process_time = time_update_end - time_update
|
||
#db_service.process_time(file_id,'3',process_time)
|
||
applog.info(f'归一化完成任务ID:{file_id}')
|
||
end_time = time.time()
|
||
applog.info(f"任务 {file_id} 完成,耗时{(end_time - start_time):.2f} 秒。")
|
||
time_update_end = time.time()
|
||
process_time = time_update_end - time_update
|
||
db_service_word.process_time(file_id,'3',process_time,time_update,time_update_end)
|
||
else:#不是三季报就直接按照年报和半年报走
|
||
main_word.start_table_measure_job(file_id)
|
||
#time_start_end = time.time()
|
||
#process_time = time_start_end - time_start
|
||
#db_service.process_time(file_id,'2',process_time)
|
||
time_start_end = time.time()
|
||
process_time = time_start_end - time_start
|
||
db_service_word.process_time(file_id,'2',process_time,time_start,time_start_end)
|
||
applog.info(f'表格指标抽取完成,任务ID:{file_id}' )
|
||
parser_end_time = time.time()
|
||
applog.info(f"表格指标抽取 {file_id} 完成,耗时{(parser_end_time - parser_start_time):.2f} 秒。")
|
||
|
||
applog.info(f'启动这个指标归一化任务ID-修改测试:{file_id}' )
|
||
time_update = time.time()
|
||
main_word.update_measure_data(file_id,file_path,parent_table_pages)
|
||
#time_update_end = time.time()
|
||
#process_time = time_update_end - time_update
|
||
#db_service.process_time(file_id,'3',process_time)
|
||
applog.info(f'归一化完成任务ID:{file_id}')
|
||
end_time = time.time()
|
||
applog.info(f"任务 {file_id} 完成,耗时{(end_time - start_time):.2f} 秒。")
|
||
time_update_end = time.time()
|
||
process_time = time_update_end - time_update
|
||
db_service_word.process_time(file_id,'3',process_time,time_update,time_update_end)
|
||
#通知任务完成
|
||
response_time = time.time()
|
||
|
||
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 1})
|
||
applog.info(f'通知任务状态url:{file_id}:{response.url}')
|
||
applog.info(f'通知任务状态任务:{file_id}:{response.text}')
|
||
|
||
response_time_end = time.time()
|
||
process_time = response_time_end - response_time
|
||
db_service_word.process_time(file_id,'4',process_time,response_time,response_time_end)
|
||
except Exception as e:
|
||
#通知任务完成
|
||
response_time = time.time()
|
||
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 4})
|
||
response_time_end = time.time()
|
||
process_time = response_time_end - response_time
|
||
db_service_word.process_time(file_id,'4',process_time,response_time,response_time_end)
|
||
applog.info(f'通知任务状态url:{file_id}:{response.url}')
|
||
applog.info(f'通知任务状态任务:{file_id}:{response.text}')
|
||
applog.info(f"Response status code: {response.status_code}")
|
||
applog.info(f"{file_id}运行失败: {e}")
|
||
finally:
|
||
applog.info(f"任务 {file_id} 完成")
|
||
|
||
else:
|
||
applog.info("有任务运行中,需要等待.....")
|
||
|
||
def parse_route(fileItem: FileItem):
|
||
# 创建一个队列,保证每次只执行一个文件解析任务
|
||
job_queue.put({
|
||
'file_path' : fileItem.file_path,
|
||
'file_id' : fileItem.file_id,
|
||
# 'type': fileItem.type
|
||
})
|
||
applog.info(f"增加 {fileItem.file_id} 到队列.")
|
||
threading.Thread(target=run_job, args=()).start()
|
||
|
||
return {"success": True, "msg": "文件解析开始"}
|
||
|
||
app.post("/parser/start",
|
||
tags=["parser"],
|
||
summary="解析Pdf文件",
|
||
)(parse_route)
|
||
|
||
# 运行 FastAPI 应用
|
||
if __name__ == "__main__":
|
||
# 服务器启动服务
|
||
import uvicorn
|
||
|
||
uvicorn.run(app, host="0.0.0.0", port=config.PORT)
|
||
# 本地调试任务
|
||
# file_id = "201837"
|
||
# job_queue.put({
|
||
# 'file_path': '西部建设.docx',
|
||
# 'file_id': file_id,
|
||
# })
|
||
# db_service_word.delete_database(file_id)
|
||
# run_job()
|