pdf_code/zzb_data_prod/app.py

255 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import os
import utils
import queue
from multiprocessing import Process,Manager
import pdf_title
import main
import time
import threading
import config
import requests
import db_service
import threading
from Mil_unit import create_partition_by_hour
from datetime import datetime, timedelta
from log_config import logger
app = FastAPI()
cpu_count = 4
job_queue = queue.Queue()
# 定义请求体模型
class FileItem(BaseModel):
file_path: str
file_id: str
def run_job():
#判断是否有任务在执行
if_run = True
if job_queue.empty():
logger.info(f"job_queue为空: {file_path}")
if_run = False
if if_run:
job_config = job_queue.get()
page_list = []
file_path = job_config['file_path']
file_id = job_config['file_id']
job_status = True
continue_execution = True
try:
#下载pdf
start_time = time.time()
logger.info(f"开始启动文件解析任务: {file_path}")
if file_path.startswith('http'):
file_path = utils.save_pdf_from_url(file_path, config.FILE_PATH)
try:
file_info = pdf_title.create_text_outline(file_path,file_id)
except Exception as e:
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 7})
logger.info(f'通知任务状态url:{file_id}:{response.url}')
logger.info(f'通知任务状态任务:{file_id}:{response.text}')
logger.info(f"{file_id}运行失败: {e}")
continue_execution = False
#
db_service.delete_MYSQL_DB_APP(file_id)
db_service.delete_MYSQL_DB(file_id)
if continue_execution:
parent_table_pages = file_info['parent_table_pages']
page_num = file_info['page_count']
if page_num < cpu_count:
p_count = page_num
else :
p_count = cpu_count
for i in range(p_count):
page_list.append({
'type': 'table',
'page_num': file_info['split_parts']['table_split_parts'][i],
# 'page_num': page_nums[i],
'path': file_path,
'file_id': file_id,
'parent_table_pages': parent_table_pages,
'page_count': file_info['page_count'],
'tables_range': {},
})
# 通知开始解析
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 5})
logger.info(f'通知pdf开始解析url:{file_id}:{response.url}')
logger.info(f'通知pdf开始解析状态:{file_id}:{response.text}')
parser_start_time = time.time()
processes = []
time_dispatch_job = time.time()
for job_info in page_list:
p = Process(target=main.dispatch_job, args=(job_info,))
processes.append(p)
p.start()
logger.info(f'等待所有子任务完成任务ID:{file_id}')
for p in processes:
p.join()
logger.info(f'pdf解析任务完成任务完成任务ID:{file_id}')
time_dispatch_job_end = time.time()
process_time = time_dispatch_job_end - time_dispatch_job
db_service.process_time(file_id,'1',process_time,time_dispatch_job,time_dispatch_job_end)
parser_end_time = time.time()
logger.info(f"解析任务 {file_id} 完成,耗时{(parser_end_time - parser_start_time):.2f} 秒。")
#这里做一步判断,看看是否还要继续。
if db_service.file_type_check(file_id):
logger.info(f"文本较真表格生成已结束")
else:
# 通知抽取指标
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 6})
logger.info(f'通知开始抽取指标url:{file_id}:{response.url}')
logger.info(f'通知开始抽取指标状态:{file_id}:{response.text}')
parser_start_time = time.time()
logger.info(f'开始表格指标抽取任务ID:{file_id}')
time_start = time.time()
# 获取当前时间
partition_name = f"partition_{file_id}"
# 判断是否创建新的分区
create_partition_by_hour(file_id)
time.sleep(10)
# 判断是否为3季报
if db_service.file_type_check_v2(file_id) == 3:
main.start_table_measure_job(file_id,partition_name)
time_start_end = time.time()
process_time = time_start_end - time_start
db_service.process_time(file_id,'2',process_time,time_start,time_start_end)
logger.info(f'表格指标抽取完成任务ID:{file_id}')
parser_end_time = time.time()
logger.info(f"表格指标抽取 {file_id} 完成,耗时{(parser_end_time - parser_start_time):.2f} 秒。")
logger.info(f'启动这个指标归一化任务ID-修改测试:{file_id}')
time_update = time.time()
main.update_measure_data(file_id,file_path,parent_table_pages,partition_name)
logger.info(f'归一化完成任务ID:{file_id}')
end_time = time.time()
logger.info(f"任务 {file_id} 完成,耗时{(end_time - start_time):.2f} 秒。")
time_update_end = time.time()
process_time = time_update_end - time_update
db_service.process_time(file_id,'3',process_time,time_update,time_update_end)
# 不是三季报就直接按照年报和半年报走
else:
main.start_table_measure_job(file_id,partition_name)
time_start_end = time.time()
process_time = time_start_end - time_start
db_service.process_time(file_id,'2',process_time,time_start,time_start_end)
logger.info(f'表格指标抽取完成任务ID:{file_id}')
parser_end_time = time.time()
logger.info(f"表格指标抽取 {file_id} 完成,耗时{(parser_end_time - parser_start_time):.2f} 秒。")
logger.info(f'启动这个指标归一化任务ID-修改测试:{file_id}')
time_update = time.time()
main.update_measure_data(file_id,file_path,parent_table_pages,partition_name)
logger.info(f'归一化完成任务ID:{file_id}')
end_time = time.time()
logger.info(f"任务 {file_id} 完成,耗时{(end_time - start_time):.2f} 秒。")
time_update_end = time.time()
process_time = time_update_end - time_update
db_service.process_time(file_id,'3',process_time,time_update,time_update_end)
#通知任务完成
response_time = time.time()
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 1})
logger.info(f'通知任务状态url:{file_id}:{response.url}')
logger.info(f'通知任务状态任务:{file_id}:{response.text}')
response_time_end = time.time()
process_time = response_time_end - response_time
db_service.process_time(file_id,'4',process_time,response_time,response_time_end)
except Exception as e:
#通知任务完成
response_time = time.time()
if "integer division or modulo by zero" in str(e):
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id, 'status': 4})
else:
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id, 'status': 4})
#response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 4})
response_time_end = time.time()
process_time = response_time_end - response_time
db_service.process_time(file_id,'4',process_time,response_time,response_time_end)
logger.info(f'通知任务状态url:{file_id}:{response.url}')
logger.info(f'通知任务状态任务:{file_id}:{response.text}')
logger.info(f"Response status code: {response.status_code}")
logger.info(f"{file_id}运行失败: {e}")
finally:
logger.info(f"任务 {file_id} 完成,运行状态:{job_status}")
#pdf_company_0824.name_code_fix(file_id,file_path)
#print('公司名与编码填充完毕')
else:
logger.info(f"有任务运行中,需要等待.....")
def parse_pdf_route(fileItem: FileItem):
# 创建一个队列,保证每次只执行一个文件解析任务
job_queue.put({
'file_path' : fileItem.file_path,
'file_id' : fileItem.file_id
})
logger.info(f"增加 {fileItem.file_id} 到队列.")
threading.Thread(target=run_job, args=()).start()
return {"success": True, "msg": "文件解析开始"}
app.post("/parser/start",
tags=["parser"],
summary="解析Pdf文件",
)(parse_pdf_route)
def get_local_ip():
try:
# 创建一个 UDP 套接字
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 连接到一个外部地址(这里使用 Google 的公共 DNS 服务器)
s.connect(("8.8.8.8", 80))
# 获取本地套接字的 IP 地址
local_ip = s.getsockname()[0]
except Exception as e:
logger.info(f"获取内网 IP 失败: {e}")
local_ip = "127.0.0.1" # 如果失败,返回本地回环地址
finally:
s.close() # 关闭套接字
return local_ip
# 运行 FastAPI 应用
if __name__ == "__main__":
# 服务器启动服务
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=config.PORT)
try:
# 获取内网IP
ip = get_local_ip()
logger.info(f"内网IP地址: {ip}")
# 假设 config.NOTIFY_ADDR 是一个字符串,我们可以使用 rpartition 方法来替换最后一个 / 后面的值
url = config.NOTIFY_ADDR.rpartition('/')[0] + '/restart?address'
address = f"{ip}:{config.PORT}"
logger.info(address)
response = requests.get(url, params={'address':address})
logger.info(f"Response status code: {response.status_code}")
except KeyboardInterrupt:
logger.info("Shutdown server")
# 本地调试任务
# job_queue.put({
# 'file_path' : '1.pdf',
# 'file_id' : '2222222'
# })
# run_job()