pdf_code/zzb_data/main_1.py

152 lines
4.7 KiB
Python
Raw Normal View History

2024-10-31 15:35:27 +08:00
import camelot
import re
from multiprocessing import Pool
import os, time, random
import json
from config import MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DB
from datetime import datetime
# 读取PDF
import PyPDF2
# 分析PDF的layout提取文本
from pdfminer.high_level import extract_pages
from pdfminer.layout import LTTextBoxHorizontal
import pdfplumber
import mysql.connector
import db_service
from multiprocessing import Process
from config import REDIS_HOST,REDIS_PORT,REDIS_PASSWORD
import utils
def text_in_table(top, tables_range, page_num):
if tables_range.get(page_num):
for range in tables_range[page_num]:
if top < range['top'] and top > range['buttom']:
return True
return False
def get_text_type(text: str):
text = re.sub(r"\s", "", text)
first_re = '年度报告'
page_number_pattern = re.compile(r'^\d+(/\d+)?$')
if re.search(first_re, text.strip()):
return 'page_header'
if page_number_pattern.match(text.strip()):
return 'page_footer'
if len(text) < 20 and text.endswith(''):
return 'page_footer'
return 'text'
# 读取pdf文件中文本内容不包括表格
def get_text_content(pdf_path,file_id,tables_range,conn,cursor):
"""
:return: 返回pdf文件中文本内容不包括表格
"""
# 我们从PDF中提取页面,page_numbers=[4,5,6]
for pagenum, page in enumerate(extract_pages(pdf_path)):
try:
# 找到所有的元素
page_elements = [(element.y1, element) for element in page._objs]
# 查找组成页面的元素
for i,component in enumerate(page_elements):
try:
# 提取页面布局的元素
element = component[1]
# 检查该元素是否为文本元素
if isinstance(element, LTTextBoxHorizontal):
# element_top = element.bbox[3]
print(element)
line_text = element.get_text().replace('\n','')
line_text = re.sub(r"\s", "", line_text)
if delete_flag(line_text):
continue
# if not text_in_table(element_top, tables_range, pagenum+1):
db_service.insert_pdf_text_info({
'file_id': file_id,
'page_num' : pagenum+1,
'text' : line_text
},conn,cursor)
except Exception as e:
print(f'{pagenum}{i}处理异常')
print(e)
except Exception as e:
print(f'{pagenum}页处理异常')
print(e)
def delete_flag(text : str):
if utils.under_non_alpha_ratio(text):
return True
if not re.findall(',||。|、||',text):
return True
if text.find('适用') != -1 and text.find('不适用') != -1:
return True
if text.find('') != -1 and text.find('') != -1:
return True
return False
def get_table_range(file_path, file_id, pages, tables_range):
print('Run task %s (%s)...' % (f'解析表格{pages}', os.getpid()))
start = time.time()
conn = mysql.connector.connect(
host= MYSQL_HOST,
user= MYSQL_USER,
password= MYSQL_PASSWORD,
database= MYSQL_DB
)
# 创建一个cursor对象来执行SQL语句
cursor = conn.cursor(buffered=True)
tables = camelot.read_pdf(file_path, pages=pages, strip_text=',\n', copy_text=['v','h'],shift_text = ['l'])
for t in tables:
top = t._bbox[3]
buttom = t._bbox[1]
page_num = int(t.page)
table_index = int(t.order)
if not tables_range.get(page_num):
tables_range[page_num] = []
tables_range[page_num].append({
'top' : top,
'buttom' : buttom,
'table_index' : table_index,
'page_num' : page_num,
})
get_text_content(file_path, file_id, tables_range, conn, cursor)
cursor.close()
conn.close()
end = time.time()
print('Task %s runs %0.2f seconds.' % (f'解析表格{pages}', (end - start)))
if __name__ == "__main__":
path = "/Users/zhengfei/Desktop/cb/002315-2023-nb-nb.pdf"
# get_text_content(path,'111')
# get_table_measure(path,'all','111')
#print(pdf_data)
# pdf_info = []
tables_range = {}
get_table_range(path, '5555', 'all', tables_range)
# sorted_pdf_info = sorted(pdf_info, key=lambda k: k['sort_num'])
# pdf_tables = merge_consecutive_arrays(sorted_pdf_info)
# for table in pdf_tables:
# print(table)#修改测试