[策略编写系列四] Python实现均值回归策略(打分法实战)
概要:
在 [策略编写系列三] 中,详细的讲述了如何用 Python 语言中的字典编写出一个简单的动量策略。本章内容讲解如何用 Python 语言编写简单的均值回归策略,其中使用到了打分法,希望给有需要的同学提供一些帮助。内容主要分为:均值回归介绍、构建简单均值回归策略、运用 Python 编写出策略(本节打分法)、策略回测结果分析。
本章内容会涉及到 [策略编写系列三] 的内容,点击链接查看: http://quant.10jqka.com.cn/platform/html/article.html#id/87859787
正文:
一、均值回归介绍
均值回归是金融学的一个重要概念,也是一种数学方法,市场中指的是:个股股价偏离其内在价值过高或者过低时,就会改变原有趋势,而形成靠拢内在价值的趋势。酷似价值规律。
均值回归是否与动量效应冲突?从形式上看,动量效应认为趋势会延续,均值回归认为趋势会改变,但是从内在逻辑上看,均值回归通过计算个股内在价值,从而研判个股内在趋势,但是动量效应与均值回归不同,它不存在一个内在价值,或者说动量效应是研究价格的一种方式,而均值回归是研究价值的一种方式,因此两者形式相冲,但内在逻辑不冲突。
均值回归的核心:准确计算出个股内在价值。均值回归之所以能经久不衰,靠的是价值这口气活着,如果投资者在运用均值回归时,无法准确计算出个股的价值,那么其结果必定是惨烈的,不是在股价上涨时提前抛售,就是在股价下跌时过早买入。
均值回归的价值计算方式:说实话,计算个股内在价值的方式有很多种,并且每个投资者都有自己的观点和看法,因此本文为了演示策略,挑选了三种个股内在价值的计算方法:
第一种市盈率法,目前一年期的货币基金利率为 3%,对应股市中的平均市盈率为 33 倍。
第二种市净率法,一般而言,市净率小于 1,表明公司股价被严重低估,1-3 是合理投资区间,超过 3 就表明公司股价偏高。
第三种市销率,市销率越低,说明该公司股票目前的投资价值越大,一般情况下,市销率小于 2,则股票具有投资价值,超过 10,则说明风险较大。
二、构建简单均值回归策略
根据上一节的内容,我们构建一个简单的均值回归策略:
第一步:
通过以上三种方法来判断其内在价值是否为低估,如果低估,则可能在一段时间内进行均值回归,也就是价格上涨至价值区。其中市盈率,市销率,市净率是我们判断的指标,本策略采用经典打分法,也就是说,我们把单纯的市盈率指标进行解剖,上文已经算出市盈率均值为 33.因此将市盈率 33 以下的个股给予 10 分,反之则不给分。市净率小于 1 的个股给予 10 分,在 1 至 3 区间的给予 5 分,反之不给分。市销率小于 2 的给予 10 分,在 2-10 区间的给予 5 分,反之不给分。采用以上打分法,得分较高的个股说明其股价被低估的越严重,越有可能进行均值回复!
第二步:
确定调仓周期:一个月,确定最大持股数量:30 只。
第三步:
设置风险措施,个股亏损超过 10%,则进行止损。止盈为时间止盈,满一个月止盈。
三、运用 Python 编写出策略(本节打分法)
第一步:导入编写过程中需要用到的代码包。
from datetime import timedelta, date
#1.导入时间数据包
import pandas as pd
#2.导入 pandas 数据包,快捷使用为 pd
第二步:设置初始条件
# 初始化函数 ##################################################################
def initialize(account):
account.n = 30 # 最大持股数
#调仓月份设置,为 1-12 月
account.trade_date = range(1,13,1)
## 按月调仓,调仓日为月末最后一个交易日
run_monthly(trade,date_rule=2)
#运用 I 问财进行股票筛选
get_iwencai('未停牌,上市时间超过 2 年')
其中 get_iwencai()是 MindGo 平台特有的自然语言选股器,输入文字即可。选出的股票池放到 account.iwencai_securities,可以直接调用。
第三步:全市场股票打分,筛选得分最高的 30 只。
# 1. 筛选股票列表
def stocks_jz(account,data):
date= get_datetime().strftime('%Y%m%d')
df = {'security': [], 1:[], 2:[], 3:[], 'score':[]}
stocks=account.iwencai_securities
for security in stocks:
q=query(profit.symbol,valuation.pe_ttm,valuation.pb,valuation.ps_ttm).filter(profit.symbol==security)
yz = get_fundamentals(q, date=date).fillna(0)
log.info(yz)
log.info(df)
df['security'].append(security)
if (not (yz['valuation_pe_ttm'].empty or yz['valuation_ps_ttm'].empty or yz['valuation_pb'].empty)):
if yz['valuation_pe_ttm'][0]<33 :
df[1].append(10)
else:
df[1].append(0)
if yz['valuation_ps_ttm'][0]<2.01:
df[2].append(10)
elif yz['valuation_ps_ttm'][0]<10.01 and yz['valuation_ps_ttm'][0]>2.01:
df[2].append(5)
else:
df[2].append(0)
if yz['valuation_pb'][0]<1.01:
df[3].append(10)
elif yz['valuation_pb'][0]<3.01 and yz['valuation_pb'][0]>1.01:
df[3].append(5)
else:
df[3].append(0)
else:
df[1].append(0)
df[2].append(0)
df[3].append(0)
for i in range(len(df['security'])):
s = (df[1][i]+df[2][i]+df[3][i])
df['score'].append(s)
df = pd.DataFrame(df).sort_values(by ='score', ascending=False)
account.sample = df['security'][:30]
return account.sample
1.def stocks_jz(account,data):
这行代码是自定义函数,用于获取相应股票列表,后缀是参数。
2.date= get_datetime().strftime('%Y%m%d')
这行代码用来获取当前时间,并转化成年月日的格式。
3.df = {'security': [], 1:[], 2:[], 3:[], 'score':[]}
这行代码用来创建字典,4 个项。security 用来存放股票,1、2、3 用来存在分数,score 用来计算总分
4.stocks=account.iwencai_securities
这行代码将问财选股后的结果植入 stocks
5.for security in stocks:
for in 循环函数,从 stocks 从逐一取股票 6.q=query(profit.symbol,valuation.pe_ttm,valuation.pb,valuation.ps_ttm).filter(profit.symbol==security)
q=query().filter()是获取因子的子函数,此处为第 5 行代码选出的个股,以及它的市盈率市净率市销率。
7.yz = get_fundamentals(q, date=date).fillna(0)
get_fundamentals ()是获取因子主函数,参数 q 就是第 6 行代码,date 是第 2 行代码,fillna(0)是将其中的缺失值填充为 0,防止数据缺失。
8.log.info(yz)
log.info ()是打印数据,这里是打印第 7 行代码的 yz
9.log.info(df)
同 8,这里是打印字典数据。
10.df['security'].append(security)
将个股股票植入字典的 security 项中。
11.if (not (yz['valuation_pe_ttm'].empty or yz['valuation_ps_ttm'].empty or yz['valuation_pb'].empty)):
if 判断函数,用来判断第 7 行代码中的数据空值,如果没有空值,则进行下一步。
12.if yz['valuation_pe_ttm'][0]<33 :
if 判断函数,如果市盈率小于 33,则下一步
13.df[1].append(10)
在字典的第一项中输入 10 分
14.else:
接收个股市盈率大于 33 的情况
15.df[1].append(0)
在字典的第一项中输入 0 分
16.if yz['valuation_ps_ttm'][0]<2.01:
if 判断函数,市销率小于 2,则加 10 分,
17.df[2].append(10)
18.elif yz['valuation_ps_ttm'][0]<10.01 and yz['valuation_ps_ttm'][0]>2.01:
elif 函数,在 if 判断完后,继续判断,这里是市销率在 2-10 区间,则加 5 分
19.df[2].append(5)
20.else:
接收上述两个条件都不满足的,则加 0 分。
21.df[2].append(0)
22.if yz['valuation_pb'][0]<1.01:
同 16
23.df[3].append(10)
24.elif yz['valuation_pb'][0]<3.01 and yz['valuation_pb'][0]>1.01:
同 18
25.df[3].append(5)
26.else:
27.df[3].append(0)
28.else:
接收 11 行中含空值的情况,则在字典的全部三项输入 0.
29.df[1].append(0)
30.df[2].append(0)
31.df[3].append(0)
32.for i in range(len(df['security'])):
得出字典中 security 项的股票个数,然后形成自然数数列,以此从中取数
33.s = (df[1][i]+df[2][i]+df[3][i])
将 1、2、3 项的分数累加
34.df['score'].append(s)
将累加值 s 输入到字典的 score 项中。
35.df = pd.DataFrame(df).sort_values(by ='score', ascending=False)
将字典转化成 dataframe 形式,并以 score 项为基准,进行降序排序。
36.account.sample = df['security'][:30]
排序后,截取字典中个股的前 30 只。
37.return account.sample
输出函数结果。
第四步:设置交易函数:
def trade(account, data):
date = get_datetime()
months = get_datetime().month
if months in account.trade_date:
##获得 50 只股票列表
jz_list = stocks_jz(account,data)
## 获得满足每种条件的股票池
stock_list = list(set(jz_list))
## 卖出
if len(account.positions) > 0:
for stock in list(account.positions):
if stock not in stock_list:
order_target(stock, 0)
## 买入
if len(stock_list) > 0:
for stock in stock_list:
if stock not in list(account.positions):
if len(account.positions) < account.n :
number = account.n - len(account.positions)
order_value(stock,account.cash/number)
else:
order_value(stock,account.cash)
else:
pass
1.def trade(account, data):
这行代码用来自定义交易函数,与选股函数同理。
2.date = get_datetime()
这行代码用来获取当前时间
3.months = get_datetime().month
这行代码用来获取当前时间的月份
4.if months in account.trade_date:
这行代码用来判断当前月份是否符合调仓月份要求,后半段 account.trade_date 是初始设置条件之一。如果不满足直接跳到 19 行。
5.jz_list = stocks_jz(account,data)
这行代码用来获取股票列表,将选股函数的结果输出到列表上
6.stock_list = list(set(jz_list))
这行代码用来将列表的股票转移到新的列表,用来交易,其中 list()是列表形式,set()用来创建集合。
7.if len(account.positions) > 0:
这行代码用来判断目前持仓股票数量,如果有数量,则进行下一步。
8.for stock in list(account.positions)
for in 是一个循环函数,将持仓股票逐一选出,并逐一进行下一步
9.if stock not in stock_list:
if 判断函数,如果选出的股票不在股票列表,则表明,个股经过一个月后不在是市盈率最低的 15 只了。需要进行下一步卖出。
10.order_target(stock, 0)
order_target 是下单函数,用于买卖股票,参数 stock 是交易对象,参数 0 代表将股票清仓。具体下单函数可以阅读 MINDGO 的 API 文档,进行学习。http://quant.10jqka.com.cn/platform/html/help-api.html#7/145
11.if len(stock_list) > 0:
if 判断函数,用来判断股票列表中个股数量是否大于 0,符合则进行下一步。
12.for stock in stock_list:
for in 循环函数,将股票列表中的个股逐一选股,并逐一进行下一步。
13.if stock not in list(account.positions):
if 函数,逐一拿出来的股票是否在当前持仓中,如果不在当前持仓则进到下一步。
14.if len(account.positions) < account.n :
if 函数,判断当前持仓数量是否小于最大持股数,如果小于则下一步。如果不满足,则跳到 17 行。
15.number = account.n - len(account.positions)
计算出最大持股数与当前持仓数量的差值。
16.order_value(stock,account.cash/number)
order_valuse 是下单函数,参数 stock 是交易对象,也就是逐一选的股票,参数 account.cash 是当前可用资金,number 是 15 行计算的结果,整合就是买入的资金,即当前可用资金平均分配到每个个股。
17.else:
用来接收 14 行中,if 函数不满足的个股
18.order_value(stock,account.cash)
order_valuse 是下单函数,参数 stock 是交易对象,参数 accunt.cash 是买入金额,即当然可用资金。
19.else:
用于接收第 4 行代码中不符合的情况发生,进行下一步。
20pass
当出现第 4 行代码不符合的情况,则 pass,跳过。
第五步:设置风控条件
# 每日检查止损条件 #############################################################
def handle_data(account,data):
## 个股止损
if len(account.positions) > 0:
# 止损:个股跌幅超过 10%,卖出
securities = list(account.positions)
for stock in securities:
price = data.attribute_history(stock, ['close'], 1, '1d', skip_paused=False, fq='pre')
if account.positions[stock].cost_basis /price['close'][0]-1 < -0.1:
order_target(stock, 0)
1.def handle_data(account,data):
这行代码是函数创建。
2.if len(account.positions) > 0:
if 判断函数,用来判断目前是否有持仓。
3.securities = list(account.positions)
如果有持仓,则将持仓股票植入 securities
4.for stock in securities:
for in 循环函数,从持仓股票池中逐一取出股票,进行操作。
5.price = data.attribute_history(stock, ['close'], 1, '1d', skip_paused=False, fq='pre')
data.attribute_history ()获取数据的函数,参数 stock 是对象,['close']获取的数据为收盘价。详细可以参考 MindGo 的 API 文档: http://quant.10jqka.com.cn/platform/html/help-api.html?t=data#3/0
6.if account.positions[stock].cost_basis /price['close'][0]-1 < -0.1:
if 判断函数,account.positions[stock].cost_basis 是持仓个股的成本价,price['close'][0]是收盘价,这行代码是用来止损,当个股亏损超过 10%时,执行下一步。
7.order_target(stock, 0)
order_target 是下单函数,参数 stock 是操作对象,0 代表清仓。
至此我们已经编写完整个策略,进行历史行情回测。
四、策略回测结果分析。
从回测结果看,均值回归在中国股市确实有着非常强大的获利能力,也就是说在中国股市投资哪些被低估的个股是可以稳稳的赚钱获利的,其中可能的原因就是 A 股市场是一个散户为主导,投资充满非理性的市场,而低估值的股票往往不受投资者的喜爱,但不合理的投资造成的不合理的价格必然会被修复,由此看:长期投资者低估值的股票,可以说是稳健获利的方式。最后也得出结论:价格确实是围绕价值上下波动,这在中国股市的股票上也不例外,投资那些价值被低估的个股是一个稳健的投资方式。
>>>详细点击 http://quant.10jqka.com.cn/platform/html/article.html#id/87894149
[策略编写系列四] Python实现均值回归策略(打分法实战)
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
class MeanReversionScoring:
"""
均值回归打分法策略
核心逻辑:对多个技术指标进行标准化打分,综合判断超买超卖状态
"""
def __init__(self, lookback_period=20, zscore_threshold=2.0):
self.lookback = lookback_period
self.threshold = zscore_threshold
self.position = 0 # 持仓状态:0空仓,1多仓,-1空仓
def calculate_indicators(self, df):
"""计算所有技术指标"""
# 价格序列
close = df['close']
# 1. RSI指标
delta = close.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
df['rsi'] = 100 - (100 / (1 + rs))
# 2. 布林带
df['bb_middle'] = close.rolling(window=self.lookback).mean()
bb_std = close.rolling(window=self.lookback).std()
df['bb_upper'] = df['bb_middle'] + 2 * bb_std
df['bb_lower'] = df['bb_middle'] - 2 * bb_std
df['bb_position'] = (close - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'])
# 3. 价格Z-Score
df['price_mean'] = close.rolling(window=self.lookback).mean()
df['price_std'] = close.rolling(window=self.lookback).std()
df['price_zscore'] = (close - df['price_mean']) / df['price_std']
# 4. 成交量加权价格
df['vwap'] = (df['volume'] * df['close']).cumsum() / df['volume'].cumsum()
df['vwap_ratio'] = close / df['vwap']
return df.dropna()
def score_signal(self, row):
"""对当前K线进行综合打分"""
scores = []
# RSI打分 (30-70为中性区间)
if row['rsi'] < 30:
scores.append(1) # 超卖,看多
elif row['rsi'] > 70:
scores.append(-1) # 超买,看空
else:
scores.append(0)
# 布林带打分
if row['bb_position'] < 0.2:
scores.append(1) # 接近下轨
elif row['bb_position'] > 0.8:
scores.append(-1) # 接近上轨
else:
scores.append(0)
# Z-Score打分
if row['price_zscore'] < -self.threshold:
scores.append(1) # 显著低于均值
elif row['price_zscore'] > self.threshold:
scores.append(-1) # 显著高于均值
else:
scores.append(0)
# VWAP相对位置
if row['vwap_ratio'] < 0.98:
scores.append(0.5) # 价格低于平均成本
elif row['vwap_ratio'] > 1.02:
scores.append(-0.5) # 价格高于平均成本
else:
scores.append(0)
# 综合得分
total_score = sum(scores)
# 生成交易信号
if total_score >= 2:
return 1 # 强烈看多
elif total_score <= -2:
return -1 # 强烈看空
else:
return 0 # 观望
def generate_signals(self, df):
"""生成交易信号"""
df = self.calculate_indicators(df.copy())
df['signal'] = df.apply(self.score_signal, axis=1)
# 信号过滤:避免频繁交易
df['position'] = 0
for i in range(1, len(df)):
if df['signal'].iloc[i] == 1 and df['position'].iloc[i-1] <= 0:
df.loc[df.index[i], 'position'] = 1
elif df['signal'].iloc[i] == -1 and df['position'].iloc[i-1] >= 0:
df.loc[df.index[i], 'position'] = -1
else:
df.loc[df.index[i], 'position'] = df['position'].iloc[i-1]
return df
# 使用示例
if __name__ == "__main__":
# 模拟数据(实际使用时替换为真实数据)
dates = pd.date_range(start='2023-01-01', end='2023-06-01', freq='D')
np.random.seed(42)
prices = 100 + np.cumsum(np.random.randn(len(dates)) * 0.5)
volumes = np.random.randint(1000, 10000, len(dates))
sample_data = pd.DataFrame({
'date': dates,
'close': prices,
'volume': volumes
}).set_index('date')
# 运行策略
strategy = MeanReversionScoring(lookback_period=20, zscore_threshold=1.5)
result = strategy.generate_signals(sample_data)
# 查看结果
print("策略信号统计:")
print(result['position'].value_counts())
print("\n最近5个交易日的信号:")
print(result[['close', 'rsi', 'price_zscore', 'position']].tail())
这个实现的核心是通过多个指标综合打分来判断均值回归机会。RSI看超买超卖,布林带看价格通道位置,Z-Score量化偏离程度,VWAP看市场平均成本。当多个指标同时发出信号时(总分≥2),才产生交易信号,避免假信号。
策略参数可以根据不同品种调整,比如lookback_period改变均值计算周期,zscore_threshold调整触发阈值。实际使用时要回测验证参数有效性。
简单说就是多指标投票,一致性高时才交易。

