From 8d33c112a33e4ff310176f2bd846a2f46fb05711 Mon Sep 17 00:00:00 2001 From: ZhaoRX Date: Fri, 1 Aug 2025 16:40:53 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20=E5=9B=9E=E5=BD=92?= =?UTF-8?q?=E5=88=86=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 回归分析 | 196 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 196 insertions(+) create mode 100644 回归分析 diff --git a/回归分析 b/回归分析 new file mode 100644 index 0000000..ef2b99c --- /dev/null +++ b/回归分析 @@ -0,0 +1,196 @@ +import pandas as pd +from pymongo import MongoClient +import numpy as np +from datetime import datetime +from concurrent.futures import ThreadPoolExecutor +import statsmodels.api as sm +from ast import literal_eval +from datetime import timedelta +import statsmodels.api as sm +from scipy import stats +import matplotlib.pyplot as plt +# 数据库连接配置 +#----------------------获取每一天的沪深500-------------------------- +config = { + 'host': 'www.bldcapital.cn', + 'port': 27217, + 'username': 'ZhangZH', + 'password': 'S$#r)JAHE_2C', + 'authSource': 'index_stocks', # 指定认证数据库 + 'authMechanism': 'SCRAM-SHA-1' # 指定认证机制 +} +client = MongoClient(**config) +client.admin.command('ping') +print("成功连接到MongoDB服务器") +db1= client['index_stocks'] +col1 = db1["000300.XSHG"] +query = { + "time": { # 替换为你的日期字段名 + "$gte": datetime(2015, 1, 1), # 大于等于2020年1月1日 + "$lt": datetime(2025 , 1, 1) # 小于2025年1月1日(包含2024全年) + } +} +Projection={ + '_id':0 + } +cursor1= col1.find(query,Projection) +data1= list(cursor1) +stockindex=pd.DataFrame(data1) +stockindex['Code'] = stockindex['Code'].apply( + lambda codes: [c.replace('.XSHE', '_XSHE').replace('.XSHG', '_XSHG') for c in codes] +) +#----------------------获取股票数据-------------------------- +config = { + 'host': 'www.bldcapital.cn', + 'port': 27217, + 'username': 'ZhangZH', + 'password': 'S$#r)JAHE_2C', + 'authSource': 'CrossSectionData', # 指定认证数据库 + 'authMechanism': 'SCRAM-SHA-1' # 指定认证机制 +} +client = MongoClient(**config) +client.admin.command('ping') +print("成功连接到MongoDB服务器") +db2 = client['CrossSectionData'] +def get_data(fun): + col2=db2[fun] + cursor2= col2.find(query,Projection) + data2= list(cursor2) + df=pd.DataFrame(data2) + + #----------------------筛选每一天的沪深500-------------------------- + valid_stocks = set(stockindex['Code'].explode()) + df_filtered = df[[c for c in df.columns if c in valid_stocks]] + + result = pd.DataFrame(index=df_filtered.index, columns=df_filtered.columns) + + # 5. 按日期匹配(注意:stockindex['time'] 是日期列) + for idx in stockindex.index: + codes = stockindex.loc[idx, 'Code'] + codes_in_df = [c for c in codes if c in df_filtered.columns] + + # 找到 df 中对应日期的行(这里假设 df 和 stockindex 是按顺序一一对应) + # 如果顺序不对,需要按日期对齐(见下方说明) + if idx in df_filtered.index: + result.loc[idx, codes_in_df] = df_filtered.loc[idx, codes_in_df] + + # 6. 添加时间列(从 stockindex) + result = result.dropna(axis=1, how='all') + result.insert(0, 'time', stockindex['time']) + return result +#X +cap=get_data('stock_lcap') +five=get_data('stock_5mDayRetVola') + + +#Y +closereal=get_data('stock_close_real') +cap=cap.set_index(cap.columns[0]) +five=five.set_index(five.columns[0]) +closereal=closereal.set_index(closereal.columns[0]) + +monthly_close = closereal.resample('M').last() + +# 计算月末收益率并去除第一行缺失值 +monthly_return = monthly_close.pct_change(axis=0) + +##对X做数值处理 + +#---------------------去极值------------------------- +def extreme_3sigma(dt,n=3): + + mean = dt.mean() # 截面数据均值 + std = dt.std() # 截面数据标准差 + dt_up = mean + n*std # 上限 + dt_down = mean - n*std # 下限 + return dt.clip(dt_down, dt_up, axis=1) # 超出上下限的值,赋值为上下限 +df1=extreme_3sigma(five) +#---------------------标准化------------------------- +def standardize_z(dt): + mean = dt.mean() # 截面数据均值 + std = dt.std() # 截面数据标准差 + return (dt - mean)/std # 标准化处理 + +df2 = standardize_z(df1) +#----------------------市值中性化-------------------------- +def neutralization_size(factor, market_cap): + Y = factor.astype(float) + M = market_cap.astype(float) + + df = pd.DataFrame(index=Y.index, columns=Y.columns, dtype=float) + + for date in Y.index: # 1. 逐日 + y = Y.loc[date].dropna() # 2. 当天因子 + m = M.loc[date].dropna() + m = m[m > 0] + + common = y.index.intersection(m.index) + if len(common) < 2: # 样本太少 + df.loc[date, common] = y.loc[common] + continue + + y = y.loc[common] + x = pd.Series(np.log(m.loc[common]), index=common) # 3. 确保是 Series + X = sm.add_constant(x) + + try: + res = sm.OLS(y, X).fit().resid + df.loc[date, res.index] = res + except Exception as e: + print(f"{date} 中性化失败: {e}") + df.loc[date, y.index] = y + + return df +df3=neutralization_size(df2, cap) + + +#----------------筛选月末数据-------------------- +monthly_df = df3.resample('M').last() +#----------------回归-------------- + +def factortest_regression(factor, stock): + # 1. 防除 0 + stock = stock.where(stock > 0, np.nan) + + # 2. 计算次月收益 + stock_return = stock.shift(-1, axis=1) / stock - 1 + stock_return = stock_return.iloc[:, :-1] + + # 3. 统一把 inf 变 NaN + stock_return = stock_return.replace([np.inf, -np.inf], np.nan) + + # 4. 准备与原始索引对齐的容器 + tickers = factor.index + factor_return = pd.Series(index=tickers, dtype='float64') + tvalue = pd.Series(index=tickers, dtype='float64') + + # 5. 逐只股票循环(或逐列,取决于你的 shape) + for tic in tickers: + x = factor.loc[tic] # 该股票所有月份的因子值 + y = stock_return.loc[tic] # 该股票所有月份的次月收益 + + mask = x.notna() & y.notna() + if mask.sum() < 10: # 样本不足,用 NaN 占位 + continue + + res = sm.OLS(y.loc[mask], sm.add_constant(x.loc[mask])).fit() + factor_return[tic] = res.params[1] + tvalue[tic] = res.tvalues[1] + + # 6. 拼成 DataFrame,索引与原表一致 + return pd.DataFrame({'factor_return': factor_return, + 'tvalue': tvalue}) +fr = factortest_regression(monthly_df,monthly_return).dropna() +# t值绝对值序列平均值 +t_ma = fr['tvalue'].abs().mean() +# t值序列绝对值大于2 的占比 +t_ratio = len(fr[(fr['tvalue'].abs()>2)])/len(fr['tvalue']) +# 因子收益率序列平均值 +factors_ma = fr['factor_return'].mean() +# t值序列均值的绝对值除以t值序列的标准差 +t_div= abs(fr['tvalue'].mean())/fr['tvalue'].std() +# 因子收益率累积曲线 +fig = plt.figure() +fr['factor_return'].cumsum().plot(kind = 'line',label = 'factor_return') +plt.legend() +plt.show()