328 lines
16 KiB
Python
328 lines
16 KiB
Python
import camelot
|
||
import re
|
||
#from multiprocessing import Pool
|
||
import os, time, random
|
||
import json
|
||
#from config import MILVUS_CLIENT,MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DB,MEASURE_COUNT
|
||
from datetime import datetime
|
||
# 读取PDF
|
||
import PyPDF2
|
||
# 分析PDF的layout,提取文本
|
||
from pdfminer.high_level import extract_pages
|
||
from pdfminer.layout import LTTextBoxHorizontal
|
||
import pdfplumber
|
||
import mysql.connector
|
||
#import utils
|
||
from pymilvus import MilvusClient
|
||
#import llm_service
|
||
#import db_service
|
||
#import pdf_title
|
||
import numpy as np
|
||
#from multiprocessing import Process
|
||
import logging
|
||
logger = logging.getLogger(__name__)
|
||
|
||
STR_PATTERN = '营业收入|净利润|变动比例|损益|现金流量净额|现金净流量|现金流|每股收益|总资产|资产总额|收益率|货币资金|应收账款|存货|固定资产|在建工程|商誉|短期借款|应付账款|合同负债|长期借款|营业成本|销售费用|管理费用|财务费用|研发费用|研发投入'
|
||
#负责表内一旦出现某个字符,整个表丢弃
|
||
PATTERN = '品牌类型|分门店|销售渠道|行业名称|产品名称|地区名称|子公司名称|业绩快报|调整情况说明|调整年初资产负债表|计入当期损益的政府补助|主要子公司|分部|母公司资产负债表|显示服务|渠道|商品类型|合同分类|会计政策变更|地区分类'
|
||
#unit_pattern = re.compile(r'单位[:|:]?(百万元|千万元|亿元|万元|千元|元)')
|
||
MUILT_PATTERN = '调整前'
|
||
file_path = r"combined_v61.pdf"
|
||
file_id = 1
|
||
pages = '1-2'
|
||
tables_range = {}
|
||
# def get_table_range_test(file_path, file_id, pages, tables_range):
|
||
|
||
# print('Run task %s (%s)...' % (f'解析表格{pages}', os.getpid()))
|
||
# #(f'file_path: {file_path},file_id:{file_id},pages:{pages},tables_range:{tables_range}')
|
||
# start = time.time()
|
||
# import tempfile
|
||
# temp_dir_path = "F:\\temp"
|
||
|
||
# # 检查并创建临时文件夹
|
||
# if not os.path.exists(temp_dir_path):
|
||
# os.makedirs(temp_dir_path)
|
||
|
||
# # 创建临时文件夹
|
||
# temp_dir = tempfile.mkdtemp(prefix="camelot_temp_", dir=temp_dir_path)
|
||
# # 设置全局临时文件夹路径
|
||
# os.environ["TMP"] = temp_dir
|
||
# os.environ["TEMP"] = temp_dir
|
||
# # conn = mysql.connector.connect(
|
||
# # host= MYSQL_HOST,
|
||
# # user= MYSQL_USER,
|
||
# # password= MYSQL_PASSWORD,
|
||
# # database= MYSQL_DB
|
||
# # )
|
||
|
||
# # 创建一个cursor对象来执行SQL语句
|
||
# #print(f'file_path的值是{file_path}')
|
||
# #cursor = conn.cursor()
|
||
# # try:
|
||
# # tables = camelot.read_pdf(file_path, pages=pages, strip_text=' ,\n', copy_text=['h'])
|
||
# # print('读取成功')
|
||
# # except Exception as e:
|
||
# # print(f'错误在{e}')
|
||
# #print(f'file_path的值是{file_path}')
|
||
# #file_path = "F:\\11_pdf\\688670-2023-nb-nb.pdf"
|
||
# os.environ["GHOSTSCRIPT_BINARY"] = "gswin64c"
|
||
# try:
|
||
# # 确保 file_path 是正确的,并且文件是可访问的
|
||
# if not os.path.exists(file_path):
|
||
# print(f'文件路径不正确或文件不存在: {file_path}')
|
||
# raise FileNotFoundError(f"文件不存在:{file_path}")
|
||
# else:
|
||
# pass#(f'file_path是存在的就是{file_path}')
|
||
|
||
# # 读取 PDF 文件
|
||
# #tables = camelot.read_pdf(file_path, pages=pages, strip_text=' ,\n')#, copy_text=['h']
|
||
# #tables = camelot.read_pdf(file_path, pages=pages, flavor='lattice', strip_text=' ,\n', temp_dir=temp_dir)
|
||
# tables = camelot.read_pdf(file_path, pages=pages, strip_text=' ,\n', copy_text=['h'], temp_dir=temp_dir)#line_scale=10,
|
||
|
||
# print('读取成功')
|
||
# print("检测到的表格数量:", tables.n)
|
||
# except FileNotFoundError as fe:
|
||
# print(fe)
|
||
# except Exception as e:
|
||
# print(f'处理PDF时出错: {e}')
|
||
# for t in tables:
|
||
|
||
# top = t._bbox[3]
|
||
# buttom = t._bbox[1]
|
||
# page_num = int(t.page)
|
||
# table_index = int(t.order)
|
||
# arr = np.array(t.data)
|
||
# #recent_value = None
|
||
# #这里开始对可能解析错误的值做判断:
|
||
# for i, row in enumerate(arr):
|
||
# if len(row) >= 4:
|
||
# # first_value = row[0]
|
||
# # if ("2023年度" in first_value or "2022年度" in first_value) and len(first_value) <= 12:
|
||
# # recent_value = first_value
|
||
# # if first_value == '' and recent_value:
|
||
# # row[0] = recent_value
|
||
# # 检查条件:第一列不为数字,第二列和第四列为空,第三列有三个小数点【三列的数字被识别到一起了】
|
||
# if (not row[0].replace('.', '', 1).isdigit()) and (row[1] == '') and (len(row[2].split('.')) == 4 and len(row[2].rsplit('.', 1)[-1]) == 2) and (row[3] == ''):
|
||
# split_values = row[2].split('.')
|
||
# # 确保可以正确拆分成三个数值
|
||
# if len(split_values) == 4:
|
||
# new_value1 = f"{split_values[0]}.{split_values[1][:2]}"
|
||
# new_value2 = f"{split_values[1][2:]}.{split_values[2][:2]}"
|
||
# new_value3 = f"{split_values[2][2:]}.{split_values[3]}"
|
||
# row[1] = new_value1
|
||
# row[2] = new_value2
|
||
# row[3] = new_value3
|
||
# #检查条件:第一列不为数字,第二列第四列为空,第三列两个小数点,第五列两个小数点【两列的数字被识别到一起了】
|
||
# if len(row) >= 5 and (not row[0].replace('.', '', 1).isdigit()) and (row[1] == '') and (len(row[2].split('.')) == 3) and (row[3] == '') and (len(row[4].split('.')) == 3) and len(row[2].rsplit('.', 1)[-1]) == 2 and len(row[4].rsplit('.', 1)[-1]) == 2:
|
||
# split_value_3 = row[2].split('.')
|
||
# split_value_5 = row[4].split('.')
|
||
|
||
# if len(split_value_3) == 3:
|
||
# new_value2 = f"{split_value_3[0]}.{split_value_3[1][:2]}"
|
||
# new_value3 = f"{split_value_3[1][2:]}.{split_value_3[2]}"
|
||
|
||
# if len(split_value_5) == 3:
|
||
# new_value4 = f"{split_value_5[0]}.{split_value_5[1][:2]}"
|
||
# new_value5 = f"{split_value_5[1][2:]}.{split_value_5[2]}"
|
||
|
||
# row[1] = new_value2
|
||
# row[2] = new_value3
|
||
# row[3] = new_value4
|
||
# row[4] = new_value5
|
||
# #检查条件:第一列不为数字,第二列为空,第三列有两个小数点,第四列为正常数字【两列的数字被识别到一起了】
|
||
# if len(row) >= 4 and (not row[0].replace('.', '', 1).isdigit()) and (row[1] == '') and (len(row[2].split('.')) == 3) and len(row[2].rsplit('.', 1)[-1]) == 2 and (row[3].replace('-', '', 1).replace('.', '', 1).isdigit()):
|
||
# split_values = row[2].split('.')
|
||
# if len(split_values) == 3:
|
||
# new_value2 = f"{split_values[0]}.{split_values[1][:2]}"
|
||
# new_value3 = f"{split_values[1][2:]}.{split_values[2]}"
|
||
# row[1] = new_value2
|
||
# row[2] = new_value3
|
||
# #检查条件:第一列不位数字,后面有一列中的值存在“%”并且"%"不是结尾,就进行拆分
|
||
# if not row[0].replace('.', '', 1).isdigit():
|
||
# for i in range(1, len(row) - 1):
|
||
# if row[i] == '' and '%' in row[i + 1] and len(row[i + 1].split('%')) == 2:
|
||
# split_values = row[i + 1].split('%')
|
||
# new_value1 = f"{split_values[0]}%"
|
||
# new_value2 = f"{split_values[1]}"
|
||
# row[i] = new_value1
|
||
# row[i + 1] = new_value2
|
||
# break
|
||
|
||
# #检查条件:当一个列表中同时出现了2022年12月31日和2023年1月1日时【并且都只出现1次】,在2022年12月31日后面增加“调整前”字段
|
||
# # if sum(1 for item in row if item.strip() == "2023年1月1日") == 1 and sum(1 for item in row if item.strip() == "2022年12月31日") == 1:
|
||
# # for i, item in enumerate(row):
|
||
# # stripped_item = item.strip() #去空格
|
||
# # if stripped_item == "2022年12月31日":
|
||
# # row[i] = stripped_item + '调整前'
|
||
|
||
# new_data = arr.tolist()#用于后面保存到数据库中
|
||
|
||
|
||
# rows, cols = arr.shape
|
||
# if rows == 1 and cols == 1:
|
||
# continue
|
||
# arr_str = ''.join([''.join(map(str, row)) for row in arr])
|
||
# #print(f'arr_str的值是 {arr_str}')
|
||
# #过滤掉不包含需抽取指标表格的文本
|
||
# matches = re.findall(STR_PATTERN, arr_str)
|
||
# pattern = re.findall(PATTERN,arr_str)
|
||
# muilt_pattern = re.findall(MUILT_PATTERN,arr_str)
|
||
# if len(matches) > 0 and len(pattern) == 0 and len(muilt_pattern)<5:
|
||
# if not tables_range.get(page_num):
|
||
# tables_range[page_num] = []
|
||
|
||
# tables_range[page_num].append({
|
||
# 'top' : top,
|
||
# 'buttom' : buttom,
|
||
# 'table_index' : table_index,
|
||
# 'page_num' : page_num,
|
||
# })
|
||
# print(f"tables_range的值是{tables_range}")
|
||
# #(f'file_id是{file_id}')
|
||
|
||
# # db_service.insert_pdf_parse_process({
|
||
# # 'file_id': file_id,
|
||
# # 'page_num' : page_num,
|
||
# # 'page_count' : 100,
|
||
# # 'type' : 'parse_table',
|
||
# # 'content':{
|
||
# # 'top' : top,
|
||
# # 'buttom' : buttom,
|
||
# # 'page_num' : page_num,
|
||
# # 'table_index' : table_index,
|
||
# # "type" : "table",
|
||
# # "data" : new_data,
|
||
# # 'sort_num' : page_num*1000 - top
|
||
# # }},conn,cursor)
|
||
|
||
# #get_text_content(file_path, file_id, tables_range, pages, conn, cursor)
|
||
|
||
# # cursor.close()
|
||
# # conn.close()
|
||
|
||
# end = time.time()
|
||
# print('Task %s runs %0.2f seconds.' % (f'解析表格{pages}', (end - start)))
|
||
def get_table_range_test(file_path, file_id, pages, tables_range):
|
||
|
||
logger.info('Run task %s (%s)...' % (f'解析表格{pages}', os.getpid()))
|
||
start = time.time()
|
||
|
||
# conn = mysql.connector.connect(
|
||
# host= MYSQL_HOST,
|
||
# user= MYSQL_USER,
|
||
# password= MYSQL_PASSWORD,
|
||
# database= MYSQL_DB
|
||
# )
|
||
|
||
# 创建一个cursor对象来执行SQL语句
|
||
#cursor = conn.cursor()
|
||
|
||
#redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, db=6)
|
||
|
||
tables = camelot.read_pdf(file_path, pages=pages, strip_text=' ,\n', copy_text=['h'])
|
||
for t in tables:
|
||
|
||
top = t._bbox[3]
|
||
buttom = t._bbox[1]
|
||
page_num = int(t.page)
|
||
table_index = int(t.order)
|
||
arr = np.array(t.data)
|
||
#这里开始对可能解析错误的值做判断:
|
||
for i, row in enumerate(arr):
|
||
if len(row) >= 4:
|
||
# 检查条件:第一列不为数字,第二列和第四列为空,第三列有三个小数点【三列的数字被识别到一起了】
|
||
if (not row[0].replace('.', '', 1).isdigit()) and (row[1] == '') and (len(row[2].split('.')) == 4 and len(row[2].rsplit('.', 1)[-1]) == 2) and (row[3] == ''):
|
||
split_values = row[2].split('.')
|
||
# 确保可以正确拆分成三个数值
|
||
if len(split_values) == 4:
|
||
new_value1 = f"{split_values[0]}.{split_values[1][:2]}"
|
||
new_value2 = f"{split_values[1][2:]}.{split_values[2][:2]}"
|
||
new_value3 = f"{split_values[2][2:]}.{split_values[3]}"
|
||
row[1] = new_value1
|
||
row[2] = new_value2
|
||
row[3] = new_value3
|
||
#检查条件:第一列不为数字,第二列第四列为空,第三列两个小数点,第五列两个小数点【两列的数字被识别到一起了】
|
||
if len(row) >= 5 and (not row[0].replace('.', '', 1).isdigit()) and (row[1] == '') and (len(row[2].split('.')) == 3) and (row[3] == '') and (len(row[4].split('.')) == 3) and len(row[2].rsplit('.', 1)[-1]) == 2 and len(row[4].rsplit('.', 1)[-1]) == 2:
|
||
split_value_3 = row[2].split('.')
|
||
split_value_5 = row[4].split('.')
|
||
|
||
if len(split_value_3) == 3:
|
||
new_value2 = f"{split_value_3[0]}.{split_value_3[1][:2]}"
|
||
new_value3 = f"{split_value_3[1][2:]}.{split_value_3[2]}"
|
||
|
||
if len(split_value_5) == 3:
|
||
new_value4 = f"{split_value_5[0]}.{split_value_5[1][:2]}"
|
||
new_value5 = f"{split_value_5[1][2:]}.{split_value_5[2]}"
|
||
|
||
row[1] = new_value2
|
||
row[2] = new_value3
|
||
row[3] = new_value4
|
||
row[4] = new_value5
|
||
#检查条件:第一列不为数字,第二列为空,第三列有两个小数点,第四列为正常数字【两列的数字被识别到一起了】
|
||
if len(row) >= 4 and (not row[0].replace('.', '', 1).isdigit()) and (row[1] == '') and (len(row[2].split('.')) == 3) and len(row[2].rsplit('.', 1)[-1]) == 2 and (row[3].replace('-', '', 1).replace('.', '', 1).isdigit()):
|
||
split_values = row[2].split('.')
|
||
if len(split_values) == 3:
|
||
new_value2 = f"{split_values[0]}.{split_values[1][:2]}"
|
||
new_value3 = f"{split_values[1][2:]}.{split_values[2]}"
|
||
row[1] = new_value2
|
||
row[2] = new_value3
|
||
#检查条件:第一列不位数字,后面有一列中的值存在“%”并且"%"不是结尾,就进行拆分
|
||
if not row[0].replace('.', '', 1).isdigit():
|
||
for i in range(1, len(row) - 1):
|
||
if row[i] == '' and '%' in row[i + 1] and len(row[i + 1].split('%')) == 2:
|
||
split_values = row[i + 1].split('%')
|
||
new_value1 = f"{split_values[0]}%"
|
||
new_value2 = f"{split_values[1]}"
|
||
row[i] = new_value1
|
||
row[i + 1] = new_value2
|
||
break
|
||
|
||
new_data = arr.tolist()#用于后面保存到数据库中
|
||
rows, cols = arr.shape
|
||
if rows == 1 and cols == 1:
|
||
continue
|
||
arr_str = ''.join([''.join(map(str, row)) for row in arr])
|
||
|
||
#过滤掉不包含需抽取指标表格的文本
|
||
matches = re.findall(STR_PATTERN, arr_str)
|
||
pattern = re.findall(PATTERN,arr_str)
|
||
muilt_pattern = re.findall(MUILT_PATTERN,arr_str)
|
||
if len(matches) > 0 and len(pattern) == 0 and len(muilt_pattern)<5:
|
||
if not tables_range.get(page_num):
|
||
tables_range[page_num] = []
|
||
|
||
tables_range[page_num].append({
|
||
'top' : top,
|
||
'buttom' : buttom,
|
||
'table_index' : table_index,
|
||
'page_num' : page_num,
|
||
})
|
||
logger.debug(f"tables_range的值是{tables_range}")
|
||
|
||
# db_service.insert_pdf_parse_process({
|
||
# 'file_id': file_id,
|
||
# 'page_num' : page_num,
|
||
# 'page_count' : 100,
|
||
# 'type' : 'parse_table',
|
||
# 'content':{
|
||
# 'top' : top,
|
||
# 'buttom' : buttom,
|
||
# 'page_num' : page_num,
|
||
# 'table_index' : table_index,
|
||
# "type" : "table",
|
||
# "data" : new_data,
|
||
# 'sort_num' : page_num*1000 - top
|
||
# }},conn,cursor)
|
||
|
||
# get_text_content(file_path, file_id, tables_range, pages, conn, cursor, redis_client)
|
||
|
||
# cursor.close()
|
||
# conn.close()
|
||
# redis_client.close()
|
||
|
||
end = time.time()
|
||
logger.info('Task %s runs %0.2f seconds.' % (f'解析表格{pages}', (end - start)))
|
||
|
||
|
||
get_table_range_test(file_path, file_id, pages, tables_range)
|