pdf_code/zzb_data_prod/insert_redis.py

247 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import mysql.connector
import utils
#from config import MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DB
import re
import redis
def process_excel_and_db(input_excel_path1, input_excel_path2, output_file_path):
# 读取第一个 Excel 文件
df = pd.read_excel(input_excel_path1, sheet_name='Sheet2', header=0)#对应ttt表
# 将 DataFrame 转换为字典列表
data_list = df.to_dict(orient='records')
# 连接到 MySQL 数据库
conn = mysql.connector.connect(
host=MYSQL_HOST,
user=MYSQL_USER,
password=MYSQL_PASSWORD,
database=MYSQL_DB
)
cursor = conn.cursor()
# 插入数据到 measure_create_config 表
insert_query = '''
INSERT INTO measure_create_config
(config_id, meta_measure, same_mean_measure, measure_period, change_type, black_list)
VALUES (%s, %s, %s, %s, %s, %s)
'''
for data in data_list:
show_measure = str(data['指标'])
same_mean_measure = str(data['同义表述'])
period_measure = str(data['周期'])
change_measure = str(data['变动'])
black_list = str(data['黑名单词'])
config_id = utils.get_md5(show_measure)
insert_query_data = (config_id, show_measure, same_mean_measure, period_measure, change_measure, black_list)
cursor.execute(insert_query, insert_query_data)
conn.commit()
# 读取第二个 Excel 文件
df_period = pd.read_excel(input_excel_path2, sheet_name='Sheet2', header=0)#对应周期表
# 将 DataFrame 转换为字典列表
period_list = df_period.to_dict(orient='records')
# 插入数据到 measure_create_period 表
period_insert_query = '''
INSERT INTO measure_create_period
(period_name, same_mean_period)
VALUES (%s, %s)
'''
for data in period_list:
period_name = str(data['标准表述'])
same_mean_period = str(data['同义表述'])
insert_query_data = (period_name, same_mean_period)
cursor.execute(period_insert_query, insert_query_data)
conn.commit()
# 查询数据库
data_query = '''
SELECT * FROM measure_create_config WHERE delete_status = 0
'''
period_query = '''
SELECT * FROM measure_create_period
'''
cursor.execute(data_query)
data_list = cursor.fetchall()
cursor.execute(period_query)
period_list = cursor.fetchall()
# 输出到文件
with open(output_file_path, 'w', encoding='utf-8') as file:
for data in data_list:
config_id = data[0]
show_measure = data[1]
same_mean_measure = data[2]
period_measure = data[3]
change_measure = data[4]
same_mean_measure_arr = []
period_measure_arr = []
change_measure_arr = []
if same_mean_measure != 'nan':
same_mean_measure_arr = same_mean_measure.split(',')
same_mean_measure_arr.append(show_measure)
if period_measure != 'nan':
period_measure_arr = period_measure.split(',')
if change_measure != 'nan':
change_measure_arr = change_measure.split(',')
for c in change_measure_arr:
period_measure_arr.append(c)
for x in period_measure_arr:
if x in change_measure_arr:
show_name = show_measure + x
else:
show_name = x + show_measure
for y in same_mean_measure_arr:
if x in change_measure:
parser_name = y + x
else:
parser_name = x + y
file.write(f'{show_name},{parser_name}\n')
for p in period_list:
period_exra_name = p[0]
period_exra_value = p[1]
if period_exra_name in x:
for v in period_exra_value.split(','):
if x in change_measure:
parser_name = y + x.replace(period_exra_name, v)
else:
parser_name = x.replace(period_exra_name, v) + y
file.write(f'{show_name},{parser_name}\n')
cursor.close()
conn.close()
# 根据老指标配置表生成新指标配置表
def create_new_config(conn, cursor, table_name,old_year,new_year):
select_query = f'''
SELECT measure_id, measure_name,ori_measure_id,ori_measure_name,delete_status,measure_vector,distance,year
FROM {table_name}
WHERE year = '{old_year}'
'''
cursor.execute(select_query)
data_list = cursor.fetchall()
insert_query = f'''
INSERT INTO {table_name}
(measure_id, measure_name,ori_measure_id,ori_measure_name,delete_status,measure_vector,distance, year)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
'''
for data in data_list:
ori_measure_name = data[3]
if re.match(r'^\d{4}',ori_measure_name):
year = int(re.match(r'^\d{4}',ori_measure_name).group(0))
year += 1
ori_measure_name = str(year) + ori_measure_name[4:]
insert_data = (data[0],data[1],data[2],ori_measure_name,data[4],data[5],data[6],new_year)
cursor.execute(insert_query, insert_data)
conn.commit()
def measure_config_to_db(conn, cursor, table_name):
year_list = ["2021","2022","2023","2024","2025"]
for year in year_list:
insert_query = f'''
INSERT INTO {table_name}
(measure_id, measure_name, ori_measure_id, ori_measure_name,delete_status,distance,year)
VALUES (%s, %s, %s, %s,%s,%s,%s)
'''
check_query = f'''
SELECT ori_measure_id FROM {table_name}
WHERE year = '{year}'
'''
# 新增指标
lines = [
f"当期营业收入,{year}年第一季度营业收入",
f"当期归母净利润,{year}年第一季度归母净利润",
f"当期扣非净利润,{year}年第一季度扣非净利润",
f"当期经营活动现金流净额,{year}年第一季度经营活动现金流净额",
f"当期筹资活动现金流净额,{year}年第一季度筹资活动现金流净额",
f"当期投资活动现金流净额,{year}年第一季度投资活动现金流净额",
f"当期非经常性损益,{year}年第一季度非经常性损益",
f"当期基本每股收益,{year}年第一季度基本每股收益",
f"当期稀释每股收益,{year}年第一季度稀释每股收益",
f"当期加权平均净资产收益率,{year}年第一季度加权平均净资产收益率",
f"当期扣非加权平均净资产收益率,{year}年第一季度扣非加权平均净资产收益率",
f"当期营业成本 ,{year}年第一季度营业成本",
f"当期销售费用,{year}年第一季度销售费用",
f"当期管理费用,{year}年第一季度管理费用",
f"当期财务费用,{year}年第一季度财务费用",
f"当期研发费用,{year}年第一季度研发费用"]
# 打印每一行
for line in lines:
config_list = line.strip().split(',')
measure = config_list[0]
ori_measure = config_list[1]
ori_measure_id = utils.get_md5(ori_measure)
# 判断数据库中是否有数据
cursor.execute(check_query)
check_records = cursor.fetchall()
if any(record[0] == ori_measure_id for record in check_records):
continue
data_to_insert = (utils.get_md5(measure), measure, ori_measure_id, ori_measure,0,0.94,year)
cursor.execute(insert_query, data_to_insert)
conn.commit()
def insert_measure_vector(conn,cursor,table_name):
from config import REDIS_HOST,REDIS_PASSWORD,REDIS_PORT
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, db=6)# 192.168.0.172 #测试123.60.153.169
# 执行SQL语句更新数据
select_query = f'''
SELECT ori_measure_id,ori_measure_name FROM {table_name}
'''
cursor.execute(select_query)
records = cursor.fetchall()
print(f"总计{len(records)}条数据")
for record in records:
if redis_client.hexists('measure_config', record[0]):
measure_vector = redis_client.hget('measure_config', record[0])
else:
print('新增指标',record[1])
vector_obj = utils.embed_with_str(record[1])
measure_vector = str(vector_obj.output["embeddings"][0]["embedding"])
redis_client.hset('measure_config', record[0], measure_vector)
redis_client.close()
conn.close()
#from config import MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DB
if __name__ == "__main__":
#需要先清空本地数据库的 measure_create_config 和 measure_create_period 表
# process_excel_and_db(
# 'F:\\11_pdf\\ttt_1.xlsx',#ttt文件
# 'F:\\11_pdf\\period_1.xlsx',#period文件
# 'F:\\11_pdf\\out_2022_new_year.txt'#输出文件
# )
from config import MYSQL_HOST_APP, MYSQL_USER_APP, MYSQL_PASSWORD_APP, MYSQL_DB_APP
conn = mysql.connector.connect(
host=MYSQL_HOST_APP,
user=MYSQL_USER_APP,
password=MYSQL_PASSWORD_APP,
database=MYSQL_DB_APP
)
cursor = conn.cursor()
#file_path = r'F:\\11_pdf\\out_2022_new_year.txt'
# 更新第一季度的measure_vector
table_name = 'measure_config'
# 写入mysql
# measure_config_to_db(conn, cursor, table_name)
create_new_config(conn, cursor, table_name,'2023','2024')
# 插入redies
insert_measure_vector(conn,cursor,table_name)