diff --git a/zzb_data_prod/1.PDF b/zzb_data_prod/1.PDF new file mode 100644 index 0000000..6fd4fd5 Binary files /dev/null and b/zzb_data_prod/1.PDF differ diff --git a/zzb_data_prod/Mil_unit.py b/zzb_data_prod/Mil_unit.py new file mode 100644 index 0000000..ba33782 --- /dev/null +++ b/zzb_data_prod/Mil_unit.py @@ -0,0 +1,87 @@ +from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection,MilvusClient +from config import MILVUS_CLIENT +import time +from datetime import datetime, timedelta + +def create_partition_by_hour(): + # 连接到 Milvus 服务器 + connections.connect("default",uri=MILVUS_CLIENT) + # 获取集合 + collection_name = "pdf_measure_v4" + collection = Collection(collection_name) + + # 获取当前时间 + now = datetime.now() + current_hour = now.strftime("%Y%m%d%H") + + # 创建当前小时的分区 + partition_name = f"partition_{current_hour}" + if not collection.has_partition(partition_name): + collection.create_partition(partition_name) + print(f"Created partition: {partition_name}") + + # 删除前一个小时的分区 + previous_hour = (now - timedelta(hours=1)).strftime("%Y%m%d%H") + previous_partition_name = f"partition_{previous_hour}" + if collection.has_partition(previous_partition_name): + + pre_partition = collection.partition(previous_partition_name) + pre_partition.release() + collection.drop_partition(previous_partition_name) + print(f"Dropped partition: {previous_partition_name}") + + partition = collection.partition(partition_name) + partition.load() + + return collection, partition + + +# res = partition.search( +# # collection_name="pdf_measure_v4", # Replace with the actual name of your collection +# # Replace with your query vector +# data=data, +# limit=3, # Max. number of search results to return +# anns_field="vector", +# param={"metric_type": "COSINE", "params": {}}, # Search parameters +# output_fields=["measure_name","measure_value","table_num","table_index","measure_unit"], +# # filter=filter_str, +# expr=query +# ) + + + + + +# from pymilvus import connections, CollectionSchema, Collection,utility,FieldSchema,DataType +# # 连接到 B 服务器上的 Milvus +# # connections.connect(host='124.70.129.232', port='19530')# 测试服务器 +# connections.connect(host='1.94.60.103', port='19530')# 测试服务器 +# # # 获取集合列表 +# utility.drop_collection("pdf_measure_v4") +# +# # 定义字段 +# fields = [ +# FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), +# FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=1536), +# FieldSchema(name="table_num", dtype=DataType.INT16), +# FieldSchema(name="table_index", dtype=DataType.INT16), +# FieldSchema(name="measure_name", dtype=DataType.VARCHAR, max_length=200), +# FieldSchema(name="measure_value", dtype=DataType.VARCHAR, max_length=200), +# FieldSchema(name="file_id", dtype=DataType.VARCHAR, max_length=200), +# FieldSchema(name="measure_unit", dtype=DataType.VARCHAR, max_length=200) +# ] +# +# # 定义集合的 schema +# schema = CollectionSchema(fields=fields, description="My Milvus collection") +# +# # 创建集合 +# collection = Collection(name="pdf_measure_v4", schema=schema) +# +# collection = Collection("pdf_measure_v4") +# index_params = { +# "index_type": "IVF_FLAT", +# "metric_type": "COSINE", +# "params": {"nlist": 128} +# } +# collection.create_index(field_name="vector", index_params=index_params) +# collection.load() \ No newline at end of file diff --git a/zzb_data_prod/app.py b/zzb_data_prod/app.py index c1eaa52..e4c21db 100644 --- a/zzb_data_prod/app.py +++ b/zzb_data_prod/app.py @@ -219,13 +219,13 @@ app.post("/parser/start", # 运行 FastAPI 应用 if __name__ == "__main__": # 服务器启动服务 - import uvicorn - uvicorn.run(app, host="0.0.0.0", port=config.PORT) + # import uvicorn + # uvicorn.run(app, host="0.0.0.0", port=config.PORT) # 本地调试任务 - #job_queue.put({ - #'file_path' : '6281.pdf', - #'file_id' : '6281' - #}) + job_queue.put({ + 'file_path' : '1.pdf', + 'file_id' : '2122' + }) - #run_job() + run_job() diff --git a/zzb_data_prod/config.py b/zzb_data_prod/config.py index 1e10365..c335d49 100644 --- a/zzb_data_prod/config.py +++ b/zzb_data_prod/config.py @@ -1,23 +1,28 @@ -MILVUS_CLIENT='http://127.0.0.1:19530' -MILVUS_HOST = '127.0.0.1' -MILVUS_PORT = 19530 -MYSQL_HOST = '192.168.0.142' +MILVUS_CLIENT='http://124.70.129.232:19530' +#MILVUS_CLIENT='http://60.204.228.154:19530' +MYSQL_HOST = '121.37.185.246' MYSQL_PORT = 3306 -MYSQL_USER = 'financial_prod' -MYSQL_PASSWORD = 'mmTFncqmDal5HLRGY0BV' -MYSQL_DB = 'financial_report_prod' -NOTIFY_ADDR = 'http://192.168.0.166:8100/api/tenant/report/notify' -FILE_PATH = '/root/pdf_parser/pdf/' -REDIS_HOST = '192.168.0.172' +MYSQL_USER = 'financial' +MYSQL_PASSWORD = 'financial_8000' +MYSQL_DB = 'financial_report' +NOTIFY_ADDR = 'http://127.0.0.1:8100/api/tenant/report/notify' +NOTIFY_ADDR_DIS = 'http://127.0.0.1:8100/api/tenant/info/notify' +REDIS_HOST = '123.60.153.169' REDIS_PORT = 6379 REDIS_PASSWORD = 'Xgf_redis' +FILE_PATH = '/root/pdf_parser/pdf/' PORT = 8000 MEASURE_COUNT = 8 -MYSQL_HOST_APP = '192.168.0.201' +MYSQL_HOST_APP = '121.37.185.246' MYSQL_PORT_APP = 3306 -MYSQL_USER_APP = 'root' -MYSQL_PASSWORD_APP = 'mmTFncqmDal5HLRGY0BV' -MYSQL_DB_APP = 'financial_report_prod' +MYSQL_USER_APP = 'financial' +MYSQL_PASSWORD_APP = 'financial_8000' +MYSQL_DB_APP = 'financial_report' +#MYSQL_HOST_APP = '192.168.0.201' +#MYSQL_PORT_APP = 3306 +#MYSQL_USER_APP = 'root' +#MYSQL_PASSWORD_APP = 'mmTFncqmDal5HLRGY0BV' +#MYSQL_DB_APP = 'financial_report_prod' diff --git a/zzb_data_prod/db_service.py b/zzb_data_prod/db_service.py index 29d3a4b..29fb0ad 100644 --- a/zzb_data_prod/db_service.py +++ b/zzb_data_prod/db_service.py @@ -10,6 +10,7 @@ from pymilvus import MilvusClient import mysql.connector import threading import redis +from Mil_unit import create_partition_by_hour measure_name_keywords = ["营业","季度","利润","归属于","扣非","经营","现金","活动","损益","收益","资产","费用","销售","管理","财务","研发","货币资金","应收账款","存货","固定资产","在建工程","商誉","短期借款","应付账款","合同负债","长期借款","营业成本"] # 解析大模型抽取的指标,并插入到数据库 def parse_llm_measure_to_db(measure_info,type,conn,cursor): @@ -275,9 +276,8 @@ def insert_table_from_vector_mul_process(parent_table_pages,file_id,file_name,re print('Run task %s (%s)...' % (record_range, os.getpid())) print(f"插入数据 {len(records)}") - client = MilvusClient( - uri=MILVUS_CLIENT - ) + + _,partition = create_partition_by_hour() conn = mysql.connector.connect( host = MYSQL_HOST, @@ -357,6 +357,10 @@ def insert_table_from_vector_mul_process(parent_table_pages,file_id,file_name,re #print(f'黑名单的值是{parent_table_pages}和{table_index_array}') record_start = record_range.split('-')[0] record_end = record_range.split('-')[1] + + now = datetime.now() + current_hour = now.strftime("%Y%m%d%H") + try: for index in range(int(record_start),int(record_end)): record = records[index] @@ -370,14 +374,21 @@ def insert_table_from_vector_mul_process(parent_table_pages,file_id,file_name,re data = [measure_list] # data.append(measure_list) filter_str = 'file_id == "'+file_id+'"' - res = client.search( - collection_name="pdf_measure_v4", # Replace with the actual name of your collection + + + # 定义查询条件 + + + res = partition.search( + # collection_name="pdf_measure_v4", # Replace with the actual name of your collection # Replace with your query vector data=data, limit=3, # Max. number of search results to return - search_params={"metric_type": "COSINE", "params": {}}, # Search parameters + anns_field="vector", + param={"metric_type": "COSINE", "params": {}}, # Search parameters output_fields=["measure_name","measure_value","table_num","table_index","measure_unit"], - filter=filter_str + # filter=filter_str, + expr=filter_str ) # Convert the output to a formatted JSON string @@ -532,7 +543,6 @@ def insert_table_from_vector_mul_process(parent_table_pages,file_id,file_name,re redis_client.close() cursor.close() conn.close() - client.close() def insert_table_measure_from_vector_async_process(cursor,parent_table_pages,file_id,file_name): select_year_select = f"""select report_type,year from report_check where id = {file_id}""" @@ -707,7 +717,7 @@ def insert_table_measure_from_vector(conn,cursor,client,parent_table_pages,file_ start_time = time.time() -def insert_measure_data_to_milvus(client,table_info,cursor,conn): +def insert_measure_data_to_milvus(milvus_partition,table_info,cursor,conn): insert_query = ''' INSERT INTO measure_parse_process (file_id, page_num, content) @@ -732,8 +742,8 @@ def insert_measure_data_to_milvus(client,table_info,cursor,conn): measure_name = measure_name.replace(quarter * 2, quarter) pattern_dup = re.compile(r'(\w{3,})\1+')#去掉任意超过两个字且重复的字符 matches = pattern_dup.findall(measure_name) - for match in matches: - print(f"被删除的字符: {match * 2}") + # for match in matches: + # print(f"被删除的字符: {match * 2}") measure_name = pattern_dup.sub(r'\1', measure_name) measure_name_1 = measure_name.replace('调整后','').replace('上年期末数','上年期末').replace('上年期末','上年年末') measure_unit = measure['measure_unit'] @@ -748,6 +758,8 @@ def insert_measure_data_to_milvus(client,table_info,cursor,conn): measure_data['measure_value'] = measure_value measure_data['measure_unit'] = measure_unit measure_data['file_id'] = file_id + + data.append(measure_data) # 指标数据写入指标解析过程表,用于前端展示 @@ -775,6 +787,7 @@ def insert_measure_data_to_milvus(client,table_info,cursor,conn): measure_data['measure_value'] = measure_value measure_data['measure_unit'] = measure_unit measure_data['file_id'] = file_id + data.append(measure_data) # 指标数据写入指标解析过程表,用于前端展示 @@ -785,8 +798,8 @@ def insert_measure_data_to_milvus(client,table_info,cursor,conn): else: pass#print(f"数据值的格式错误:{measure_value}。或者字段名不在名单内{measure_name}") - res = client.insert( - collection_name="pdf_measure_v4", + + res = milvus_partition.insert( data=data ) diff --git a/zzb_data_prod/main.py b/zzb_data_prod/main.py index 797ecd4..068825f 100644 --- a/zzb_data_prod/main.py +++ b/zzb_data_prod/main.py @@ -13,7 +13,7 @@ from pdfminer.layout import LTTextBoxHorizontal import pdfplumber import mysql.connector import utils -from pymilvus import MilvusClient +from Mil_unit import create_partition_by_hour import llm_service import db_service import pdf_title @@ -725,9 +725,9 @@ def get_table_measure(file_id, pdf_tables, record_range): report_type = record_select[0][0] report_year = record_select[0][1] - client = MilvusClient( - uri= MILVUS_CLIENT - ) + # 获取milvus 连接 + _, milvus_partition = create_partition_by_hour() + print('提取指标任务 %s (%s)...' % (record_range, os.getpid())) start = time.time() @@ -843,7 +843,7 @@ def get_table_measure(file_id, pdf_tables, record_range): data_dict["page_num"] = f"{str(t['page_num'])}_{str(t['table_index'])}" data_dict['file_id'] = file_id measure_obj.append(data_dict) - db_service.insert_measure_data_to_milvus(client,measure_obj,cursor_app,conn_app) + db_service.insert_measure_data_to_milvus(milvus_partition,measure_obj,cursor_app,conn_app) except Exception as e: print(f"循环获取表格数据这里报错了,数据是{t['data']},位置在{index}") print(f"错误是:{e}") @@ -862,11 +862,8 @@ def get_table_measure(file_id, pdf_tables, record_range): arr = np.array(t['data']) except Exception as e: print(f'这个错误是{e}的arr的值是{arr}') - - finally: redis_client.close() - client.close() cursor.close() conn.close() cursor_app.close()