Files
coding/回归分析
2025-08-01 16:40:53 +08:00

197 lines
6.6 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
from pymongo import MongoClient
import numpy as np
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import statsmodels.api as sm
from ast import literal_eval
from datetime import timedelta
import statsmodels.api as sm
from scipy import stats
import matplotlib.pyplot as plt
# 数据库连接配置
#----------------------获取每一天的沪深500--------------------------
config = {
'host': 'www.bldcapital.cn',
'port': 27217,
'username': 'ZhangZH',
'password': 'S$#r)JAHE_2C',
'authSource': 'index_stocks', # 指定认证数据库
'authMechanism': 'SCRAM-SHA-1' # 指定认证机制
}
client = MongoClient(**config)
client.admin.command('ping')
print("成功连接到MongoDB服务器")
db1= client['index_stocks']
col1 = db1["000300.XSHG"]
query = {
"time": { # 替换为你的日期字段名
"$gte": datetime(2015, 1, 1), # 大于等于2020年1月1日
"$lt": datetime(2025 , 1, 1) # 小于2025年1月1日包含2024全年
}
}
Projection={
'_id':0
}
cursor1= col1.find(query,Projection)
data1= list(cursor1)
stockindex=pd.DataFrame(data1)
stockindex['Code'] = stockindex['Code'].apply(
lambda codes: [c.replace('.XSHE', '_XSHE').replace('.XSHG', '_XSHG') for c in codes]
)
#----------------------获取股票数据--------------------------
config = {
'host': 'www.bldcapital.cn',
'port': 27217,
'username': 'ZhangZH',
'password': 'S$#r)JAHE_2C',
'authSource': 'CrossSectionData', # 指定认证数据库
'authMechanism': 'SCRAM-SHA-1' # 指定认证机制
}
client = MongoClient(**config)
client.admin.command('ping')
print("成功连接到MongoDB服务器")
db2 = client['CrossSectionData']
def get_data(fun):
col2=db2[fun]
cursor2= col2.find(query,Projection)
data2= list(cursor2)
df=pd.DataFrame(data2)
#----------------------筛选每一天的沪深500--------------------------
valid_stocks = set(stockindex['Code'].explode())
df_filtered = df[[c for c in df.columns if c in valid_stocks]]
result = pd.DataFrame(index=df_filtered.index, columns=df_filtered.columns)
# 5. 按日期匹配注意stockindex['time'] 是日期列)
for idx in stockindex.index:
codes = stockindex.loc[idx, 'Code']
codes_in_df = [c for c in codes if c in df_filtered.columns]
# 找到 df 中对应日期的行(这里假设 df 和 stockindex 是按顺序一一对应)
# 如果顺序不对,需要按日期对齐(见下方说明)
if idx in df_filtered.index:
result.loc[idx, codes_in_df] = df_filtered.loc[idx, codes_in_df]
# 6. 添加时间列(从 stockindex
result = result.dropna(axis=1, how='all')
result.insert(0, 'time', stockindex['time'])
return result
#X
cap=get_data('stock_lcap')
five=get_data('stock_5mDayRetVola')
#Y
closereal=get_data('stock_close_real')
cap=cap.set_index(cap.columns[0])
five=five.set_index(five.columns[0])
closereal=closereal.set_index(closereal.columns[0])
monthly_close = closereal.resample('M').last()
# 计算月末收益率并去除第一行缺失值
monthly_return = monthly_close.pct_change(axis=0)
##对X做数值处理
#---------------------去极值-------------------------
def extreme_3sigma(dt,n=3):
mean = dt.mean() # 截面数据均值
std = dt.std() # 截面数据标准差
dt_up = mean + n*std # 上限
dt_down = mean - n*std # 下限
return dt.clip(dt_down, dt_up, axis=1) # 超出上下限的值,赋值为上下限
df1=extreme_3sigma(five)
#---------------------标准化-------------------------
def standardize_z(dt):
mean = dt.mean() # 截面数据均值
std = dt.std() # 截面数据标准差
return (dt - mean)/std # 标准化处理
df2 = standardize_z(df1)
#----------------------市值中性化--------------------------
def neutralization_size(factor, market_cap):
Y = factor.astype(float)
M = market_cap.astype(float)
df = pd.DataFrame(index=Y.index, columns=Y.columns, dtype=float)
for date in Y.index: # 1. 逐日
y = Y.loc[date].dropna() # 2. 当天因子
m = M.loc[date].dropna()
m = m[m > 0]
common = y.index.intersection(m.index)
if len(common) < 2: # 样本太少
df.loc[date, common] = y.loc[common]
continue
y = y.loc[common]
x = pd.Series(np.log(m.loc[common]), index=common) # 3. 确保是 Series
X = sm.add_constant(x)
try:
res = sm.OLS(y, X).fit().resid
df.loc[date, res.index] = res
except Exception as e:
print(f"{date} 中性化失败: {e}")
df.loc[date, y.index] = y
return df
df3=neutralization_size(df2, cap)
#----------------筛选月末数据--------------------
monthly_df = df3.resample('M').last()
#----------------回归--------------
def factortest_regression(factor, stock):
# 1. 防除 0
stock = stock.where(stock > 0, np.nan)
# 2. 计算次月收益
stock_return = stock.shift(-1, axis=1) / stock - 1
stock_return = stock_return.iloc[:, :-1]
# 3. 统一把 inf 变 NaN
stock_return = stock_return.replace([np.inf, -np.inf], np.nan)
# 4. 准备与原始索引对齐的容器
tickers = factor.index
factor_return = pd.Series(index=tickers, dtype='float64')
tvalue = pd.Series(index=tickers, dtype='float64')
# 5. 逐只股票循环(或逐列,取决于你的 shape
for tic in tickers:
x = factor.loc[tic] # 该股票所有月份的因子值
y = stock_return.loc[tic] # 该股票所有月份的次月收益
mask = x.notna() & y.notna()
if mask.sum() < 10: # 样本不足,用 NaN 占位
continue
res = sm.OLS(y.loc[mask], sm.add_constant(x.loc[mask])).fit()
factor_return[tic] = res.params[1]
tvalue[tic] = res.tvalues[1]
# 6. 拼成 DataFrame索引与原表一致
return pd.DataFrame({'factor_return': factor_return,
'tvalue': tvalue})
fr = factortest_regression(monthly_df,monthly_return).dropna()
# t值绝对值序列平均值
t_ma = fr['tvalue'].abs().mean()
# t值序列绝对值大于2 的占比
t_ratio = len(fr[(fr['tvalue'].abs()>2)])/len(fr['tvalue'])
# 因子收益率序列平均值
factors_ma = fr['factor_return'].mean()
# t值序列均值的绝对值除以t值序列的标准差
t_div= abs(fr['tvalue'].mean())/fr['tvalue'].std()
# 因子收益率累积曲线
fig = plt.figure()
fr['factor_return'].cumsum().plot(kind = 'line',label = 'factor_return')
plt.legend()
plt.show()