pdf_code/zzb_data_word/app_word.py

226 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI
from pydantic import BaseModel
import os
import utils
import queue
from multiprocessing import Process
import word_title
import time
import config
import requests
import threading
from parse_word import parse_docx, split_text_table
import json
import db_service_word
import main_word
from zzb_logger import applog
app = FastAPI()
cpu_count = os.cpu_count()
job_queue = queue.Queue()
# 定义请求体模型
class FileItem(BaseModel):
file_path: str
file_id: str
def split_list(lst, n):
k, m = divmod(len(lst), n)
return [lst[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n)]
def run_job():
#判断是否有任务在执行
if_run = True
if job_queue.empty():
applog.info(f"job_queue为空:")
if_run = False
if if_run:
job_config = job_queue.get()
file_path = job_config['file_path']
file_id = job_config['file_id']
continue_execution = True
try:
start_time = time.time()
applog.info(f"开始启动文件解析任务: {file_path}")
if file_path.startswith('http'):
file_path = utils.save_pdf_from_url(file_path, config.FILE_PATH)
try:
time_dispatch_job = time.time()
# 通知开始解析 暂时不通知
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 5})
applog.info(f'通知pdf开始解析url:{file_id}:{response.url}')
applog.info(f'通知pdf开始解析状态:{file_id}:{response.text}')
parsed_content, catalog_content = parse_docx(file_path)
json_parsed_content = json.loads(parsed_content)
json_catalog_content = json.loads(catalog_content)
db_service_word.word_title_insert_mysql(file_id, json_catalog_content)
parent_table_pages = word_title.get_parent_table_pages(json_catalog_content,file_id)
text_elements_json, table_elements_json = split_text_table(json_parsed_content)
#
processes = []
text_list = split_list(json.loads(text_elements_json), cpu_count)
applog.info(f'text任务ID:{file_id}')
for job_info in text_list:
p = Process(target=main_word.process_text_content, args=(file_id, job_info,json.loads(table_elements_json),json.loads(text_elements_json)))
processes.append(p)
p.start()
applog.info(f'等待所有子任务完成任务ID:{file_id}')
for p in processes:
p.join()
applog.info(f'word表格中 text解析完成任务ID:{file_id}',)
processes = []
table_list = split_list(json.loads(table_elements_json), cpu_count)
applog.info(f'开始解析word表表格中的table任务ID:{file_id}')
for job_info in table_list:
p = Process(target=main_word.process_table, args=(file_id, job_info,))
processes.append(p)
p.start()
applog.info(f'等待所有子任务完成任务ID:{file_id}' )
for p in processes:
p.join()
# main_word.process_table(file_id, json.loads(table_elements_json))
applog.info(f'word表格中 table解析完成任务ID:{file_id}')
time_dispatch_job_end = time.time()
process_time = time_dispatch_job_end - time_dispatch_job
db_service_word.process_time(file_id, '1', process_time, time_dispatch_job, time_dispatch_job_end)
parser_end_time = time.time()
applog.info(f"解析任务 {file_id} 完成,耗时{(parser_end_time - time_dispatch_job):.2f} 秒。")
except Exception as e:
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 7})
applog.info(f'通知任务状态url:{file_id}:{response.url}')
applog.info(f'通知任务状态任务:{file_id}:{response.text}')
applog.info(f"{file_id}运行失败: {e}")
continue_execution = False
if continue_execution :
#这里做一步判断,看看是否还要继续。
if db_service_word.file_type_check(file_id):
applog.info("文本较真表格生成已结束")
else:
# 通知抽取指标---------------------------------
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 6})
applog.info(f'通知开始抽取指标url:{file_id}:{response.url}')
applog.info(f'通知开始抽取指标状态:{file_id}:{response.text}')
parser_start_time = time.time()
applog.info(f'开始表格指标抽取任务ID:{file_id}')
time_start = time.time()
if db_service_word.file_type_check_v2(file_id) == 3 : #判断是否为3季报
main_word.start_table_measure_job(file_id)
#time_start_end = time.time()
#process_time = time_start_end - time_start
#db_service.process_time(file_id,'2',process_time)
time_start_end = time.time()
process_time = time_start_end - time_start
db_service_word.process_time(file_id,'2',process_time,time_start,time_start_end)
applog.info(f'表格指标抽取完成任务ID:{file_id}')
parser_end_time = time.time()
applog.info(f"表格指标抽取 {file_id} 完成,耗时{(parser_end_time - parser_start_time):.2f} 秒。")
applog.info(f'启动这个指标归一化任务ID-修改测试:{file_id}')
time_update = time.time()
main_word.update_measure_data(file_id,file_path,parent_table_pages)
#time_update_end = time.time()
#process_time = time_update_end - time_update
#db_service.process_time(file_id,'3',process_time)
applog.info(f'归一化完成任务ID:{file_id}')
end_time = time.time()
applog.info(f"任务 {file_id} 完成,耗时{(end_time - start_time):.2f} 秒。")
time_update_end = time.time()
process_time = time_update_end - time_update
db_service_word.process_time(file_id,'3',process_time,time_update,time_update_end)
else:#不是三季报就直接按照年报和半年报走
main_word.start_table_measure_job(file_id)
#time_start_end = time.time()
#process_time = time_start_end - time_start
#db_service.process_time(file_id,'2',process_time)
time_start_end = time.time()
process_time = time_start_end - time_start
db_service_word.process_time(file_id,'2',process_time,time_start,time_start_end)
applog.info(f'表格指标抽取完成任务ID:{file_id}' )
parser_end_time = time.time()
applog.info(f"表格指标抽取 {file_id} 完成,耗时{(parser_end_time - parser_start_time):.2f} 秒。")
applog.info(f'启动这个指标归一化任务ID-修改测试:{file_id}' )
time_update = time.time()
main_word.update_measure_data(file_id,file_path,parent_table_pages)
#time_update_end = time.time()
#process_time = time_update_end - time_update
#db_service.process_time(file_id,'3',process_time)
applog.info(f'归一化完成任务ID:{file_id}')
end_time = time.time()
applog.info(f"任务 {file_id} 完成,耗时{(end_time - start_time):.2f} 秒。")
time_update_end = time.time()
process_time = time_update_end - time_update
db_service_word.process_time(file_id,'3',process_time,time_update,time_update_end)
#通知任务完成
response_time = time.time()
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 1})
applog.info(f'通知任务状态url:{file_id}:{response.url}')
applog.info(f'通知任务状态任务:{file_id}:{response.text}')
response_time_end = time.time()
process_time = response_time_end - response_time
db_service_word.process_time(file_id,'4',process_time,response_time,response_time_end)
except Exception as e:
#通知任务完成
response_time = time.time()
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 4})
response_time_end = time.time()
process_time = response_time_end - response_time
db_service_word.process_time(file_id,'4',process_time,response_time,response_time_end)
applog.info(f'通知任务状态url:{file_id}:{response.url}')
applog.info(f'通知任务状态任务:{file_id}:{response.text}')
applog.info(f"Response status code: {response.status_code}")
applog.info(f"{file_id}运行失败: {e}")
finally:
applog.info(f"任务 {file_id} 完成")
else:
applog.info("有任务运行中,需要等待.....")
def parse_route(fileItem: FileItem):
# 创建一个队列,保证每次只执行一个文件解析任务
job_queue.put({
'file_path' : fileItem.file_path,
'file_id' : fileItem.file_id,
# 'type': fileItem.type
})
applog.info(f"增加 {fileItem.file_id} 到队列.")
threading.Thread(target=run_job, args=()).start()
return {"success": True, "msg": "文件解析开始"}
app.post("/parser/start",
tags=["parser"],
summary="解析Pdf文件",
)(parse_route)
# 运行 FastAPI 应用
if __name__ == "__main__":
# 服务器启动服务
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=config.PORT)
# 本地调试任务
# file_id = "201837"
# job_queue.put({
# 'file_path': '西部建设.docx',
# 'file_id': file_id,
# })
# db_service_word.delete_database(file_id)
# run_job()