pdf_code/zzb_data/test.py

220 lines
8.7 KiB
Python
Raw Normal View History

2024-10-31 15:35:27 +08:00
import camelot
import os
import tempfile
import matplotlib.pyplot as plt
import numpy as np
#from camelot.plotting import plot_contour
#单独针对三季报的资产负债表识别合并问题
import re
def process_array(arr, years=['2022', '2023', '2024'], keyword='项目'):
# 确保 row 有足够的列来存储分割后的数据
def ensure_columns(row, num_columns):
while len(row) < num_columns:
row.append('')
def is_valid_header(header, years, keyword):
header_text = header.lower() # 转小写以提高匹配的鲁棒性
return any(year in header_text for year in years) and keyword in header_text
# 对字符串进行清理
def clean_text(text):
# 去除“年”和“月”相邻的空格
text = re.sub(r'\s*(年|月)\s*', r'\1', text)
# 去除“日”左侧相邻的空格
text = re.sub(r'\s*日', '', text)
return text
# 将 numpy 数组转换为列表
arr = arr.tolist() if isinstance(arr, np.ndarray) else arr
if len(arr[0]) == 1 and is_valid_header(arr[0][0], years, keyword):
remaining_value = arr[0][0]
# 清理字符串
remaining_value = clean_text(remaining_value)
parts = remaining_value.split()
ensure_columns(arr[0], len(parts))
for i in range(len(parts)):
arr[0][i] = parts[i]
header_columns = len(arr[0])
for i in range(1, len(arr)):
if len(arr[i]) == 1:
remaining_value = arr[i][0]
parts = remaining_value.split()
if len(parts) > header_columns:
parts = parts[:header_columns]
ensure_columns(arr[i], header_columns)
for j in range(len(parts)):
arr[i][j] = parts[j]
# 如果分割出的值不足,填充空值
if len(parts) < header_columns:
for j in range(len(parts), header_columns):
arr[i][j] = ''
return arr
def process_array_with_annual_comparison(arr, keywords=['本报告期', '年初至报告期末', '上年同期']):
def contains_all_keywords(header, keywords):
return all(keyword in header for keyword in keywords)
def split_and_replace_occurrences(header, target, replacement):
# 找到所有 target 出现的位置
indices = [i for i, x in enumerate(header) if x == target]
if len(indices) > 1:
split_index = len(indices) // 2
for i in range(split_index):
header[indices[i]] = replacement
return header
# 将 numpy 数组转换为列表
arr = arr.tolist() if isinstance(arr, np.ndarray) else arr
if len(arr) > 0 and len(arr[0]) > 0:
first_row = arr[0]
if contains_all_keywords(first_row, keywords):
# 将 "上年同期" 拆分并替换
first_row = split_and_replace_occurrences(first_row, '上年同期', '三季报中无需识别的上年同期')
arr[0] = first_row
return arr
def process_array_with_grants(arr, keywords=['本报告期', '年初至报告期'], target='计入当期损益的政府补助', replacement='非经常性损益'):
# 检查第一行是否包含所有关键词
def contains_all_keywords(header, keywords):
#return all(keyword in header for keyword in keywords)
return all(any(keyword in str(cell) for cell in header) for keyword in keywords)
# 检查第一列中是否存在目标文本
def contains_target_in_first_column(arr, target):
return any(target in str(item[0]) for item in arr)
# 替换第一列中的特定值
def replace_in_first_column(arr, target, replacement):
for i in range(len(arr)):
if arr[i][0] == target:
arr[i][0] = replacement
return arr
# 将 numpy 数组转换为列表
arr = arr.tolist() if isinstance(arr, np.ndarray) else arr
if len(arr) > 0 and len(arr[0]) > 0:
first_row = arr[0]
# 检查第一行和第一列的条件
if contains_all_keywords(first_row, keywords) and contains_target_in_first_column(arr, target):
# 替换第一列中的 "合计"
arr = replace_in_first_column(arr, '合计', replacement)
return arr
temp_dir_path = "F:\\temp"
# 检查并创建临时文件夹
if not os.path.exists(temp_dir_path):
os.makedirs(temp_dir_path)
#file_path = "F:\\11_pdf\\603636-2024-0630-0803.pdf"#"F:\11_pdf\603636-2024-0630-0803.PDF"
#file_path = r"C:\Users\钱程\Downloads\天德钰深圳天德钰科技股份有限公司2024年半年度报告.PDF"
#file_path = r"C:\Users\钱程\Downloads\600239-2023-nb-nb.pdf"
file_path = r"C:\Users\钱程\Downloads\航发科技中国航发航空科技股份有限公司2023年第三季度报告.PDF"
# 创建临时文件夹
temp_dir = tempfile.mkdtemp(prefix="camelot_temp_", dir=temp_dir_path)
# 设置全局临时文件夹路径
os.environ["TMP"] = temp_dir
os.environ["TEMP"] = temp_dir
# try:
# tables = camelot.read_pdf(file_path, pages=pages, strip_text=' ,\n', copy_text=['h'])
# print('读取成功')
# except Exception as e:
# print(f'错误在{e}')
#print(f'file_path的值是{file_path}')
#file_path = "F:\\11_pdf\\688670-2023-nb-nb.pdf"
os.environ["GHOSTSCRIPT_BINARY"] = "gswin64c"
# 确保 file_path 是正确的,并且文件是可访问的
if not os.path.exists(file_path):
print(f'文件路径不正确或文件不存在: {file_path}')
raise FileNotFoundError(f"文件不存在:{file_path}")
else:
pass#(f'file_path是存在的就是{file_path}')
# 读取 PDF 文件
#tables = camelot.read_pdf(file_path, pages=pages, strip_text=' ,\n')#, copy_text=['h']
#tables = camelot.read_pdf(file_path, pages=pages, flavor='lattice', strip_text=' ,\n', temp_dir=temp_dir)
#tables = camelot.read_pdf(file_path,strip_text=' ,\n',pages = '1-1', copy_text=['h','v'], temp_dir=temp_dir,shift_text = [''])#line_scale=10,
#tables = camelot.read_pdf(file_path, pages='1-1', strip_text=' ,\n', copy_text=['h'], temp_dir=temp_dir)
#tables = camelot.read_pdf(file_path, pages='1-1', strip_text=' ,\n', copy_text=['h'], temp_dir=temp_dir)
#tables = camelot.read_pdf(file_path, pages='1-1', strip_text=' ,\n', copy_text=['v','h'], temp_dir=temp_dir,shift_text = [''])
#54
tables = camelot.read_pdf(file_path, pages='1-1', strip_text=',\n', copy_text=['v','h'],shift_text = ['l'])
#camelot.plot(tables[0], kind='grid').savefig('contour_plot.png')
# for t in tables:
# top = t._bbox[3]
# buttom = t._bbox[1]
# page_num = int(t.page)
# table_index = int(t.order)
# arr = np.array(t.data)
#print(arr)
for t in tables:
top = t._bbox[3]
buttom = t._bbox[1]
page_num = int(t.page)
table_index = int(t.order)
arr = np.array(t.data)
print('=======')
print(arr)
arr = process_array_with_annual_comparison(arr)
arr = process_array_with_grants(arr)
if len(arr[0]) == 4 and all(value == arr[0][0] for value in arr[0]) and all("项目" in arr[0][0] and "附注" in arr[0][0] for value in arr[0]):
initial_value = arr[0][0].replace(' ','')
project_value = "项目"
note_value = "附注"
remaining_value = initial_value.replace("项目", "", 1).replace("附注", "", 1)
split_index = len(remaining_value) // 2
first_half = remaining_value[:split_index]
second_half = remaining_value[split_index:]
# 判断 "项目" 在 original_value 中的位置
if "项目" in initial_value and first_half in initial_value and second_half in initial_value :
project_index = initial_value.index("项目")
year_index = initial_value.index(first_half)
year_index_2 = initial_value.index(second_half)
print('条件满足')
# 判断 "项目" 是否在 first_half 的前面
if project_index > year_index and project_index < year_index_2:
first_half, second_half = second_half, first_half
arr[0] = [project_value, note_value, first_half, second_half]
print(arr)
tables.export('foo.csv', f='csv', compress=True) # json, excel, html, markdown, sqlite
a=0
tables[a].parsing_report
tables[a].to_csv('foo.csv') # to_json, to_excel, to_html, to_markdown, to_sqlite
tables[a].df # get a pandas DataFrame!
import pandas as pd
# 定义 CSV 文件的路径
csv_file_path = 'foo.csv' # 替换为你的 CSV 文件路径
excel_file_path = 'foodata.xlsx' # 定义输出的 Excel 文件路径
# 读取 CSV 文件
df = pd.read_csv(csv_file_path)
# 将数据写入 Excel 文件
df.to_excel(excel_file_path, index=False)
print(f"CSV 文件已成功转换为 Excel 文件:{excel_file_path}")