255 lines
11 KiB
Python
255 lines
11 KiB
Python
|
from fastapi import FastAPI, HTTPException
|
|||
|
from pydantic import BaseModel
|
|||
|
import os
|
|||
|
import utils
|
|||
|
import queue
|
|||
|
from multiprocessing import Process,Manager
|
|||
|
import pdf_title
|
|||
|
import main
|
|||
|
import time
|
|||
|
import threading
|
|||
|
import config
|
|||
|
import requests
|
|||
|
import db_service
|
|||
|
import threading
|
|||
|
from Mil_unit import create_partition_by_hour
|
|||
|
from datetime import datetime, timedelta
|
|||
|
from log_config import logger
|
|||
|
|
|||
|
app = FastAPI()
|
|||
|
cpu_count = 4
|
|||
|
job_queue = queue.Queue()
|
|||
|
|
|||
|
# 定义请求体模型
|
|||
|
class FileItem(BaseModel):
|
|||
|
file_path: str
|
|||
|
file_id: str
|
|||
|
|
|||
|
def run_job():
|
|||
|
#判断是否有任务在执行
|
|||
|
if_run = True
|
|||
|
|
|||
|
if job_queue.empty():
|
|||
|
logger.info(f"job_queue为空: {file_path}")
|
|||
|
if_run = False
|
|||
|
|
|||
|
if if_run:
|
|||
|
job_config = job_queue.get()
|
|||
|
page_list = []
|
|||
|
file_path = job_config['file_path']
|
|||
|
file_id = job_config['file_id']
|
|||
|
job_status = True
|
|||
|
continue_execution = True
|
|||
|
try:
|
|||
|
#下载pdf
|
|||
|
start_time = time.time()
|
|||
|
logger.info(f"开始启动文件解析任务: {file_path}")
|
|||
|
if file_path.startswith('http'):
|
|||
|
file_path = utils.save_pdf_from_url(file_path, config.FILE_PATH)
|
|||
|
try:
|
|||
|
file_info = pdf_title.create_text_outline(file_path,file_id)
|
|||
|
except Exception as e:
|
|||
|
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 7})
|
|||
|
logger.info(f'通知任务状态url:{file_id}:{response.url}')
|
|||
|
logger.info(f'通知任务状态任务:{file_id}:{response.text}')
|
|||
|
logger.info(f"{file_id}运行失败: {e}")
|
|||
|
continue_execution = False
|
|||
|
|
|||
|
#
|
|||
|
db_service.delete_MYSQL_DB_APP(file_id)
|
|||
|
db_service.delete_MYSQL_DB(file_id)
|
|||
|
|
|||
|
if continue_execution:
|
|||
|
parent_table_pages = file_info['parent_table_pages']
|
|||
|
page_num = file_info['page_count']
|
|||
|
if page_num < cpu_count:
|
|||
|
p_count = page_num
|
|||
|
else :
|
|||
|
p_count = cpu_count
|
|||
|
|
|||
|
for i in range(p_count):
|
|||
|
page_list.append({
|
|||
|
'type': 'table',
|
|||
|
'page_num': file_info['split_parts']['table_split_parts'][i],
|
|||
|
# 'page_num': page_nums[i],
|
|||
|
'path': file_path,
|
|||
|
'file_id': file_id,
|
|||
|
'parent_table_pages': parent_table_pages,
|
|||
|
'page_count': file_info['page_count'],
|
|||
|
'tables_range': {},
|
|||
|
})
|
|||
|
|
|||
|
|
|||
|
# 通知开始解析
|
|||
|
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 5})
|
|||
|
logger.info(f'通知pdf开始解析url:{file_id}:{response.url}')
|
|||
|
logger.info(f'通知pdf开始解析状态:{file_id}:{response.text}')
|
|||
|
parser_start_time = time.time()
|
|||
|
processes = []
|
|||
|
time_dispatch_job = time.time()
|
|||
|
|
|||
|
for job_info in page_list:
|
|||
|
p = Process(target=main.dispatch_job, args=(job_info,))
|
|||
|
processes.append(p)
|
|||
|
p.start()
|
|||
|
|
|||
|
logger.info(f'等待所有子任务完成,任务ID:{file_id}')
|
|||
|
for p in processes:
|
|||
|
p.join()
|
|||
|
logger.info(f'pdf解析任务完成任务完成,任务ID:{file_id}')
|
|||
|
time_dispatch_job_end = time.time()
|
|||
|
process_time = time_dispatch_job_end - time_dispatch_job
|
|||
|
db_service.process_time(file_id,'1',process_time,time_dispatch_job,time_dispatch_job_end)
|
|||
|
parser_end_time = time.time()
|
|||
|
logger.info(f"解析任务 {file_id} 完成,耗时{(parser_end_time - parser_start_time):.2f} 秒。")
|
|||
|
#这里做一步判断,看看是否还要继续。
|
|||
|
if db_service.file_type_check(file_id):
|
|||
|
logger.info(f"文本较真表格生成已结束")
|
|||
|
else:
|
|||
|
# 通知抽取指标
|
|||
|
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 6})
|
|||
|
logger.info(f'通知开始抽取指标url:{file_id}:{response.url}')
|
|||
|
logger.info(f'通知开始抽取指标状态:{file_id}:{response.text}')
|
|||
|
|
|||
|
parser_start_time = time.time()
|
|||
|
logger.info(f'开始表格指标抽取,任务ID:{file_id}')
|
|||
|
time_start = time.time()
|
|||
|
|
|||
|
|
|||
|
# 获取当前时间
|
|||
|
|
|||
|
partition_name = f"partition_{file_id}"
|
|||
|
# 判断是否创建新的分区
|
|||
|
create_partition_by_hour(file_id)
|
|||
|
time.sleep(10)
|
|||
|
# 判断是否为3季报
|
|||
|
|
|||
|
if db_service.file_type_check_v2(file_id) == 3:
|
|||
|
main.start_table_measure_job(file_id,partition_name)
|
|||
|
time_start_end = time.time()
|
|||
|
process_time = time_start_end - time_start
|
|||
|
db_service.process_time(file_id,'2',process_time,time_start,time_start_end)
|
|||
|
logger.info(f'表格指标抽取完成,任务ID:{file_id}')
|
|||
|
parser_end_time = time.time()
|
|||
|
logger.info(f"表格指标抽取 {file_id} 完成,耗时{(parser_end_time - parser_start_time):.2f} 秒。")
|
|||
|
|
|||
|
logger.info(f'启动这个指标归一化任务ID-修改测试:{file_id}')
|
|||
|
time_update = time.time()
|
|||
|
main.update_measure_data(file_id,file_path,parent_table_pages,partition_name)
|
|||
|
|
|||
|
logger.info(f'归一化完成任务ID:{file_id}')
|
|||
|
end_time = time.time()
|
|||
|
logger.info(f"任务 {file_id} 完成,耗时{(end_time - start_time):.2f} 秒。")
|
|||
|
time_update_end = time.time()
|
|||
|
process_time = time_update_end - time_update
|
|||
|
db_service.process_time(file_id,'3',process_time,time_update,time_update_end)
|
|||
|
# 不是三季报就直接按照年报和半年报走
|
|||
|
else:
|
|||
|
main.start_table_measure_job(file_id,partition_name)
|
|||
|
time_start_end = time.time()
|
|||
|
process_time = time_start_end - time_start
|
|||
|
db_service.process_time(file_id,'2',process_time,time_start,time_start_end)
|
|||
|
logger.info(f'表格指标抽取完成,任务ID:{file_id}')
|
|||
|
parser_end_time = time.time()
|
|||
|
logger.info(f"表格指标抽取 {file_id} 完成,耗时{(parser_end_time - parser_start_time):.2f} 秒。")
|
|||
|
|
|||
|
logger.info(f'启动这个指标归一化任务ID-修改测试:{file_id}')
|
|||
|
time_update = time.time()
|
|||
|
main.update_measure_data(file_id,file_path,parent_table_pages,partition_name)
|
|||
|
|
|||
|
logger.info(f'归一化完成任务ID:{file_id}')
|
|||
|
end_time = time.time()
|
|||
|
logger.info(f"任务 {file_id} 完成,耗时{(end_time - start_time):.2f} 秒。")
|
|||
|
time_update_end = time.time()
|
|||
|
process_time = time_update_end - time_update
|
|||
|
db_service.process_time(file_id,'3',process_time,time_update,time_update_end)
|
|||
|
#通知任务完成
|
|||
|
response_time = time.time()
|
|||
|
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 1})
|
|||
|
logger.info(f'通知任务状态url:{file_id}:{response.url}')
|
|||
|
logger.info(f'通知任务状态任务:{file_id}:{response.text}')
|
|||
|
response_time_end = time.time()
|
|||
|
process_time = response_time_end - response_time
|
|||
|
db_service.process_time(file_id,'4',process_time,response_time,response_time_end)
|
|||
|
except Exception as e:
|
|||
|
#通知任务完成
|
|||
|
response_time = time.time()
|
|||
|
if "integer division or modulo by zero" in str(e):
|
|||
|
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id, 'status': 4})
|
|||
|
else:
|
|||
|
response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id, 'status': 4})
|
|||
|
#response = requests.get(config.NOTIFY_ADDR, params={'fileId': file_id,'status': 4})
|
|||
|
response_time_end = time.time()
|
|||
|
process_time = response_time_end - response_time
|
|||
|
db_service.process_time(file_id,'4',process_time,response_time,response_time_end)
|
|||
|
logger.info(f'通知任务状态url:{file_id}:{response.url}')
|
|||
|
logger.info(f'通知任务状态任务:{file_id}:{response.text}')
|
|||
|
logger.info(f"Response status code: {response.status_code}")
|
|||
|
logger.info(f"{file_id}运行失败: {e}")
|
|||
|
finally:
|
|||
|
logger.info(f"任务 {file_id} 完成,运行状态:{job_status}")
|
|||
|
|
|||
|
#pdf_company_0824.name_code_fix(file_id,file_path)
|
|||
|
#print('公司名与编码填充完毕')
|
|||
|
else:
|
|||
|
logger.info(f"有任务运行中,需要等待.....")
|
|||
|
|
|||
|
def parse_pdf_route(fileItem: FileItem):
|
|||
|
|
|||
|
# 创建一个队列,保证每次只执行一个文件解析任务
|
|||
|
job_queue.put({
|
|||
|
'file_path' : fileItem.file_path,
|
|||
|
'file_id' : fileItem.file_id
|
|||
|
})
|
|||
|
logger.info(f"增加 {fileItem.file_id} 到队列.")
|
|||
|
|
|||
|
threading.Thread(target=run_job, args=()).start()
|
|||
|
|
|||
|
return {"success": True, "msg": "文件解析开始"}
|
|||
|
|
|||
|
app.post("/parser/start",
|
|||
|
tags=["parser"],
|
|||
|
summary="解析Pdf文件",
|
|||
|
)(parse_pdf_route)
|
|||
|
|
|||
|
def get_local_ip():
|
|||
|
try:
|
|||
|
# 创建一个 UDP 套接字
|
|||
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|||
|
# 连接到一个外部地址(这里使用 Google 的公共 DNS 服务器)
|
|||
|
s.connect(("8.8.8.8", 80))
|
|||
|
# 获取本地套接字的 IP 地址
|
|||
|
local_ip = s.getsockname()[0]
|
|||
|
except Exception as e:
|
|||
|
logger.info(f"获取内网 IP 失败: {e}")
|
|||
|
local_ip = "127.0.0.1" # 如果失败,返回本地回环地址
|
|||
|
finally:
|
|||
|
s.close() # 关闭套接字
|
|||
|
return local_ip
|
|||
|
|
|||
|
# 运行 FastAPI 应用
|
|||
|
if __name__ == "__main__":
|
|||
|
# 服务器启动服务
|
|||
|
import uvicorn
|
|||
|
uvicorn.run(app, host="0.0.0.0", port=config.PORT)
|
|||
|
try:
|
|||
|
# 获取内网IP
|
|||
|
ip = get_local_ip()
|
|||
|
logger.info(f"内网IP地址: {ip}")
|
|||
|
# 假设 config.NOTIFY_ADDR 是一个字符串,我们可以使用 rpartition 方法来替换最后一个 / 后面的值
|
|||
|
url = config.NOTIFY_ADDR.rpartition('/')[0] + '/restart?address'
|
|||
|
address = f"{ip}:{config.PORT}"
|
|||
|
logger.info(address)
|
|||
|
response = requests.get(url, params={'address':address})
|
|||
|
logger.info(f"Response status code: {response.status_code}")
|
|||
|
except KeyboardInterrupt:
|
|||
|
logger.info("Shutdown server")
|
|||
|
|
|||
|
# 本地调试任务
|
|||
|
# job_queue.put({
|
|||
|
# 'file_path' : '1.pdf',
|
|||
|
# 'file_id' : '2222222'
|
|||
|
# })
|
|||
|
|
|||
|
# run_job()
|