实现4S完整策略历史回测程序,输出年化收益,最大回撤核心指标。
在智能证券投资的课程体系中,"4S"不仅仅指选股(Stock Selection),更包含择时(Market Timing)、仓位管理(Position Sizing)和风控(Stop Loss)。
很多开发者在编写回测程序时,容易陷入一个误区:过度拟合单一环节,忽略了策略的系统完整性。
今天,我将结合Python编程实践,为你拆解一个完整的4S策略回测系统。我们将不掺杂任何营销话术,中立地探讨如何通过代码,将"选股、择时、仓位、风控"四大模块工程化,并输出年化收益(CAGR)与最大回撤(Max Drawdown)这两个核心评价指标。
一、 实际应用场景描述
在量化投研的实际工作中,我们面临的真实场景往往不是简单的"低买高卖",而是一个复杂的系统工程:
1. 多因子选股:面对全市场5000+只股票,如何根据4S标准(如基本面、动量、质量因子)每日筛选出符合条件的股票池?
2. 动态仓位管理:选出了5只股票,是等权分配,还是根据评分加权?总资金如何在"进攻"和"防守"之间切换?
3. 严格的止损机制:当单只个股亏损触及总资金的2%时,如何强制平仓?这2%是总资金的2%,而非个股成本的2%,这中间存在杠杆撬动的差异。
4. 完整的绩效评估:策略跑完3年,到底赚了多少?最大回撤发生在什么时候?
本篇将解决的核心痛点:如何通过模块化代码,实现"选股→择时→仓位→风控"的闭环回测,并精准计算年化收益与最大回撤。
二、 引入痛点(问题结构化)
在编写4S完整策略回测时,开发者通常会遇到以下三大痛点:
1. 模块耦合度高:选股逻辑、交易逻辑、风控逻辑写在一个大循环里,代码超过500行就难以维护,更无法灵活调整参数。
2. 收益率计算失真:简单使用
"(期末-期初)/期初" 无法反映资金曲线的波动,且忽略了资金利用率(非满仓状态下的真实收益)。
3. 回撤计算盲区:只计算了单日回撤,忽略了历史最高点到当前点的累计回撤,导致对风险的描述过于乐观。
三、 核心逻辑讲解
1. 4S策略的闭环逻辑
我们的回测引擎将遵循以下执行顺序:
每日选股(Select) → 市场择时(Time) → 资金分配(Size) → 风险检查(Stop) → 记录净值(Record)
2. 年化收益(CAGR)的核心算法
公式:
CAGR = \left(\frac{V_{final}}{V_{start}}\right)^{\frac{1}{years}} - 1
代码实现要点:需要精确计算回测天数,处理闰年和非交易日,避免用"年份差"取整导致的误差。
3. 最大回撤(Max Drawdown)的核心算法
公式:
MDD = \max\left(\frac{V_{peak} - V_{trough}}{V_{peak}}\right)
代码实现要点:需要维护一个
"rolling_max"(至今最高净值)变量。当
"当前净值 < rolling_max"时,计算回撤比,并持续更新全局最大回撤值。
四、 代码模块化实现
我们将代码分为5个模块,结构清晰,注释详尽。
1. 配置文件 (
"config.yaml")
# 回测全局配置
backtest:
start_date: "2020-01-01"
end_date: "2023-12-31"
initial_capital: 1000000 # 初始资金100万
# 4S 选股参数
select:
momentum_days: 20 # 20日动量
volatility_lookback: 20 # 20日波动率
# 4S 择时参数
time:
ma_short: 5
ma_long: 20
# 4S 仓位参数
size:
max_positions: 5 # 最多持有5只
single_position_pct: 0.20 # 单只最大20%
# 4S 风控参数
stop:
max_loss_of_total_capital: 0.02 # 单只亏总资金2%强制平仓
2. 数据加载与预处理 (
"data_loader.py")
import pandas as pd
import numpy as np
def load_price_data(path: str) -> pd.DataFrame:
"""
加载日频行情数据
CSV格式: date, code, open, high, low, close, volume, amount
"""
df = pd.read_csv(path, parse_dates=['date'])
df['code'] = df['code'].astype(str).str.zfill(6)
# 确保按日期和代码排序
return df.sort_values(['date', 'code']).reset_index(drop=True)
def calculate_technical_factors(df: pd.DataFrame) -> pd.DataFrame:
"""
计算技术因子(动量、均线等)
这是4S选股的核心输入
"""
df = df.copy()
# 20日动量因子
df['momentum'] = df.groupby('code')['close'].pct_change(periods=20)
# 20日波动率因子
df['volatility'] = df.groupby('code')['close'].pct_change().rolling(window=20).std().reset_index(0, drop=True)
# 5日均线
df['ma5'] = df.groupby('code')['close'].transform(lambda x: x.rolling(window=5).mean())
# 20日均线
df['ma20'] = df.groupby('code')['close'].transform(lambda x: x.rolling(window=20).mean())
return df
3. 4S策略引擎 (
"strategy_engine.py")
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple
class FourSStrategy:
"""
★ 4S完整策略引擎
Select -> Time -> Size -> Stop
"""
def __init__(self, config: dict):
self.config = config
self.capital = config['backtest']['initial_capital']
self.initial_capital = self.capital
# 持仓状态: {code: {'qty': int, 'cost': float, 'entry_date': Timestamp}}
self.positions: Dict[str, dict] = {}
# 净值曲线记录
self.nav_history: List[Tuple[pd.Timestamp, float]] = []
# 交易记录
self.trade_log: List[dict] = []
def run_daily(self, date: pd.Timestamp, daily_data: pd.DataFrame):
"""
★ 核心逻辑:每日执行4S策略
"""
# === S1: 选股 (Select) ===
candidates = self._select_stocks(daily_data)
# === S2: 择时 (Time) ===
final_buy, final_sell = self._time_market(date, candidates, daily_data)
# === S3: 仓位 (Size) & S4: 风控 (Stop) ===
self._execute_trades(date, final_buy, final_sell, daily_data)
# === 记录当日净值 ===
self._record_nav(date, daily_data)
def _select_stocks(self, df: pd.DataFrame) -> pd.DataFrame:
"""
选股逻辑:动量 > 0 且 波动率 < 阈值 (避免高波动陷阱)
"""
if len(df) == 0:
return pd.DataFrame()
# 简单4S选股示例:动量排名前20,且波动率低于80%分位
df = df.dropna(subset=['momentum', 'volatility', 'close'])
if len(df) < 5:
return pd.DataFrame()
# 动量筛选
df = df[df['momentum'] > 0].sort_values('momentum', ascending=False)
# 波动率过滤(排除过高波动的标的)
vol_threshold = df['volatility'].quantile(0.8)
df = df[df['volatility'] < vol_threshold]
return df.head(self.config['size']['max_positions'] * 2) # 预选池
def _time_market(self, date, candidates: pd.DataFrame, daily_data: pd.DataFrame) -> Tuple[List[str], List[str]]:
"""
择时逻辑:均线金叉买入,死叉卖出
"""
buy_list = []
sell_list = []
if len(candidates) == 0:
return [], []
for _, row in candidates.iterrows():
code = row['code']
ma5 = row.get('ma5', 0)
ma20 = row.get('ma20', 0)
close = row.get('close', 0)
if pd.isna(ma5) or pd.isna(ma20) or ma5 <= 0 or ma20 <= 0:
continue
# ★ 买入信号:短均 > 长均 (金叉区域)
if ma5 > ma20 * 1.005: # 0.5%缓冲带,防止噪音
if code not in self.positions:
buy_list.append(code)
# ★ 卖出信号:短均 < 长均 (死叉区域) 或 持有标的不再在候选池
if code in self.positions:
if ma5 < ma20 * 0.995:
sell_list.append(code)
# 检查不在候选池的持仓(基本面恶化),加入卖出列表
current_codes = set(candidates['code'].tolist())
for code in list(self.positions.keys()):
if code not in current_codes:
if code not in sell_list:
sell_list.append(code)
return buy_list, sell_list
def _execute_trades(self, date, buy_list, sell_list, daily_data: pd.DataFrame):
"""
执行交易:包含仓位管理和风控检查
"""
# ★ 先执行卖出(释放资金)
for code in sell_list:
if code in self.positions:
self._sell_stock(date, code, daily_data)
# ★ 再执行买入(使用释放的资金)
available_codes = [c for c in buy_list if c not in self.positions]
if available_codes and len(self.positions) < self.config['size']['max_positions']:
self._buy_stock(date, available_codes, daily_data)
def _buy_stock(self, date, codes: List[str], daily_data: pd.DataFrame):
"""买入逻辑:等权分配可用资金"""
n_slots = self.config['size']['max_positions']
current_pos = len(self.positions)
remaining_slots = n_slots - current_pos
if remaining_slots <= 0 or not codes:
return
# 等权分配
alloc_per_stock = self.capital / n_slots
for code in codes[:remaining_slots]:
row = daily_data[daily_data['code'] == code]
if len(row) == 0:
continue
price = row.iloc[0]['close']
if price <= 0:
continue
qty = int(alloc_per_stock / price / 100) * 100 # 整百股
if qty <= 0:
continue
cost = qty * price * 1.0003 # 佣金
if cost > self.capital:
continue
self.capital -= cost
self.positions[code] = {
'qty': qty,
'cost': price,
'entry_date': date
}
self.trade_log.append({
'date': date, 'code': code, 'action': 'BUY',
'price': price, 'qty': qty, 'cost': cost
})
def _sell_stock(self, date, code: str, daily_data: pd.DataFrame):
"""卖出逻辑:含风控检查"""
if code not in self.positions:
return
pos = self.positions[code]
row = daily_data[daily_data['code'] == code]
if len(row) == 0:
return
price = row.iloc[0]['close']
if price <= 0:
price = pos['cost'] * 0.95 # 极端情况用成本价估算
revenue = pos['qty'] * price * (1 - 0.0003 - 0.001) # 佣金+印花税
self.capital += revenue
pnl = (price - pos['cost']) / pos['cost'] * 100
self.trade_log.append({
'date': date, 'code': code, 'action': 'SELL',
'price': price, 'qty': pos['qty'], 'pnl_pct': pnl
})
del self.positions[code]
def _record_nav(self, date, daily_data: pd.DataFrame):
"""记录当日净值"""
market_value = self.capital
for code, pos in self.positions.items():
row = daily_data[daily_data['code'] == code]
if len(row) > 0:
price = row.iloc[0]['close']
if price > 0:
market_value += pos['qty'] * price
self.nav_history.append((date, market_value))
def risk_check(self, daily_data: pd.DataFrame) -> List[str]:
"""
★ 风控检查:单只亏损超总资金2%,强制调出
这是4S中"Stop"的核心实现
"""
total_capital = self.initial_capital
forced_sell = []
for code, pos in self.positions.items():
row = daily_data[daily_data['code'] == code]
if len(row) == 0:
continue
current_price = row.iloc[0]['close']
if current_price <= 0:
continue
# ★ 核心公式:个股亏损对总资金的影响
loss_impact = (current_price - pos['cost']) * pos['qty'] / total_capital
# 触发风控:亏损超过总资金的2%
if loss_impact < -self.config['stop']['max_loss_of_total_capital']:
forced_sell.append(code)
pnl = (current_price - pos['cost']) / pos['cost'] * 100
print(f" 🔴 [风控] {code} 触发强制平仓: "
f"开仓 ¥{pos['cost']:.2f} → 当前 ¥{current_price:.2f}, "
f"个股亏损 {pnl:.1f}%, 占总资金 {loss_impact*100:.2f}%")
return forced_sell
4. 绩效计算模块 (
"metrics.py")
import pandas as pd
import numpy as np
def calculate_cagr(nav_series: pd.Series) -> float:
"""
★ 计算年化收益率 (CAGR)
CAGR = (V_final / V_start)^(1/years) - 1
"""
if len(nav_series) < 2:
return 0.0
start_val = nav_series.iloc[0]
end_val = nav_series.iloc[-1]
if start_val <= 0:
return 0.0
# ★ 精确计算年数(考虑实际天数)
days = (nav_series.index[-1] - nav_series.index[0]).days
if days <= 0:
return 0.0
years = days / 365.25
cagr = (end_val / start_val) ** (1.0 / years) - 1.0
return round(cagr * 100, 2)
def calculate_max_drawdown(nav_series: pd.Series) -> Tuple[float, str, str]:
"""
★ 计算最大回撤 (Max Drawdown)
MDD = max((Peak - Trough) / Peak)
返回: (最大回撤百分比, 回撤开始日期, 回撤结束日期)
"""
if len(nav_series) < 2:
return 0.0, "", ""
nav = nav_series.values
dates = nav_series.index
max_dd = 0.0
peak_date = ""
trough_date = ""
peak_val = nav[0]
peak_idx = 0
for i in range(1, len(nav)):
if nav[i] > peak_val:
peak_val = nav[i]
peak_idx = i
dd = (peak_val - nav[i]) / peak_val
if dd > max_dd:
max_dd = dd
peak_date = str(dates[peak_idx].strftime('%Y-%m-%d'))
trough_date = str(dates[i].strftime('%Y-%m-%d'))
return round(max_dd * 100, 2), peak_date, trough_date
def calculate_sharpe(nav_series: pd.Series, risk_free_rate: float = 0.025) -> float:
"""计算夏普比率"""
if len(nav_series) < 2:
return 0.0
ret = nav_series.pct_change().dropna()
if len(ret) < 2:
return 0.0
days = (nav_series.index[-1] - nav_series.index[0]).days
yrs = max(days / 365.25, 1/365.25)
ann_ret = (nav_series.iloc[-1] / nav_series.iloc[0]) ** (1/yrs) - 1
ann_vol = ret.std() * np.sqrt(252)
if ann_vol < 1e-10:
return 0.0
sharpe = (ann_ret - risk_free_rate) / ann_vol
return round(sharpe, 3)
def print_performance_report(nav_series: pd.Series, strategy_name: str = "4S策略"):
"""打印完整绩效报告"""
cagr = calculate_cagr(nav_series)
mdd, peak_d, trough_d = calculate_max_drawdown(nav_series)
sharpe = calculate_sharpe(nav_series)
total_ret = (nav_series.iloc[-1] / nav_series.iloc[0] - 1) * 100
print(f"\n{'='*60}")
print(f" {strategy_name} 绩效报告")
print(f"{'='*60}")
print(f" 期初净值: ¥{nav_series.iloc[0]:,.2f}")
print(f" 期末净值: ¥{nav_series.iloc[-1]:,.2f}")
print(f" 累计收益: {total_ret:+.2f}%")
print(f" ★ 年化收益: {cagr:+.2f}%")
print(f" ★ 最大回撤: {mdd:.2f}%")
print(f" ├─ 回撤开始: {peak_d}")
print(f" └─ 回撤探底: {trough_d}")
print(f" ★ 夏普比率: {sharpe:.3f}")
print(f"{'='*60}\n")
return {
'cagr': cagr,
'max_drawdown': mdd,
'sharpe': sharpe,
'total_return': round(total_ret, 2)
}
5. 主回测入口 (
"main.py")
import yaml
import pandas as pd
from pathlib import Path
from data_loader import load_price_data, calculate_technical_factors
from strategy_engine import FourSStrategy
from metrics import print_performance_report
def load_config(path: str = 'config.yaml') -> dict:
with open(path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
def main():
config = load_config()
# 1. 加载数据
print("📊 加载行情数据...")
df = load_price_data('data/daily_prices.csv')
df = calculate_technical_factors(df)
# 2. 初始化策略
strategy = FourSStrategy(config)
# 3. 按日循环回测
unique_dates = df['date'].unique()
print(f"\n{'='*60}")
print(f" 开始回测: {unique_dates[0].strftime('%Y-%m-%d')} ~ "
f"{unique_dates[-1].strftime('%Y-%m-%d')}")
print(f" 初始资金: ¥{config['backtest']['initial_capital']:,.0f}")
print(f"{'='*60}\n")
for i, date in enumerate(unique_dates):
daily = df[df['date'] == date]
# ★ 先检查风控(针对持仓)
forced_sell = strategy.risk_check(daily)
if forced_sell:
for code in forced_sell:
strategy._sell_stock(date, code, daily)
# ★ 再执行正常4S策略
strategy.run_daily(date, daily)
if i % 100 == 0:
nav = strategy.nav_history[-1][1] if strategy.nav_history else 0
pos_count = len(strategy.positions)
print(f" [{i+1}/{len(unique_dates)}] {date.strftime('%Y-%m-%d')} | "
f"持仓:{pos_count} | 净值:¥{nav:,.0f}")
# 4. 生成净值序列
dates = [d for d, v in strategy.nav_history]
values = [v for d, v in strategy.nav_history]
nav_series = pd.Series(values, index=dates)
# 5. 输出核心指标
metrics = print_performance_report(nav_series, "4S完整策略")
# 6. 保存结果
Path("output").mkdir(exist_ok=True)
nav_series.to_csv("output/nav_curve.csv", header=['nav'], index_label='date')
print(f"✅ 回测完成!")
print(f" 年化收益: {metrics['cagr']:+.2f}%")
print(f" 最大回撤: {metrics['max_drawdown']:.2f}%")
print(f" 夏普比率: {metrics['sharpe']:.3f}")
if __name__ == '__main__':
main()
五、 README 文件与使用说明
# 4S 完整策略历史回测系统
## 项目简介
本系统实现"选股(Select) → 择时(Time) → 仓位(Size) → 风控(Stop)"完整闭环,
输出年化收益、最大回撤、夏普比率三大核心指标。
## 安装依赖
bash
pip install pandas numpy pyyaml matplotlib
## 使用说明
1. 准备数据: `data/daily_prices.csv` (date, code, open, high, low, close, volume)
2. 修改配置: `config.yaml` (调整选股因子、仓位上限、风控线)
3. 运行回测: `python main.py`
## 核心输出
- 年化收益率 (CAGR)
- 最大回撤 (Max Drawdown) 及起止日期
- 夏普比率 (Sharpe Ratio)
- 净值曲线 CSV
## 参数调优建议
- 选股因子周期: 20日动量适合中线,5日适合短线
- 风控线: 2%为中性,激进可放宽至3%,保守收紧至1%
- 仓位上限: 5只为宜,过多分散收益,过少集中风险
六、 核心知识点卡片
┌──────────────────────────────────────────────────────┐
│ 4S 完整策略回测 — 核心知识 │
├──────────────┬───────────────────────────────────────┤
│ 选股(Select) │ 动量因子+波动率过滤,筛选优质标的 │
│ 择时(Time) │ 均线金叉/死叉,决定买卖时机 │
│ 仓位(Size) │ 等权/加权分配,控制单只最大占比 │
│ 风控(Stop) │ 个股亏损超总资金2% → 强制平仓 │
│ 年化收益(CAGR)│ (期末/期初)^(1/年数) - 1 │
│ 最大回撤(MDD) │ max((峰值-谷值)/峰值) │
│ 夏普比率 │ (年化收益-无风险利率) / 年化波动 │
│ 回测陷阱 │ 未来函数、幸存者偏差、过拟合 │
│ 核心原则 │ 风控线不是限制收益,是保住本金 │
└──────────────┴───────────────────────────────────────┘
七、 免责声明与风险提示
⚠️ 免责声明:本代码仅供学习、研究与量化教学用途,不构成任何投资建议或投资决策依据。模拟数据为随机数生成,不代表任何真实标的历史或未来表现。
⚠️ 风险提示:
- 回测结果不代表实盘表现,存在滑点、冲击成本等未建模因素
- 2%风控线为经验值,不同策略/资金规模需动态调整
- 均线择时存在滞后性,单边下跌市中可能无法及时止损
- 动量因子在震荡市中容易频繁切换,产生大量交易成本
- 本系统未进行参数优化,直接用于实盘可能导致亏损
八、 总结
编写一个完整的4S策略回测系统,核心不在于"写出多复杂的数学公式",而在于工程化的闭环思维:
1. 模块化:选股、择时、仓位、风控四权分立,便于单独调参和单元测试
2. 年化收益:用
"(期末/期初)^(1/实际年数) - 1"而非简单除法,才是真实的CAGR
3. 最大回撤:维护
"rolling_max"变量,追踪"历史最高点→当前"的回撤幅度,而非单日波动
4. 风控2%:计算的是"个股亏损/总资金",不是"个股亏损/个股成本",这是大资金管理的核心视角
核心原则:回测的意义不是"证明策略能赚钱",而是量化你在不同市场环境下的风险暴露。一个能清晰输出年化收益和最大回撤的系统,才是合格的策略研发基础设施。
本文代码仅供学习与技术交流,不构成任何投资建议,股市有风险,入市需谨慎!
利用AI解决实际问题,如果你觉得这个工具好用,欢迎关注长安牧笛!