import pandas as pd from pymongo import MongoClient import numpy as np from datetime import datetime from concurrent.futures import ThreadPoolExecutor import statsmodels.api as sm from ast import literal_eval from datetime import timedelta import statsmodels.api as sm from scipy import stats import matplotlib.pyplot as plt # 数据库连接配置 #----------------------获取每一天的沪深500-------------------------- config = { 'host': 'www.bldcapital.cn', 'port': 27217, 'username': 'ZhangZH', 'password': 'S$#r)JAHE_2C', 'authSource': 'index_stocks', # 指定认证数据库 'authMechanism': 'SCRAM-SHA-1' # 指定认证机制 } client = MongoClient(**config) client.admin.command('ping') print("成功连接到MongoDB服务器") db1= client['index_stocks'] col1 = db1["000300.XSHG"] query = { "time": { # 替换为你的日期字段名 "$gte": datetime(2015, 1, 1), # 大于等于2020年1月1日 "$lt": datetime(2025 , 1, 1) # 小于2025年1月1日(包含2024全年) } } Projection={ '_id':0 } cursor1= col1.find(query,Projection) data1= list(cursor1) stockindex=pd.DataFrame(data1) stockindex['Code'] = stockindex['Code'].apply( lambda codes: [c.replace('.XSHE', '_XSHE').replace('.XSHG', '_XSHG') for c in codes] ) #----------------------获取股票数据-------------------------- config = { 'host': 'www.bldcapital.cn', 'port': 27217, 'username': 'ZhangZH', 'password': 'S$#r)JAHE_2C', 'authSource': 'CrossSectionData', # 指定认证数据库 'authMechanism': 'SCRAM-SHA-1' # 指定认证机制 } client = MongoClient(**config) client.admin.command('ping') print("成功连接到MongoDB服务器") db2 = client['CrossSectionData'] def get_data(fun): col2=db2[fun] cursor2= col2.find(query,Projection) data2= list(cursor2) df=pd.DataFrame(data2) #----------------------筛选每一天的沪深500-------------------------- valid_stocks = set(stockindex['Code'].explode()) df_filtered = df[[c for c in df.columns if c in valid_stocks]] result = pd.DataFrame(index=df_filtered.index, columns=df_filtered.columns) # 5. 按日期匹配(注意:stockindex['time'] 是日期列) for idx in stockindex.index: codes = stockindex.loc[idx, 'Code'] codes_in_df = [c for c in codes if c in df_filtered.columns] # 找到 df 中对应日期的行(这里假设 df 和 stockindex 是按顺序一一对应) # 如果顺序不对,需要按日期对齐(见下方说明) if idx in df_filtered.index: result.loc[idx, codes_in_df] = df_filtered.loc[idx, codes_in_df] # 6. 添加时间列(从 stockindex) result = result.dropna(axis=1, how='all') result.insert(0, 'time', stockindex['time']) return result #X cap=get_data('stock_lcap') five=get_data('stock_5mDayRetVola') #Y closereal=get_data('stock_close_real') cap=cap.set_index(cap.columns[0]) five=five.set_index(five.columns[0]) closereal=closereal.set_index(closereal.columns[0]) monthly_close = closereal.resample('M').last() # 计算月末收益率并去除第一行缺失值 monthly_return = monthly_close.pct_change(axis=0) ##对X做数值处理 #---------------------去极值------------------------- def extreme_3sigma(dt,n=3): mean = dt.mean() # 截面数据均值 std = dt.std() # 截面数据标准差 dt_up = mean + n*std # 上限 dt_down = mean - n*std # 下限 return dt.clip(dt_down, dt_up, axis=1) # 超出上下限的值,赋值为上下限 df1=extreme_3sigma(five) #---------------------标准化------------------------- def standardize_z(dt): mean = dt.mean() # 截面数据均值 std = dt.std() # 截面数据标准差 return (dt - mean)/std # 标准化处理 df2 = standardize_z(df1) #----------------------市值中性化-------------------------- def neutralization_size(factor, market_cap): Y = factor.astype(float) M = market_cap.astype(float) df = pd.DataFrame(index=Y.index, columns=Y.columns, dtype=float) for date in Y.index: # 1. 逐日 y = Y.loc[date].dropna() # 2. 当天因子 m = M.loc[date].dropna() m = m[m > 0] common = y.index.intersection(m.index) if len(common) < 2: # 样本太少 df.loc[date, common] = y.loc[common] continue y = y.loc[common] x = pd.Series(np.log(m.loc[common]), index=common) # 3. 确保是 Series X = sm.add_constant(x) try: res = sm.OLS(y, X).fit().resid df.loc[date, res.index] = res except Exception as e: print(f"{date} 中性化失败: {e}") df.loc[date, y.index] = y return df df3=neutralization_size(df2, cap) #----------------筛选月末数据-------------------- monthly_df = df3.resample('M').last() #----------------回归-------------- def factortest_regression(factor, stock): # 1. 防除 0 stock = stock.where(stock > 0, np.nan) # 2. 计算次月收益 stock_return = stock.shift(-1, axis=1) / stock - 1 stock_return = stock_return.iloc[:, :-1] # 3. 统一把 inf 变 NaN stock_return = stock_return.replace([np.inf, -np.inf], np.nan) # 4. 准备与原始索引对齐的容器 tickers = factor.index factor_return = pd.Series(index=tickers, dtype='float64') tvalue = pd.Series(index=tickers, dtype='float64') # 5. 逐只股票循环(或逐列,取决于你的 shape) for tic in tickers: x = factor.loc[tic] # 该股票所有月份的因子值 y = stock_return.loc[tic] # 该股票所有月份的次月收益 mask = x.notna() & y.notna() if mask.sum() < 10: # 样本不足,用 NaN 占位 continue res = sm.OLS(y.loc[mask], sm.add_constant(x.loc[mask])).fit() factor_return[tic] = res.params[1] tvalue[tic] = res.tvalues[1] # 6. 拼成 DataFrame,索引与原表一致 return pd.DataFrame({'factor_return': factor_return, 'tvalue': tvalue}) fr = factortest_regression(monthly_df,monthly_return).dropna() # t值绝对值序列平均值 t_ma = fr['tvalue'].abs().mean() # t值序列绝对值大于2 的占比 t_ratio = len(fr[(fr['tvalue'].abs()>2)])/len(fr['tvalue']) # 因子收益率序列平均值 factors_ma = fr['factor_return'].mean() # t值序列均值的绝对值除以t值序列的标准差 t_div= abs(fr['tvalue'].mean())/fr['tvalue'].std() # 因子收益率累积曲线 fig = plt.figure() fr['factor_return'].cumsum().plot(kind = 'line',label = 'factor_return') plt.legend() plt.show()