Files
coding/回归分析

197 lines
6.6 KiB
Plaintext
Raw Normal View History

2025-08-01 16:40:53 +08:00
import pandas as pd
from pymongo import MongoClient
import numpy as np
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import statsmodels.api as sm
from ast import literal_eval
from datetime import timedelta
import statsmodels.api as sm
from scipy import stats
import matplotlib.pyplot as plt
# 数据库连接配置
#----------------------获取每一天的沪深500--------------------------
config = {
'host': 'www.bldcapital.cn',
'port': 27217,
'username': 'ZhangZH',
'password': 'S$#r)JAHE_2C',
'authSource': 'index_stocks', # 指定认证数据库
'authMechanism': 'SCRAM-SHA-1' # 指定认证机制
}
client = MongoClient(**config)
client.admin.command('ping')
print("成功连接到MongoDB服务器")
db1= client['index_stocks']
col1 = db1["000300.XSHG"]
query = {
"time": { # 替换为你的日期字段名
"$gte": datetime(2015, 1, 1), # 大于等于2020年1月1日
"$lt": datetime(2025 , 1, 1) # 小于2025年1月1日包含2024全年
}
}
Projection={
'_id':0
}
cursor1= col1.find(query,Projection)
data1= list(cursor1)
stockindex=pd.DataFrame(data1)
stockindex['Code'] = stockindex['Code'].apply(
lambda codes: [c.replace('.XSHE', '_XSHE').replace('.XSHG', '_XSHG') for c in codes]
)
#----------------------获取股票数据--------------------------
config = {
'host': 'www.bldcapital.cn',
'port': 27217,
'username': 'ZhangZH',
'password': 'S$#r)JAHE_2C',
'authSource': 'CrossSectionData', # 指定认证数据库
'authMechanism': 'SCRAM-SHA-1' # 指定认证机制
}
client = MongoClient(**config)
client.admin.command('ping')
print("成功连接到MongoDB服务器")
db2 = client['CrossSectionData']
def get_data(fun):
col2=db2[fun]
cursor2= col2.find(query,Projection)
data2= list(cursor2)
df=pd.DataFrame(data2)
#----------------------筛选每一天的沪深500--------------------------
valid_stocks = set(stockindex['Code'].explode())
df_filtered = df[[c for c in df.columns if c in valid_stocks]]
result = pd.DataFrame(index=df_filtered.index, columns=df_filtered.columns)
# 5. 按日期匹配注意stockindex['time'] 是日期列)
for idx in stockindex.index:
codes = stockindex.loc[idx, 'Code']
codes_in_df = [c for c in codes if c in df_filtered.columns]
# 找到 df 中对应日期的行(这里假设 df 和 stockindex 是按顺序一一对应)
# 如果顺序不对,需要按日期对齐(见下方说明)
if idx in df_filtered.index:
result.loc[idx, codes_in_df] = df_filtered.loc[idx, codes_in_df]
# 6. 添加时间列(从 stockindex
result = result.dropna(axis=1, how='all')
result.insert(0, 'time', stockindex['time'])
return result
#X
cap=get_data('stock_lcap')
five=get_data('stock_5mDayRetVola')
#Y
closereal=get_data('stock_close_real')
cap=cap.set_index(cap.columns[0])
five=five.set_index(five.columns[0])
closereal=closereal.set_index(closereal.columns[0])
monthly_close = closereal.resample('M').last()
# 计算月末收益率并去除第一行缺失值
monthly_return = monthly_close.pct_change(axis=0)
##对X做数值处理
#---------------------去极值-------------------------
def extreme_3sigma(dt,n=3):
mean = dt.mean() # 截面数据均值
std = dt.std() # 截面数据标准差
dt_up = mean + n*std # 上限
dt_down = mean - n*std # 下限
return dt.clip(dt_down, dt_up, axis=1) # 超出上下限的值,赋值为上下限
df1=extreme_3sigma(five)
#---------------------标准化-------------------------
def standardize_z(dt):
mean = dt.mean() # 截面数据均值
std = dt.std() # 截面数据标准差
return (dt - mean)/std # 标准化处理
df2 = standardize_z(df1)
#----------------------市值中性化--------------------------
def neutralization_size(factor, market_cap):
Y = factor.astype(float)
M = market_cap.astype(float)
df = pd.DataFrame(index=Y.index, columns=Y.columns, dtype=float)
for date in Y.index: # 1. 逐日
y = Y.loc[date].dropna() # 2. 当天因子
m = M.loc[date].dropna()
m = m[m > 0]
common = y.index.intersection(m.index)
if len(common) < 2: # 样本太少
df.loc[date, common] = y.loc[common]
continue
y = y.loc[common]
x = pd.Series(np.log(m.loc[common]), index=common) # 3. 确保是 Series
X = sm.add_constant(x)
try:
res = sm.OLS(y, X).fit().resid
df.loc[date, res.index] = res
except Exception as e:
print(f"{date} 中性化失败: {e}")
df.loc[date, y.index] = y
return df
df3=neutralization_size(df2, cap)
#----------------筛选月末数据--------------------
monthly_df = df3.resample('M').last()
#----------------回归--------------
def factortest_regression(factor, stock):
# 1. 防除 0
stock = stock.where(stock > 0, np.nan)
# 2. 计算次月收益
stock_return = stock.shift(-1, axis=1) / stock - 1
stock_return = stock_return.iloc[:, :-1]
# 3. 统一把 inf 变 NaN
stock_return = stock_return.replace([np.inf, -np.inf], np.nan)
# 4. 准备与原始索引对齐的容器
tickers = factor.index
factor_return = pd.Series(index=tickers, dtype='float64')
tvalue = pd.Series(index=tickers, dtype='float64')
# 5. 逐只股票循环(或逐列,取决于你的 shape
for tic in tickers:
x = factor.loc[tic] # 该股票所有月份的因子值
y = stock_return.loc[tic] # 该股票所有月份的次月收益
mask = x.notna() & y.notna()
if mask.sum() < 10: # 样本不足,用 NaN 占位
continue
res = sm.OLS(y.loc[mask], sm.add_constant(x.loc[mask])).fit()
factor_return[tic] = res.params[1]
tvalue[tic] = res.tvalues[1]
# 6. 拼成 DataFrame索引与原表一致
return pd.DataFrame({'factor_return': factor_return,
'tvalue': tvalue})
fr = factortest_regression(monthly_df,monthly_return).dropna()
# t值绝对值序列平均值
t_ma = fr['tvalue'].abs().mean()
# t值序列绝对值大于2 的占比
t_ratio = len(fr[(fr['tvalue'].abs()>2)])/len(fr['tvalue'])
# 因子收益率序列平均值
factors_ma = fr['factor_return'].mean()
# t值序列均值的绝对值除以t值序列的标准差
t_div= abs(fr['tvalue'].mean())/fr['tvalue'].std()
# 因子收益率累积曲线
fig = plt.figure()
fr['factor_return'].cumsum().plot(kind = 'line',label = 'factor_return')
plt.legend()
plt.show()