第七章:时间序列模型
第三部分:时间序列实战:使用时序模型进行股票预测实战
一套可以直接拿来跑、理解与改造的「股票量化预测完整工程流程」
一次性给你:从数据获取(包含爬虫与 API 两种方案)、数据清洗与特征工程、多模型训练(统计模型 + 机器学习 + 深度学习 + N-BEATS)、模型评估、基于预测的简单量化策略回测、到模型选型与工程化建议。代码以 Python 为主(PyTorch 用于 LSTM/N-BEATS),结构模块化,能在本地快速复现并扩展到真实量化研究中。
注意:金融市场有风险,本文提供的是研究/教学用途的回测与策略示例,不构成投资建议。
股票量化预测实战:完整工程流水线(可复现代码 + 说明)
总体步骤(高层)
获取数据(爬虫 / API / 历史数据)
数据清洗与对齐(填缺、频率转换、拆分训练集/测试集)
特征工程(价格/技术指标/时序特征/外部因子)
建模(ARIMA、XGBoost、LSTM、N-BEATS)
评估(RMSE/MAE/MAPE + 方向准确率 + 回测收益/夏普)
策略回测(基于预测信号构建择时或头寸)
模型比选与部署建议
下面给出详细分步实现与可运行代码模板(你可直接复制运行并替换 ticker / 数据源)。
0. 环境依赖(建议)
pip install pandas numpy matplotlib seaborn scikit-learn statsmodels yfinance ta xgboost torch tqdm pyarrow
# 可选:prophet timesfm(按需)
1. 数据获取:两种方式(推荐使用 API 库 yfinance,也提供爬虫示例)
1.1 推荐:使用 yfinance(稳定、无需 API Key)
# src/data_fetch.py
import yfinance as yf
import pandas as pd
def fetch_yfinance(ticker: str, start='2010-01-01', end=None, interval='1d'):"""返回 DataFrame,index=DatetimeIndex,包含 Open/High/Low/Close/Adj Close/Volume"""df = yf.download(ticker, start=start, end=end, interval=interval, auto_adjust=False, progress=False)df = df.rename(columns={'Adj Close':'Adj_Close'})return df
# Example
if __name__ == "__main__":df = fetch_yfinance("AAPL", start="2015-01-01", end="2024-10-31")print(df.tail())df.to_parquet("data/AAPL_daily.parquet")
1.2 爬虫示例(备用)——用于网站没有 API 的场景(示例:抓取雅虎财经历史表格)
注意:网站结构会变,遵守网站 robots 与使用条款;爬虫在生产中需谨慎与限速。
# src/scraper_yahoo.py (示例,仅教学用途)
import requests, pandas as pd, time
from bs4 import BeautifulSoup
def fetch_yahoo_history_html(ticker, period1, period2):url = f"https://finance.yahoo.com/quote/{ticker}/history?period1={period1}&period2={period2}&interval=1d&filter=history"r = requests.get(url, headers={"User-Agent":"Mozilla/5.0"})soup = BeautifulSoup(r.text, "lxml")# 解析表格…(略:推荐使用 yfinance 替代)
2. 数据清洗与基本特征(price、returns、log returns)
# src/preprocess.py
import pandas as pd
import numpy as np
def basic_preprocess(df: pd.DataFrame):# 确保时间索引if not isinstance(df.index, pd.DatetimeIndex):df.index = pd.to_datetime(df.index)df = df.sort_index()# 填补缺失(前向填充)df = df.ffill().bfill()# 基本价格列df['Close'] = df['Adj_Close'].fillna(df['Close'])df['return'] = df['Close'].pct_change()df['log_return'] = np.log(df['Close']).diff()return df
# usage
# df = pd.read_parquet("data/AAPL_daily.parquet")
# df = basic_preprocess(df)
3. 特征工程(时间序列特征 + 技术指标 + 滞后项 + 窗口统计)
使用 ta 库和手工实现滞后/rolling 特征。
# src/features.py
import pandas as pd
import numpy as np
import ta # pip install ta
def add_technical_features(df: pd.DataFrame):df = df.copy()# Momentumdf['rsi_14'] = ta.momentum.RSIIndicator(df['Close'], window=14).rsi()df['roc_9'] = ta.momentum.ROCIndicator(df['Close'], window=9).roc()# Trenddf['ma_5'] = df['Close'].rolling(5).mean()df['ma_20'] = df['Close'].rolling(20).mean()df['ma_ratio'] = df['ma_5'] / df['ma_20']# Volatilitydf['vol_10'] = df['log_return'].rolling(10).std()# Volume-baseddf['vol_ma_20'] = df['Volume'].rolling(20).mean()# Lag featuresfor lag in [1,2,3,5,10]:df[f'lag_close_{lag}'] = df['Close'].shift(lag)df[f'lag_ret_{lag}'] = df['log_return'].shift(lag)# Rolling statsdf['roll_mean_7'] = df['Close'].rolling(7).mean()df['roll_std_14'] = df['Close'].rolling(14).std()df = df.dropna()return df
# usage
# df_feats = add_technical_features(df)
说明:特征设计非常关键。对于不同模型(统计 vs ML vs DL)要选择合适特征集。深度模型可直接使用 raw sequence windows。
4. 数据集切分(样本形成,监督学习设置)
两类建模思路:
逐点预测(one-step):预测下一个交易日价格/return;
多步预测(horizon):预测未来 H 天(可用 N-BEATS / seq2seq)。
示例:构造监督学习数据(滑动窗口)
# src/data_window.py
import numpy as np
def create_windows(df, feature_cols, target_col='Close', window_size=30, horizon=1):X, y, dates = [], [], []arr = df[feature_cols].valuestarget = df[target_col].valuesfor i in range(window_size, len(df)-horizon+1):X.append(arr[i-window_size:i])y.append(target[i + horizon - 1]) # predict price at t+horizon-1dates.append(df.index[i + horizon - 1])return np.array(X), np.array(y), np.array(dates)
5. 建模:选取多类模型并训练
我们给出四类模型示例:ARIMA(统计)、XGBoost(树模型,基于滑动窗口特征)、LSTM(时序深度模型)、N-BEATS(SOTA 可解释深度模型)。
5.1 基线:持有(naive)、移动平均、线性回归(baseline)
简短说明略(保持基线以便比较)。
5.2 统计:ARIMA(statsmodels)
适用于短期 one-step。需要差分/平稳化。示例:
# src/models_arima.py
from statsmodels.tsa.arima.model import ARIMA
def train_arima(series, order=(5,1,0)):model = ARIMA(series, order=order)res = model.fit()return res # res.predict / res.forecast
5.3 机器学习:XGBoost(基于手工特征)
# src/models_xgb.py
import xgboost as xgb
from sklearn.model_selection import train_test_split
def train_xgb(X, y):X_train, X_val, y_train, y_val = train_test_split(X, y, shuffle=False, test_size=0.2)model = xgb.XGBRegressor(n_estimators=200, learning_rate=0.05, max_depth=5)model.fit(X_train, y_train, eval_set=[(X_val,y_val)], early_stopping_rounds=20, verbose=False)return model
5.4 深度:LSTM(PyTorch)
最常见序列模型:用滑动窗口作为输入(shape: B, T, features)。
# src/models_lstm.py
import torch, torch.nn as nn
class LSTMRegressor(nn.Module):def __init__(self, input_size, hidden_size=64, num_layers=2, dropout=0.2):super().__init__()self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout)self.fc = nn.Linear(hidden_size, 1)def forward(self, x):# x: [B, T, F]out, (h, c) = self.lstm(x)return self.fc(out[:, -1, :]).squeeze(-1)
# Training loop (simplified)
def train_lstm(model, train_loader, val_loader, epochs=20, lr=1e-3, device='cpu'):opt = torch.optim.Adam(model.parameters(), lr=lr)loss_fn = nn.MSELoss()model.to(device)for ep in range(epochs):model.train()for X_batch, y_batch in train_loader:X_batch = X_batch.to(device).float()y_batch = y_batch.to(device).float()pred = model(X_batch)loss = loss_fn(pred, y_batch)opt.zero_grad(); loss.backward(); opt.step()# validation omitted for brevityreturn model
5.5 SOTA:N-BEATS(简化实现)
# src/models_nbeats.py (极简版 block)
import torch.nn as nn
class NBeatsBlock(nn.Module):def __init__(self, input_size, theta_size, hidden_size=128):super().__init__()self.fc = nn.Sequential(nn.Linear(input_size, hidden_size),nn.ReLU(),nn.Linear(hidden_size, hidden_size),nn.ReLU(),nn.Linear(hidden_size, theta_size))def forward(self, x):theta = self.fc(x)# assume theta split into backcast and forecast; simplifiedbackcast = theta[:, :x.shape[1]]forecast = theta[:, x.shape[1]:]return backcast, forecast
# Compose multiple blocks as earlier in our N-BEATS example
6. 评估:精度 + 方向 + 交易回测
6.1 预测精度指标
RMSE、MAE、MAPE(对百分比敏感谨慎使用当价格接近0)
from sklearn.metrics import mean_squared_error, mean_absolute_error
import numpy as np
def regression_metrics(y_true, y_pred):rmse = np.sqrt(mean_squared_error(y_true, y_pred))mae = mean_absolute_error(y_true, y_pred)mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100return {'RMSE':rmse, 'MAE':mae, 'MAPE':mape}
6.2 方向准确率(sign accuracy)
def direction_accuracy(y_true, y_pred):# use returnstrue_ret = np.sign(np.diff(y_true))pred_ret = np.sign(np.diff(y_pred))return (true_ret == pred_ret).mean()
6.3 回测策略示例(基于预测的简单择时策略)
构建简单策略:若预测下日 return > threshold → 买入明日收盘到次日开并持有一天(示例)。为了简单,我们使用日内/次日开高低忽略费用与滑点做示例回测。
# src/backtest.py
import numpy as np, pandas as pd
def simple_signal_backtest(dates, close_prices, pred_prices, initial_capital=100000, threshold=0.0):"""Signal: if predicted return (pred/close - 1) > threshold => go long next day.We simulate buy at next open and sell at next close (approx).For simplicity use close prices: entry next day close, exit same day close (one-day hold)."""df = pd.DataFrame({'date':dates, 'close':close_prices, 'pred':pred_prices})df['pred_ret'] = df['pred'].pct_change().shift(-1) # predict next day's returndf['signal'] = (df['pred_ret'] > threshold).astype(int)capital = initial_capitalposition = 0capitals = []for i in range(len(df)-1):if df['signal'].iloc[i] == 1:# buy at close_i+1, sell at close_i+1 next day: simplified to capture daily changeret = (df['close'].iloc[i+1] / df['close'].iloc[i]) - 1capital = capital * (1 + ret)capitals.append(capital)# compute metricsreturns = pd.Series(capitals).pct_change().fillna(0)cumulative_return = capital / initial_capital - 1sharpe = returns.mean() / returns.std() * np.sqrt(252) if returns.std()>0 else np.nanreturn {'cumulative_return': cumulative_return, 'sharpe': sharpe, 'equity_curve': capitals}
说明:工业级回测要严格使用下一日开盘价/滑点/手续费/可用份额/仓位限制/最大回撤控制,这里只是教学演示。
7. 模型比较与选择(如何做决定)
推荐流程:
离线指标对比(RMSE/MAE/MAPE + direction accuracy)
离线回测:使用相同回测逻辑比较策略收益、夏普、最大回撤(Max Drawdown)
可解释性 & 稳定性(对不同时间段、不同市场环境的泛化)
计算资源/延迟:是否能在实时或每天批量推理中运作
风险控制:极端情况下模型的失败模式(例如在大幅波动期)
通常结论模板:
若目标是短期择时、交易:方向准确率、信息比(Information Ratio)更重要;复杂 DL 模型只在大样本和高频数据下有优势。
若目标是中长期价值预测或补充信号:ARIMA/Exponential + N-BEATS 为稳健选择;XGBoost 可做组合特征快速迭代。
8. 工程化建议(生产/实盘前必须做)
数据一致性:训练与线上特征必须完全一致(时间戳、填充策略、聚合口径)。
回测模拟真实交易成本:包含滑点、手续费、成交量限制。
模型监控:在线监控预测分布漂移、指标衰退告警并支持自动回滚。
模型组合(Ensemble):将统计模型 + ML + DL 模型加权融合通常比单模型更稳健(stacking / blending)。
在线 A/B 与 Shadow 流量:小规模试验并严控风险。
可复现 CI/CD:模型训练日志、超参、数据版本化(DVC/Parquet + git),并自动化导出部署包(ONNX/TorchScript/TF SavedModel)。
9. 全流程示例:从下载到评估(简化脚本入口)
# scripts/run_pipeline.py (伪代码流程)
from data_fetch import fetch_yfinance
from preprocess import basic_preprocess
from features import add_technical_features
from data_window import create_windows
from models_xgb import train_xgb
from models_lstm import LSTMRegressor, train_lstm
from eval import regression_metrics, direction_accuracy
from backtest import simple_signal_backtest
# 1. get data
df = fetch_yfinance("AAPL", start="2015-01-01", end="2024-10-31")
df = basic_preprocess(df)
df = add_technical_features(df)
# 2. prepare windows
feature_cols = ['log_return','rsi_14','ma_5','ma_20','vol_10'] + [f'lag_ret_{l}' for l in [1,2,3]]
X, y, dates = create_windows(df, feature_cols, target_col='Close', window_size=30)
# 3. train xgboost (flatten windows)
nsamples, T, F = X.shape
X_flat = X.reshape(nsamples, T*F)
xgb_model = train_xgb(X_flat, y)
# 4. predict and evaluate
y_pred = xgb_model.predict(X_flat[-len(y)//5:]) # simplified split
y_true = y[-len(y)//5:]
print(regression_metrics(y_true, y_pred))
print("Direction acc:", direction_accuracy(y_true, y_pred))
# 5. backtest
bt = simple_signal_backtest(dates[-len(y)//5:], y_true, y_pred, initial_capital=100000)
print(bt['cumulative_return'], bt['sharpe'])
10. 常见问题与答疑
Q:LSTM 比 XGBoost 更好吗?
A:并非总是。LSTM 更善于建模原始序列长期依赖,但 XGBoost 在小样本或以技术指标为输入时非常强。实践应做交叉验证与回测比较。Q:如何防止数据泄露?
A:严格按时间切分训练/验证/测试集,不在训练集中使用未来信息(例如 future rolling stat)。Q:能否用 TimesFM/N-BEATS 做 transfer learning?
A:可以。N-BEATS 和预训练的 TimesFM(若能获取)在少样本/zero-shot 情况下效果出色。TimesFM 需要大型预训练模型与资源支持。
11. 结论(如何选择最优模型)
先做基线(naive、MA、ARIMA),确认问题难度。
用 XGBoost 快速做可解释特征工程实验,作为强 baseline。
若有大量历史序列且需要捕捉复杂模式,使用 LSTM 或 N-BEATS;若能获得预训练 TimesFM,可做 zero-shot 或 fine-tune。
根据离线误差 + 方向准确率 + 回测收益(含 Sharpe、MaxDrawdown)综合评估并选模型。
实盘前用严格回测(包含费用/滑点/成交量限制)验证,并做 shadow 流量测试。
股票量化预测工程(可运行项目)
一套可直接在本地运行的 GitHub 风格项目结构,包含数据获取、预处理、特征工程、模型训练(ARIMA / XGBoost / LSTM / N-BEATS)、评估与回测脚本。
项目概览
stock_quant_pipeline_project/
├── README.md
├── requirements.txt
├── data/ # 存放示例数据(Parquet / CSV)
├── models/ # 训练好的模型文件
├── notebooks/ # 可选的实验 notebook
├── scripts/
│ └── run_pipeline.py # 从下载到评估的流水线入口
├── src/
│ ├── data_fetch.py
│ ├── preprocess.py
│ ├── features.py
│ ├── data_window.py
│ ├── models_arima.py
│ ├── models_xgb.py
│ ├── models_lstm.py
│ ├── models_nbeats.py
│ ├── train_lstm.py
│ ├── train_xgb.py
│ ├── train_nbeats.py
│ ├── eval.py
│ └── backtest.py
└── examples/└── run_demo.sh
快速开始(README 摘要)
创建虚拟环境并安装依赖:
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install -r requirements.txt
下载示例数据(以 AAPL 为例):
python -c "from src.data_fetch import fetch_yfinance; df=fetch_yfinance('AAPL', start='2015-01-01'); df.to_parquet('data/AAPL.parquet')"
运行完整流水线(包含训练和评估):
python scripts/run_pipeline.py --ticker AAPL
requirements.txt
pandas
numpy
matplotlib
seaborn
scikit-learn
statsmodels
pmdarima
yfinance
ta
xgboost
torch
tqdm
pyarrow
joblib
代码文件(核心)
下面把 src/ 目录下的核心脚本完整列出,按文件名分隔以便复制。
src/data_fetch.py
import yfinance as yf
import pandas as pd
def fetch_yfinance(ticker: str, start='2010-01-01', end=None, interval='1d'):df = yf.download(ticker, start=start, end=end, interval=interval, progress=False)if 'Adj Close' in df.columns:df = df.rename(columns={'Adj Close':'Adj_Close'})df = df.sort_index()return df
if __name__ == '__main__':df = fetch_yfinance('AAPL', start='2015-01-01')print(df.tail())
src/preprocess.py
import pandas as pd
import numpy as np
def basic_preprocess(df: pd.DataFrame):df = df.copy()if not isinstance(df.index, pd.DatetimeIndex):df.index = pd.to_datetime(df.index)df = df.sort_index()df = df.ffill().bfill()if 'Adj_Close' in df.columns:df['Close'] = df['Adj_Close']df['return'] = df['Close'].pct_change()df['log_return'] = np.log(df['Close']).diff()return df
if __name__ == '__main__':df = pd.read_parquet('data/AAPL.parquet')df = basic_preprocess(df)df.to_parquet('data/AAPL_preprocessed.parquet')
src/features.py
import pandas as pd
import numpy as np
import ta
def add_technical_features(df: pd.DataFrame):df = df.copy()df['rsi_14'] = ta.momentum.RSIIndicator(df['Close'], window=14).rsi()df['roc_9'] = ta.momentum.ROCIndicator(df['Close'], window=9).roc()df['ma_5'] = df['Close'].rolling(5).mean()df['ma_20'] = df['Close'].rolling(20).mean()df['ma_ratio'] = df['ma_5'] / df['ma_20']df['vol_10'] = df['log_return'].rolling(10).std()df['vol_ma_20'] = df['Volume'].rolling(20).mean()for lag in [1,2,3,5,10]:df[f'lag_close_{lag}'] = df['Close'].shift(lag)df[f'lag_ret_{lag}'] = df['log_return'].shift(lag)df['roll_mean_7'] = df['Close'].rolling(7).mean()df['roll_std_14'] = df['Close'].rolling(14).std()df = df.dropna()return df
if __name__ == '__main__':df = pd.read_parquet('data/AAPL_preprocessed.parquet')df = add_technical_features(df)df.to_parquet('data/AAPL_features.parquet')
src/data_window.py
import numpy as np
import pandas as pd
def create_windows(df, feature_cols, target_col='Close', window_size=30, horizon=1):X, y, dates = [], [], []arr = df[feature_cols].valuestarget = df[target_col].valuesfor i in range(window_size, len(df)-horizon+1):X.append(arr[i-window_size:i])y.append(target[i + horizon - 1])dates.append(df.index[i + horizon - 1])return np.array(X), np.array(y), np.array(dates)
src/models_arima.py
from statsmodels.tsa.arima.model import ARIMA
def train_arima(series, order=(5,1,0)):model = ARIMA(series, order=order)res = model.fit()return res
src/models_xgb.py
import xgboost as xgb
from sklearn.model_selection import train_test_split
def train_xgb(X, y, test_size=0.2, seed=42):X_train, X_val, y_train, y_val = train_test_split(X, y, shuffle=False, test_size=test_size)model = xgb.XGBRegressor(n_estimators=200, learning_rate=0.05, max_depth=5, random_state=seed)model.fit(X_train, y_train, eval_set=[(X_val,y_val)], early_stopping_rounds=20, verbose=False)return model
src/models_lstm.py
import torch
import torch.nn as nn
class LSTMRegressor(nn.Module):def __init__(self, input_size, hidden_size=64, num_layers=2, dropout=0.2):super().__init__()self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout)self.fc = nn.Linear(hidden_size, 1)def forward(self, x):out, (h, c) = self.lstm(x)return self.fc(out[:, -1, :]).squeeze(-1)
# training helper omitted; use train_lstm.py
src/models_nbeats.py
import torch
import torch.nn as nn
class NBeatsBlock(nn.Module):def __init__(self, input_size, theta_size, hidden_size=128):super().__init__()self.fc = nn.Sequential(nn.Linear(input_size, hidden_size),nn.ReLU(),nn.Linear(hidden_size, hidden_size),nn.ReLU(),nn.Linear(hidden_size, theta_size))def forward(self, x):theta = self.fc(x)backcast = theta[:, :x.shape[1]]forecast = theta[:, x.shape[1]:]return backcast, forecast
class NBeatsModel(nn.Module):def __init__(self, input_size, hidden_size=128, theta_size=None, num_blocks=3):super().__init__()if theta_size is None:theta_size = input_size * 2self.blocks = nn.ModuleList([NBeatsBlock(input_size, theta_size, hidden_size) for _ in range(num_blocks)])def forward(self, x):forecast = torch.zeros(x.size(0), x.size(1), device=x.device)residual = xfor block in self.blocks:backcast, block_forecast = block(residual)residual = residual - backcastforecast = forecast + block_forecastreturn forecast
src/train_lstm.py (简化训练脚本)
import torch
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn
from models_lstm import LSTMRegressor
def train_lstm_model(X, y, epochs=20, batch_size=64, lr=1e-3, device='cpu'):X_t = torch.tensor(X, dtype=torch.float32)y_t = torch.tensor(y, dtype=torch.float32)ds = TensorDataset(X_t, y_t)loader = DataLoader(ds, batch_size=batch_size, shuffle=False)model = LSTMRegressor(input_size=X.shape[2]).to(device)opt = torch.optim.Adam(model.parameters(), lr=lr)loss_fn = nn.MSELoss()for ep in range(epochs):model.train()total=0for xb, yb in loader:xb, yb = xb.to(device), yb.to(device)pred = model(xb)loss = loss_fn(pred, yb)opt.zero_grad(); loss.backward(); opt.step()total += loss.item() * xb.size(0)print(f"Epoch {ep} loss {total/len(ds):.6f}")return model
src/train_xgb.py
import numpy as np
from models_xgb import train_xgb
# expects X_flat, y
def train_xgb_pipeline(X, y):model = train_xgb(X, y)return model
src/train_nbeats.py (简化)
import torch
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn
from models_nbeats import NBeatsModel
def train_nbeats(X, y, epochs=30, batch_size=64, device='cpu'):X_t = torch.tensor(X, dtype=torch.float32)y_t = torch.tensor(y, dtype=torch.float32)ds = TensorDataset(X_t, y_t)loader = DataLoader(ds, batch_size=batch_size, shuffle=False)model = NBeatsModel(input_size=X.shape[1]).to(device)opt = torch.optim.Adam(model.parameters(), lr=1e-3)loss_fn = nn.MSELoss()for ep in range(epochs):model.train()total=0for xb, yb in loader:xb, yb = xb.to(device), yb.to(device)# N-BEATS expects flat vector input; flatten time dimxb_flat = xb.view(xb.size(0), -1)pred = model(xb_flat)# pred shape align simplificationloss = loss_fn(pred[:, -1], yb)opt.zero_grad(); loss.backward(); opt.step()total += loss.item() * xb.size(0)print(f"Epoch {ep} loss {total/len(ds):.6f}")return model
src/eval.py
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error
def regression_metrics(y_true, y_pred):rmse = np.sqrt(mean_squared_error(y_true, y_pred))mae = mean_absolute_error(y_true, y_pred)mape = np.mean(np.abs((y_true - y_pred) / (y_true + 1e-9))) * 100return {'RMSE':rmse, 'MAE':mae, 'MAPE':mape}
def direction_accuracy(y_true, y_pred):true_ret = np.sign(np.diff(y_true))pred_ret = np.sign(np.diff(y_pred))return (true_ret == pred_ret).mean()
src/backtest.py
import numpy as np
import pandas as pd
def simple_signal_backtest(dates, close_prices, pred_prices, initial_capital=100000, threshold=0.0):df = pd.DataFrame({'date':dates, 'close':close_prices, 'pred':pred_prices})df = df.reset_index(drop=True)df['pred_ret'] = df['pred'].pct_change().shift(-1)df['signal'] = (df['pred_ret'] > threshold).astype(int)capital = initial_capitalcapitals = [capital]for i in range(len(df)-1):if df['signal'].iloc[i] == 1:ret = (df['close'].iloc[i+1] / df['close'].iloc[i]) - 1capital = capital * (1 + ret)capitals.append(capital)returns = pd.Series(capitals).pct_change().fillna(0)cumulative_return = capital / initial_capital - 1sharpe = returns.mean() / (returns.std()+1e-9) * np.sqrt(252) if returns.std()>0 else np.nanreturn {'cumulative_return': cumulative_return, 'sharpe': sharpe, 'equity_curve': capitals}
scripts/run_pipeline.py
import argparse
import pandas as pd
import numpy as np
from src.data_fetch import fetch_yfinance
from src.preprocess import basic_preprocess
from src.features import add_technical_features
from src.data_window import create_windows
from src.train_xgb import train_xgb_pipeline
from src.train_lstm import train_lstm_model
from src.train_nbeats import train_nbeats
from src.eval import regression_metrics, direction_accuracy
from src.backtest import simple_signal_backtest
def main(ticker='AAPL'):print('Fetch data...')df = fetch_yfinance(ticker, start='2015-01-01')df = basic_preprocess(df)df = add_technical_features(df)df.to_parquet(f'data/{ticker}_features.parquet')feature_cols = ['log_return','rsi_14','ma_5','ma_20','vol_10'] + [f'lag_ret_{l}' for l in [1,2,3]]X, y, dates = create_windows(df, feature_cols, window_size=30)# XGBoost (flatten)ns, T, F = X.shapeX_flat = X.reshape(ns, T*F)print('Train XGBoost...')xgb = train_xgb_pipeline(X_flat, y)y_pred_xgb = xgb.predict(X_flat[-len(y)//5:])y_true = y[-len(y)//5:]print('XGB metrics:', regression_metrics(y_true, y_pred_xgb))print('XGB dir acc:', direction_accuracy(y_true, y_pred_xgb))# LSTMprint('Train LSTM...')lstm = train_lstm_model(X[:-len(y)//5], y[:-len(y)//5], epochs=5)# inference (very simplified)import torchX_t = torch.tensor(X[-len(y)//5:], dtype=torch.float32)y_pred_lstm = lstm(X_t).detach().numpy()print('LSTM metrics:', regression_metrics(y_true, y_pred_lstm))print('LSTM dir acc:', direction_accuracy(y_true, y_pred_lstm))# backtest using xgb predsbt = simple_signal_backtest(dates[-len(y)//5:], y_true, y_pred_xgb)print('Backtest XGB:', bt['cumulative_return'], bt['sharpe'])
if __name__ == '__main__':main('AAPL')
许可与注意事项
本项目仅用于教学与研究用途。金融市场有风险,回测结果不能作为投资建议。
爬虫抓取须遵守目标网站的 robots 协议与使用条款,生产环境建议使用正规 API。