VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶一
本文介绍了如何使用PyTorch和LSTM模型进行股票数据的训练和回测。涵盖了数据预处理、特征选择、LSTM模型构建、模型训练与验证、动态阈值策略生成交易信号以及使用VectorBT进行回测和绩效分析。
文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。适合量化新手建立系统认知,为策略开发打下基础。
本文是 🚀 VectorBT:Python量化交易策略开发与回测评估详解 🔥的进阶指南,推荐先阅读了解基础知识‼️
1. 知识点总结
本文将介绍如何使用PyTorch+LSTM模型结合VectorBT来训练和回测股票数据。
将涵盖以下知识点:
- 数据预处理
- 特征选择
- LSTM模型构建
- 模型训练与验证
- 动态阈值策略
- 回测与绩效分析
2. 关键步骤讲解
2.1 数据预处理
数据预处理是机器学习中的关键步骤,包括读取数据、处理缺失值、标准化等。
- Pandas:用于数据处理和操作。
- MinMaxScaler:用于数据标准化,将特征缩放到0到1之间。
2.2 特征选择
特征选择是为了减少特征维度,提高模型性能。这里使用互信息法进行特征选择。
- mutual_info_regression:计算特征与目标变量之间的互信息。
2.3 LSTM模型构建
LSTM(长短期记忆网络)是一种特殊的RNN,适用于时间序列预测。
- EnhancedLSTM:自定义的LSTM模型,包含注意力机制。
- nn.LSTM:PyTorch中的LSTM层。
- nn.Sequential:用于构建神经网络层的顺序容器。
2.4 模型训练与验证
- AdamW优化器:一种基于梯度下降的优化算法。
- OneCycleLR调度器:动态调整学习率。
- HuberLoss损失函数:对异常值不敏感的损失函数。
2.5 动态阈值策略
动态阈值策略根据历史数据生成交易信号。
- AdaptiveStrategy:根据预测收益率和波动率生成交易信号。
2.6 回测与绩效分析
- VectorBT:用于回测和绩效分析的库。
- Portfolio.from_signals:根据交易信号构建投资组合。
3. 代码实现
3.1 环境设置
import torch
import vectorbt as vbtprint(f"PyTorch版本: {torch.__version__}")
print(f"VectorBT版本: {vbt.__version__}")vbt.settings.array_wrapper["freq"] = "D"
vbt.settings.plotting["layout"]["template"] = "vbt_dark"
vbt.settings.plotting["layout"]["width"] = 1200
vbt.settings.portfolio["init_cash"] = 100000.0 # 100000 CNY
vbt.settings.portfolio["fees"] = 0.0025 # 0.25%
vbt.settings.portfolio["slippage"] = 0.0025 # 0.25%device = torch.device("cuda"if torch.cuda.is_available()else "mps" if torch.backends.mps.is_available() else "cpu"
)
3.2 数据准备
import pandas as pd# 股票代码
# 贵州茅台(600519.SH)
ts_code = "600519.SH"# 读取Parquet文件
df = pd.read_parquet(f"./data/processed_{ts_code}.parquet")df["trade_date"] = pd.to_datetime(df["trade_date"], format="%Y%m%d")
df.set_index("trade_date", inplace=True)df.dropna(inplace=True)print(df.head())
print(df.shape)
3.3 数据预处理
- 特征筛选
- 目标变量
- 平稳性检验
- 时序分割
- 时序分割
- 标准化
- 创建时间序列数据
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import optuna
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.feature_selection import mutual_info_regression
from statsmodels.tsa.stattools import adfuller
from tqdm.auto import tqdm# 滑动窗口生成
seq_length = 30def prepare_data(df):# 特征工程(示例)features = ["open","high","low","vol","close","ma5","ma10","ma20","rsi","macd","macdsignal","macdhist","bb_upper","bb_middle","bb_lower","momentum","roc","atr","obv",]# 特征筛选(互信息法)target = df["close"].pct_change().shift(-1).dropna()selected_features = []for feature in features:mi = mutual_info_regression(df.iloc[:-1][feature].values.reshape(-1, 1), target.iloc[: len(df)].values)if mi > 0.05:selected_features.append(feature)if len(selected_features) > 0:features = selected_featuresprint(f"特征维度: {len(features)}列: {features}")# 目标变量转换为收益率df["returns"] = df["close"].pct_change().shift(-1) # 下一日收益率df.dropna(inplace=True)# 平稳性检验adf_result = adfuller(df["returns"].dropna())if adf_result[1] > 0.05:df["returns"] = df["returns"].diff().fillna(0) # 一阶差分# 时序分割split_ratios = (0.6, 0.2, 0.2) # train/val/testsplits = np.cumsum(split_ratios) * len(df)splits = splits.astype(int)train_df = df.iloc[: splits[0]]val_df = df.iloc[splits[0] : splits[1]]test_df = df.iloc[splits[1] :]# 标准化X_scaler = MinMaxScaler().fit(train_df[features])y_scaler = MinMaxScaler().fit(train_df[["returns"]])# 创建时间序列数据def create_sequences(data, target):X, y = [], []for i in range(len(data) - seq_length):X.append(data[i : i + seq_length]) # 滑动窗口y.append(target[i + seq_length - 1]) # 预测下一个时间步return np.array(X), np.array(y)X_train, y_train = create_sequences(X_scaler.transform(train_df[features]),y_scaler.transform(train_df[["returns"]]),)X_val, y_val = create_sequences(X_scaler.transform(val_df[features]), y_scaler.transform(val_df[["returns"]]))X_test, y_test = create_sequences(X_scaler.transform(test_df[features]), y_scaler.transform(test_df[["returns"]]))print(f"训练集维度: X{X_train.shape} y{y_train.shape}")print(f"验证集维度: X{X_val.shape} y{y_val.shape}")print(f"测试集维度: X{X_test.shape} y{y_test.shape}")print(f"总样本数: {len(X_train)+len(X_val)+len(X_test)}")return ((X_train, y_train),(X_val, y_val),(X_test, y_test),X_scaler,y_scaler,test_df,)
3.4 LSTM模型定义
class EnhancedLSTM(nn.Module):def __init__(self, input_dim, hidden_dim=128, num_layers=2, dropout=0.3):super().__init__()self.lstm = nn.LSTM(input_size=input_dim,hidden_size=hidden_dim,num_layers=num_layers,batch_first=True,dropout=dropout if num_layers > 1 else 0,)self.attention = nn.Sequential(nn.Linear(hidden_dim, 16), nn.Tanh(), nn.Linear(16, 1), nn.Softmax(dim=1))self.fc = nn.Sequential(nn.Linear(hidden_dim, 32), nn.ReLU(), nn.Dropout(dropout), nn.Linear(32, 1))def forward(self, x):out, _ = self.lstm(x) # [batch, seq_len, hidden]attn_weights = self.attention(out) # [batch, seq_len, 1]context = torch.sum(attn_weights * out, dim=1) # [batch, hidden]return self.fc(context)
3.5 动态阈值策略
class AdaptiveStrategy:def __init__(self, pred_returns, volatility, params):self.pred_returns = pred_returnsself.volatility = volatilityself.params = paramsdef generate_signals(self):# 动态阈值lookback = self.params["threshold_lookback"]upper_q = self.pred_returns.rolling(lookback).quantile(0.7)lower_q = self.pred_returns.rolling(lookback).quantile(0.3)# 波动率调整仓位(凯利公式变体)position_size = 0.5 * self.pred_returns.abs() / (self.volatility + 1e-6)position_size = position_size.clip(0.1, 0.8)signals = pd.Series(0, index=self.pred_returns.index)long_signals = (self.pred_returns > upper_q) & (position_size > 0.15)exit_signals = self.pred_returns < lower_qsignals[long_signals] = 1signals[exit_signals] = -1return signals, position_size
3.6 模型训练和评估
# 模型训练
def train_model(config, train_data, val_data):X_train, y_train = train_datainput_dim = X_train.shape[-1]hidden_dim = config["hidden_dim"]num_layers = config["num_layers"]dropout = config["dropout"]batch_size = config["batch_size"]lr = config["lr"]weight_decay = config["weight_decay"]epochs = config["epochs"]# 初始化模型model = EnhancedLSTM(input_dim, hidden_dim, num_layers, dropout).to(device)optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)scheduler = optim.lr_scheduler.OneCycleLR(optimizer,max_lr=config["lr"],total_steps=config["epochs"] * len(X_train) // config["batch_size"],)criterion = nn.HuberLoss()train_loader = DataLoader(TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train)),batch_size=config["batch_size"],shuffle=True,drop_last=True,)# 训练循环best_loss = float("inf")early_stop_counter = 0for epoch in tqdm(range(epochs), desc="Training"):model.train()train_loss = 0for X_batch, y_batch in train_loader:X_batch, y_batch = X_batch.to(device), y_batch.to(device)optimizer.zero_grad()preds = model(X_batch)loss = criterion(preds, y_batch)loss.backward()nn.utils.clip_grad_norm_(model.parameters(), 1.0)optimizer.step()scheduler.step()train_loss += loss.item()# 验证评估val_loss = evaluate_model(model, val_data)if (epoch + 1) % 10 == 0:print(f"Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")# 早停机制if val_loss < best_loss:best_loss = val_lossearly_stop_counter = 0else:early_stop_counter += 1if early_stop_counter >= 15:break# 训练完成后保存model_path = "./models/pytorch_lstm_model.pth"torch.save({"model_state_dict": model.state_dict(),"input_dim": input_dim,"hidden_dim": hidden_dim,"num_layers": num_layers,"dropout": dropout,"batch_size": batch_size,"lr": lr,"weight_decay": weight_decay,"epochs": epochs,},model_path,)print(f"PyTorch LSTM model and parameters saved to {model_path}")return model# 模型评估
def evaluate_model(model, val_data):X_val, y_val = val_dataval_loader = DataLoader(TensorDataset(torch.FloatTensor(X_val), torch.FloatTensor(y_val)),batch_size=256,shuffle=False,)model.eval()total_loss = 0criterion = nn.HuberLoss()with torch.no_grad():for X_batch, y_batch in val_loader:X_batch, y_batch = X_batch.to(device), y_batch.to(device)preds = model(X_batch)loss = criterion(preds, y_batch)total_loss += loss.item()return total_loss / len(val_loader)
3.6 回测引擎
def backtest_strategy(model, test_data, X_scaler, y_scaler, test_df):X_test, y_test = test_data# 生成预测model.eval()with torch.no_grad():test_tensor = torch.FloatTensor(X_test).to(device)preds = model(test_tensor).cpu().numpy()# 反归一化pred_returns = y_scaler.inverse_transform(preds.reshape(-1, 1)).flatten()# 对齐时间索引test_dates = test_df.index[seq_length:-1]df = pd.DataFrame({"close": test_df["close"].iloc[seq_length:-1],"pred_returns": pred_returns[:-1],"volatility": test_df["atr"].iloc[seq_length:-1]/ test_df["close"].iloc[seq_length:-1],},index=test_dates,)# 生成信号strategy = AdaptiveStrategy(pred_returns=df["pred_returns"],volatility=df["volatility"],params={"threshold_lookback": 60},)signals, position_size = strategy.generate_signals()# 输入校验assert len(df["close"]) == len(signals), "数据长度不一致"assert signals.isin([-1, 0, 1]).all(), "信号包含非法值"# 构建投资组合pf = vbt.Portfolio.from_signals(close=df["close"],size=np.abs(position_size), # 明确方向控制size_type="percent",entries=signals == 1,exits=signals == -1,freq="D",# 增强参数accumulate=False, # 禁止累积仓位log=True, # 记录交易日志call_seq="auto", # 自动处理订单顺序)return pf, df
3.7 主程序运行
# 数据准备
train_data, val_data, test_data, X_scaler, y_scaler, test_df = prepare_data(df)# Optuna超参优化
def objective(trial):config = {"hidden_dim": trial.suggest_int("hidden_dim", 64, 256),"num_layers": trial.suggest_int("num_layers", 1, 3),"dropout": trial.suggest_float("dropout", 0.1, 0.5),"batch_size": trial.suggest_categorical("batch_size", [32, 64, 128]),"lr": trial.suggest_float("lr", 1e-4, 1e-3, log=True),"weight_decay": trial.suggest_float("weight_decay", 1e-6, 1e-4),"epochs": 100,}model = train_model(config, train_data, val_data)val_loss = evaluate_model(model, val_data)return val_loss# 超参优化
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=10, show_progress_bar=True, timeout=3600
) # 1小时超时# 加载最佳模型
checkpoint = torch.load("./models/pytorch_lstm_model.pth", map_location=device)best_model = EnhancedLSTM(input_dim=train_data[0].shape[-1],**{k: vfor k, v in checkpoint.items()if k in ["hidden_dim", "num_layers", "dropout"]}
).to(device)
best_model.load_state_dict(checkpoint["model_state_dict"])# 回测
pf, result_df = backtest_strategy(best_model, test_data, X_scaler, y_scaler, test_df)# 绩效分析
print(pf.stats())
pf.plot().show()
4. 关键类和函数说明
4.1 prepare_data
函数
- 功能:数据预处理,包括特征选择、标准化、时间序列分割等。
- 参数:
df
:原始数据框。
- 返回值:
train_data
:训练集数据。val_data
:验证集数据。test_data
:测试集数据。X_scaler
:特征标准化器。y_scaler
:目标变量标准化器。test_df
:测试集数据框。
4.2 EnhancedLSTM
类
- 功能:定义一个增强的LSTM模型,包含注意力机制。
- 参数:
input_dim
:输入特征维度。hidden_dim
:隐藏层维度,默认128。num_layers
:LSTM层数,默认2。dropout
:Dropout概率,默认0.3。
- 方法:
forward
:前向传播函数。
4.3 AdaptiveStrategy
类
- 功能:根据预测收益率和波动率生成交易信号。
- 参数:
pred_returns
:预测收益率。volatility
:波动率。params
:参数字典,包含threshold_lookback
。
- 方法:
generate_signals
:生成交易信号。
4.4 train_model
函数
- 功能:训练LSTM模型。
- 参数:
config
:配置字典,包含模型参数。train_data
:训练集数据。val_data
:验证集数据。
- 返回值:
model
:训练好的模型。
4.5 evaluate_model
函数
- 功能:评估模型在验证集上的性能。
- 参数:
model
:模型。val_data
:验证集数据。
- 返回值:
val_loss
:验证集损失。
4.6 backtest_strategy
函数
- 功能:回测策略并生成投资组合。
- 参数:
model
:模型。test_data
:测试集数据。X_scaler
:特征标准化器。y_scaler
:目标变量标准化器。test_df
:测试集数据框。
- 返回值:
pf
:投资组合。result_df
:结果数据框。
风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。