From 607f7d280668d0dbbdfe0f8569915a11d08a23ea Mon Sep 17 00:00:00 2001 From: ZhaoRX Date: Fri, 25 Jul 2025 14:42:38 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E9=A2=84=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 数据预处理 | 147 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 数据预处理 diff --git a/数据预处理 b/数据预处理 new file mode 100644 index 0000000..ab9117c --- /dev/null +++ b/数据预处理 @@ -0,0 +1,147 @@ +import pandas as pd +from pymongo import MongoClient +import numpy as np +from datetime import datetime +from concurrent.futures import ThreadPoolExecutor +import statsmodels.api as sm +# 数据库连接配置 +config = { + 'host': 'www.bldcapital.cn', + 'port': 27217, + 'username': 'ZhangZH', + 'password': 'S$#r)JAHE_2C', + 'authSource': 'Alpha101', # 指定认证数据库 + 'authMechanism': 'SCRAM-SHA-1' # 指定认证机制 +} +client = MongoClient(**config) +client.admin.command('ping') +print("成功连接到MongoDB服务器") +db = client['Alpha101'] +#取中证500只 +stocklist = pd.read_csv("C:/Users/86185/Desktop/实习/500stock.csv") +# 提取 "Stock Code" 列,转成列表 +selected = stocklist['Stock Code'].tolist() + +START = datetime(2020, 1, 1) +END = datetime(2021, 1, 1) +projection={ + '_id':0, + 'alpha004':1, + 'time':1 + } +def fetch(code: str) -> pd.DataFrame: + cur = db[code].find({'time': {'$gte': START, '$lt': END}}, projection) + df = pd.DataFrame(list(cur)) + df['Code'] = code + return df + +with ThreadPoolExecutor(max_workers=32) as pool: + dfs = list(pool.map(fetch, selected)) + +# 合并 +df_all = pd.concat(dfs, ignore_index=True) +df_all['time'] = pd.to_datetime(df_all['time']) # 转日期类型 +df_all_ = df_all.pivot(index='Code', columns='time', values='alpha004') +df_all_.info() +#---------------------去极值------------------------- +def extreme_3sigma(dt,n=3): + + mean = dt.mean() # 截面数据均值 + std = dt.std() # 截面数据标准差 + dt_up = mean + n*std # 上限 + dt_down = mean - n*std # 下限 + return dt.clip(dt_down, dt_up, axis=1) # 超出上下限的值,赋值为上下限 + +df1=extreme_3sigma(df_all_,3) +print('step1 finished') +#---------------------标准化------------------------- +def standardize_z(dt): + mean = dt.mean() # 截面数据均值 + std = dt.std() # 截面数据标准差 + return (dt - mean)/std # 标准化处理 + +df2 = standardize_z(df1) +print('step2 finished') +#----------------------3.行业中性化-------------------------- +#1生成行业哑变量矩阵 +def generate_industry_dummy_matrix(csv_file_path): + # 从 CSV 文件中读取数据 + industry= pd.read_csv(csv_file_path) + # 获取所有唯一的股票代码和行业名称 + unique_stocks = industry['code'].unique() + unique_industries = industry['industry_name'].unique() + # 创建一个全 0 的 DataFrame 作为初始的哑变量矩阵 + dummy_matrix = pd.DataFrame(0, index=unique_stocks, columns=unique_industries) + # 遍历每一行数据,将对应股票在其所属行业的位置赋值为 1 + for index, row in industry.iterrows(): + stock = row['code'] + industry = row['industry_name'] + dummy_matrix.loc[stock, industry] = 1 + return dummy_matrix +dummy_matrix= generate_industry_dummy_matrix('C:/Users/86185/Desktop/实习/industry.csv') +##取股票交集 +market=pd.read_csv('C:/Users/86185/Desktop/实习/market.csv') +market.set_index(market.columns[0], inplace=True) +common_stocks = list( + set(df2.index) & + set(dummy_matrix.index)& + set(market.index) +) +# 筛选三个数据框,仅保留共同的股票 +df2_f= df2.loc[common_stocks].copy() +industry_f = dummy_matrix.loc[common_stocks].copy() +market_f = market.loc[common_stocks].copy().astype(float) +#2实现行业中性化 +def neutralization(factor, industry_matrix): + Y = factor + df= pd.DataFrame(index=Y.index, columns=Y.columns) + # 按日期循环进行行业中性化 + for i in range(Y.shape[1]): + # 获取当天的因子值(删除缺失值) + y = Y.iloc[:, i].dropna() + # 获取对应的行业哑变量(并确保与 y 索引一致) + X = industry_matrix.loc[y.index] + # 添加常数项 + X = sm.add_constant(X) + # 执行线性回归 + try: + result = sm.OLS(y, X).fit() + # 保存残差(即行业中性化后的因子值) + df.loc[y.index, Y.columns[i]] = result.resid + except Exception as e: + print(f"日期 {Y.columns[i]} 中性化失败: {e}") + # 失败时保留原始值或填充 NaN + df.loc[y.index, Y.columns[i]] = y + return df +df3=neutralization(df2_f, industry_f) +print('step3 finished') +#----------------------4.市值中性化-------------------------- +def neutralization_size(factor, market_cap): + Y = factor.astype(float) + df= pd.DataFrame(index=Y.index, columns=Y.columns) + for i in range(Y.shape[1]): + # 获取当天的因子值(删除缺失值) + y = Y.iloc[:, i].dropna() + # 获取当天的市值并计算对数(删除缺失值和非正数) + ln_mkt_cap = np.log(market_cap.iloc[:, i]) + ln_mkt_cap = ln_mkt_cap.replace([np.inf, -np.inf], np.nan).dropna() + # 确保市值和因子的股票索引一致 + common_idx = list(set(y.index).intersection(set(ln_mkt_cap.index))) + y = y.loc[common_idx] + ln_mkt_cap = ln_mkt_cap.loc[common_idx] + # 添加常数项 + X = sm.add_constant(ln_mkt_cap) + # 执行线性回归 + try: + result = sm.OLS(y, X).fit() + # 保存残差(即市值中性化后的因子值) + df.loc[common_idx, Y.columns[i]] = result.resid + except Exception as e: + print(f"日期 {Y.columns[i]} 中性化失败: {e}") + # 失败时保留原始值或填充 NaN + df.loc[common_idx, Y.columns[i]] = y + + return df +df4=neutralization_size(df3, market_f) +df4.to_csv('df4.csv') +print('step4 finished') \ No newline at end of file