本文介绍了一个量化交易策略开发的完整框架,包括数据收集、策略实施、统计验证、模拟和最终评估。文章使用Donchian Channel Breakout策略作为示例,详细讲解了如何通过数据清洗和准备、策略实现、参数优化、蒙特卡洛置换检验和步进分析等步骤,来构建、测试和验证交易策略,以避免过拟合,并最终做出是否部署策略的决策。
从数据收集到统计验证——开发有利可图的交易算法的严格框架
开发成功的量化交易策略不仅仅需要一个好主意。它需要一种系统的、科学严谨的方法,将真正的市场无效性与统计噪声区分开来。本指南提供了一个完整的框架,任何工程师都可以遵循该框架,以统计置信度构建、测试和验证交易策略。
我们将逐步完成整个过程:
到最后,你将拥有一个完整的、可复制的策略开发框架,专业量化分析师在实践中使用该框架。
我们将使用经典的趋势跟踪策略来演示此框架:唐奇安通道突破。逻辑很简单:
该策略假设近期范围的突破表明新趋势的开始。
高质量的数据是可靠回测的基础。我们将使用 CCXT 库从多个交易所获取加密货币数据。
pip install ccxt pandas numpy scipy matplotlib seaborn yfinance
import ccxt
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import time
import os
from typing import List, Tuple, Optional
class CryptoDataCollector:
"""
使用 CCXT 的强大的加密货币数据收集器。
处理速率限制、数据验证和存储。
""" def __init__(self, exchange_name: str = 'binance'):
"""使用指定的交易所初始化数据收集器。"""
self.exchange = getattr(ccxt, exchange_name)({
'rateLimit': 1200, # 对速率限制保持保守
'enableRateLimit': True,
}) def fetch_ohlcv_data(self,
symbol: str = 'BTC/USDT',
timeframe: str = '1h',
start_date: str = '2020-01-01',
end_date: Optional[str] = None) -> pd.DataFrame:
"""
获取历史 OHLCV 数据,具有适当的错误处理和验证。
Args:
symbol: 交易对 (例如, 'BTC/USDT')
timeframe: 蜡烛图时间范围 ('1m', '5m', '1h', '1d')
start_date: 开始日期,格式为 'YYYY-MM-DD'
end_date: 结束日期,格式为 'YYYY-MM-DD' (如果为 None,则为当前日期)
Returns:
带有 OHLCV 数据的 pandas.DataFrame
"""
print(f"Fetching {symbol} {timeframe} data from {start_date}...")
# 将日期转换为时间戳
since_timestamp = self.exchange.parse8601(f"{start_date}T00:00:00Z")
end_timestamp = None
if end_date:
end_timestamp = self.exchange.parse8601(f"{end_date}T23:59:59Z") all_candles = []
current_timestamp = since_timestamp while True:
try:
# 分块获取数据
candles = self.exchange.fetch_ohlcv(
symbol, timeframe, current_timestamp, limit=1000
) if not candles or len(candles) == 0:
break # 如果指定了结束日期,则按结束日期进行过滤
if end_timestamp:
candles = [c for c in candles if c[0] <= end_timestamp]
if not candles:
break all_candles.extend(candles) # 更新时间戳以进行下一次迭代
last_timestamp = candles[-1][0]
current_timestamp = last_timestamp + self._timeframe_to_ms(timeframe) # 检查是否已到达结束日期
if end_timestamp and current_timestamp > end_timestamp:
break print(f"Fetched {len(candles)} candles, total: {len(all_candles)}") # 遵守速率限制
time.sleep(self.exchange.rateLimit / 1000) except Exception as e:
print(f"Error fetching data: {e}")
time.sleep(5) # 重试前等待
continue # 转换为 DataFrame 并清理
df = self._process_raw_data(all_candles) # 保存到文件
filename = f"{symbol.replace('/', '')}_{timeframe}_{start_date}_{end_date or 'latest'}.parquet"
df.to_parquet(filename)
print(f"Data saved to {filename}") return df def _timeframe_to_ms(self, timeframe: str) -> int:
"""将时间范围字符串转换为毫秒"""
timeframes = {
'1m': 60 * 1000,
'5m': 5 * 60 * 1000,
'15m': 15 * 60 * 1000,
'1h': 60 * 60 * 1000,
'4h': 4 * 60 * 60 * 1000,
'1d': 24 * 60 * 60 * 1000,
}
return timeframes.get(timeframe, 60 * 60 * 1000) def _process_raw_data(self, raw_data: List) -> pd.DataFrame:
"""将原始 OHLCV 数据处理为干净的 DataFrame"""
df = pd.DataFrame(raw_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) # 转换时间戳并设置为索引
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True) # 确保数字类型
numeric_columns = ['open', 'high', 'low', 'close', 'volume']
df[numeric_columns] = df[numeric_columns].astype(float) # 删除重复项并排序
df = df[~df.index.duplicated(keep='first')]
df.sort_index(inplace=True) # 基本数据验证
self._validate_data(df) return df def _validate_data(self, df: pd.DataFrame) -> None:
"""验证 OHLCV 数据的完整性"""
# 检查是否存在缺失值
if df.isnull().any().any():
print("Warning: Found missing values in data") # 检查是否存在不可能的价格关系
invalid_rows = (df['high'] < df['low']) | (df['high'] < df['open']) | \\\\
(df['high'] < df['close']) | (df['low'] > df['open']) | \\\\
(df['low'] > df['close']) if invalid_rows.any():
print(f"Warning: Found {invalid_rows.sum()} rows with invalid OHLC relationships") # 检查是否存在负价格或负交易量
if (df[['open', 'high', 'low', 'close', 'volume']] < 0).any().any():
print("Warning: Found negative prices or volumes") print(f"Data validation complete. Shape: {df.shape}")
print(f"Date range: {df.index.min()} to {df.index.max()}")# 用法示例
if __name__ == "__main__":
collector = CryptoDataCollector('binance') # 获取 4 年的每小时比特币数据
btc_data = collector.fetch_ohlcv_data(
symbol='BTC/USDT',
timeframe='1h',
start_date='2020-01-01',
end_date='2024-01-01'
) print("\\\\nSample data:")
print(btc_data.head())
print(f"\\\\nData shape: {btc_data.shape}")
现在,让我们使用适当的信号生成和绩效跟踪来实施唐奇安通道突破策略。
import pandas as pd
import numpy as np
from typing import Tuple, Dict, List
import matplotlib.pyplot as plt
import seaborn as sns
class DonchianStrategy:
"""
唐奇安通道突破策略的实现,具有全面的分析。
""" def __init__(self, lookback_period: int = 20):
"""
使用指定的回溯周期初始化策略。
Args:
lookback_period: 用于计算最高价/最低价的回溯周期数
"""
self.lookback_period = lookback_period
self.signals = None
self.returns = None
self.positions = None def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
"""
根据唐奇安通道突破生成交易信号。
Args:
data: 包含 OHLCV 数据的 DataFrame
Returns:
包含信号和相关计算的 DataFrame
"""
df = data.copy() # 计算唐奇安通道
df['upper_channel'] = df['high'].rolling(window=self.lookback_period, min_periods=self.lookback_period).max()
df['lower_channel'] = df['low'].rolling(window=self.lookback_period, min_periods=self.lookback_period).min() # 生成信号
df['signal'] = 0 # 0 = 无头寸, 1 = 多头, -1 = 空头 # 多头信号:收盘价突破上通道
long_condition = df['close'] > df['upper_channel'].shift(1)
df.loc[long_condition, 'signal'] = 1 # 空头信号:收盘价跌破下通道
short_condition = df['close'] < df['lower_channel'].shift(1)
df.loc[short_condition, 'signal'] = -1 # 向前填充信号(保持头寸直到新信号)
df['position'] = df['signal'].replace(0, np.nan).ffill().fillna(0) # 计算回报
df['price_return'] = df['close'].pct_change()
df['strategy_return'] = df['position'].shift(1) * df['price_return'] # 存储结果
self.signals = df
self.returns = df['strategy_return'].dropna()
self.positions = df['position'] return df def calculate_performance_metrics(self) -> Dict[str, float]:
"""计算全面的绩效指标"""
if self.returns is None:
raise ValueError("Must generate signals first") returns = self.returns.dropna() # 基本回报指标
total_return = (1 + returns).prod() - 1
annualized_return = (1 + returns.mean()) ** (252 * 24) - 1 # 假设每小时数据
volatility = returns.std() * np.sqrt(252 * 24)
sharpe_ratio = annualized_return / volatility if volatility > 0 else 0 # 回撤分析
cumulative_returns = (1 + returns).cumprod()
rolling_max = cumulative_returns.expanding().max()
drawdown = (cumulative_returns - rolling_max) / rolling_max
max_drawdown = drawdown.min() # 盈亏分析
winning_returns = returns[returns > 0]
losing_returns = returns[returns < 0] win_rate = len(winning_returns) / len(returns) if len(returns) > 0 else 0
avg_win = winning_returns.mean() if len(winning_returns) > 0 else 0
avg_loss = losing_returns.mean() if len(losing_returns) > 0 else 0 # 利润因子
gross_profit = winning_returns.sum()
gross_loss = abs(losing_returns.sum())
profit_factor = gross_profit / gross_loss if gross_loss > 0 else np.inf # 其他指标
calmar_ratio = annualized_return / abs(max_drawdown) if max_drawdown != 0 else 0 return {
'total_return': total_return,
'annualized_return': annualized_return,
'volatility': volatility,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'calmar_ratio': calmar_ratio,
'win_rate': win_rate,
'avg_win': avg_win,
'avg_loss': avg_loss,
'profit_factor': profit_factor,
'total_trades': len(returns),
'winning_trades': len(winning_returns),
'losing_trades': len(losing_returns)
} def plot_strategy_performance(self, data: pd.DataFrame) -> None:
"""绘制策略表现和信号"""
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(15, 12)) # 价格和通道
ax1.plot(data.index, data['close'], label='收盘价', alpha=0.7)
ax1.plot(data.index, data['upper_channel'], label='上通道', linestyle='--', alpha=0.7)
ax1.plot(data.index, data['lower_channel'], label='下通道', linestyle='--', alpha=0.7) # 标记信号
long_signals = data[data['signal'] == 1]
short_signals = data[data['signal'] == -1] ax1.scatter(long_signals.index, long_signals['close'], color='green', marker='^', s=50, label='多头信号')
ax1.scatter(short_signals.index, short_signals['close'], color='red', marker='v', s=50, label='空头信号') ax1.set_title(f'唐奇安通道突破 (回溯: {self.lookback_period})')
ax1.legend()
ax1.grid(True, alpha=0.3) # 头寸
ax2.plot(data.index, data['position'], label='头寸', linewidth=2)
ax2.set_title('随时间变化的头寸')
ax2.set_ylabel('头寸')
ax2.legend()
ax2.grid(True, alpha=0.3) # 累积回报
cumulative_returns = (1 + data['strategy_return'].fillna(0)).cumprod()
cumulative_buy_hold = (1 + data['price_return'].fillna(0)).cumprod() ax3.plot(data.index, cumulative_returns, label='策略', linewidth=2)
ax3.plot(data.index, cumulative_buy_hold, label='买入并持有', alpha=0.7)
ax3.set_title('累积回报比较')
ax3.set_ylabel('累积回报')
ax3.legend()
ax3.grid(True, alpha=0.3) plt.tight_layout()
plt.show()# 用法示例
def test_single_strategy():
"""测试具有单个参数集的策略。"""
# 加载你的数据 (替换为实际数据加载)
# btc_data = pd.read_parquet('BTCUSDT_1h_2020-01-01_2024-01-01.parquet') # 为了演示,创建合成数据
np.random.seed(42)
dates = pd.date_range('2020-01-01', '2024-01-01', freq='1H')
price = 100 * np.exp(np.cumsum(np.random.normal(0.0001, 0.02, len(dates)))) synthetic_data = pd.DataFrame({
'open': price * (1 + np.random.normal(0, 0.001, len(dates))),
'high': price * (1 + np.abs(np.random.normal(0, 0.005, len(dates)))),
'low': price * (1 - np.abs(np.random.normal(0, 0.005, len(dates)))),
'close': price,
'volume': np.random.uniform(1000, 10000, len(dates))
}, index=dates) # 测试策略
strategy = DonchianStrategy(lookback_period=20)
signals_df = strategy.generate_signals(synthetic_data)
metrics = strategy.calculate_performance_metrics() print("Performance Metrics:")
for key, value in metrics.items():
print(f"{key}: {value:.4f}") strategy.plot_strategy_performance(signals_df)if __name__ == "__main__":
test_single_strategy()
接下来,我们将构建一个强大的优化框架,该框架将测试多个参数组合并跟踪结果。
import itertools
from concurrent.futures import ProcessPoolExecutor
import warnings
warnings.filterwarnings('ignore')
class StrategyOptimizer:
"""
使用并行处理和详细跟踪的全面策略优化。
""" def __init__(self, data: pd.DataFrame):
"""
使用市场数据初始化优化器。
Args:
data: 包含 OHLCV 数据的 DataFrame
"""
self.data = data
self.optimization_results = None def optimize_parameters(self,
lookback_range: range = range(5, 51, 5),
objective_function: str = 'profit_factor',
n_jobs: int = 4) -> pd.DataFrame:
"""
使用并行处理优化策略参数。
Args:
lookback_range: 要测试的回溯周期范围
objective_function: 要优化的指标 ('profit_factor', 'sharpe_ratio' 等)
n_jobs: 并行进程数
Returns:
包含优化结果的 DataFrame
"""
print(f"Optimizing parameters over {len(lookback_range)} combinations...") # 准备参数组合
param_combinations = [(lookback,) for lookback in lookback_range] # 并行运行优化
with ProcessPoolExecutor(max_workers=n_jobs) as executor:
results = list(executor.map(
self._test_parameter_combination,
param_combinations
)) # 转换为 DataFrame
results_df = pd.DataFrame(results)
results_df = results_df.sort_values(objective_function, ascending=False) self.optimization_results = results_df print(f"Optimization complete. Best {objective_function}: {results_df.iloc[0][objective_function]:.4f}")
print(f"Best parameters: Lookback = {results_df.iloc[0]['lookback_period']}") return results_df def _test_parameter_combination(self, params: Tuple) -> Dict:
"""测试单个参数组合"""
lookback_period = params[0] try:
strategy = DonchianStrategy(lookback_period=lookback_period)
strategy.generate_signals(self.data)
metrics = strategy.calculate_performance_metrics() # 添加参数信息
metrics['lookback_period'] = lookback_period return metrics except Exception as e:
print(f"Error testing lookback {lookback_period}: {e}")
return {
'lookback_period': lookback_period,
'profit_factor': 0,
'sharpe_ratio': 0,
'total_return': 0,
'max_drawdown': -1,
'win_rate': 0
} def plot_optimization_results(self, top_n: int = 10) -> None:
"""绘制优化结果"""
if self.optimization_results is None:
raise ValueError("Must run optimization first") results = self.optimization_results.head(top_n) fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10)) # 利润因子
ax1.bar(range(len(results)), results['profit_factor'])
ax1.set_title('按等级划分的利润因子')
ax1.set_xlabel('等级')
ax1.set_ylabel('利润因子')
ax1.grid(True, alpha=0.3) # 夏普比率
ax2.bar(range(len(results)), results['sharpe_ratio'])
ax2.set_title('按等级划分的夏普比率')
ax2.set_xlabel('等级')
ax2.set_ylabel('夏普比率')
ax2.grid(True, alpha=0.3) # 总回报
ax3.bar(range(len(results)), results['total_return'] * 100)
ax3.set_title('按等级划分的总回报 (%)')
ax3.set_xlabel('等级')
ax3.set_ylabel('总回报 (%)')
ax3.grid(True, alpha=0.3) # 最大回撤
ax4.bar(range(len(results)), results['max_drawdown'] * 100)
ax4.set_title('按等级划分的最大回撤 (%)')
ax4.set_xlabel('等级')
ax4.set_ylabel('最大回撤 (%)')
ax4.grid(True, alpha=0.3) plt.tight_layout()
plt.show()# 用法示例
def run_optimization():
"""运行参数优化示例"""
# 加载或创建数据
np.random.seed(42)
dates = pd.date_range('2020-01-01', '2023-01-01', freq='1H')
price = 100 * np.exp(np.cumsum(np.random.normal(0.0001, 0.02, len(dates)))) data = pd.DataFrame({
'open': price * (1 + np.random.normal(0, 0.001, len(dates))),
'high': price * (1 + np.abs(np.random.normal(0, 0.005, len(dates)))),
'low': price * (1 - np.abs(np.random.normal(0, 0.005, len(dates)))),
'close': price,
'volume': np.random.uniform(1000, 10000, len(dates))
}, index=dates) # 运行优化
optimizer = StrategyOptimizer(data)
results = optimizer.optimize_parameters(
lookback_range=range(5, 51, 5),
objective_function='profit_factor'
) print("\\\\nTop 5 Results:")
print(results[['lookback_period', 'profit_factor', 'sharpe_ratio', 'total_return', 'max_drawdown']].head()) optimizer.plot_optimization_results()if __name__ == "__main__":
run_optimization()
这是将真实优势与统计侥幸分开的关键步骤。我们将实施全面的置换检验,以验证我们的结果。
from scipy import stats
import random
from typing import Callable
class MonteCarloValidator:
"""
用于策略验证的蒙特卡罗置换检验。
确定策略表现是否具有统计显着性。
""" def __init__(self, data: pd.DataFrame, n_simulations: int = 1000):
"""
使用市场数据初始化验证器。
Args:
data: 包含 OHLCV 数据的 DataFrame
n_simulations: 蒙特卡罗模拟次数
"""
self.data = data
self.n_simulations = n_simulations
self.permutation_results = None def run_permutation_test(self,
optimal_lookback: int,
test_statistic: str = 'profit_factor') -> Dict[str, float]:
"""
在优化的策略上运行蒙特卡罗置换检验。
Args:
optimal_lookback: 来自优化的最佳回溯周期
test_statistic: 要测试显着性的指标
Returns:
包含测试结果(包括 **p** 值)的字典
"""
print(f"Running {self.n_simulations} Monte Carlo simulations...") # 计算实际的策略表现
actual_strategy = DonchianStrategy(lookback_period=optimal_lookback)
actual_strategy.generate_signals(self.data)
actual_metrics = actual_strategy.calculate_performance_metrics()
actual_test_stat = actual_metrics[test_statistic] print(f"Actual {test_statistic}: {actual_test_stat:.4f}") # 运行置换检验
permutation_stats = [] for i in range(self.n_simulations):
if i % 100 == 0:
print(f"Simulation {i}/{self.n_simulations}") # 创建排列的数据
permuted_data = self._permute_data() # 在排列的数据上运行完整优化
permuted_optimizer = StrategyOptimizer(permuted_data)
permuted_results = permuted_optimizer.optimize_parameters(
lookback_range=range(5, 51, 5),
objective_function=test_statistic,
n_jobs=1 # 对置换检验使用单个进程
) # 从排列的优化中获取最佳结果
best_permuted_stat = permuted_results.iloc[0][test_statistic]
permutation_stats.append(best_permuted_stat) # 计算 **p** 值
permutation_stats = np.array(permutation_stats)
p_value = np.mean(permutation_stats >= actual_test_stat) # 存储结果
self.permutation_results = {
'actual_statistic': actual_test_stat,
'permutation_stats': permutation_stats,
'p_value': p_value,
'test_statistic': test_statistic,
'mean_permuted': np.mean(permutation_stats),
'std_permuted': np.std(permutation_stats),
'percentile_95': np.percentile(permutation_stats, 95),
'percentile_99': np.percentile(permutation_stats, 99)
} print(f"\\\\nMonte Carlo Permutation Test Results:")
print(f"Actual {test_statistic}: {actual_test_stat:.4f}")
print(f"Mean of permuted results: {np.mean(permutation_stats):.4f}")
print(f"95th percentile of permuted results: {np.percentile(permutation_stats, 95):.4f}")
print(f"P-value: {p_value:.4f}") return self.permutation_results def _permute_data(self) -> pd.DataFrame:
"""
创建数据的排列版本,该版本保留统计属性
但会破坏任何真实模式。
"""
permuted_data = self.data.copy() # 方法 1:块引导程序(保留短期相关性)
block_size = 24 # 每小时数据为 24 小时
n_blocks = len(permuted_data) // block_size # 创建随机块索引
block_indices = []
for _ in range(n_blocks + 1): # +1 确保我们覆盖所有数据
start_idx = random.randint(0, len(permuted_data) - block_size - 1)
block_indices.extend(range(start_idx, start_idx + block_size)) # 调整到原始长度
block_indices = block_indices[:len(permuted_data)] # 在保留价格结构的同时,将排列应用于回报
returns = permuted_data['close'].pct_change().dropna()
permuted_returns = returns.iloc[block_indices[1:]].values # 跳过第一个 NaN # 从排列的回报中重建价格序列
permuted_prices = [permuted_data['close'].iloc[0]]
for ret in permuted_returns:
permuted_prices.append(permuted_prices[-1] * (1 + ret)) # 按比例更新所有价格列
price_ratio = np.array(permuted_prices) / permuted_data['close'].iloc[:len(permuted_prices)].values for col in ['open', 'high', 'low', 'close']:
permuted_data[col].iloc[:len(permuted_prices)] *= price_ratio return permuted_data def plot_permutation_results(self) -> None:
"""绘制置换检验结果的直方图"""
if self.permutation_results is None:
raise ValueError("Must run permutation test first") results = self.permutation_results fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6)) # 置换结果的直方图
ax1.hist(results['permutation_stats'], bins=50, alpha=0.7, color='skyblue',
density=True, label='排列结果')
ax1.axvline(results['actual_statistic'], color='red', linestyle='--', linewidth=2,
label=f'实际策略 ({results["actual_statistic"]:.4f})')
ax1.axvline(results['percentile_95'], color='orange', linestyle='--',
label=f'第 95 个百分位数 ({results["percentile_95"]:.4f})')
ax1.set_xlabel(results['test_statistic'].replace('_', ' ').title())
ax1.set_ylabel('密度')
ax1.set_title(f'蒙特卡罗置换检验\\\\nP-value: {results["p_value"]:.4f}')
ax1.legend()
ax1.grid(True, alpha=0.3) # Q-Q 图检查置换结果的正态性
stats.probplot(results['permutation_stats'], dist="norm", plot=ax2)
ax2.set_title('置换结果的 Q-Q 图')
ax2.grid(True, alpha=0.3) plt.tight_layout()
plt.show()# 用法示例
def run_monte_carlo_test():
"""运行完整的蒙特卡罗验证示例"""
# 创建具有一定趋势的合成数据
np.random.seed(42)
dates = pd.date_range('2020-01-01', '2023-01-01', freq='1H') # 添加轻微趋势以使其更逼真
trend = np.linspace(0, 0.5, len(dates))
noise = np.random.normal(0, 0.02, len(dates))
returns = trend/len(dates) + noise price = 100 * np.exp(np.cumsum(returns)) data = pd.DataFrame({
'open': price * (1 + np.random.normal(0, 0.001, len(dates))),
'high': price * (1 + np.abs(np.random.normal(0, 0.005, len(dates)))),
'low': price * (1 - np.abs(np.random.normal(0, 0.005, len(dates)))),
'close': price,
'volume': np.random.uniform(1000, 10000, len(dates))
}, index=dates) # 首先优化
print("Step 1: Parameter Optimization")
optimizer = StrategyOptimizer(data)
optimization_results = optimizer.optimize_parameters(
lookback_range=range(10, 31, 5),
objective_function='profit_factor'
) best_lookback = optimization_results.iloc[0]['lookback_period']
best_profit_factor = optimization_results.iloc[0]['profit_factor'] print(f"\\\\nBest parameters: Lookback = {best_lookback}, Profit Factor = {best_profit_factor:.4f}") # 然后运行蒙特卡罗检验
print("\\\\nStep 2: Monte Carlo Validation")
validator = MonteCarloValidator(data, n_simulations=100) # 减少了用于演示
mc_results = validator.run_permutation_test(
optimal_lookback=int(best_lookback),
test_statistic='profit_factor'
) validator.plot_permutation_results() # 解释结果
print(f"\\\\nInterpretation:")
if mc_results['p_value'] < 0.01:
print("✅ Strategy is statistically significant (p < 0.01)")
elif mc_results['p_value'] < 0.05:
print("⚠️ Strategy is marginally significant (p < 0.05)")
else:
print("❌ Strategy is NOT statistically significant")
print(" This suggests the performance may be due to overfitting")if __name__ == "__main__":
run_monte_carlo_test()
步进式分析通过在滚动窗口上持续重新优化参数来模拟真实世界的交易。
def __init__(self, data: pd.DataFrame):
self.data = data
self.results = [] def run_walk_forward(self,
train_months: int = 12,
test_months: int = 1,
step_months: int = 1) -> pd.DataFrame:
"""
Run walk-forward analysis with rolling optimization windows.
**使用滚动优化窗口运行步进式分析。**
Args:
train_months: Training window size in months
**train_months: 训练窗口大小,以月为单位**
test_months: Testing window size in months
**test_months: 测试窗口大小,以月为单位**
step_months: Step size between windows in months
**step_months: 窗口之间的步长,以月为单位**
"""
print("Running Walk-Forward Analysis...")
start_date = self.data.index[0]
end_date = self.data.index[-1] current_date = start_date + pd.DateOffset(months=train_months)
results = [] while current_date + pd.DateOffset(months=test_months) <= end_date:
# Define train and test periods
# **定义训练和测试周期**
train_start = current_date - pd.DateOffset(months=train_months)
train_end = current_date
test_start = current_date
test_end = current_date + pd.DateOffset(months=test_months) # Get train and test data
# **获取训练和测试数据**
train_data = self.data[train_start:train_end]
test_data = self.data[test_start:test_end] if len(train_data) < 100 or len(test_data) < 10:
current_date += pd.DateOffset(months=step_months)
continue # Optimize on training data
# **在训练数据上进行优化**
optimizer = StrategyOptimizer(train_data)
opt_results = optimizer.optimize_parameters(
lookback_range=range(5, 51, 5),
objective_function='profit_factor',
n_jobs=1
) if len(opt_results) == 0:
current_date += pd.DateOffset(months=step_months)
continue best_lookback = opt_results.iloc[0]['lookback_period'] # Test on out-of-sample data
# **在样本外数据上进行测试**
strategy = DonchianStrategy(lookback_period=int(best_lookback))
test_signals = strategy.generate_signals(test_data)
test_metrics = strategy.calculate_performance_metrics() # Store results
# **存储结果**
result = {
'train_start': train_start,
'train_end': train_end,
'test_start': test_start,
'test_end': test_end,
'optimal_lookback': best_lookback,
'oos_profit_factor': test_metrics['profit_factor'],
'oos_sharpe_ratio': test_metrics['sharpe_ratio'],
'oos_total_return': test_metrics['total_return'],
'oos_max_drawdown': test_metrics['max_drawdown'],
'oos_win_rate': test_metrics['win_rate']
}
results.append(result) print(f"Period {test_start.date()} to {test_end.date()}: "
f"PF={test_metrics['profit_factor']:.3f}, "
f"SR={test_metrics['sharpe_ratio']:.3f}") current_date += pd.DateOffset(months=step_months) self.results = pd.DataFrame(results)
return self.results def plot_walk_forward_results(self):
"""Plot walk-forward analysis results."""
# **绘制步进式分析结果。**
if len(self.results) == 0:
raise ValueError("Must run walk-forward analysis first") fig, axes = plt.subplots(2, 2, figsize=(15, 10)) # Profit Factor over time
# **随时间变化的盈利因子**
axes[0,0].plot(self.results['test_start'], self.results['oos_profit_factor'], 'o-')
axes[0,0].axhline(y=1.0, color='red', linestyle='--', alpha=0.7)
axes[0,0].set_title('Out-of-Sample Profit Factor')
axes[0,0].set_ylabel('Profit Factor')
axes[0,0].grid(True, alpha=0.3) # Sharpe Ratio
# **夏普比率**
axes[0,1].plot(self.results['test_start'], self.results['oos_sharpe_ratio'], 'o-', color='green')
axes[0,1].axhline(y=0, color='red', linestyle='--', alpha=0.7)
axes[0,1].set_title('Out-of-Sample Sharpe Ratio')
axes[0,1].set_ylabel('Sharpe Ratio')
axes[0,1].grid(True, alpha=0.3) # Total Return
# **总回报**
axes[1,0].plot(self.results['test_start'], self.results['oos_total_return']*100, 'o-', color='purple')
axes[1,0].axhline(y=0, color='red', linestyle='--', alpha=0.7)
axes[1,0].set_title('Out-of-Sample Total Return (%)')
axes[1,0].set_ylabel('Return (%)')
axes[1,0].grid(True, alpha=0.3) # Max Drawdown
# **最大回撤**
axes[1,1].plot(self.results['test_start'], self.results['oos_max_drawdown']*100, 'o-', color='orange')
axes[1,1].set_title('Out-of-Sample Max Drawdown (%)')
axes[1,1].set_ylabel('Drawdown (%)')
axes[1,1].grid(True, alpha=0.3) plt.tight_layout()
plt.show() # Summary statistics
# **概要统计**
print("\\\\nWalk-Forward Analysis Summary:")
print(f"Total periods: {len(self.results)}")
print(f"Average OOS Profit Factor: {self.results['oos_profit_factor'].mean():.3f}")
print(f"Average OOS Sharpe Ratio: {self.results['oos_sharpe_ratio'].mean():.3f}")
print(f"Profitable periods: {(self.results['oos_total_return'] > 0).sum()}/{len(self.results)}")
print(f"Win rate: {(self.results['oos_total_return'] > 0).mean()*100:.1f}%")
Finally, let’s implement a realistic simulation that includes transaction costs, slippage, and position sizing.
最后,让我们实现一个现实的模拟,包括交易成本、滑点和头寸规模。
class StrategySimulator:
"""
Realistic strategy simulation with transaction costs and risk management.
**具有交易成本和风险管理的现实策略模拟。**
"""
def __init__(self, initial_capital: float = 100000):
self.initial_capital = initial_capital
self.current_capital = initial_capital
self.positions = []
self.trades = [] def simulate_strategy(self,
data: pd.DataFrame,
strategy: DonchianStrategy,
transaction_cost: float = 0.001, # 0.1% per trade
slippage: float = 0.0005, # 0.05% slippage
max_position_size: float = 0.95) -> Dict:
"""
Run complete strategy simulation with realistic trading costs.
**运行具有实际交易成本的完整策略模拟。**
Args:
data: Market data
**data:市场数据**
strategy: Configured strategy instance
**strategy:配置的策略实例**
transaction_cost: Transaction cost as fraction
**transaction_cost:作为分数形式的交易成本**
slippage: Slippage as fraction
**slippage:作为分数形式的滑点**
max_position_size: Maximum position size as fraction of capital
**max_position_size:作为资本分数形式的最大头寸规模**
"""
signals_df = strategy.generate_signals(data) portfolio_value = [self.initial_capital]
positions = []
current_position = 0
entry_price = 0 for i, (timestamp, row) in enumerate(signals_df.iterrows()):
if i == 0:
continue prev_signal = signals_df.iloc[i-1]['position']
curr_signal = row['position']
price = row['close'] # Check for signal change
# **检查信号变化**
if curr_signal != prev_signal:
# Close existing position
# **平仓现有头寸**
if current_position != 0:
trade_return = self._execute_trade(
current_position, entry_price, price,
transaction_cost, slippage, 'close'
)
self.current_capital *= (1 + trade_return) # Open new position
# **开立新头寸**
if curr_signal != 0:
position_size = curr_signal * max_position_size
entry_price = price * (1 + curr_signal * slippage) # Adjust for slippage
current_position = position_size
else:
current_position = 0
entry_price = 0 # Calculate portfolio value
# **计算投资组合价值**
if current_position != 0:
unrealized_pnl = current_position * (price - entry_price) / entry_price
portfolio_value.append(self.current_capital * (1 + unrealized_pnl))
else:
portfolio_value.append(self.current_capital) positions.append(current_position) # Calculate final metrics
# **计算最终指标**
returns = pd.Series(portfolio_value).pct_change().dropna() metrics = {
'total_return': (portfolio_value[-1] / self.initial_capital) - 1,
'annualized_return': (portfolio_value[-1] / self.initial_capital) ** (252*24/len(data)) - 1,
'volatility': returns.std() * np.sqrt(252*24),
'sharpe_ratio': (returns.mean() * 252*24) / (returns.std() * np.sqrt(252*24)),
'max_drawdown': self._calculate_max_drawdown(portfolio_value),
'final_capital': portfolio_value[-1],
'total_trades': len([p for p in positions if p != 0])
} return {
'metrics': metrics,
'portfolio_values': portfolio_value,
'positions': positions
} def _execute_trade(self, position_size, entry_price, exit_price,
transaction_cost, slippage, trade_type):
"""Execute a trade with costs."""
# **执行具有成本的交易。**
if trade_type == 'close':
gross_return = position_size * (exit_price - entry_price) / entry_price
total_cost = transaction_cost + slippage
net_return = gross_return - total_cost
return net_return
return 0 def _calculate_max_drawdown(self, portfolio_values):
"""Calculate maximum drawdown."""
# **计算最大回撤。**
peak = portfolio_values[0]
max_dd = 0 for value in portfolio_values:
if value > peak:
peak = value
dd = (peak - value) / peak
if dd > max_dd:
max_dd = dd return max_dd## Putting It All Together: Complete Pipeline## **把它放在一起:完整流程**def complete_strategy_validation_pipeline():
"""
Complete end-to-end strategy validation pipeline.
**完整的端到端策略验证流程。**
"""
print("=== QUANTITATIVE STRATEGY VALIDATION PIPELINE ===\\\\n") # Step 1: Load Data
# **步骤1:加载数据**
print("1. Loading market data...")
np.random.seed(42)
dates = pd.date_range('2020-01-01', '2024-01-01', freq='1H')
trend = np.linspace(0, 0.3, len(dates))
noise = np.random.normal(0, 0.02, len(dates))
returns = trend/len(dates) + noise
price = 100 * np.exp(np.cumsum(returns)) data = pd.DataFrame({
'open': price * (1 + np.random.normal(0, 0.001, len(dates))),
'high': price * (1 + np.abs(np.random.normal(0, 0.005, len(dates)))),
'low': price * (1 - np.abs(np.random.normal(0, 0.005, len(dates)))),
'close': price,
'volume': np.random.uniform(1000, 10000, len(dates))
}, index=dates) print(f"✓ Data loaded: {len(data)} candles from {data.index[0]} to {data.index[-1]}") # Step 2: In-Sample Optimization
# **步骤2:样本内优化**
print("\\\\n2. In-sample parameter optimization...")
train_data = data['2020-01-01':'2023-01-01']
test_data = data['2023-01-01':'2024-01-01'] optimizer = StrategyOptimizer(train_data)
opt_results = optimizer.optimize_parameters(
lookback_range=range(10, 31, 5),
objective_function='profit_factor'
) best_lookback = int(opt_results.iloc[0]['lookback_period'])
best_pf = opt_results.iloc[0]['profit_factor'] print(f"✓ Best parameters: Lookback={best_lookback}, Profit Factor={best_pf:.3f}") # Step 3: Monte Carlo Validation
# **步骤 3:蒙特卡洛验证**
print("\\\\n3. Monte Carlo permutation testing...")
validator = MonteCarloValidator(train_data, n_simulations=50) # Reduced for demo
mc_results = validator.run_permutation_test(
optimal_lookback=best_lookback,
test_statistic='profit_factor'
) is_significant = mc_results['p_value'] < 0.05
print(f"✓ Statistical significance: {'PASS' if is_significant else 'FAIL'} (p={mc_results['p_value']:.3f})") # Step 4: Walk-Forward Analysis
# **步骤4:步进式分析**
print("\\\\n4. Walk-forward analysis...")
wf_analyzer = WalkForwardAnalyzer(data)
wf_results = wf_analyzer.run_walk_forward(
train_months=12,
test_months=1,
step_months=1
) avg_oos_pf = wf_results['oos_profit_factor'].mean()
print(f"✓ Average out-of-sample profit factor: {avg_oos_pf:.3f}") # Step 5: Final Simulation
# **步骤5:最终模拟**
print("\\\\n5. Realistic trading simulation...")
strategy = DonchianStrategy(lookback_period=best_lookback)
simulator = StrategySimulator(initial_capital=100000) sim_results = simulator.simulate_strategy(
test_data,
strategy,
transaction_cost=0.001,
slippage=0.0005
) final_return = sim_results['metrics']['total_return']
sharpe = sim_results['metrics']['sharpe_ratio']
max_dd = sim_results['metrics']['max_drawdown'] print(f"✓ Out-of-sample simulation complete:")
print(f" Total Return: {final_return*100:.2f}%")
print(f" Sharpe Ratio: {sharpe:.3f}")
print(f" Max Drawdown: {max_dd*100:.2f}%") # Final Decision
# **最终决定**
print("\\\\n=== FINAL ASSESSMENT ===") criteria = {
'Statistical Significance': is_significant,
'Positive Walk-Forward': avg_oos_pf > 1.0,
'Positive Out-of-Sample': final_return > 0,
'Acceptable Sharpe': sharpe > 0.5,
'Manageable Drawdown': max_dd < 0.20
} passed_criteria = sum(criteria.values())
total_criteria = len(criteria) print(f"\\\\nCriteria passed: {passed_criteria}/{total_criteria}")
for criterion, passed in criteria.items():
status = "✓ PASS" if passed else "✗ FAIL"
print(f" {criterion}: {status}") recommendation = "DEPLOY" if passed_criteria >= 4 else "REJECT"
print(f"\\\\nRECOMMENDATION: {recommendation} this strategy") if recommendation == "REJECT":
print("\\\\nReasons for rejection:")
for criterion, passed in criteria.items():
if not passed:
print(f" - {criterion}") return {
'recommendation': recommendation,
'criteria_passed': passed_criteria,
'total_criteria': total_criteria,
'final_metrics': sim_results['metrics']
}# Run the complete pipeline
# **运行完整流程**
if __name__ == "__main__":
results = complete_strategy_validation_pipeline()
This comprehensive framework provides a scientifically rigorous approach to quantitative strategy development. The key principles are:
这个综合框架为量化策略的开发提供了一种科学严谨的方法。其关键原则是:
This framework has successfully identified our Donchian strategy as statistically insignificant, saving us from deploying a strategy that would likely lose money in live trading. The process is designed to be conservative — it’s better to reject a potentially profitable strategy than to deploy one that will lose money.
该框架已成功将我们的唐奇安策略识别为统计上不显着,从而避免了我们部署可能在实际交易中赔钱的策略。该过程旨在保持保守——拒绝一个潜在的盈利策略比部署一个会赔钱的策略要好。
By following this systematic approach, you can build confidence in your quantitative strategies and avoid the common pitfalls that cause most algorithmic trading ventures to fail. Remember: in quantitative trading, the absence of evidence is not evidence of absence, but the presence of statistical significance is strong evidence of a real edge.
通过遵循这种系统的方法,你可以对你的量化策略建立信心,并避免导致大多数算法交易企业失败的常见陷阱。请记住:在量化交易中,缺乏证据并不是缺乏证据的证据,但是统计显著性的存在是真实优势的有力证据。
- 原文链接: extremelysunnyyk.medium....
- 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!