import re import os,time from config import MILVUS_CLIENT,MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DB,MEASURE_COUNT,MYSQL_HOST_APP,MYSQL_USER_APP,MYSQL_PASSWORD_APP,MYSQL_DB_APP import mysql.connector import utils from pymilvus import MilvusClient import numpy as np from multiprocessing import Process from config import REDIS_HOST,REDIS_PORT,REDIS_PASSWORD import redis import db_service_word from zzb_logger import applog ''' 已知发现问题: 1.表格和文本提取错误,表格和文本内容在同一页,文本在前表格在后的,文本数据提取不出来 2.大模型抽取错,抽取2023年营业收入:主营业务收入、分产品的营业收入、变动比例被错误抽取 3.表格中的指标被抽取成文本中 4.大模型抽取指标时,语义完全不同的指标被放一起,考虑用向量相似度来判断 ''' # 数据处理流程 # 1. get_table_range多进程获取所有表格及表格上下文,输出为一个完整的列表 # 2. 单进程进行表格分页合并,输出一个新的表格对象数组 # 3. 新表格对象数组多进程开始原来的解析指标流程 STR_PATTERN = '营业收入|净利润|变动比例|损益|现金流量净额|现金净流量|现金流|每股收益|总资产|资产总额|收益率|货币资金|应收账款|存货|固定资产|在建工程|商誉|短期借款|应付账款|合同负债|长期借款|营业成本|销售费用|管理费用|财务费用|研发费用|研发投入' PATTERN = '品牌类型|分门店|销售渠道|行业名称|产品名称|地区名称|子公司名称|业绩快报|调整情况说明|调整年初资产负债表|计入当期损益的政府补助|主要子公司|分部|母公司资产负债表|显示服务|渠道|商品类型|合同分类|会计政策变更|地区分类|研发项目|分类产品|表头不合规的表格|内部控制评价|关联方|国内地区|国外地区|销售区域|存货库龄|外币|逾期60天以上|欧元|英镑|美元|日元' MUILT_PATTERN = '调整前' #unit_pattern = re.compile(r'单位[:|:]?(百万元|千万元|亿元|万元|千元|元)') unit_pattern = re.compile(r'(单位|单元|人民币).{0,6}?(百万元|千万元|亿元|万元|千元|元).{0,3}?')#修改单位匹配规则,不限制冒号,只限制距离 #获取指标的表头信息 def get_col_num_info(array,row_num,col_num,x,y): num_info="" for j in range(col_num): if len(str(array[x][j])) > 50: continue num_info += str(array[x][j]) return num_info.replace('%','') #获取指标的表头信息 def get_row_num_info(array,row_num,col_num,x,y): num_info="" for i in range(row_num): if len(str(array[i][y])) > 50: continue num_info += str(array[i][y]) return num_info def table_converter(table): table_string = '' # 遍历表格的每一行 for row_num in range(len(table)): row = table[row_num] # 从warp的文字删除线路断路器 cleaned_row = [item.replace('\n', ' ') if item is not None and '\n' in item else 'None' if item is None else item for item in row] # 将表格转换为字符串,注意'|'、'\n' table_string+=(','.join(cleaned_row)) # 删除最后一个换行符 table_string = table_string[:-1] return table_string # 检查第二列是否为中文字符的函数 def is_chinese(s): return bool(re.search('[\u4e00-\u9fff]', s)) def check_table(arr): split_index = None for i in range(arr.shape[0]): # 过滤掉第一行 if arr[i, 0] == "" and is_chinese(arr[i, 1]) and i > 1: split_index = i break if split_index is not None: arr1 = arr[:split_index] arr2 = arr[split_index:] return [arr1, arr2] else: return [arr] def safe_process_array(func, arr): try: return func(arr) except Exception as e: print(f"这个函数出现了报错{func.__name__}: {e}") return arr # 返回原数组以便继续后续处理 # 单独针对三季报的资产负债表识别合并问题 def process_array(arr, years=['2022', '2023', '2024'], keyword='项目'): # 确保 row 有足够的列来存储分割后的数据 def ensure_columns(row, num_columns): while len(row) < num_columns: row.append('') def is_valid_header(header, years, keyword): header_text = header.lower() # 转小写以提高匹配的鲁棒性 return any(year in header_text for year in years) and keyword in header_text # 对字符串进行清理 def clean_text(text): # 去除“年”和“月”相邻的空格 text = re.sub(r'\s*(年|月)\s*', r'\1', text) # 去除“日”左侧相邻的空格 text = re.sub(r'\s*日', '日', text) return text # 将 numpy 数组转换为列表 arr = arr.tolist() if isinstance(arr, np.ndarray) else arr if len(arr[0]) == 1 and is_valid_header(arr[0][0], years, keyword): remaining_value = arr[0][0] # 清理字符串 remaining_value = clean_text(remaining_value) parts = remaining_value.split() ensure_columns(arr[0], len(parts)) for i in range(len(parts)): arr[0][i] = parts[i] header_columns = len(arr[0]) for i in range(1, len(arr)): if len(arr[i]) == 1: remaining_value = arr[i][0] parts = remaining_value.split() if len(parts) > header_columns: parts = parts[:header_columns] ensure_columns(arr[i], header_columns) for j in range(len(parts)): arr[i][j] = parts[j] # 如果分割出的值不足,填充空值 if len(parts) < header_columns: for j in range(len(parts), header_columns): arr[i][j] = '' return arr # 三季报中针对性修改,本报告期和年初至报告期末的两个上年同期进行区分 def process_array_with_annual_comparison(arr, keywords=['本报告期', '年初至报告期末', '上年同期']): def contains_all_keywords(header, keywords): return all(keyword in header for keyword in keywords) def split_and_replace_occurrences(header, target, replacement): # 找到所有 target 出现的位置 indices = [i for i, x in enumerate(header) if x == target] if len(indices) > 1: split_index = len(indices) // 2 for i in range(split_index): header[indices[i]] = replacement return header # 将 numpy 数组转换为列表 arr = arr.tolist() if isinstance(arr, np.ndarray) else arr if len(arr) > 0 and len(arr[0]) > 0: first_row = arr[0] if contains_all_keywords(first_row, keywords): # 将 "上年同期" 拆分并替换 first_row = split_and_replace_occurrences(first_row, '上年同期', '三季报中无需识别的上年同期') arr[0] = first_row return arr # 三季报的非经常损益的单独处理 def process_array_with_grants(arr, keywords=['本报告期', '年初至报告期'], target='计入当期损益的政府补助', replacement='非经常性损益'): # 检查第一行是否包含所有关键词 def contains_all_keywords(header, keywords): # return all(keyword in header for keyword in keywords) return all(any(keyword in str(cell) for cell in header) for keyword in keywords) # 检查第一列中是否存在目标文本 def contains_target_in_first_column(arr, target): return any(target in str(item[0]) for item in arr) # 替换第一列中的特定值 def replace_in_first_column(arr, target, replacement): for i in range(len(arr)): if arr[i][0] == target: arr[i][0] = replacement return arr # 将 numpy 数组转换为列表 arr = arr.tolist() if isinstance(arr, np.ndarray) else arr if len(arr) > 0 and len(arr[0]) > 0: first_row = arr[0] # 检查第一行和第一列的条件 if contains_all_keywords(first_row, keywords) and contains_target_in_first_column(arr, target): # 替换第一列中的 "合计" arr = replace_in_first_column(arr, '合计', replacement) return arr # 处理表格数据 def process_table(file_id, tables): applog.info('Run task %s (%s)...' % (f'处理word文件中的table file_id:{file_id}', os.getpid())) start = time.time() conn = mysql.connector.connect( host=MYSQL_HOST, user=MYSQL_USER, password=MYSQL_PASSWORD, database=MYSQL_DB ) # 创建一个cursor对象来执行SQL语句 cursor = conn.cursor(buffered=True) for t in tables: try: arr = np.array(t["data"]) arr = safe_process_array(process_array, arr) # 部分资产负债表合并问题 arr = safe_process_array(process_array_with_annual_comparison, arr) # 复杂表格的优化"多个上年同期时处理" arr = safe_process_array(process_array_with_grants, arr) # 三季报的非经常损益 arr = np.char.replace(arr, ' ', '') arr = np.char.replace(arr, '\n', '') arr = np.char.replace(arr, ',', '') arr_list = check_table(arr) for a in arr_list: new_data = a.tolist() # 用于后面保存到数据库中 new_data = utils.check_black_table_list(new_data) rows, cols = a.shape if rows == 1 and cols == 1: continue arr_str = ''.join([''.join(map(str, row)) for row in a]) # 全量的数据先存入 word_parse_data表中 db_service_word.insert_word_parse_process({ 'file_id': file_id, 'page_num': t["index"], 'page_count': 100, 'type': 'table', 'content': { 'page_num': t["index"], 'table_index': t["index"], "type": "table", "data": new_data, }}, conn, cursor, "word_parse_data") # 过滤掉不包含需抽取指标表格的文本 matches = re.findall(STR_PATTERN, arr_str) pattern = re.findall(PATTERN, arr_str) muilt_pattern = re.findall(MUILT_PATTERN, arr_str) if len(matches) > 0 and len(muilt_pattern) < 5: # if len(matches) > 0 and len(pattern) == 0 and len(muilt_pattern) < 5: db_service_word.insert_word_parse_process({ 'file_id': file_id, 'page_num': t["index"], 'page_count': 100, 'type': 'parse_table', 'content': { 'page_num': t["index"], 'table_index': t["index"], "type": "table", "data": new_data, }}, conn, cursor,"word_parse_process") except Exception as e: applog.info(f'解析表格时出现了异常 {e} 内容为{t}') cursor.close() conn.close() end = time.time() applog.info('Task %s runs %0.2f seconds.' % (f'解析表格{file_id}', (end - start))) def text_in_table(top, tables_range, page_num): if tables_range.get(page_num): for range in tables_range[page_num]: if top < range['top'] and top > range['buttom']: return True return False def get_text_type(text: str): text = re.sub(r"\s", "", text) first_re = '年度报告' page_number_pattern = re.compile(r'^\d+(/\d+)?$') if re.search(first_re, text.strip()): return 'page_header' if page_number_pattern.match(text.strip()): return 'page_footer' if len(text) < 20 and text.endswith('页'): return 'page_footer' return 'text' def check_report_type(file_id): conn = mysql.connector.connect( host=MYSQL_HOST, user=MYSQL_USER, password=MYSQL_PASSWORD, database=MYSQL_DB ) # 创建一个cursor对象来执行SQL语句 cursor = conn.cursor(buffered=True) """ :return: 返回pdf文件中文本内容,不包括表格 """ select_year_select = f"""select report_type,year from report_check where id = {file_id}""" cursor.execute(select_year_select) record_select = cursor.fetchall() if record_select: report_type = record_select[0][0] report_year = record_select[0][1] cursor.close() conn.close() return int(report_type),report_year else: return None # 通过text的index 获取最近的一个table的index,并校验中间text文本的长度和数量 def get_next_table_index(text_index, texts, tables): try: for table in tables: if table["index"] > text_index and table["type"] == "table": table_index = table["index"] total_len = sum(len(texts.get(key).get("data").replace(" " ,"")) for key in range(text_index + 1, table_index)) # 最近一个表格的索引 在10个以内 if (table_index - text_index) < 10 and total_len < 50: # 在判断所有的字符串加起来有是否小于50个字 return table_index else: return text_index except StopIteration: applog.error("Target not found") return text_index #处理文本数据 def process_text_content(file_id,texts,tables,full_texts,type =0): applog.info('Run task %s (%s)...' % (f'处理word文件中的 text file_id:{file_id}', os.getpid())) conn = mysql.connector.connect( host=MYSQL_HOST, user=MYSQL_USER, password=MYSQL_PASSWORD, database=MYSQL_DB ) # 创建一个cursor对象来执行SQL语句 cursor = conn.cursor(buffered=True) """ :return: 返回pdf文件中文本内容,不包括表格 """ report_type, report_year = check_report_type(file_id) texts_dict = {t["index"]:t for t in full_texts} query = "SELECT title_list,button_list FROM table_title_list WHERE report_year = %s" cursor_dict = conn.cursor(dictionary=True) cursor_dict.execute(query, (report_year,)) result = cursor_dict.fetchone() title_list = result['title_list'] button_list = result['button_list'] try: for t in texts: line_text = t["data"] line_text = re.sub(r"\s", "", line_text) line_text = re.sub(r":", ":", line_text) index = t["index"] if len(re.findall('母公司|现金流量表补充', line_text)) > 0: db_service_word.insert_measure_parser_info({ 'file_id': file_id, 'content': get_next_table_index(index,texts_dict,tables), 'type': 'parent_com', }, conn, cursor) # 保存每个表格上方小范围区域的文字,这部分内容包含了表格的标题和指标单位 table_info = {} if (utils.check_table_title_black_list(line_text, title_list) or utils.check_table_title_black_list_button(line_text,button_list)): db_service_word.insert_measure_parser_info({ 'file_id': file_id, 'content': get_next_table_index(index,texts_dict,tables), 'type': 'table_index', }, conn, cursor) if utils.check_table_title_black_list_measure(line_text): db_service_word.insert_measure_parser_info_measure({ 'file_id': file_id, 'content': get_next_table_index(index, texts_dict,tables), 'type': 'measure_index', }, conn, cursor, line_text) if re.findall(unit_pattern, line_text): # 为单位 table_info = get_table_unit_info(file_id,line_text,t["index"],t["index"]+1) db_service_word.insert_table_unit_info_v1(table_info,conn,cursor) if utils.check_table_title_black_list_measure(line_text): db_service_word.insert_measure_parser_info_measure({ 'file_id': file_id, 'content': f"{t['index']}_1", 'type': 'measure_index', }, conn, cursor, line_text) if not utils.pdf_text_flag(line_text): if utils.check_line_text(line_text): db_service_word.insert_word_parse_process({ 'file_id': file_id, 'page_num' : t["index"], 'page_count' : 100, 'type' : 'parse_table', 'content':{ 'page_num' : t["index"], 'table_index' : t["index"], "type" : "text", 'content' : line_text, }},conn,cursor,"word_parse_process") # 给慎用词校验用 db_service_word.insert_word_parse_process({ 'file_id': file_id, 'page_num': t["index"], 'page_count': 100, 'type': 'text', 'content': { 'page_num': t["index"], 'table_index': t["index"], "type": "text", 'content': line_text, }}, conn, cursor, "word_parse_data") table_name = "word_text_info" if type == 1: table_name = "id_text_info" # 写入数据库 传入表名 db_service_word.batch_insert_page_text({ 'file_id': file_id, 'page_num' : t["index"], 'text' : line_text },conn,cursor, table_name) for t in tables: page_num = t["index"] for lines in t["data"]: lines = list(set(lines)) for line in lines: if len(line) == 0: continue db_service_word.batch_insert_page_text({ 'file_id': file_id, 'page_num' : page_num, 'text' : line },conn,cursor,"word_text_info") except Exception as e: applog.error(f'文本处理异常{e}') def get_table_unit_info(file_id,line_text,page_num,table_index): table_info = {} table_info['file_id'] = file_id match = unit_pattern.search(line_text) if match: unit = match.group(2) table_info['unit'] = unit table_info['page_num'] = page_num table_info['table_index'] = table_index return table_info def get_table_text_info(file_id,line_text,page_num,table_index): table_info = {} table_info['file_id'] = file_id table_info['text_info'] = line_text table_info['page_num'] = page_num table_info['table_index'] = table_index return table_info # 读取pdf中的表格,并将表格中指标和表头合并,eg: 2022年1季度营业收入为xxxxx def get_table_measure(file_id, word_tables, record_range): """ :return: pdf中的表格,并将表格中指标和表头合并,eg: 2022年1季度营业收入为xxxxx """ try: redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, db=6) conn = mysql.connector.connect( host = MYSQL_HOST, user = MYSQL_USER, password = MYSQL_PASSWORD, database = MYSQL_DB ) # 创建一个cursor对象来执行SQL语句 cursor = conn.cursor(buffered=True) conn_app = mysql.connector.connect( host = MYSQL_HOST_APP, user = MYSQL_USER_APP, password = MYSQL_PASSWORD_APP, database = MYSQL_DB_APP ) # 创建一个cursor对象来执行SQL语句 cursor_app = conn_app.cursor(buffered=True) select_year_select = f"""select report_type,year from report_check where id = {file_id}""" cursor.execute(select_year_select) record_select = cursor.fetchall() report_type = record_select[0][0] report_year = record_select[0][1] client = MilvusClient( uri= MILVUS_CLIENT ) applog.info('提取指标任务 %s (%s)...' % (record_range, os.getpid())) start = time.time() record_start = record_range.split('-')[0] record_end = record_range.split('-')[1] for index in range(int(record_start),int(record_end)): t = word_tables[index][0] measure_obj =[] data_dict = {} measure_list = [] try: arr = np.array(t["data"]) rows, cols = arr.shape if rows == 1 and cols == 1: continue row_num , col_num = -1 , -1 # 使用嵌套循环遍历数组,获取第一个数值位置 for i in range(rows): for j in range(cols): if j == 0 or i == 0:#防止第一列识别出数字 continue measure_value_config = str(arr[i, j]).replace('(','').replace(')','') if re.match(r'^[+-]?(\d+(\.\d*)?|\.\d+)(%?)$', measure_value_config): if j == cols-1: row_num, col_num = i, j break elif (re.match(r'^[+-]?(\d+(\.\d*)?|\.\d+)(%?)$', measure_value_config) or measure_value_config == '-'): row_num, col_num = i, j break else: continue break # 遍历数值二维数组,转成带语义的指标 if row_num != -1 and col_num != -1: for i in range(row_num,arr.shape[0]): for j in range(col_num,arr.shape[1]): measure_value = str(arr[i, j]).replace('%','').replace('(','-').replace(')','') if measure_value == '-' or measure_value == '' or len(measure_value) > 20: continue else: row_num_info = get_row_num_info(arr,row_num,col_num,i,j) col_num_info = get_col_num_info(arr,row_num,col_num,i,j) #如果上表头为空则认为是被截断,除了研发投入特殊处理其它过滤 if row_num_info in ('','-',')',')'): continue #特殊处理非经常性损益合计和非经常性损益净额同时出现时保留净额 if col_num_info == '非经常性损益合计': continue if utils.check_pdf_measure_black_list(f"{col_num_info}{row_num_info}"): continue #去掉没有周期的指标 if utils.check_pdf_measure(f"{col_num_info}{row_num_info}"): continue #判断上表头和左表头周期是否一致,不一致过滤 row_period = utils.get_period_type_other(row_num_info, report_year) col_period = utils.get_period_type_other(col_num_info, report_year) if(row_period != col_period and row_period != 'c_n' and col_period != 'c_n'): continue units_mapping = { "百万元": "百万元", "千万元": "千万元", "亿元": "亿元", "万元": "万元", "千元": "千元", "元": "元", "元/股": "元" } row_num_info = row_num_info.replace('%','增减') #num_info = f"{col_num_info}{row_num_info}".replace('()','').replace('加:','').replace('减:','').replace('%','') num_info = utils.get_clean_text(f"{row_num_info}{col_num_info}") num_info_bak = utils.get_clean_text(f"{col_num_info}{row_num_info}") measure_unit = '' #"%": "同期增减" combined_info = f"{row_num_info} {col_num_info}" # for unit in units_mapping: # if unit in row_num_info: # measure_unit = units_mapping[unit] # break if utils.get_percent_flag(row_num_info) == '1': measure_unit = '' else: for unit in units_mapping: if re.search(rf'\(\s*{unit}(\s*人民币)?\s*\)|\(\s*{unit}(\s*人民币)?\s*\)', combined_info) or (re.search(rf'{unit}', combined_info) and any(re.search('单位', item) for item in arr[0])): measure_unit = units_mapping[unit] break measure_list.append({ 'measure_name': num_info, 'measure_value': measure_value, 'measure_unit':measure_unit, }) measure_list.append({ 'measure_name': num_info_bak, 'measure_value': measure_value, 'measure_unit':measure_unit, }) if not redis_client.exists(f'parsed_measure_count_{file_id}'): redis_client.set(f'parsed_measure_count_{file_id}', 0) redis_client.incr(f'parsed_measure_count_{file_id}') if len(measure_list) > 0: data_dict["measure_list"] = measure_list data_dict["page_num"] = f"{str(t['page_num'])}_{str(t['table_index'])}" data_dict['file_id'] = file_id measure_obj.append(data_dict) db_service_word.insert_measure_data_to_milvus(client,measure_obj,cursor_app,conn_app) except Exception as e: applog.error(f"循环获取表格数据这里报错了,数据是{t['data']},位置在{index}") applog.error(f"错误是:{e}") end = time.time() applog.info('提取指标 %s runs %0.2f seconds.' % (record_range, (end - start))) except Exception as e: applog.error(f'这个错误是{e},所在的位置是{record_start}-{record_end}') record_start = record_range.split('-')[0] record_end = record_range.split('-')[1] for index in range(int(record_start),int(record_end)): t = word_tables[index] try: arr = np.array(t['data']) except Exception as e: applog.error(f'这个错误是{e}的arr的值是{arr}') finally: redis_client.close() client.close() cursor.close() conn.close() cursor_app.close() conn_app.close() #指标归一化处理 def update_measure_data(file_id,file_path,parent_table_pages): conn = mysql.connector.connect( host = MYSQL_HOST, user = MYSQL_USER, password = MYSQL_PASSWORD, database = MYSQL_DB ) # 创建一个cursor对象来执行SQL语句 cursor = conn.cursor(buffered=True) # #通过向量查询指标 conn_app = mysql.connector.connect( host = MYSQL_HOST_APP, user = MYSQL_USER_APP, password = MYSQL_PASSWORD_APP, database = MYSQL_DB_APP ) # 创建一个cursor对象来执行SQL语句 cursor_app = conn_app.cursor(buffered=True) applog.info(f'目录黑名单为:{parent_table_pages}') # db_service_word.delete_to_run(conn,cursor,file_id) db_service_word.insert_table_measure_from_vector_async_process(cursor,parent_table_pages,file_id,file_path) # #指标归一化处理 db_service_word.update_ori_measure(conn,cursor,file_id) # db_service.delete_database(conn_app,cursor_app,file_id) cursor.close() conn.close() cursor_app.close() conn_app.close() def merge_consecutive_arrays(word_info): merged_objects = [] temp_list = [] for info_obj in word_info: try: if info_obj['type'] == 'table': # 如果对象是表格,将其元素添加到临时列表中 data = info_obj['data'] if not data: continue first_row = data[0] if all(re.search(r'[\u4e00-\u9fa5]', cell) for cell in first_row[1:]) and len(temp_list) == 0: temp_list.append(info_obj) elif all(re.search(r'[\u4e00-\u9fa5]', cell) for cell in first_row[1:]) and len(temp_list) > 0: merged_objects.append(temp_list) temp_list = [] temp_list.append(info_obj) elif not all(re.search(r'[\u4e00-\u9fa5]', cell) for cell in first_row[1:]) and len(temp_list) > 0: temp_data = temp_list[-1]['data'] temp_data = list(temp_data) for row in list(info_obj['data']): temp_data.append(row) info_obj['data'] = temp_data temp_list.clear() temp_list.append(info_obj) except Exception as e: applog.error(f"解析数据错误: {e}") if temp_list: merged_objects.append(temp_list) return merged_objects def merge_consecutive_arrays_v1(pdf_info): merged_objects = [] temp_array = {} def is_same_dimension(data1, data2): # 检查两个表的每行长度是否相同 if len(data1) != len(data2): return False return all(len(row1) == len(row2) for row1, row2 in zip(data1, data2)) for info_obj in pdf_info: try: if info_obj['type'] == 'table': if not temp_array: # 如果临时列表为空,则初始化临时列表 temp_array = info_obj else: # 检查当前表与临时列表中的表是否同维度 if is_same_dimension(temp_array['data'], info_obj['data']): # 如果是同维度,则合并数据 temp_array['data'].extend(info_obj['data']) else: # 如果不是同维度,将现有临时列表添加到结果中,并重置临时列表 merged_objects.append(temp_array) temp_array = info_obj else: # 如果对象不是表格,检查临时列表是否非空 if temp_array: # 将临时列表中的元素合并成一个数组,并添加到新的对象列表中 merged_objects.append(temp_array) temp_array = {} # 重置临时列表 except Exception as e: applog.error(f"解析数据错误: {e}") # 循环结束后,检查临时列表是否非空,如果非空,则添加到结果中 if temp_array: merged_objects.append(temp_array) return merged_objects def start_table_measure_job(file_id): conn_app = mysql.connector.connect( host = MYSQL_HOST_APP, user = MYSQL_USER_APP, password = MYSQL_PASSWORD_APP, database = MYSQL_DB_APP ) # 创建一个cursor对象来执行SQL语句 cursor_app = conn_app.cursor(buffered=True) select_process_query = ''' select DISTINCT content from word_parse_process WHERE file_id = '{file_id}' and type='parse_table' order by page_num '''.format(file_id=file_id) cursor_app.execute(select_process_query) records = cursor_app.fetchall() word_info = [] for record in records: word_info.append(eval(record[0])) # 获取table 数据 word_tables = merge_consecutive_arrays(word_info) redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, db=6) redis_client.set(f'measure_count_{file_id}', len(word_tables)) cursor_app.close() conn_app.close() redis_client.close() records_range_parts = utils.get_range(len(word_tables),MEASURE_COUNT) processes = [] for record_range in records_range_parts: p = Process(target=get_table_measure, args=(file_id,word_tables,record_range,)) processes.append(p) p.start() for p in processes: p.join()