Compare commits

..

2 Commits

Author SHA1 Message Date
qian cheng 6da5105825 备注 2024-10-31 15:37:44 +08:00
qian cheng 4a38375f7e pdf代码迁移1031 2024-10-31 15:35:27 +08:00
200 changed files with 6124 additions and 170257 deletions

View File

@ -1,39 +1,8 @@
# pdf_code
#### 介绍
{**以下是 Gitee 平台说明,您可以替换此简介**
Gitee 是 OSCHINA 推出的基于 Git 的代码托管平台(同时支持 SVN。专为开发者提供稳定、高效、安全的云端软件开发协作平台
无论是个人、团队、或是企业,都能够用 Gitee 实现代码托管、项目管理、协作开发。企业项目请看 [https://gitee.com/enterprises](https://gitee.com/enterprises)}
{**财报的PDF代码**
代码的更新一次新建一次分支,以更新时间命名
#### 软件架构
软件架构说明
#### 架构
#### 安装教程
1. xxxx
2. xxxx
3. xxxx
#### 使用说明
1. xxxx
2. xxxx
3. xxxx
#### 参与贡献
1. Fork 本仓库
2. 新建 Feat_xxx 分支
3. 提交代码
4. 新建 Pull Request
#### 特技
1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md
2. Gitee 官方博客 [blog.gitee.com](https://blog.gitee.com)
3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解 Gitee 上的优秀开源项目
4. [GVP](https://gitee.com/gvp) 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目
5. Gitee 官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help)
6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/)

View File

@ -1,58 +0,0 @@
import socket
import subprocess
import time
from datetime import datetime
def get_time():
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
def check_port(host, port):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
result = sock.connect_ex((host, port))
sock.close()
return result
except Exception as e:
print(f"[{get_time()}] 端口检测异常: {str(e)}")
return False
def restart_service():
try:
subprocess.run("bash /root/docker/milvus/standalone_embed.sh restart", shell=True)
# 正确示例
# subprocess.run(["bash", "standalone_embed.sh", "restart"])
print(f"[{get_time()}] milvus服务重启成功")
return True
except subprocess.CalledProcessError as e:
print(f"[{get_time()}] 服务重启失败: {str(e)}")
return False
def restart_zzbservice():
try:
subprocess.run("cd /root/pdf_parser/zzb_data_prod", shell=True)
subprocess.run("nohup python3 app.py > app.log 2>&1 &", shell=True)
print("zzb服务重启成功")
return True
except subprocess.CalledProcessError as e:
print(f"[{get_time()}] zzb服务重启失败: {str(e)}")
if __name__ == '__main__':
print(f"[{get_time()}] 启动Milvus监控服务")
port_ok = check_port("127.0.0.1", 19530)
if port_ok not in [0,True]:
print("检测到Milvus服务异常尝试重启...")
restart_service()
print(f"[{get_time()}] 启动zzb监控服务")
port_ok = check_port("127.0.0.1", 8000)
if port_ok not in [0,True]:
print("检测到zzb服务异常尝试重启...")
restart_zzbservice()

3
zzb_data/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
*.pyc
*.vscode
__pycache__/

BIN
zzb_data/1084_test_1.pdf Normal file

Binary file not shown.

BIN
zzb_data/1151_test_1.pdf Normal file

Binary file not shown.

BIN
zzb_data/1151_test_2.pdf Normal file

Binary file not shown.

View File

@ -7,12 +7,12 @@ from multiprocessing import Process,Manager
import pdf_title
import main
import time
import threading
import config
import requests
import db_service
import threading
#import pdf_company_0824
app = FastAPI()
cpu_count = os.cpu_count()
@ -28,7 +28,7 @@ def run_job():
if_run = True
if job_queue.empty():
print(f"job_queue为空:")
print(f"job_queue为空: {file_path}")
if_run = False
if if_run:
@ -62,6 +62,8 @@ def run_job():
# '1-3',
# '4-6',
# ]
print(cpu_count)
print('测试')
page_num = file_info['page_count']
if page_num < cpu_count:
p_count = page_num
@ -219,7 +221,7 @@ def run_disclosure():
if_run = True
if job_queue.empty():
print(f"job_queue为空")
print(f"job_queue为空: {file_path}")
if_run = False
if if_run:
@ -350,7 +352,6 @@ app.post("/parser/disclosure",
tags=["parser"],
summary="信披文件解析",
)(disclosure)
# 运行 FastAPI 应用
if __name__ == "__main__":
# 服务器启动服务

BIN
zzb_data/combined_v1.pdf Normal file

Binary file not shown.

21
zzb_data/config.py Normal file
View File

@ -0,0 +1,21 @@
MILVUS_CLIENT='http://127.0.0.1:19530'
#MILVUS_CLIENT='http://60.204.228.154:19530'
MYSQL_HOST = '192.168.0.107'
MYSQL_PORT = 3306
MYSQL_USER = 'financial'
MYSQL_PASSWORD = 'financial_8000'
MYSQL_DB = 'financial_report'
NOTIFY_ADDR = 'http://192.168.0.175:8100/api/tenant/report/notify'
NOTIFY_ADDR_DIS = 'http://192.168.0.175:8100/api/tenant/info/notify'
REDIS_HOST = '192.168.0.175'
REDIS_PORT = 6379
REDIS_PASSWORD = 'Xgf_redis'
FILE_PATH = '/root/pdf_parser/pdf/'
PORT = 8000
MEASURE_COUNT = 8
MYSQL_HOST_APP = '192.168.0.201'#192.168.0.201
MYSQL_PORT_APP = 3306
MYSQL_USER_APP = 'root'
MYSQL_PASSWORD_APP = 'mmTFncqmDal5HLRGY0BV'
MYSQL_DB_APP = 'financial_report_prod'

View File

@ -11,9 +11,9 @@ import json,time
# import db_service
import ast
import numpy as np
import config_p
import config
import redis_service
from config_p import MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DB,REDIS_HOST,REDIS_PORT,REDIS_PASSWORD
from config import MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DB,REDIS_HOST,REDIS_PORT,REDIS_PASSWORD
# import main
import redis
@ -27,7 +27,7 @@ def measure_config_to_db(conn,cursor):
VALUES (%s, %s, %s, %s, %s)
'''
# 打开文本文件
with open('measure_config_all.txt', 'r',encoding='utf-8') as file:
with open('/Users/zhengfei/work/zzb_data/measure_config_all.txt', 'r') as file:
# 读取所有行到一个列表中
lines = file.readlines()
@ -44,15 +44,11 @@ def measure_config_to_db(conn,cursor):
def insert_measure_vector(conn,cursor):
# redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, db=6)
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=6)
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, db=6)
# 执行SQL语句更新数据
select_query = '''
SELECT ori_measure_id,ori_measure_name FROM measure_config_half_year where year='2024'
'''
select_query = '''
SELECT ori_measure_id,ori_measure_name FROM measure_config where year='2023'
'''
cursor.execute(select_query)
records = cursor.fetchall()
for record in records:
@ -156,7 +152,7 @@ if __name__ == "__main__":
measure_config_to_db(conn,cursor)
# insert_measure_vector(conn,cursor)
insert_measure_vector(conn,cursor)
# cursor.close()
# conn.close()

View File

@ -270,6 +270,34 @@ def update_ori_measure(conn,cursor,file_id):
end_time = time.time()
print(f"更新数据写入 {(end_time - start_time):.2f} 秒。")
def update_ori_measure_name(conn, cursor, file_id):
try:
update_query = '''
UPDATE ori_measure_list AS oml1
JOIN (
SELECT file_id, page_number, table_index, pdf_measure, MIN(id) AS min_id
FROM ori_measure_list
WHERE file_id = %s
GROUP BY file_id, page_number, table_index, pdf_measure
) AS oml2
ON oml1.file_id = oml2.file_id AND oml1.page_number = oml2.page_number AND oml1.table_index = oml2.table_index AND oml1.pdf_measure = oml2.pdf_measure
SET oml1.keep_flag = IF(oml1.id = oml2.min_id, 1, 0)
WHERE oml1.file_id = %s;
'''
delete_query = '''
DELETE FROM ori_measure_list
WHERE file_id = %s AND keep_flag = 0;
'''
cursor.execute(update_query, (file_id, file_id))
cursor.execute(delete_query, (file_id,))
conn.commit()
print("更新和删除操作成功完成。")
except Exception as e:
conn.rollback()
print(f"更新和删除操作失败: {e}")
def insert_table_from_vector_mul_process(parent_table_pages,file_id,file_name,records,record_range,black_array):
create_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
@ -918,7 +946,7 @@ def batch_insert_page_text(table_info, conn, cursor):
text_lines = table_info['text']
# 1. 检查表是否为空
check_if_empty_query = f"SELECT COUNT(*) FROM pdf_text_info where file_id = '{file_id}' and page_num = {page_num}"
check_if_empty_query = f"SELECT COUNT(*) FROM pdf_text_info where file_id = {file_id} and page_num = {page_num}"
cursor.execute(check_if_empty_query)
is_table_empty = cursor.fetchone()[0] == 0
@ -934,6 +962,41 @@ def batch_insert_page_text(table_info, conn, cursor):
else:
pass
conn.commit()
def batch_insert_page_text_nocheck_disclosure(table_info, conn, cursor):
file_id = table_info['file_id']
page_num = int(table_info['page_num'])
text_lines = table_info['text']
insert_query = '''
INSERT INTO pdf_text_info_disclosure
(file_id, page_num, text)
VALUES (%s, %s, %s)
'''
data_to_insert = [(file_id, page_num, text) for text in text_lines]
cursor.executemany(insert_query, data_to_insert)
conn.commit()
def batch_insert_page_text_disclosure(table_info, conn, cursor):
file_id = table_info['file_id']
page_num = int(table_info['page_num'])
text_lines = table_info['text']
# 1. 检查表是否为空
check_if_empty_query = f"SELECT COUNT(*) FROM pdf_text_info_disclosure where file_id = {file_id} and page_num = {page_num}"
cursor.execute(check_if_empty_query)
is_table_empty = cursor.fetchone()[0] == 0
if is_table_empty:
# 表为空,直接插入数据
insert_query = '''
INSERT INTO pdf_text_info_disclosure
(file_id, page_num, text)
VALUES (%s, %s, %s)
'''
data_to_insert = [(file_id, page_num, text) for text in text_lines]
cursor.executemany(insert_query, data_to_insert)
else:
pass
conn.commit()
def file_type_check(file_id):
conn = mysql.connector.connect(
host= MYSQL_HOST,

View File

@ -1,7 +1,7 @@
import pandas as pd
import json
import utils
from config_p import MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DB
from config import MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DB
import mysql.connector
# 读取 Excel 文件

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

BIN
zzb_data/file/docx/1.docx Normal file

Binary file not shown.

5460
zzb_data/file/docx/test.txt Normal file

File diff suppressed because it is too large Load Diff

7
zzb_data/foo.csv Normal file
View File

@ -0,0 +1,7 @@
"项目","本报告期","本报告期比上年同期增减变动幅度(%)","年初至报告期末","年初至报告期末比上年同期增减变动幅度(%)"
"营业收入","1190016393.52","66.52","3039822089.50","31.66"
"归属于上市公司股东的净利润","20825380.35","77.78","33183058.04","3207.76"
"归属于上市公司股东的扣除非经常性损益的净利润","14366478.21","40.63","21692930.24","不适用"
"经营活动产生的现金流量净额","不适用","不适用","-957159937.33","不适用"
"基本每股收益(元/股)","0.06","77.70","0.10","3250.51"
"稀释每股收益(元/股)","0.06","77.70","0.10","3250.51"
1 项目 本报告期 本报告期比上年同期增减变动幅度(%) 年初至报告期末 年初至报告期末比上年同期增减变动幅度(%)
2 营业收入 1190016393.52 66.52 3039822089.50 31.66
3 归属于上市公司股东的净利润 20825380.35 77.78 33183058.04 3207.76
4 归属于上市公司股东的扣除非经常性损益的净利润 14366478.21 40.63 21692930.24 不适用
5 经营活动产生的现金流量净额 不适用 不适用 -957159937.33 不适用
6 基本每股收益(元/股) 0.06 77.70 0.10 3250.51
7 稀释每股收益(元/股) 0.06 77.70 0.10 3250.51

BIN
zzb_data/foo.zip Normal file

Binary file not shown.

BIN
zzb_data/foodata.xlsx Normal file

Binary file not shown.

View File

@ -1,7 +1,7 @@
import pandas as pd
import json
import utils
from config_p import MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DB
from config import MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DB
import mysql.connector

View File

@ -512,8 +512,8 @@ def get_text_content(pdf_path,file_id,tables_range,pages,conn,cursor,redis_clien
line_text = re.sub(r"\s", "", line_text)
#提取符合要求的文本写入pdf_text_info用于文本书写错误识别
if not utils.pdf_text_flag(line_text):
line_texts.append(line_text)
#if not utils.pdf_text_flag(line_text):
line_texts.append(line_text)
#db_service.insert_pdf_text_info({
# 'file_id': file_id,
# 'page_num' : pagenum+1,
@ -667,7 +667,78 @@ def get_text_content(pdf_path,file_id,tables_range,pages,conn,cursor,redis_clien
print(f'{pagenum}页处理异常')
print(e)
def get_text_content_disclosure(pdf_path,file_id,tables_range,pages,conn,cursor,redis_client, conn_app, cursor_app):
"""
:return: 返回pdf文件中文本内容不包括表格
"""
#print(f'tables_range 的值为{tables_range}')
#print('----------------')
#print(pages)
page_start = pages.split('-')[0]
page_end = pages.split('-')[1]
print(f'pages的值为{pages}')
# select_year_select = f"""select report_type,year from report_check where id = {file_id}"""
# cursor.execute(select_year_select)
# record_select = cursor.fetchall()
# report_type = record_select[0][0]
# report_year = record_select[0][1]
select_pdf_text_check = f"""select count(1) from pdf_text_info_disclosure where file_id = {file_id}"""
#check_if_empty_query = f"SELECT COUNT(*) FROM pdf_text_info where file_id = {file_id} and page_num = {page_num}"
cursor.execute(select_pdf_text_check)
is_empty = cursor.fetchone()[0] == 0
# 我们从PDF中提取页面,page_numbers=[4,5,6]
for pagenum, page in enumerate(extract_pages(pdf_path)):
try:
if pagenum+1 < int(page_start) or pagenum+1 > int(page_end):
continue
#更新redis已解析页码
if not redis_client.exists(f'parsed_page_count_{file_id}'):
redis_client.set(f'parsed_page_count_{file_id}', 0)
redis_client.incr(f'parsed_page_count_{file_id}')
# 找到所有的元素
page_elements = [(element.y1, element) for element in page._objs]
# 查找组成页面的元素
line_texts = []
#if not utils.pdf_text_flag(line_text):
# line_texts.append(line_text)
for i,component in enumerate(page_elements):
# 提取页面布局的元素
element = component[1]
# 检查该元素是否为文本元素
if isinstance(element, LTTextBoxHorizontal):
# 检查文本是否出现在表中
line_text = element.get_text().replace('\n','')
line_text = re.sub(r"\s", "", line_text)
#提取符合要求的文本写入pdf_text_info用于文本书写错误识别
#if not utils.pdf_text_flag(line_text):
line_texts.append(line_text)
#db_service.insert_pdf_text_info({
# 'file_id': file_id,
# 'page_num' : pagenum+1,
# 'text' : line_text
# },conn,cursor)
if is_empty:
db_service.batch_insert_page_text_nocheck_disclosure({
'file_id': file_id,
'page_num' : pagenum+1,
'text' : line_texts
},conn,cursor)
#print('文本这里没有重跑')
else:
db_service.batch_insert_page_text_disclosure({
'file_id': file_id,
'page_num' : pagenum+1,
'text' : line_texts
},conn,cursor)
except Exception as e:
print(f'{pagenum}页处理异常')
print(e)
def get_table_unit_info(file_id,line_text,page_num,table_index):
table_info = {}
table_info['file_id'] = file_id
@ -885,7 +956,36 @@ def dispatch_job(job_info):
except Exception as e:
print(e)
def dispatch_disclosure(job_info):
try:
type = job_info['type']
path = job_info['path']
file_id = job_info['file_id']
page_num = job_info['page_num']
tables_range = job_info['tables_range']
conn = mysql.connector.connect(
host= MYSQL_HOST,
user= MYSQL_USER,
password= MYSQL_PASSWORD,
database= MYSQL_DB
)
# 创建一个cursor对象来执行SQL语句
cursor = conn.cursor(buffered=True)
conn_app = mysql.connector.connect(
host= MYSQL_HOST_APP,
user= MYSQL_USER_APP,
password= MYSQL_PASSWORD_APP,
database= MYSQL_DB_APP
)
cursor_app = conn_app.cursor(buffered=True)
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, db=6)
if type == 'table':
get_text_content_disclosure(path,file_id,tables_range,page_num,conn,cursor,redis_client, conn_app, cursor_app)
except Exception as e:
print(e)
#指标归一化处理
def update_measure_data(file_id,file_path,parent_table_pages):
@ -915,6 +1015,8 @@ def update_measure_data(file_id,file_path,parent_table_pages):
# #指标归一化处理
db_service.update_ori_measure(conn,cursor,file_id)
#db_service.delete_database(conn_app,cursor_app,file_id)
#保证同一页同一个表的指标在页面展示时,只出现一次
db_service.update_ori_measure_name(conn,cursor,file_id)
cursor.close()
conn.close()
cursor_app.close()

View File

@ -7,7 +7,7 @@ import redis
def process_excel_and_db(input_excel_path1, input_excel_path2, output_file_path):
# 读取第一个 Excel 文件
df = pd.read_excel(input_excel_path1, sheet_name='Sheet7', header=0)#对应ttt表
df = pd.read_excel(input_excel_path1, sheet_name='Sheet8', header=0)#对应ttt表
# 将 DataFrame 转换为字典列表
data_list = df.to_dict(orient='records')
@ -121,13 +121,13 @@ def process_excel_and_db(input_excel_path1, input_excel_path2, output_file_path)
def measure_config_to_db(conn, cursor, file_path):
insert_query = '''
INSERT INTO measure_config_third_quarter
INSERT INTO measure_config_1024
(measure_id, measure_name, ori_measure_id, ori_measure_name)
VALUES (%s, %s, %s, %s)
'''
check_query = '''
SELECT ori_measure_id FROM measure_config_third_quarter
'''
# check_query = '''
# SELECT ori_measure_id FROM measure_config_1024
# '''
# 打开文本文件
with open(file_path, 'r', encoding='utf-8') as file:
@ -142,10 +142,10 @@ def measure_config_to_db(conn, cursor, file_path):
ori_measure_id = utils.get_md5(ori_measure)
# 判断数据库中是否有数据
cursor.execute(check_query)
check_records = cursor.fetchall()
#if any(record[0] == ori_measure_id for record in check_records):
# continue
# cursor.execute(check_query)
# check_records = cursor.fetchall()
# if any(record[0] == ori_measure_id for record in check_records):
# continue
data_to_insert = (utils.get_md5(measure), measure, ori_measure_id, ori_measure)
cursor.execute(insert_query, data_to_insert)
@ -153,10 +153,10 @@ def measure_config_to_db(conn, cursor, file_path):
def insert_measure_vector(conn,cursor):
redis_client = redis.Redis(host='192.168.0.172', port=6379, password='Xgf_redis', db=6)# 192.168.0.172 #测试123.60.153.169
redis_client = redis.Redis(host='123.60.153.169', port=6379, password='Xgf_redis', db=6)# 192.168.0.172 #测试123.60.153.169
# 执行SQL语句更新数据
select_query = '''
SELECT ori_measure_id,ori_measure_name FROM measure_config_1024
SELECT ori_measure_id,ori_measure_name FROM measure_config_third_quarter
'''
cursor.execute(select_query)
records = cursor.fetchall()
@ -180,11 +180,11 @@ if __name__ == "__main__":
MYSQL_DB = 'financial_report'
# 需要先清空本地数据库的 measure_create_config 和 measure_create_period 表
process_excel_and_db(
'ttt_1.xlsx',#ttt文件
'period_1.xlsx',#period文件
'out_2022_new_year.txt'#输出文件
)
# process_excel_and_db(
# 'F:\\11_pdf\\ttt_1.xlsx',#ttt文件
# 'F:\\11_pdf\\period_1.xlsx',#period文件
# 'F:\\11_pdf\\out_2022_new_year.txt'#输出文件
# )
conn = mysql.connector.connect(
host=MYSQL_HOST,
user=MYSQL_USER,
@ -192,7 +192,6 @@ if __name__ == "__main__":
database=MYSQL_DB
)
cursor = conn.cursor()
file_path = 'out_2022_new_year.txt'
measure_config_to_db(conn, cursor, file_path)
insert_measure_vector(conn,cursor)
# file_path = r'F:\\11_pdf\\out_2022_new_year.txt'
# measure_config_to_db(conn, cursor, file_path)
insert_measure_vector(conn,cursor)

View File

@ -3,7 +3,7 @@ import re
from multiprocessing import Pool
import os, time, random
import json
from config_p import MILVUS_CLIENT,MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DB,MEASURE_COUNT,MYSQL_HOST_APP,MYSQL_USER_APP,MYSQL_PASSWORD_APP,MYSQL_DB_APP
from config import MILVUS_CLIENT,MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DB,MEASURE_COUNT,MYSQL_HOST_APP,MYSQL_USER_APP,MYSQL_PASSWORD_APP,MYSQL_DB_APP
from datetime import datetime
# 读取PDF
import PyPDF2
@ -19,7 +19,7 @@ import db_service
import pdf_title
import numpy as np
from multiprocessing import Process
from config_p import REDIS_HOST,REDIS_PORT,REDIS_PASSWORD
from config import REDIS_HOST,REDIS_PORT,REDIS_PASSWORD
import redis

View File

@ -0,0 +1,122 @@
from docx import Document
import json
from docx.oxml.table import CT_Tbl
from docx.oxml.text.paragraph import CT_P
from lxml import etree
import os
RESULT_TYPE_TEXT = 'text'
RESULT_TYPE_TABLE = 'table'
def build_result(result_type, index, data):
return {
'type': result_type,
'index': index,
'data': data
}
def build_catalog_result(index, depth, data):
return {
'index': index,
'depth': depth,
'data': data
}
def parse_paragraph(paragraph, index):
paragraph_text = paragraph.text.strip() if paragraph else ''
if paragraph_text:
return build_result(RESULT_TYPE_TEXT, index, paragraph_text)
return None
def parse_table(table, index):
table_data = []
for row in table.rows:
row_data = [cell.text for cell in row.cells]
table_data.append(row_data)
return build_result(RESULT_TYPE_TABLE, index, table_data)
def parse_docx(docx_path):
try:
document = Document(docx_path)
except Exception as e:
print(f"Error loading document: {e}")
doc_content = [] # 内容(文本+表格)
catalog_content = [] # 目录
current_index = 1 # 维护全局的 index 变量
paragraph_index = 0
table_index = 0
# 获取整个文档的XML内容
xml_root = document.part.element
namespaces = xml_root.nsmap
# 遍历文档中的所有元素
for i, element in enumerate(document.element.body):
if element.tag.endswith('p'): # 段落
# 插入段落内容
paragraph = document.paragraphs[paragraph_index]
paragraph_index += 1
paragraph_result = parse_paragraph(paragraph, current_index)
if paragraph_result:
doc_content.append(paragraph_result)
# 判断是否为目录,是就插入目录内容
p_element = paragraph._element
# 将docx的元素转换为lxml的元素
p_element = etree.fromstring(p_element.xml)
outlineLvl = p_element.xpath('.//w:outlineLvl', namespaces=namespaces)
if outlineLvl:
level = int(outlineLvl[0].get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val'))
text = paragraph.text
catalog_content.append(build_catalog_result(current_index, level + 1, text))
else:
style_name = paragraph.style.name
if style_name.startswith('Heading'):
level = int(style_name[-1])
text = paragraph.text
catalog_content.append(build_catalog_result(current_index, level + 1, text))
current_index += 1 # 更新 index
# 判断是否表格内容
elif element.tag.endswith('tbl'):
table = document.tables[table_index]
table_index += 1
table_result = parse_table(table, current_index)
if table_result:
doc_content.append(table_result)
current_index += 1 # 更新 index
return json.dumps(doc_content, indent=4, ensure_ascii=False),json.dumps(catalog_content, indent=4, ensure_ascii=False)
def split_text_table(json_data):
# 分组
text_elements = [element for element in json_data if element['type'] == 'text']
table_elements = [element for element in json_data if element['type'] == 'table']
# 转换为JSON字符串
text_elements_json = json.dumps(text_elements, ensure_ascii=False, indent=4)
table_elements_json = json.dumps(table_elements, ensure_ascii=False, indent=4)
return text_elements_json, table_elements_json
def append_to_file(file_path, text):
try:
with open(file_path, 'a', encoding='utf-8') as file:
file.write(text + '\n')
except Exception as e:
print(f"Error writing to file: {e}")
if __name__ == "__main__":
current_directory = os.getcwd()
docx_relative_path = 'file/docx/1.docx'
file_relative_path = 'file/docx/test.txt'
docx_path = os.path.join(current_directory, docx_relative_path)
file_path = os.path.join(current_directory, file_relative_path)
parsed_content,catalog_content = parse_docx(docx_path)
json_parsed_content = json.loads(parsed_content)
text_elements_json, table_elements_json = split_text_table(json_parsed_content)
append_to_file(file_path, text_elements_json)
append_to_file(file_path, table_elements_json)
append_to_file(file_path, catalog_content)

View File

@ -100,7 +100,7 @@ def get_company_code(file_path):
def llm_service(user_prompt):
system_prompt = '''
从以下数据报告中提取公司全称只需要提取中文公司全称不要增加其他内容如果提取不到公司全称请返回-不要返回其他任何内容
从以下数据报告中提取公司全称只需要提取中文公司全称不要增加其他内容如果提取不到公司全称请返回-
<数据报告>
<user_prompt>
</数据报告>
@ -165,53 +165,6 @@ def update_company_name(file_id, company_name,company_code, cursor, conn):
'''
cursor.execute(update_sql)
conn.commit()
def name_code_fix(file_id,file_path):
conn = mysql.connector.connect(
host = MYSQL_HOST,
user = MYSQL_USER,
password = MYSQL_PASSWORD,
database = MYSQL_DB
)
# 创建一个cursor对象来执行SQL语句
cursor = conn.cursor()
try:
# file_id = data[0]
# #生产环境地址
# file_path = f'/usr/local/zhanglei/financial{data[1]}'
# #测试环境地址
# # file_path_1 = f'/root/pdf_parser/pdf/{data[1]}'
# # file_path = file_path_1.replace('/upload/file/','')
# print(f'财报{file_id}开始解析')
# #file_id = '305'
# #file_path = r"F:\11_pdf\7874.pdf"
company_name = get_company_name(file_path)
contains_newline = '\n' in company_name
if contains_newline:
lines = company_name.splitlines(True)
company_name = lines[0]
company_code = get_company_code(file_path)
contains_newline1 = '\n' in company_code
if contains_newline1:
lines = company_code.splitlines(True)
company_code = lines[0]
if company_name != "llm_error" or company_code != "llm_error":
#print(company_code)
pattern = re.compile(r'^(\d{6}|\d{6}(,\d{6})*)$')
if not pattern.match(company_code):
company_code = '-'
if len(company_name) > 15 or company_name == '-':
company_name = ''
update_company_name(file_id, company_name,company_code, cursor, conn)
except Exception as e:
print(f'财报解析失败',e)
cursor.close()
conn.close()
if __name__ == '__main__':
conn = mysql.connector.connect(
@ -220,7 +173,7 @@ if __name__ == '__main__':
password = MYSQL_PASSWORD,
database = MYSQL_DB
)
# 创建一个cursor对象来执行SQL语句
cursor = conn.cursor()
@ -265,4 +218,4 @@ if __name__ == '__main__':
print(f'财报解析失败',e)
cursor.close()
conn.close()
conn.close()

View File

@ -168,10 +168,9 @@ def create_text_outline(pdf_path, file_id):
return file_info
def create_text_outline_disclosure(pdf_path, file_id):
# print('Running the script for [%s] with padding [%d]' % (pdf_path, page_number_padding))
# creating an object
# creating an object
with open(pdf_path, 'rb') as file:
file_info = {}
fileReader = PyPDF2.PdfReader(file)
@ -183,7 +182,7 @@ def create_text_outline_disclosure(pdf_path, file_id):
info = {
'page_count': page_count,
'all_pages': {},
'current_page_id': 1,
'current_page_id': 1,
'padding': 0
}
@ -197,7 +196,7 @@ def create_text_outline_disclosure(pdf_path, file_id):
title_array = get_tree_pages(fileReader.outline, info, 0, [])
#db_service.pdf_title_insert_mysql(file_id,title_array)
#title_array = db_service.get_file_info_from_mysql(file_id)
parent_table_pages_local = {}
parent_table_pages_local[file_id] = []
print(f'{file_id}:{len(title_array)}')
@ -215,14 +214,14 @@ def create_text_outline_disclosure(pdf_path, file_id):
else:
page_end = page_count
print(f'目录识别时被丢弃的页码:{page_start}-{page_end}')
#当标题为母公司财务报表主要项目注释时最后一页不过滤避免核心roe指标无法召回
if len(re.findall('财务报表主要项目注释', title)) == 0:
page_end = page_end - 1
# print(title,page_start,page_end)
for i in range(page_start, page_end + 1):
# 将每个数字添加到列表中
parent_table_pages_local[file_id].append(i)
parent_table_pages_local[file_id].append(i)
file_info['page_count'] = page_count
file_info['parent_table_pages'] = parent_table_pages_local[file_id]
file_info['split_parts'] = get_file_split(page_count)

View File

@ -10,7 +10,7 @@ def read_from_redis(redis_client,ori_measure_id):
return redis_client.hget('measure_config',ori_measure_id).decode()
if __name__ == "__main__":
redis_client = redis.Redis(host='192.168.0.175', port=6379, password='Xgf_redis', db=6)
redis_client = redis.Redis(host='123.60.153.169', port=6379, password='Xgf_redis', db=6)
value = read_from_redis(redis_client,"bb3cf43f3dba147373c706c6567b5a")
print(value)
value = read_from_redis(redis_client,"92b44ffb50b6ab2068f5de447c9925")
print(value)

View File

@ -10,5 +10,4 @@ pydantic
uvicorn
redis
ghostscript
opencv-python-headless
python-docx
opencv-python-headless

Some files were not shown because too many files have changed in this diff Show More