1063 lines
47 KiB
Python
1063 lines
47 KiB
Python
from datetime import datetime
|
||
import re,os,json
|
||
import utils
|
||
import ast
|
||
import time
|
||
import redis_service
|
||
from multiprocessing import Process
|
||
from config import MILVUS_CLIENT,MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DB,REDIS_HOST,REDIS_PORT,REDIS_PASSWORD,MEASURE_COUNT,MYSQL_HOST_APP,MYSQL_USER_APP,MYSQL_PASSWORD_APP,MYSQL_DB_APP
|
||
from pymilvus import MilvusClient
|
||
import mysql.connector
|
||
import redis
|
||
from zzb_logger import applog
|
||
measure_name_keywords = ["营业","季度","利润","归属于","扣非","经营","现金","活动","损益","收益","资产","费用","销售","管理","财务","研发","货币资金","应收账款","存货","固定资产","在建工程","商誉","短期借款","应付账款","合同负债","长期借款","营业成本"]
|
||
# 解析大模型抽取的指标,并插入到数据库
|
||
def parse_llm_measure_to_db(measure_info,type,conn,cursor):
|
||
|
||
create_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
check_query = '''
|
||
select id from ori_measure_list
|
||
WHERE file_id = %s and type = %s and page_number = %s and ori_measure_value = %s
|
||
'''
|
||
# 执行SQL语句,插入数据
|
||
insert_query = '''
|
||
INSERT INTO ori_measure_list
|
||
(file_id, file_name, type, page_number, table_index, ori_measure_id, ori_measure_name, ori_measure_value, create_time, update_time)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
'''
|
||
file_id = measure_info['file_id']
|
||
file_name = measure_info['path']
|
||
llm_measure = measure_info['llm_measure']
|
||
page_num = measure_info['page_num']
|
||
table_index = '0'
|
||
if type == 'table':
|
||
table_index = measure_info['table_index']
|
||
for measure_obj in llm_measure:
|
||
measure_obj = measure_obj.replace('\n', '').replace('\r', '').replace(' ', '').replace(':', ':')
|
||
if ':' in measure_obj:
|
||
ori_measure_name = measure_obj.split(':')[0].replace('-', '')
|
||
if len(ori_measure_name) > 30 :
|
||
continue
|
||
ori_measure_value = measure_obj.split(':')[1].replace('+', '').replace(',', '').replace('元', '').replace('%', '')
|
||
if '-' in ori_measure_value:
|
||
ori_measure_value = "-"
|
||
if '.' in ori_measure_name:
|
||
ori_measure_name = ori_measure_name.split('.')[1]
|
||
ori_measure_id = utils.get_md5(ori_measure_name)
|
||
if re.match(r'^[+-]?(\d+(\.\d*)?|\.\d+)(%?)$', ori_measure_value):
|
||
# 判断数据库中是否有数据
|
||
check_query_data = (file_id, 'text', int(page_num), ori_measure_value)
|
||
cursor.execute(check_query, check_query_data)
|
||
check_records = cursor.fetchall()
|
||
if(len(check_records)) > 0:
|
||
continue
|
||
data_to_insert = (file_id, file_name, type, int(page_num), int(table_index), ori_measure_id, ori_measure_name, ori_measure_value, create_time, create_time)
|
||
cursor.execute(insert_query, data_to_insert)
|
||
conn.commit()
|
||
|
||
def insert_measure_parser_info(parser_info,conn,cursor):
|
||
create_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
# 执行SQL语句,插入数据
|
||
insert_query = '''
|
||
INSERT INTO measure_parser_info
|
||
(file_id, type, content, create_time)
|
||
VALUES (%s, %s, %s, %s)
|
||
'''
|
||
file_id = parser_info['file_id']
|
||
type = parser_info['type']
|
||
content = parser_info['content']
|
||
data_to_insert = (file_id, type, content, create_time)
|
||
cursor.execute(insert_query, data_to_insert)
|
||
conn.commit()
|
||
def insert_measure_parser_info_measure(parser_info, conn, cursor, line_text):
|
||
create_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
insert_query = '''
|
||
INSERT INTO word_measure_parser_info_linetext
|
||
(file_id, type, content,text, create_time)
|
||
VALUES (%s, %s, %s, %s,%s)
|
||
'''
|
||
file_id = parser_info['file_id']
|
||
type = parser_info['type']
|
||
content = parser_info['content']
|
||
text = line_text
|
||
data_to_insert = (file_id, type, content,text, create_time)
|
||
cursor.execute(insert_query, data_to_insert)
|
||
conn.commit()
|
||
def insert_table_unit_info(table_info,conn,cursor):
|
||
|
||
# 执行SQL语句,插入数据
|
||
insert_query = '''
|
||
INSERT INTO table_unit_info
|
||
(file_id, page_num, table_index, unit)
|
||
VALUES (%s, %s, %s, %s)
|
||
'''
|
||
file_id = table_info['file_id']
|
||
page_num = int(table_info['page_num'])
|
||
table_index = int(table_info['table_index'])
|
||
unit = table_info['unit']
|
||
data_to_insert = (file_id, page_num, table_index, unit)
|
||
cursor.execute(insert_query, data_to_insert)
|
||
conn.commit()
|
||
|
||
def insert_table_unit_info_v1(table_info, conn, cursor):
|
||
"""
|
||
插入数据到 table_unit_info 表之前,检查是否存在相同的 file_id, page_num 和 table_index。
|
||
如果存在且 unit 不同,更新现有记录,否则插入新记录。
|
||
"""
|
||
|
||
file_id = table_info['file_id']
|
||
page_num = int(table_info['page_num'])
|
||
table_index = int(table_info['table_index'])
|
||
unit = table_info['unit']
|
||
|
||
# 查询现有记录
|
||
check_query = '''
|
||
SELECT unit
|
||
FROM table_unit_info
|
||
WHERE file_id = %s AND page_num = %s AND table_index = %s
|
||
'''
|
||
cursor.execute(check_query, (file_id, page_num, table_index))
|
||
existing_record = cursor.fetchone()
|
||
|
||
if existing_record:
|
||
existing_unit = existing_record[0]
|
||
|
||
if unit != existing_unit:
|
||
# 更新现有记录
|
||
update_query = '''
|
||
UPDATE table_unit_info
|
||
SET unit = %s
|
||
WHERE file_id = %s AND page_num = %s AND table_index = %s
|
||
'''
|
||
cursor.execute(update_query, (unit, file_id, page_num, table_index))
|
||
|
||
else:
|
||
applog.info(f'No change needed. Existing unit={existing_unit} is the same as new unit={unit}.')
|
||
else:
|
||
# 插入新的记录
|
||
insert_query = '''
|
||
INSERT INTO table_unit_info
|
||
(file_id, page_num, table_index, unit)
|
||
VALUES (%s, %s, %s, %s)
|
||
'''
|
||
data_to_insert = (file_id, page_num, table_index, unit)
|
||
cursor.execute(insert_query, data_to_insert)
|
||
|
||
conn.commit()
|
||
|
||
def insert_table_text_info(table_info,conn,cursor):
|
||
|
||
# 执行SQL语句,插入数据
|
||
insert_query = '''
|
||
INSERT INTO table_text_info
|
||
(file_id, page_num, table_index, text)
|
||
VALUES (%s, %s, %s, %s)
|
||
'''
|
||
file_id = table_info['file_id']
|
||
page_num = int(table_info['page_num'])
|
||
table_index = int(table_info['table_index'])
|
||
text = table_info['text_info']
|
||
data_to_insert = (file_id, page_num, table_index, text)
|
||
cursor.execute(insert_query, data_to_insert)
|
||
conn.commit()
|
||
|
||
def update_ori_measure(conn,cursor,file_id):
|
||
|
||
select_year_select = f"""select report_type,year from report_check where id = {file_id}"""
|
||
cursor.execute(select_year_select)
|
||
record_select = cursor.fetchall()
|
||
report_type = record_select[0][0]
|
||
report_year = record_select[0][1]
|
||
|
||
# 执行SQL语句,更新数据
|
||
update_query = '''
|
||
UPDATE ori_measure_list
|
||
SET measure_id = %s, measure_name = %s
|
||
WHERE ori_measure_id = %s and file_id = %s
|
||
'''
|
||
|
||
select_query = '''
|
||
SELECT t2.measure_id,t2.measure_name,t1.ori_measure_id
|
||
FROM ori_measure_list t1
|
||
left join
|
||
measure_config t2
|
||
on t1.ori_measure_id = t2.ori_measure_id
|
||
where t2.measure_id is not null and (t1.measure_id is null or t1.measure_id ='')
|
||
and t1.file_id = '{file_id}'
|
||
and t2.year = '{year}'
|
||
'''.format(file_id=file_id, year=report_year)
|
||
select_query_half_year = '''
|
||
SELECT t2.measure_id,t2.measure_name,t1.ori_measure_id
|
||
FROM ori_measure_list t1
|
||
left join
|
||
measure_config_half_year t2
|
||
on t1.ori_measure_id = t2.ori_measure_id
|
||
where t2.measure_id is not null and (t1.measure_id is null or t1.measure_id ='')
|
||
and t1.file_id = '{file_id}'
|
||
and t2.year = '{year}'
|
||
'''.format(file_id=file_id, year=report_year)
|
||
select_query_thrid = '''
|
||
SELECT t2.measure_id,t2.measure_name,t1.ori_measure_id
|
||
FROM ori_measure_list t1
|
||
left join
|
||
measure_config_third_quarter t2
|
||
on t1.ori_measure_id = t2.ori_measure_id
|
||
where t2.measure_id is not null and (t1.measure_id is null or t1.measure_id ='')
|
||
and t1.file_id = '{file_id}'
|
||
and t2.year = '{year}'
|
||
'''.format(file_id=file_id, year=report_year)
|
||
|
||
select_query_first_quarter = '''
|
||
SELECT t2.measure_id,t2.measure_name,t1.ori_measure_id
|
||
FROM ori_measure_list t1
|
||
left join
|
||
measure_config_first_quarter t2
|
||
on t1.ori_measure_id = t2.ori_measure_id
|
||
where t2.measure_id is not null and (t1.measure_id is null or t1.measure_id ='')
|
||
and t1.file_id = '{file_id}'
|
||
and t2.year = '{year}'
|
||
'''.format(file_id=file_id, year=report_year)
|
||
|
||
if report_type == 1:
|
||
start_time = time.time()
|
||
cursor.execute(select_query_half_year)
|
||
records = cursor.fetchall()
|
||
end_time = time.time()
|
||
applog.info(f"更新数据查询 {(end_time - start_time):.2f} 秒。")
|
||
applog.info(f'update_ori_measure方法走的是半年报')
|
||
elif report_type == 2:
|
||
start_time = time.time()
|
||
cursor.execute(select_query_first_quarter)
|
||
records = cursor.fetchall()
|
||
end_time = time.time()
|
||
applog.info(f"更新数据查询 {(end_time - start_time):.2f} 秒。")
|
||
applog.info(f'update_ori_measure方法走的是一季报')
|
||
elif report_type == 3:
|
||
start_time = time.time()
|
||
cursor.execute(select_query_thrid)
|
||
records = cursor.fetchall()
|
||
end_time = time.time()
|
||
applog.info(f"更新数据查询 {(end_time - start_time):.2f} 秒。")
|
||
applog.info(f'update_ori_measure方法走的是三季报')
|
||
else:
|
||
start_time = time.time()
|
||
cursor.execute(select_query)
|
||
records = cursor.fetchall()
|
||
end_time = time.time()
|
||
applog.info(f"更新数据查询 {(end_time - start_time):.2f} 秒。")
|
||
applog.info(f'update_ori_measure方法走的是全年报')
|
||
start_time = time.time()
|
||
for record in records:
|
||
data_to_update = (record[0], record[1], record[2], file_id)
|
||
cursor.execute(update_query, data_to_update)
|
||
conn.commit()
|
||
end_time = time.time()
|
||
applog.info(f"更新数据更新 {(end_time - start_time):.2f} 秒。")
|
||
#更新measure_list表,增加此次文件的显示指标
|
||
start_time = time.time()
|
||
create_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
if report_type == 0:
|
||
table_name = "measure_config"
|
||
elif report_type == 2:
|
||
table_name = "measure_config_first_quarter"
|
||
|
||
elif report_type == 3:
|
||
table_name = "measure_config_third_quarter"
|
||
else:
|
||
table_name = "measure_config_half_year"
|
||
|
||
insert_query = f'''
|
||
INSERT INTO measure_list
|
||
(measure_id, measure_name, create_time, update_time, file_id)
|
||
select distinct measure_id,measure_name, %s,%s,%s from {table_name}
|
||
where year = {report_year}
|
||
'''
|
||
|
||
data_to_update = (create_time, create_time, file_id)
|
||
cursor.execute(insert_query, data_to_update)
|
||
conn.commit()
|
||
end_time = time.time()
|
||
applog.info(f"更新数据写入 {(end_time - start_time):.2f} 秒。")
|
||
|
||
def insert_table_from_vector_mul_process(parent_table_pages,file_id,file_name,records,record_range,black_array):
|
||
create_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
applog.info('Run task %s (%s)...' % (record_range, os.getpid()))
|
||
applog.info(f"插入数据 {len(records)}")
|
||
client = MilvusClient(
|
||
uri=MILVUS_CLIENT
|
||
)
|
||
|
||
conn = mysql.connector.connect(
|
||
host = MYSQL_HOST,
|
||
user = MYSQL_USER,
|
||
password = MYSQL_PASSWORD,
|
||
database = MYSQL_DB
|
||
)
|
||
|
||
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, db=6)
|
||
|
||
|
||
# 创建一个cursor对象来执行SQL语句
|
||
cursor = conn.cursor(buffered=True)
|
||
conn_app = mysql.connector.connect(
|
||
host = MYSQL_HOST_APP,
|
||
user = MYSQL_USER_APP,
|
||
password = MYSQL_PASSWORD_APP,
|
||
database = MYSQL_DB_APP
|
||
)
|
||
cursor_app = conn_app.cursor(buffered=True)
|
||
|
||
select_year_select = f"""select report_type,year from report_check where id = {file_id}"""
|
||
cursor.execute(select_year_select)
|
||
record_select = cursor.fetchall()
|
||
report_type = record_select[0][0]
|
||
report_year = record_select[0][1]
|
||
|
||
check_query = '''
|
||
select id from ori_measure_list
|
||
WHERE file_id = %s and measure_name = %s and page_number = %s and table_index = %s and ori_measure_value = %s
|
||
'''
|
||
insert_query = '''
|
||
INSERT INTO ori_measure_list
|
||
(file_id, file_name, type, page_number, table_index, ori_measure_id, ori_measure_name, ori_measure_value, create_time, update_time, distance, pdf_measure,measure_id,measure_name,unit)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
'''
|
||
#获取表格上方文字包含母公司字样的文本index
|
||
select_parent_query = '''
|
||
select distinct content from measure_parser_info WHERE file_id = '{file_id}' and type='parent_com'
|
||
'''.format(file_id=file_id)
|
||
|
||
#获取表格上方文字黑名单关键词的页码和表格下标
|
||
select_table_index_query = '''
|
||
select distinct content from measure_parser_info WHERE file_id = '{file_id}' and type='table_index'
|
||
'''.format(file_id=file_id)
|
||
# #获取表格上方文字黑名单关键词的页码和表格下标----标题下的详细指标
|
||
select_measure_index_query = '''
|
||
SELECT content FROM measure_parser_info_linetext WHERE file_id = %s AND type = 'measure_index'
|
||
'''
|
||
unit_query = '''
|
||
select unit from table_unit_info
|
||
WHERE file_id = %s and page_num = %s and table_index = %s
|
||
'''
|
||
|
||
cursor_app.execute(select_parent_query)
|
||
parent_records = cursor_app.fetchall()
|
||
|
||
for parent_record in parent_records:
|
||
parent_id = parent_record[0]
|
||
parent_table_pages.append(int(parent_id))
|
||
|
||
#表格上方文字黑名单关键词的页码和表格下标转成数组
|
||
table_index_array = []
|
||
cursor_app.execute(select_table_index_query)
|
||
table_index_records = cursor_app.fetchall()
|
||
for table_index_record in table_index_records:
|
||
table_index_array.append(table_index_record[0])
|
||
# #仿照写法,指标的黑名单转化
|
||
measure_index_array = []
|
||
cursor_app.execute(select_measure_index_query, (file_id,))
|
||
measure_index_records = cursor_app.fetchall()
|
||
for measure_index_record in measure_index_records:
|
||
measure_index_array.append(measure_index_record[0])
|
||
|
||
|
||
if str(report_type) == "2":
|
||
table_index_array = []
|
||
measure_index_array = []
|
||
|
||
applog.info(f'黑名单的值是{parent_table_pages}和{table_index_array}以及新增的{measure_index_array}')
|
||
applog.info(f"black_array:{black_array}")
|
||
|
||
record_start = record_range.split('-')[0]
|
||
record_end = record_range.split('-')[1]
|
||
try:
|
||
for index in range(int(record_start),int(record_end)):
|
||
record = records[index]
|
||
ori_measure_name = record[0]
|
||
measure_name = record[1]
|
||
distance = record[2]
|
||
ori_measure_id = record[3]
|
||
measure_id = record[4]
|
||
measure_vector = redis_service.read_from_redis(redis_client,ori_measure_id)
|
||
measure_list = ast.literal_eval(measure_vector)
|
||
data = [measure_list]
|
||
# data.append(measure_list)
|
||
filter_str = 'file_id == "'+file_id+'"'
|
||
res = client.search(
|
||
collection_name="pdf_measure_v4", # Replace with the actual name of your collection
|
||
# Replace with your query vector
|
||
data=data,
|
||
limit=3, # Max. number of search results to return
|
||
search_params={"metric_type": "COSINE", "params": {}}, # Search parameters
|
||
output_fields=["measure_name","measure_value","table_num","table_index","measure_unit"],
|
||
filter=filter_str
|
||
)
|
||
|
||
|
||
|
||
# Convert the output to a formatted JSON string
|
||
# for i in range(len(res[0])):
|
||
for i in range(len(res[0])):
|
||
|
||
vector_distance = float(res[0][i]["distance"])
|
||
pdf_measure = res[0][i]["entity"]["measure_name"]
|
||
measure_value = res[0][i]["entity"]["measure_value"]
|
||
table_num = res[0][i]["entity"]["table_num"]
|
||
table_index = res[0][i]["entity"]["table_index"]
|
||
unit = res[0][i]["entity"]["measure_unit"]
|
||
|
||
#先过滤页码为0的情况,暂时不知道原因
|
||
if table_num == 0:
|
||
continue
|
||
|
||
#过滤表格上方文字黑名单关键词的页码和表格下标
|
||
if f"{table_num}" in table_index_array:
|
||
continue
|
||
|
||
|
||
#过滤指标中包含黑名单关键词
|
||
if utils.check_pdf_measure_black_list(pdf_measure):
|
||
continue
|
||
|
||
if f"{table_num}" in measure_index_array and utils.check_pdf_measure_black_list_v3(file_id,table_num,table_index,pdf_measure,conn_app,cursor_app):
|
||
#if utils.check_pdf_measure_black_list_v3(file_id,table_num,table_index,pdf_measure,conn_app,cursor_app):
|
||
applog.info(f'经过第三层规则去除了{table_num}页的{pdf_measure}指标')
|
||
continue
|
||
|
||
|
||
if vector_distance > distance and table_num not in parent_table_pages:
|
||
#检测规则开始
|
||
#判断抽取指标和财报指标周期是否相同
|
||
ori_period = utils.get_period_type(ori_measure_name, report_year)
|
||
pdf_period = utils.get_period_type(pdf_measure, report_year)
|
||
if pdf_measure == '2023年6月30日货币资金合计':
|
||
applog.info(f'第1处{ori_period}和{pdf_period}')
|
||
if(ori_period != pdf_period):
|
||
continue
|
||
|
||
|
||
#判断抽取指标和财报指标是否期初指标
|
||
start_ori_period = utils.get_start_period_type(ori_measure_name)
|
||
start_pdf_period = utils.get_start_period_type(pdf_measure)
|
||
if pdf_measure == '2023年6月30日货币资金合计':
|
||
applog.info(f'第2处{start_ori_period}和{start_pdf_period}')
|
||
if(start_ori_period != start_pdf_period):
|
||
continue
|
||
|
||
#判断抽取指标和财报指标类型是否相同,是否都是季度
|
||
ori_season_type = utils.get_season_flag(ori_measure_name)
|
||
pdf_season_type = utils.get_season_flag(pdf_measure)
|
||
if pdf_measure == '2023年6月30日货币资金合计':
|
||
applog.info(f'第3处{ori_season_type}和{pdf_season_type}')
|
||
if(ori_season_type != pdf_season_type):
|
||
continue
|
||
|
||
|
||
#判断是否都是扣非指标
|
||
ori_kf_type = utils.get_kf_flag(ori_measure_name)
|
||
pdf_kf_type = utils.get_kf_flag(pdf_measure)
|
||
if pdf_measure == '2023年6月30日货币资金合计':
|
||
applog.info(f'第4处{ori_kf_type}和{pdf_kf_type}')
|
||
if(ori_kf_type != pdf_kf_type):
|
||
applog.info(f'扣非指标{table_num}页的{pdf_measure}指标')
|
||
continue
|
||
|
||
#判断抽取指标和财报指标类型是否相同,是否都是百分比
|
||
ori_type = utils.get_percent_flag(ori_measure_name)
|
||
pdf_type = utils.get_percent_flag(pdf_measure)
|
||
if pdf_measure == '2023年6月30日货币资金合计':
|
||
applog.info(f'第5处{ori_type}和{pdf_type}')
|
||
if(ori_type != pdf_type):
|
||
continue
|
||
|
||
#判断抽取指标和财报指标类型是否相同,是否都是占比同比变动类
|
||
ori_growth_type = utils.get_percent_growth(ori_measure_name)
|
||
pdf_growth_type = utils.get_percent_growth(pdf_measure)
|
||
if pdf_measure == '2023年6月30日货币资金合计':
|
||
applog.info(f'第6处{ori_growth_type}和{pdf_growth_type}')
|
||
if(ori_growth_type != pdf_growth_type):
|
||
continue
|
||
|
||
#解决指标语义是比率,但值为非比率的情况
|
||
if ori_growth_type == '1':
|
||
check_measure_value = abs(float(measure_value))
|
||
if(check_measure_value > 10000):
|
||
continue
|
||
|
||
# 判断数据库中是否有数据
|
||
check_query_data = (file_id, measure_name, int(table_num), int(table_index), measure_value)
|
||
cursor.execute(check_query, check_query_data)
|
||
check_records = cursor.fetchall()
|
||
if(len(check_records)) > 0:
|
||
continue
|
||
|
||
#判断是否包含黑名单
|
||
if(utils.check_black_list(measure_name,pdf_measure,black_array)):
|
||
continue
|
||
|
||
if(utils.check_white_list(measure_name,pdf_measure)):
|
||
applog.info(f"measure_name{measure_name},pdf_measure{pdf_measure}")
|
||
continue
|
||
|
||
#判断抽取指标和财报指标类型是否都是增长类,比如同比变动为增长类
|
||
ori_change_type = utils.get_change_rate_flag(ori_measure_name)
|
||
pdf_change_type = utils.get_change_rate_flag(pdf_measure)
|
||
if(ori_change_type != pdf_change_type):
|
||
continue
|
||
|
||
#处理调整前,调整前、后同时出现,如果有调整前过滤
|
||
if pdf_measure.find('调整前') != -1 or pdf_measure.find('重述前') != -1:
|
||
continue
|
||
|
||
#判断指标是否报告期初
|
||
ori_report_start = utils.get_report_start(ori_measure_name)
|
||
pdf_report_start = utils.get_report_start(pdf_measure)
|
||
|
||
if(ori_report_start != pdf_report_start):
|
||
continue
|
||
|
||
#检测规则结束
|
||
#获取指标单位数据,除了百分比
|
||
if(utils.get_percent_flag(measure_name) == '0'):
|
||
unit_query_data = (file_id, int(table_num), int(table_index))
|
||
cursor.execute(unit_query, unit_query_data)
|
||
unit_records = cursor.fetchall()
|
||
if unit != '' :
|
||
pass
|
||
elif unit == '' and (len(unit_records)) > 0:
|
||
unit = unit_records[0][0]
|
||
else:
|
||
unit = '元'
|
||
|
||
data_to_insert = (file_id, file_name, "table", int(table_num), int(table_index), ori_measure_id, ori_measure_name, measure_value, create_time, create_time, vector_distance, pdf_measure,measure_id,measure_name,unit)
|
||
cursor.execute(insert_query, data_to_insert)
|
||
conn.commit()
|
||
except Exception as e:
|
||
applog.error(e)
|
||
finally:
|
||
redis_client.close()
|
||
cursor.close()
|
||
conn.close()
|
||
client.close()
|
||
|
||
#
|
||
def insert_table_measure_from_vector_async_process(cursor,parent_table_pages,file_id,file_name):
|
||
select_year_select = f"""select report_type,year from report_check where id = {file_id}"""
|
||
cursor.execute(select_year_select)
|
||
record_select = cursor.fetchall()
|
||
report_type = record_select[0][0]
|
||
report_year = record_select[0][1]
|
||
|
||
select_query = '''
|
||
SELECT ori_measure_name,measure_name,distance,ori_measure_id,measure_id FROM measure_config
|
||
where year = '{year}'
|
||
'''.format(year=report_year)
|
||
select_query_half_year = '''
|
||
SELECT ori_measure_name,measure_name,distance,ori_measure_id,measure_id FROM measure_config_half_year
|
||
where year = '{year}'
|
||
'''.format(year=report_year)
|
||
select_query_thrid = '''
|
||
SELECT ori_measure_name,measure_name,distance,ori_measure_id,measure_id FROM measure_config_third_quarter
|
||
where year = '{year}'
|
||
'''.format(year=report_year)
|
||
select_query_first_quarter = '''
|
||
SELECT ori_measure_name,measure_name,distance,ori_measure_id,measure_id FROM measure_config_first_quarter
|
||
where year = '{year}'
|
||
'''.format(year=report_year)
|
||
# select_black_array_query = 'SELECT measure_name, keywords FROM measure_black_list where isdel = 0'
|
||
select_black_array_query = '''
|
||
SELECT measure_name, keywords FROM measure_black_list where isdel = 0 and find_in_set('{year}',year) and find_in_set('{flag}',flag)
|
||
'''.format(year=report_year, flag=report_type)
|
||
|
||
|
||
black_array = []
|
||
cursor.execute(select_black_array_query)
|
||
results = cursor.fetchall()
|
||
for row in results:
|
||
category = row[0]
|
||
keywords = row[1].split(',')
|
||
black_array.append(f"{category}:{','.join(keywords)}")
|
||
|
||
if report_type == 1:
|
||
start_time = time.time()
|
||
cursor.execute(select_query_half_year)
|
||
records = cursor.fetchall()
|
||
end_time = time.time()
|
||
applog.info(f"向量配置数据查询 {(end_time - start_time):.2f} 秒。")
|
||
applog.info('insert_table_measure_from_vector_async_process方法走的半年报')
|
||
start_time = time.time()
|
||
records_range_parts = utils.get_range(len(records),MEASURE_COUNT)
|
||
processes = []
|
||
for record_range in records_range_parts:
|
||
p = Process(target=insert_table_from_vector_mul_process, args=(parent_table_pages,file_id,file_name,records,record_range,black_array,))
|
||
processes.append(p)
|
||
p.start()
|
||
elif report_type == 2:
|
||
start_time = time.time()
|
||
cursor.execute(select_query_first_quarter)
|
||
records = cursor.fetchall()
|
||
end_time = time.time()
|
||
applog.info(f"向量配置数据查询 {(end_time - start_time):.2f} 秒。")
|
||
applog.info('insert_table_measure_from_vector_async_process方法走的一季报')
|
||
start_time = time.time()
|
||
records_range_parts = utils.get_range(len(records),MEASURE_COUNT)
|
||
processes = []
|
||
for record_range in records_range_parts:
|
||
p = Process(target=insert_table_from_vector_mul_process, args=(parent_table_pages,file_id,file_name,records,record_range,black_array,))
|
||
processes.append(p)
|
||
p.start()
|
||
elif report_type == 3:
|
||
start_time = time.time()
|
||
cursor.execute(select_query_thrid)
|
||
records = cursor.fetchall()
|
||
end_time = time.time()
|
||
applog.info(f"向量配置数据查询 {(end_time - start_time):.2f} 秒。")
|
||
applog.info('insert_table_measure_from_vector_async_process方法走的三季报')
|
||
start_time = time.time()
|
||
records_range_parts = utils.get_range(len(records),MEASURE_COUNT)
|
||
processes = []
|
||
for record_range in records_range_parts:
|
||
p = Process(target=insert_table_from_vector_mul_process, args=(parent_table_pages,file_id,file_name,records,record_range,black_array,))
|
||
processes.append(p)
|
||
p.start()
|
||
|
||
else:
|
||
start_time = time.time()
|
||
cursor.execute(select_query)
|
||
records = cursor.fetchall()
|
||
end_time = time.time()
|
||
applog.info(f"向量配置数据查询 {(end_time - start_time):.2f} 秒。")
|
||
applog.info('insert_table_measure_from_vector_async_process方法走的全年报')
|
||
start_time = time.time()
|
||
records_range_parts = utils.get_range(len(records),MEASURE_COUNT)
|
||
processes = []
|
||
for record_range in records_range_parts:
|
||
p = Process(target=insert_table_from_vector_mul_process, args=(parent_table_pages,file_id,file_name,records,record_range,black_array,))
|
||
processes.append(p)
|
||
p.start()
|
||
|
||
applog.info(f'等待所有子任务完成,任务ID:{file_id}')
|
||
for p in processes:
|
||
p.join()
|
||
applog.info(f'所有子任务完成,任务ID:{file_id}')
|
||
applog.info(f'启动指标归一化任务ID:{file_id}')
|
||
end_time = time.time()
|
||
applog.info(f"向量更新时间 {(end_time - start_time):.2f} 秒。")
|
||
|
||
def insert_table_measure_from_vector(conn,cursor,client,parent_table_pages,file_id,file_name):
|
||
create_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
select_query = '''
|
||
SELECT ori_measure_name,measure_name,distance,ori_measure_id,measure_id FROM measure_config
|
||
'''
|
||
|
||
check_query = '''
|
||
select id from ori_measure_list
|
||
WHERE file_id = %s and measure_name = %s and page_number = %s and table_index = %s and ori_measure_value = %s
|
||
'''
|
||
insert_query = '''
|
||
INSERT INTO ori_measure_list
|
||
(file_id, file_name, type, page_number, table_index, ori_measure_id, ori_measure_name, ori_measure_value, create_time, update_time, distance, pdf_measure,measure_id,measure_name,unit)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ,%s)
|
||
'''
|
||
select_year_select = f"""select report_type,year from report_check where id = {file_id}"""
|
||
cursor.execute(select_year_select)
|
||
record_select = cursor.fetchall()
|
||
report_type = record_select[0][0]
|
||
report_year = record_select[0][1]
|
||
|
||
start_time = time.time()
|
||
cursor.execute(select_query)
|
||
records = cursor.fetchall()
|
||
end_time = time.time()
|
||
applog.info(f"向量配置数据查询 {(end_time - start_time):.2f} 秒。")
|
||
start_time = time.time()
|
||
|
||
try:
|
||
for record in records:
|
||
ori_measure_name = record[0]
|
||
measure_name = record[1]
|
||
distance = record[2]
|
||
ori_measure_id = record[3]
|
||
measure_id = record[4]
|
||
measure_vector = redis_service.read_from_redis(ori_measure_id)
|
||
measure_list = ast.literal_eval(measure_vector)
|
||
data = [measure_list]
|
||
filter_str = 'file_id == "'+file_id+'"'
|
||
res = client.search(
|
||
collection_name="pdf_measure_v4", # Replace with the actual name of your collection
|
||
# Replace with your query vector
|
||
data=data,
|
||
limit=3, # Max. number of search results to return
|
||
search_params={"metric_type": "COSINE", "params": {}}, # Search parameters
|
||
output_fields=["measure_name","measure_value","table_num","table_index","measure_unit"],
|
||
filter=filter_str
|
||
)
|
||
|
||
# Convert the output to a formatted JSON string
|
||
for i in range(len(res[0])):
|
||
|
||
vector_distance = float(res[0][i]["distance"])
|
||
pdf_measure = res[0][i]["entity"]["measure_name"]
|
||
measure_value = res[0][i]["entity"]["measure_value"]
|
||
table_num = res[0][i]["entity"]["table_num"]
|
||
table_index = res[0][i]["entity"]["table_index"]
|
||
measure_unit = res[0][i]["entity"]["measure_unit"]
|
||
|
||
if vector_distance > distance and table_num not in parent_table_pages:
|
||
#检测规则开始
|
||
#判断抽取指标和财报指标周期是否相同
|
||
ori_period = utils.get_period_type(ori_measure_name, report_year)
|
||
pdf_period = utils.get_period_type(pdf_measure, report_year)
|
||
if(ori_period != pdf_period):
|
||
continue
|
||
|
||
#判断抽取指标和财报指标类型是否相同,是否都是百分比
|
||
ori_type = utils.get_percent_flag(ori_measure_name)
|
||
pdf_type = utils.get_percent_flag(pdf_measure)
|
||
if(ori_type != pdf_type):
|
||
continue
|
||
|
||
# 判断数据库中是否有数据
|
||
check_query_data = (file_id, measure_name, int(table_num), int(table_index), measure_value)
|
||
cursor.execute(check_query, check_query_data)
|
||
check_records = cursor.fetchall()
|
||
if(len(check_records)) > 0:
|
||
continue
|
||
#检测规则结束
|
||
|
||
data_to_insert = (file_id, file_name, "table", int(table_num), int(table_index), ori_measure_id, ori_measure_name, measure_value, create_time, create_time, vector_distance, pdf_measure,measure_id,measure_name,measure_unit)
|
||
cursor.execute(insert_query, data_to_insert)
|
||
conn.commit()
|
||
except Exception as e:
|
||
applog.info(e)
|
||
end_time = time.time()
|
||
applog.info(f"向量更新数据时间 {(end_time - start_time):.2f} 秒。")
|
||
start_time = time.time()
|
||
|
||
|
||
def insert_measure_data_to_milvus(client,table_info,cursor,conn):
|
||
insert_query = '''
|
||
INSERT INTO word_measure_parse_process
|
||
(file_id, page_num, content)
|
||
VALUES (%s, %s, %s)
|
||
'''
|
||
for table in table_info:
|
||
try:
|
||
data=[]
|
||
table_num = table['page_num'].split("_")[0]
|
||
file_id = table['file_id']
|
||
table_index = table['page_num'].split("_")[1]
|
||
|
||
measure_list = table['measure_list']
|
||
for measure in measure_list:
|
||
measure_name = measure['measure_name']
|
||
# 需要跳过的一些指标
|
||
black_list = ["营业总成本"]
|
||
if any(black in measure_name for black in black_list):
|
||
continue
|
||
measure_value = measure['measure_value'].replace("(", "").replace(")", "")
|
||
measure_name = utils.get_clean_text(measure_name)
|
||
measure_name = measure_name.replace('2023','2023年').replace('2022','2022年').replace('(','').replace(')','')#这个真绝了,怎么都删不掉
|
||
#measure_name_1 = measure_name.replace('调整后','')
|
||
quarters = ['第一季度', '第二季度', '第三季度', '第四季度','增减','2023年','2022年','2021年','年']
|
||
for quarter in quarters:
|
||
measure_name = measure_name.replace(quarter * 2, quarter)
|
||
pattern_dup = re.compile(r'(\w{3,})\1+')#去掉任意超过两个字且重复的字符
|
||
matches = pattern_dup.findall(measure_name)
|
||
for match in matches:
|
||
applog.info(f"被删除的字符: {match * 2}")
|
||
measure_name = pattern_dup.sub(r'\1', measure_name)
|
||
measure_name_1 = measure_name.replace('调整后','').replace('上年期末数','上年期末').replace('上年期末','上年年末')
|
||
measure_unit = measure['measure_unit']
|
||
if re.match(r'^[+-]?(\d+(\.\d*)?|\.\d+)(%?)$', measure_value) and any(key_word in measure_name for key_word in measure_name_keywords):
|
||
vector_obj = utils.embed_with_str(measure_name_1)
|
||
vector = vector_obj.output["embeddings"][0]["embedding"]
|
||
measure_data = {}
|
||
measure_data['vector'] = vector
|
||
measure_data['table_num'] = int(table_num)
|
||
measure_data['table_index'] = int(table_index)
|
||
measure_data['measure_name'] = measure_name
|
||
measure_data['measure_value'] = measure_value
|
||
measure_data['measure_unit'] = measure_unit
|
||
measure_data['file_id'] = file_id
|
||
data.append(measure_data)
|
||
|
||
# 指标数据写入指标解析过程表,用于前端展示
|
||
content = f"{measure_name}:{measure_value}"
|
||
data_to_insert = (file_id, table_num, content)
|
||
cursor.execute(insert_query, data_to_insert)
|
||
conn.commit()
|
||
elif re.match(r'(增加|减少|下降|上升)[了]?(\d+\.\d+)[个]?百分点', measure_value) and any(key_word in measure_name for key_word in measure_name_keywords):
|
||
#特殊处理指标值为增加了/减少了 XXX 个百分点
|
||
unit_pattern = re.compile(r'(增加|减少|下降|上升)[了]?(\d+\.\d+)[个]?百分点')
|
||
match = unit_pattern.search(measure_value)
|
||
if match and len(match.groups()) == 2:
|
||
crease_type = match.group(1)
|
||
measure_value = match.group(2)
|
||
if crease_type == '减少' or crease_type == '下降':
|
||
measure_value = f'-{match.group(2)}'
|
||
|
||
vector_obj = utils.embed_with_str(measure_name_1)
|
||
vector = vector_obj.output["embeddings"][0]["embedding"]
|
||
measure_data = {}
|
||
measure_data['vector'] = vector
|
||
measure_data['table_num'] = int(table_num)
|
||
measure_data['table_index'] = int(table_index)
|
||
measure_data['measure_name'] = measure_name
|
||
measure_data['measure_value'] = measure_value
|
||
measure_data['measure_unit'] = measure_unit
|
||
measure_data['file_id'] = file_id
|
||
data.append(measure_data)
|
||
|
||
# 指标数据写入指标解析过程表,用于前端展示
|
||
content = f"{measure_name}:{measure_value}"
|
||
data_to_insert = (file_id, table_num, content)
|
||
cursor.execute(insert_query, data_to_insert)
|
||
conn.commit()
|
||
res = client.insert(
|
||
collection_name="pdf_measure_v4",
|
||
data=data
|
||
)
|
||
|
||
|
||
except Exception as e:
|
||
applog.error(f"异常信息=={e}")
|
||
|
||
def runing_job():
|
||
conn = mysql.connector.connect(
|
||
host= MYSQL_HOST,
|
||
user= MYSQL_USER,
|
||
password= MYSQL_PASSWORD,
|
||
database= MYSQL_DB
|
||
)
|
||
|
||
# 创建一个cursor对象来执行SQL语句
|
||
cursor = conn.cursor(buffered=True)
|
||
select_query = '''
|
||
SELECT * FROM report_check where status = 0 and isdel=0
|
||
'''
|
||
cursor.execute(select_query)
|
||
records = cursor.fetchall()
|
||
if(len(records)) > 1:
|
||
return True
|
||
return False
|
||
|
||
def insert_word_parse_process(parser_info,conn,cursor,table_name):
|
||
# 执行SQL语句,插入数据
|
||
insert_query = f'''
|
||
INSERT INTO {table_name}
|
||
(file_id, page_num, page_count, content, type)
|
||
VALUES (%s, %s, %s, %s, %s)
|
||
'''
|
||
file_id = parser_info['file_id']
|
||
page_num = int(parser_info['page_num'])
|
||
page_count = int(parser_info['page_count'])
|
||
content = json.dumps(parser_info['content'], ensure_ascii=False)
|
||
type = parser_info['type']
|
||
data_to_insert = (file_id, page_num, page_count, content, type)
|
||
cursor.execute(insert_query, data_to_insert)
|
||
conn.commit()
|
||
|
||
|
||
def delete_database(file_id):
|
||
try:
|
||
conn = mysql.connector.connect(
|
||
host=MYSQL_HOST,
|
||
user=MYSQL_USER,
|
||
password=MYSQL_PASSWORD,
|
||
database=MYSQL_DB
|
||
)
|
||
|
||
# 创建一个cursor对象来执行SQL语句
|
||
cursor = conn.cursor(buffered=True)
|
||
|
||
truncate_query = [
|
||
"delete from measure_parse_process where file_id = %s;",
|
||
"delete from measure_parser_info where file_id = %s;",
|
||
"delete from ori_measure_list where file_id = %s;",
|
||
"delete from measure_list where file_id = %s;",
|
||
"delete from word_parse_process where file_id = %s;",
|
||
"delete from table_unit_info where file_id = %s;",
|
||
# "delete from a where file_id = %s;",
|
||
# "delete from b where file_id = %s;",
|
||
]
|
||
#file_id = file_id
|
||
for truncate in truncate_query:
|
||
cursor.execute(truncate,(file_id,))
|
||
conn.commit()
|
||
except Exception as e:
|
||
applog.error(f'删除失败,原因是{e}')
|
||
def delete_to_run(conn,cursor,file_id):
|
||
try:
|
||
truncate_query = [
|
||
"delete from ori_measure_list where file_id = %s;",
|
||
"delete from measure_list where file_id = %s;",
|
||
"delete from check_measure_list where file_id = %s;",
|
||
"delete from check_measure_detail_list where file_id = %s;",
|
||
# "delete from table_unit_info where file_id = %s;",
|
||
# "delete from pdf_parse_process where file_id = %s;",
|
||
# "delete from table_unit_info where file_id = %s;",
|
||
# "delete from a where file_id = %s;",
|
||
# "delete from b where file_id = %s;",
|
||
]
|
||
#file_id = file_id
|
||
for truncate in truncate_query:
|
||
cursor.execute(truncate,(file_id,))
|
||
conn.commit()
|
||
except Exception as e:
|
||
applog.error(f'删除失败,原因是{e}')
|
||
|
||
def insert_word_text_info(file_id,table_info):
|
||
conn = mysql.connector.connect(
|
||
host=MYSQL_HOST,
|
||
user=MYSQL_USER,
|
||
password=MYSQL_PASSWORD,
|
||
database=MYSQL_DB
|
||
)
|
||
cursor = conn.cursor(buffered=True)
|
||
|
||
# 执行SQL语句,插入数据
|
||
insert_query = '''
|
||
INSERT INTO word_text_info
|
||
(file_id, page_num, text)
|
||
VALUES (%s, %s, %s)
|
||
'''
|
||
|
||
data_to_insert = [(file_id, int(line["index"]),int(line["data"])) for line in table_info]
|
||
cursor.executemany(insert_query,data_to_insert)
|
||
|
||
conn.commit()
|
||
|
||
def process_time(file_id,type,time,start_time,end_time):
|
||
conn = mysql.connector.connect(
|
||
host= MYSQL_HOST,
|
||
user= MYSQL_USER,
|
||
password= MYSQL_PASSWORD,
|
||
database= MYSQL_DB
|
||
)
|
||
cursor = conn.cursor(buffered=True)
|
||
time = round(time, 2)
|
||
start_time = datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S')
|
||
end_time = datetime.fromtimestamp(end_time).strftime('%Y-%m-%d %H:%M:%S')
|
||
insert_query = '''
|
||
insert into word_process_time
|
||
(file_id,type,time,start_time,end_time)
|
||
values (%s, %s, %s,%s,%s)
|
||
'''
|
||
data_insert = (file_id,type,time,start_time,end_time)
|
||
cursor.execute(insert_query,data_insert)
|
||
conn.commit()
|
||
|
||
|
||
def batch_insert_page_text(table_info, conn, cursor, table_name):
|
||
file_id = table_info['file_id']
|
||
page_num = int(table_info['page_num'])
|
||
text_lines = table_info['text']
|
||
|
||
# 1. 检查表是否为空
|
||
check_if_empty_query = f"SELECT COUNT(*) FROM {table_name} where file_id = {file_id} and page_num = {page_num}"
|
||
cursor.execute(check_if_empty_query)
|
||
is_table_empty = cursor.fetchone()[0] == 0
|
||
|
||
if is_table_empty:
|
||
# 表为空,直接插入数据
|
||
insert_query = f'''
|
||
INSERT INTO {table_name}
|
||
(file_id, page_num, text)
|
||
VALUES (%s, %s, %s)
|
||
'''
|
||
data_to_insert = [(file_id, page_num, text_lines) ]
|
||
cursor.executemany(insert_query, data_to_insert)
|
||
else:
|
||
pass
|
||
conn.commit()
|
||
def file_type_check(file_id):
|
||
conn = mysql.connector.connect(
|
||
host= MYSQL_HOST,
|
||
user= MYSQL_USER,
|
||
password= MYSQL_PASSWORD,
|
||
database= MYSQL_DB
|
||
)
|
||
cursor = conn.cursor(buffered=True)
|
||
try:
|
||
select_query = '''
|
||
SELECT report_type FROM report_check WHERE id = %s
|
||
'''
|
||
cursor.execute(select_query, (file_id,))
|
||
record = cursor.fetchone()
|
||
if record and record[0] == 5:
|
||
return True
|
||
return False
|
||
finally:
|
||
cursor.close()
|
||
conn.close()
|
||
def file_type_check_v2(file_id):
|
||
conn = mysql.connector.connect(
|
||
host= MYSQL_HOST,
|
||
user= MYSQL_USER,
|
||
password= MYSQL_PASSWORD,
|
||
database= MYSQL_DB
|
||
)
|
||
cursor = conn.cursor(buffered=True)
|
||
try:
|
||
select_query = '''
|
||
SELECT report_type FROM report_check WHERE id = %s
|
||
'''
|
||
cursor.execute(select_query, (file_id,))
|
||
record = cursor.fetchone()
|
||
return record[0]
|
||
# if record and == 5:
|
||
# return True
|
||
# return False
|
||
finally:
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
def word_title_insert_mysql(file_id,title_array):
|
||
conn = mysql.connector.connect(
|
||
host=MYSQL_HOST,
|
||
user=MYSQL_USER,
|
||
password=MYSQL_PASSWORD,
|
||
database=MYSQL_DB
|
||
)
|
||
cursor = conn.cursor(buffered=True)
|
||
data_to_insert = [(file_id, line["data"], int(line["index"]),int(line["depth"])) for line in title_array]
|
||
|
||
# 插入语句
|
||
insert_query = """
|
||
INSERT INTO word_title_info (file_id, title, page_num, depth)
|
||
VALUES (%s, %s, %s, %s)
|
||
"""
|
||
|
||
cursor.executemany(insert_query, data_to_insert)
|
||
|
||
conn.commit()
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
|
||
def get_file_info_from_mysql(file_id):
|
||
conn = mysql.connector.connect(
|
||
host= MYSQL_HOST,
|
||
user= MYSQL_USER,
|
||
password= MYSQL_PASSWORD,
|
||
database= MYSQL_DB
|
||
)
|
||
#cursor = conn.cursor(buffered=True)
|
||
cursor = conn.cursor(dictionary=True)
|
||
select_query = """
|
||
SELECT title, page_num, depth
|
||
FROM pdf_title_info
|
||
WHERE file_id = %s
|
||
"""
|
||
|
||
cursor.execute(select_query, (file_id,))
|
||
result = cursor.fetchall()
|
||
cursor.close()
|
||
conn.close()
|
||
return result
|