Code
%load_ext autoreload
%autoreload 2May 4, 2026
May 15, 2026
This notebook compares training targets for a single-ticker 09:30 signal that decides whether to take a long position through 10:30. The objective is not the best row-level label fit; it is the most useful out-of-sample trading signal under the same features, model families, and deployment rule.
Recommendation: carry a compact shortlist forward rather than selecting a permanent target from this synthetic study alone.
| Role | Target | Current read |
|---|---|---|
| Default mean-Sharpe leader | Trend t-stat |
Best average daily Sharpe for both Ridge and HistGBRT, but gaps are small |
| Robust / cost-aware alternative | MAE-penalized return |
Close runner-up with lower trade rate and explicit path-pain control |
| Event-time alternative | Barrier reward |
Useful when early actionable moves matter more than clean one-hour drift |
| Conservative lens | Tradeability score |
Not a primary alpha target here; useful as a selective gating benchmark |
The decision message is deliberately conservative: the evidence narrows target choice, but final promotion should require net-cost, policy, and shifted-regime survival on non-synthetic data.
A target is better only if it improves the deployed trading rule after fitting the same features with the same model family. The rule scores the ticker at 09:30, skips non-positive scores, sizes positive scores up to 1.0, exits at 10:30, and measures gross PnL.
The primary metric is mean daily Sharpe of gross PnL. Mean daily PnL, trade rate, positive-run rate, drawdown, cost sensitivity, and shifted-regime behavior are supporting checks.
Scope limits: this is single-ticker, long/skip only, gross of transaction costs and execution uncertainty, and based on synthetic controlled regimes rather than live historical validation.
from lets_plot import *
LetsPlot.setup_html()
LetsPlot.set_theme(
theme_minimal()
+ theme(
axis_text=element_text(size=11),
axis_title=element_text(size=12),
plot_title=element_text(face='bold', size=14),
legend_title=element_text(size=11),
legend_text=element_text(size=10),
)
)
facet_separator_theme = theme(
panel_spacing=5.0,
panel_border=element_rect(color='#c8ced8', size=5.0),
panel_border_ontop=True,
)from __future__ import annotations
from datetime import date, datetime, time, timedelta
from itertools import product
from pathlib import Path
import numpy as np
import polars as pl
from scipy.stats import spearmanr
from sklearn.ensemble import HistGradientBoostingRegressor
from sklearn.linear_model import Ridge
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from trading_research.target_engineering import trend_scanning_tstat
from trading_research.time_aware_cross_validation import DateEmbargoSplit
from IPython.display import display, Markdown
pl.Config.set_tbl_rows(14)
pl.Config.set_tbl_cols(30)
def plot_size(width: int, height: int):
return ggsize(width * 2, height * 2)CONFIG = {
'scored_days': 320,
'warmup_days': 20,
'session_minutes': 390,
'primary_horizon': 60,
'trend_min_horizon': 10,
'trend_max_horizon': 60,
'barrier_horizons': [15, 30, 60],
'stop_losses': [0.004, 0.008, 0.012],
'take_profits': [0.006, 0.010, 0.015],
'profit_factor_eps': 1e-6,
'competing_risk_tau': 15.0,
'mae_penalty_lambda': 0.5,
'downside_risk_eps': 1e-5,
'downside_target_clip': 20.0,
'tradeability_return_threshold': 0.0,
'tradeability_return_temp': 0.0020,
'tradeability_mae_temp': 0.0015,
'n_splits': 8,
'pre_embargo_days': 5,
'min_train_days': 60,
'bootstrap_reps': 2000,
'sample_path_days': 8,
}
CONFIG['tradeability_mae_cap'] = min(CONFIG['stop_losses'])
SEEDS = [
7, 11, 19, 23, 29, 31, 43, 47, 59, 71,
83, 97, 101, 109, 127, 149, 163, 181, 211, 239,
251, 263, 277, 293, 307, 331, 347, 359, 379, 397,
419, 433, 449, 467, 487, 503, 521, 541, 563, 587,
601, 613, 631, 647, 659, 673, 691, 709, 727, 739,
757, 773, 787, 809, 827, 839, 853, 877, 887, 907,
919, 937, 953, 967, 983, 997, 1013, 1031, 1049, 1061,
]
SEED_BUDGETS = [8, 16, 24, 40, 60]
SCENARIOS = [
{
'scenario_name': 'stable_balanced',
'scenario_kind': 'stable',
'event_scale': 1.00,
'observability': 1.00,
'vol_scale': 1.00,
'jump_scale': 1.00,
'decay_scale': 1.00,
'carry_persistence': 0.72,
'shift_start_frac': 0.70,
'late_event_scale_mult': 1.00,
'late_observability_mult': 1.00,
'late_vol_mult': 1.00,
'late_jump_mult': 1.00,
'late_decay_mult': 1.00,
},
{
'scenario_name': 'stable_noisy',
'scenario_kind': 'stable',
'event_scale': 0.90,
'observability': 0.88,
'vol_scale': 1.20,
'jump_scale': 1.30,
'decay_scale': 1.10,
'carry_persistence': 0.68,
'shift_start_frac': 0.70,
'late_event_scale_mult': 1.00,
'late_observability_mult': 1.00,
'late_vol_mult': 1.00,
'late_jump_mult': 1.00,
'late_decay_mult': 1.00,
},
{
'scenario_name': 'shift_signal_break',
'scenario_kind': 'shifted',
'event_scale': 1.00,
'observability': 0.98,
'vol_scale': 1.05,
'jump_scale': 1.10,
'decay_scale': 1.05,
'carry_persistence': 0.72,
'shift_start_frac': 0.58,
'late_event_scale_mult': 0.62,
'late_observability_mult': 0.58,
'late_vol_mult': 1.20,
'late_jump_mult': 1.35,
'late_decay_mult': 1.45,
},
{
'scenario_name': 'shift_jump_stress',
'scenario_kind': 'shifted',
'event_scale': 0.95,
'observability': 0.92,
'vol_scale': 1.10,
'jump_scale': 1.20,
'decay_scale': 1.10,
'carry_persistence': 0.67,
'shift_start_frac': 0.55,
'late_event_scale_mult': 0.70,
'late_observability_mult': 0.68,
'late_vol_mult': 1.45,
'late_jump_mult': 1.85,
'late_decay_mult': 1.35,
},
]
NEW_TARGET_COLS = [
'target_competing_risk_hit',
'target_mae_penalized_rate_h60',
'target_downside_adj_rate_h60',
'target_tradeability_score',
]
TARGET_COLS = [
'target_fixed_rate_h60',
'target_path_mean_rate_h60',
'target_profit_factor_h60',
'target_trend_slope',
'target_trend_tstat',
'target_barrier_rate',
*NEW_TARGET_COLS,
]
MODEL_FAMILIES = ['ridge', 'hist_gbrt']
SHORTLIST_LABELS = ['Trend t-stat', 'MAE-penalized return', 'Barrier reward', 'Tradeability score']
CACHE_DIR = Path('data/eda_01_target_engineering_v4')
CACHE_DIR.mkdir(parents=True, exist_ok=True)
def cache_path(name: str) -> Path:
return CACHE_DIR / f'{name}.parquet'
def load_cached_parquet(name: str) -> pl.DataFrame | None:
path = cache_path(name)
if not path.exists():
return None
try:
print(f'Loading cache: {path}')
return pl.read_parquet(path)
except Exception as exc:
print(f'Ignoring corrupt cache: {path} ({exc})')
path.unlink(missing_ok=True)
return None
def write_cached_parquet(df: pl.DataFrame, name: str) -> pl.DataFrame:
path = cache_path(name)
df.write_parquet(path, compression='zstd')
print(f'Saved cache: {path}')
return dftarget_method_reference = pl.DataFrame([
{
'method_name': 'target_fixed_rate_h60',
'emphasis': 'Terminal move at 60m',
'path_preference': 'Can tolerate noisy path if end result is strong',
'strength': 'Directly aligned with a terminal-horizon objective',
'risk': 'Can overvalue late reversals that look weak intrawindow',
},
{
'method_name': 'target_path_mean_rate_h60',
'emphasis': 'Average path quality through 60m',
'path_preference': 'Rewards smoother follow-through',
'strength': 'Less terminal-print dependent',
'risk': 'May underweight sharp late accelerations',
},
{
'method_name': 'target_profit_factor_h60',
'emphasis': 'Upside bar mass vs downside bar mass',
'path_preference': 'Prefers many or large up bars with limited down-bar drag',
'strength': 'Captures minute-level path asymmetry without terminal-only dependence',
'risk': 'Can overreward grind-up paths and underweight late convex breakouts',
},
{
'method_name': 'target_trend_slope',
'emphasis': 'Persistent directional drift',
'path_preference': 'Prefers linear trend-like movement',
'strength': 'Often helpful when edge decays smoothly',
'risk': 'Can miss convex or fast-jump paths',
},
{
'method_name': 'target_trend_tstat',
'emphasis': 'Drift plus path cleanliness',
'path_preference': 'Prefers lower-noise trends',
'strength': 'Can suppress noisy false positives',
'risk': 'Can underweight large but volatile opportunities',
},
{
'method_name': 'target_barrier_rate',
'emphasis': 'Early actionable path hits',
'path_preference': 'Sensitive to fast take-profit / stop behavior',
'strength': 'Captures timing asymmetry',
'risk': 'Threshold choices can dominate behavior',
},
{
'method_name': 'target_competing_risk_hit',
'emphasis': 'Which barrier hits first and how fast',
'path_preference': 'Prefers fast take-profit hits and punishes fast stop hits',
'strength': 'Directly matches first-hit event timing',
'risk': 'Barrier and decay choices can dominate behavior',
},
{
'method_name': 'target_mae_penalized_rate_h60',
'emphasis': 'Terminal return minus downside pain',
'path_preference': 'Rewards winners that avoid deep early drawdown',
'strength': 'Makes adverse excursion explicit in the label',
'risk': 'Penalty weight can overpunish volatile winners',
},
{
'method_name': 'target_downside_adj_rate_h60',
'emphasis': 'Return per unit of downside path risk',
'path_preference': 'Prefers upside earned with limited downside semivolatility',
'strength': 'Makes downside-aware path quality explicit',
'risk': 'Needs clipping when downside risk is near zero',
},
{
'method_name': 'target_tradeability_score',
'emphasis': 'Was the day worth trading at all?',
'path_preference': 'Prefers positive outcomes that stay inside a pain budget',
'strength': 'Closest to the trade-or-skip deployment rule',
'risk': 'Threshold and smoothing choices shape the label',
},
]
)target_method_reference.select([
pl.col('method_name').replace({
'target_fixed_rate_h60': 'Fixed 60m return',
'target_path_mean_rate_h60': 'Path mean return',
'target_profit_factor_h60': 'Profit factor',
'target_trend_slope': 'Trend slope',
'target_trend_tstat': 'Trend t-stat',
'target_barrier_rate': 'Barrier reward',
'target_competing_risk_hit': 'Competing-risk hit',
'target_mae_penalized_rate_h60': 'MAE-penalized return',
'target_downside_adj_rate_h60': 'Downside-adjusted return',
'target_tradeability_score': 'Tradeability score',
}
).alias('Target'),
pl.col('emphasis').alias('What it rewards'),
pl.col('path_preference').alias('Preferred path shape'),
pl.col('risk').alias('Where it can fail'),
]
)| Loading ITables v2.7.3 from the internet... (need help?) |
The ten labels reduce to four practical target families. The full target dictionary remains below for reproducibility, but the interpretation should focus on families rather than treating all ten labels as independent bets.
| Family | Representative targets | What it tests |
|---|---|---|
| Terminal / path return | Fixed 60m return, path mean, profit factor, downside-adjusted, MAE-penalized | Whether the model should learn the size and path quality of the hour-long move |
| Trend cleanliness | Trend slope, trend t-stat | Whether clean directional drift is easier to monetize than raw return |
| Event-time behavior | Barrier reward, competing-risk hit | Whether early take-profit / stop behavior matters more than terminal return |
| Selective gating | Tradeability score | Whether the model should learn when not to trade |
Each target is trained on the same feature set with Ridge and HistGBRT. The out-of-sample test uses walk-forward validation with a five-day pre-validation embargo. The reference analysis has six effective validation folds after warmup, minimum-history, and embargo constraints.
Synthetic days are rerun across multiple seeds and four regimes: two stable regimes and two shifted regimes where observability falls and volatility/jump risk rises. Stable results show learnability; shifted results show fragility.
Two idealized benchmarks are used only for calibration. Oracle uses the simulator’s hidden expected edge, and Perfect uses the realized 09:30 to 10:30 return. They are not tradable target candidates; their plots remain in the appendix.
def consecutive_dates(start_date: date, n_days: int) -> list[date]:
return [start_date + timedelta(days=offset) for offset in range(n_days)]
def logistic(x: np.ndarray | float) -> np.ndarray | float:
return 1.0 / (1.0 + np.exp(-x))
def rank_ic(y_true: np.ndarray, score: np.ndarray) -> float:
mask = np.isfinite(y_true) & np.isfinite(score)
if mask.sum() < 3:
return float('nan')
stat = spearmanr(y_true[mask], score[mask]).statistic
return float(stat) if stat is not None else float('nan')
def path_mean_rate(cumulative_log_returns: np.ndarray) -> float:
horizons = np.arange(1, cumulative_log_returns.shape[0] + 1, dtype=float)
return float(np.mean(cumulative_log_returns / horizons))
def profit_factor_log_reward(step_log_returns: np.ndarray, eps: float) -> float:
up_sum = float(np.maximum(step_log_returns, 0.0).sum())
down_sum = float(np.maximum(-step_log_returns, 0.0).sum())
return float(np.log((eps + up_sum) / (eps + down_sum)))
def max_adverse_excursion(cumulative_log_returns: np.ndarray) -> float:
return float(max(0.0, -float(np.min(cumulative_log_returns))))
def first_barrier_event(
cumulative_log_returns: np.ndarray,
stop_loss: float,
take_profit: float,
horizon: int,
) -> tuple[str, int, float]:
path = cumulative_log_returns[:horizon]
hit_take = np.flatnonzero(path >= take_profit)
hit_stop = np.flatnonzero(path <= -stop_loss)
first_take = int(hit_take[0] + 1) if hit_take.size else horizon + 1
first_stop = int(hit_stop[0] + 1) if hit_stop.size else horizon + 1
exit_minute = min(first_take, first_stop)
if exit_minute > horizon:
return 'none', horizon, float(path[-1])
if first_take < first_stop:
return 'take', exit_minute, float(take_profit)
return 'stop', exit_minute, float(-stop_loss)
def barrier_reward_rate(
cumulative_log_returns: np.ndarray,
stop_losses: list[float],
take_profits: list[float],
horizons: list[int],
) -> float:
rewards = []
for stop_loss, take_profit, horizon in product(stop_losses, take_profits, horizons):
_, exit_minute, realized_reward = first_barrier_event(
cumulative_log_returns,
stop_loss=stop_loss,
take_profit=take_profit,
horizon=horizon,
)
rewards.append(realized_reward / exit_minute)
return float(np.mean(rewards))
def competing_risk_hit_score(
cumulative_log_returns: np.ndarray,
stop_losses: list[float],
take_profits: list[float],
horizons: list[int],
tau: float,
) -> float:
scores = []
tau_safe = max(float(tau), 1e-8)
for stop_loss, take_profit, horizon in product(stop_losses, take_profits, horizons):
outcome, exit_minute, _ = first_barrier_event(
cumulative_log_returns,
stop_loss=stop_loss,
take_profit=take_profit,
horizon=horizon,
)
if outcome == 'take':
score = float(np.exp(-exit_minute / tau_safe))
elif outcome == 'stop':
score = float(-np.exp(-exit_minute / tau_safe))
else:
score = 0.0
scores.append(score)
return float(np.mean(scores))
def mae_penalized_rate(cumulative_log_returns: np.ndarray, horizon: int, penalty_lambda: float) -> float:
mae = max_adverse_excursion(cumulative_log_returns)
return float((float(cumulative_log_returns[-1]) - penalty_lambda * mae) / horizon)
def downside_semivol(step_log_returns: np.ndarray) -> float:
downside = np.minimum(np.asarray(step_log_returns, dtype=float), 0.0)
return float(np.sqrt(np.mean(downside * downside)))
def downside_adjusted_return_rate(terminal_rate: float, step_log_returns: np.ndarray, eps: float, clip: float) -> float:
downside_risk = downside_semivol(step_log_returns)
raw_value = terminal_rate / max(float(eps), downside_risk)
return float(np.clip(raw_value, -clip, clip))
def tradeability_score(
terminal_log_return: float,
max_adverse_excursion_value: float,
return_threshold: float,
return_temp: float,
mae_cap: float,
mae_temp: float,
) -> float:
return_component = float(logistic((terminal_log_return - return_threshold) / max(return_temp, 1e-8)))
pain_component = float(logistic((mae_cap - max_adverse_excursion_value) / max(mae_temp, 1e-8)))
return float(2.0 * (return_component * pain_component) - 1.0)
def ols_tstat(y: np.ndarray) -> tuple[float, float]:
x = np.arange(y.shape[0], dtype=float)
x_centered = x - x.mean()
y_centered = y - y.mean()
ss_x = float(np.dot(x_centered, x_centered))
if ss_x <= 0.0:
return float('nan'), float('nan')
slope = float(np.dot(x_centered, y_centered) / ss_x)
intercept = float(y.mean() - slope * x.mean())
residuals = y - (intercept + slope * x)
dof = y.shape[0] - 2
if dof <= 0:
return float('nan'), slope
rss = float(np.dot(residuals, residuals))
if rss <= 0.0:
return float('nan'), slope
se = float(np.sqrt((rss / dof) / ss_x))
if se == 0.0:
return float('nan'), slope
return slope / se, slope
def sharpe_ratio(values: np.ndarray) -> float:
arr = np.asarray(values, dtype=float)
arr = arr[np.isfinite(arr)]
if arr.size < 2:
return float('nan')
std = float(np.std(arr, ddof=1))
if std == 0.0:
return float('nan')
return float(np.mean(arr) / std)
def sortino_ratio(values: np.ndarray) -> float:
arr = np.asarray(values, dtype=float)
arr = arr[np.isfinite(arr)]
if arr.size < 2:
return float('nan')
downside = arr[arr < 0.0]
if downside.size < 2:
return float('nan')
downside_std = float(np.std(downside, ddof=1))
if downside_std == 0.0:
return float('nan')
return float(np.mean(arr) / downside_std)
def max_drawdown_bps(values: np.ndarray) -> float:
arr = np.asarray(values, dtype=float)
arr = np.nan_to_num(arr, nan=0.0)
equity = np.cumsum(arr)
peak = np.maximum.accumulate(equity)
drawdown = equity - peak
return float(-np.min(drawdown))
def bootstrap_mean_ci(values: np.ndarray, reps: int, seed: int) -> tuple[float, float]:
arr = np.asarray(values, dtype=float)
arr = arr[np.isfinite(arr)]
if arr.size == 0:
return float('nan'), float('nan')
rng = np.random.default_rng(seed)
sample_idx = rng.integers(0, arr.size, size=(reps, arr.size))
draws = arr[sample_idx].mean(axis=1)
return float(np.quantile(draws, 0.025)), float(np.quantile(draws, 0.975))
def finite_mean(values: np.ndarray) -> float:
arr = np.asarray(values, dtype=float)
arr = arr[np.isfinite(arr)]
return float(np.mean(arr)) if arr.size else float('nan')
def finite_median(values: np.ndarray) -> float:
arr = np.asarray(values, dtype=float)
arr = arr[np.isfinite(arr)]
return float(np.median(arr)) if arr.size else float('nan')
def finite_std(values: np.ndarray) -> float:
arr = np.asarray(values, dtype=float)
arr = arr[np.isfinite(arr)]
return float(np.std(arr)) if arr.size else float('nan')
def finite_count(values: np.ndarray) -> int:
return int(np.isfinite(np.asarray(values, dtype=float)).sum())
def safe_ratio(numerator: pl.Expr, denominator: pl.Expr) -> pl.Expr:
return pl.when(denominator.is_finite() & (denominator > 0.0)).then(numerator / denominator).otherwise(None)
def paired_bootstrap_diff(
left: np.ndarray,
right: np.ndarray,
reps: int,
seed: int,
) -> tuple[float, float, float, float]:
left_arr = np.asarray(left, dtype=float)
right_arr = np.asarray(right, dtype=float)
mask = np.isfinite(left_arr) & np.isfinite(right_arr)
diff = left_arr[mask] - right_arr[mask]
if diff.size == 0:
return float('nan'), float('nan'), float('nan'), float('nan')
rng = np.random.default_rng(seed)
sample_idx = rng.integers(0, diff.size, size=(reps, diff.size))
draws = diff[sample_idx].mean(axis=1)
return (
float(np.mean(diff)),
float(np.quantile(draws, 0.025)),
float(np.quantile(draws, 0.975)),
float(np.mean(draws > 0.0)),
)
def make_model(model_family: str, seed: int):
if model_family == 'ridge':
return Pipeline([
('scaler', StandardScaler()),
('model', Ridge(alpha=1.0)),
]
)
if model_family == 'hist_gbrt':
return HistGradientBoostingRegressor(
learning_rate=0.05,
max_depth=3,
max_iter=90,
min_samples_leaf=16,
l2_regularization=0.1,
random_state=seed,
)
raise KeyError(model_family)
def reference_scale(scores: np.ndarray) -> float:
arr = np.asarray(scores, dtype=float)
positive = arr[arr > 0.0]
if positive.size >= 5:
scale = float(np.quantile(positive, 0.9))
elif positive.size > 0:
scale = float(np.max(positive))
else:
scale = float(np.quantile(np.abs(arr), 0.9))
return max(scale, 1e-8)
def position_size_from_score(score: np.ndarray, scale: float) -> np.ndarray:
clipped = np.maximum(np.asarray(score, dtype=float), 0.0)
return np.clip(clipped / scale, 0.0, 1.0)
def simulate_single_ticker_panel(config: dict, scenario: dict, seed: int) -> tuple[pl.DataFrame, pl.DataFrame]:
rng = np.random.default_rng(seed)
total_days = config['warmup_days'] + config['scored_days']
trade_dates = consecutive_dates(date(2024, 1, 1), total_days)
n_minutes = config['session_minutes']
minute_idx = np.arange(n_minutes, dtype=float)
vol_profile = 0.80 + 0.95 * np.cos(np.pi * minute_idx / (n_minutes - 1)) ** 2
volume_profile = 0.92 + 1.25 * np.cos(np.pi * minute_idx / (n_minutes - 1)) ** 2
session_open = time(9, 30)
shift_start = config['warmup_days'] + int(config['scored_days'] * scenario['shift_start_frac'])
panel_rows: list[dict[str, object]] = []
sample_rows: list[dict[str, object]] = []
sample_day_idx = set(
np.linspace(
config['warmup_days'],
total_days - 1,
num=min(config['sample_path_days'], total_days - config['warmup_days']),
dtype=int,
).tolist()
)
ticker = 'SIM'
ticker_quality = float(rng.normal(0.0, 1.0))
ticker_liquidity = float(np.clip(0.2 + 0.7 * rng.beta(4, 2), 0.05, 0.98))
prev_close = float(20.0 + 120.0 * rng.random())
carry_state = float(rng.normal(0.0, 0.25))
vol_state = float(np.clip(rng.normal(1.0, 0.12), 0.65, 1.8))
for day_idx, trade_date in enumerate(trade_dates):
shifted_flag = day_idx >= shift_start
event_scale_live = scenario['event_scale'] * (scenario['late_event_scale_mult'] if shifted_flag else 1.0)
observability_live = scenario['observability'] * (scenario['late_observability_mult'] if shifted_flag else 1.0)
vol_scale_live = scenario['vol_scale'] * (scenario['late_vol_mult'] if shifted_flag else 1.0)
jump_scale_live = scenario['jump_scale'] * (scenario['late_jump_mult'] if shifted_flag else 1.0)
decay_scale_live = scenario['decay_scale'] * (scenario['late_decay_mult'] if shifted_flag else 1.0)
latent_signal = float(0.60 * carry_state + 0.22 * ticker_quality + rng.normal(0.0, 0.70))
latent_sentiment = float(np.clip(latent_signal + rng.normal(0.0, 0.35), -3.0, 3.0))
latent_direction = float(
np.clip(0.10 + 0.82 * logistic(abs(latent_signal) + rng.normal(0.0, 0.55)), 0.05, 0.99)
)
latent_materiality = float(
np.clip(0.12 + 0.78 * logistic(abs(latent_signal) + rng.normal(0.0, 0.80)), 0.05, 0.99)
)
latent_novelty = float(np.clip(rng.beta(2.0, 3.0), 0.05, 0.99))
true_edge = float(
np.tanh(latent_sentiment)
* (0.35 + 0.65 * latent_direction)
* (0.45 + 0.55 * latent_materiality)
)
sentiment_score = float(
np.clip(latent_sentiment + rng.normal(0.0, 0.60 / max(observability_live, 0.15)), -3.0, 3.0)
)
direction_strength = float(
np.clip(latent_direction + rng.normal(0.0, 0.12 / max(observability_live, 0.15)), 0.05, 0.99)
)
materiality = float(
np.clip(latent_materiality + rng.normal(0.0, 0.12 / max(observability_live, 0.15)), 0.05, 0.99)
)
novelty = float(np.clip(latent_novelty + rng.normal(0.0, 0.10 / max(observability_live, 0.15)), 0.05, 0.99))
signed_event = float(
np.tanh(sentiment_score)
* (0.35 + 0.65 * direction_strength)
* (0.45 + 0.55 * materiality)
)
overnight_gap = float(0.0005 * carry_state + rng.normal(0.0, 0.0032 * (1.1 - 0.45 * ticker_liquidity)))
day_open = float(prev_close * np.exp(overnight_gap))
gap_ret = float(np.log(day_open / prev_close))
decay = float(0.0095 * decay_scale_live * (1.0 + 0.55 * (1.0 - latent_direction) + 0.20 * rng.random()))
baseline_drift = float(0.000004 * carry_state)
event_alpha_0 = float(0.00011 * event_scale_live * true_edge + 0.000018 * carry_state)
alpha_path = event_alpha_0 * np.exp(-decay * minute_idx)
sigma_level = float((0.00055 + 0.00042 * vol_state + 0.00016 * (1.0 - ticker_liquidity)) * vol_scale_live)
sigma_path = sigma_level * vol_profile
jump_prob = float((0.0010 + 0.0040 * latent_materiality * (1.0 - ticker_liquidity)) * jump_scale_live)
shocks = rng.normal(0.0, sigma_path)
jumps = rng.normal(0.0, sigma_path * 1.8) * (rng.random(n_minutes) < jump_prob)
log_returns = baseline_drift + alpha_path + shocks + jumps
log_close = np.log(day_open) + np.cumsum(log_returns)
close_prices = np.exp(log_close)
open_prices = np.concatenate(([day_open], close_prices[:-1]))
wick = np.maximum(0.00018, np.abs(log_returns) * 0.55 + rng.uniform(0.0, sigma_path * 1.4))
high_prices = np.maximum(open_prices, close_prices) * (1.0 + wick)
low_prices = np.minimum(open_prices, close_prices) * np.maximum(1e-9, 1.0 - wick)
dollar_volume = (35_000.0 + 205_000.0 * ticker_liquidity) * (1.0 + 1.6 * latent_materiality) * volume_profile
dollar_volume = dollar_volume * rng.lognormal(mean=-0.10, sigma=0.34, size=n_minutes)
share_volume = np.maximum(100.0, dollar_volume / np.maximum(close_prices, 1.0)).astype(int)
primary_step_log_returns = log_returns[: config['primary_horizon']]
primary_cum = np.cumsum(primary_step_log_returns)
primary_path = np.concatenate(([day_open], close_prices[: config['primary_horizon']]))
primary_trend = trend_scanning_tstat(
primary_path,
min_horizon=config['trend_min_horizon'],
max_horizon=config['trend_max_horizon'],
step=5,
)
full_session_log = np.concatenate(([np.log(day_open)], log_close))
day_tstat, day_slope = ols_tstat(full_session_log)
target_fixed_rate_h60 = float(primary_cum[-1] / config['primary_horizon'])
target_path_mean_rate_h60 = path_mean_rate(primary_cum)
target_profit_factor_h60 = profit_factor_log_reward(primary_step_log_returns, eps=config['profit_factor_eps'])
target_trend_slope = float(primary_trend.slope[0])
target_trend_tstat = float(primary_trend.t_value[0])
target_barrier_rate = barrier_reward_rate(
primary_cum,
stop_losses=config['stop_losses'],
take_profits=config['take_profits'],
horizons=config['barrier_horizons'],
)
max_adverse_excursion_h60 = max_adverse_excursion(primary_cum)
target_competing_risk_hit = competing_risk_hit_score(
primary_cum,
stop_losses=config['stop_losses'],
take_profits=config['take_profits'],
horizons=config['barrier_horizons'],
tau=config['competing_risk_tau'],
)
target_mae_penalized_rate_h60 = mae_penalized_rate(
primary_cum,
horizon=config['primary_horizon'],
penalty_lambda=config['mae_penalty_lambda'],
)
target_downside_adj_rate_h60 = downside_adjusted_return_rate(
target_fixed_rate_h60,
primary_step_log_returns,
eps=config['downside_risk_eps'],
clip=config['downside_target_clip'],
)
target_tradeability_score = tradeability_score(
terminal_log_return=float(primary_cum[-1]),
max_adverse_excursion_value=max_adverse_excursion_h60,
return_threshold=config['tradeability_return_threshold'],
return_temp=config['tradeability_return_temp'],
mae_cap=config['tradeability_mae_cap'],
mae_temp=config['tradeability_mae_temp'],
)
realized_return_h60 = float(np.expm1(primary_cum[-1]))
realized_return_h60_bps = float(10_000.0 * realized_return_h60)
oracle_rate_h60 = float((baseline_drift + alpha_path[: config['primary_horizon']]).mean())
day_close = float(close_prices[-1])
daily_high = float(high_prices.max())
daily_low = float(low_prices.min())
daily_oc_ret = float(np.log(day_close / day_open))
daily_cc_ret = float(np.log(day_close / prev_close))
daily_range = float(np.log(daily_high / daily_low))
daily_realized_vol = float(np.std(log_returns))
daily_volume = int(share_volume.sum())
daily_close_loc = float((day_close - daily_low) / max(daily_high - daily_low, 1e-9))
panel_rows.append({
'scenario_name': scenario['scenario_name'],
'scenario_kind': scenario['scenario_kind'],
'seed': seed,
'run_id': f"{scenario['scenario_name']}|seed={seed}",
'ticker': ticker,
'day_idx': day_idx,
'trade_date': trade_date,
'publish_ts': datetime.combine(trade_date, session_open),
'event_time': '09:30',
'scored_flag': day_idx >= config['warmup_days'],
'shifted_flag': shifted_flag,
'shift_phase': 'shifted' if shifted_flag else 'base',
'event_scale_live': event_scale_live,
'observability_live': observability_live,
'vol_scale_live': vol_scale_live,
'jump_scale_live': jump_scale_live,
'decay_scale_live': decay_scale_live,
'sentiment_score': sentiment_score,
'direction_strength': direction_strength,
'materiality': materiality,
'novelty': novelty,
'signed_event': signed_event,
'gap_ret': gap_ret,
'observed_liquidity_score': float(
np.clip(ticker_liquidity + rng.normal(0.0, 0.05 / max(observability_live, 0.20)), 0.02, 1.0)
),
'observed_vol_state': float(
np.clip(vol_state + rng.normal(0.0, 0.10 / max(observability_live, 0.20)), 0.5, 3.0)
),
'daily_oc_ret': daily_oc_ret,
'daily_cc_ret': daily_cc_ret,
'daily_range': daily_range,
'daily_realized_vol': daily_realized_vol,
'daily_volume': daily_volume,
'daily_close_loc': daily_close_loc,
'daily_trend_tstat': float(day_tstat),
'daily_trend_slope': float(day_slope),
'realized_return_h60': realized_return_h60,
'realized_return_h60_bps': realized_return_h60_bps,
'max_adverse_excursion_h60': max_adverse_excursion_h60,
'oracle_rate_h60': oracle_rate_h60,
'target_fixed_rate_h60': target_fixed_rate_h60,
'target_path_mean_rate_h60': target_path_mean_rate_h60,
'target_profit_factor_h60': target_profit_factor_h60,
'target_trend_slope': target_trend_slope,
'target_trend_tstat': target_trend_tstat,
'target_barrier_rate': target_barrier_rate,
'target_competing_risk_hit': target_competing_risk_hit,
'target_mae_penalized_rate_h60': target_mae_penalized_rate_h60,
'target_downside_adj_rate_h60': target_downside_adj_rate_h60,
'target_tradeability_score': target_tradeability_score,
'latent_true_edge': true_edge,
'latent_event_alpha_0': event_alpha_0,
'latent_carry_state': carry_state,
}
)
if day_idx in sample_day_idx:
sample_id = f"{scenario['scenario_name']} | day {day_idx:03d}"
session_start = datetime.combine(trade_date, session_open)
sample_rows.extend(
{
'scenario_name': scenario['scenario_name'],
'day_idx': day_idx,
'sample_id': sample_id,
'ts': session_start + timedelta(minutes=int(i)),
'close': float(close_prices[i]),
'latent_alpha_per_min': float(alpha_path[i]),
'shift_phase': 'shifted' if shifted_flag else 'base',
}
for i in range(n_minutes)
)
carry_state = float(scenario['carry_persistence'] * carry_state + 0.28 * true_edge + rng.normal(0.0, 0.20))
vol_state = float(np.clip(
0.78 * vol_state + 0.22 * (0.75 + 0.85 * latent_materiality + 0.30 * abs(true_edge)) + abs(
rng.normal(0.0, 0.05)
), 0.6, 2.8
)
)
prev_close = day_close
return pl.DataFrame(panel_rows), pl.DataFrame(sample_rows)
def build_feature_frame(panel: pl.DataFrame) -> tuple[pl.DataFrame, list[str]]:
frame = panel.sort('day_idx').with_columns(
pl.col('daily_volume').log1p().alias('log_daily_volume')
).with_columns([
pl.col('sentiment_score').alias('feature_sentiment_score'),
pl.col('direction_strength').alias('feature_direction_strength'),
pl.col('materiality').alias('feature_materiality'),
pl.col('novelty').alias('feature_novelty'),
pl.col('signed_event').alias('feature_signed_event'),
pl.col('gap_ret').alias('feature_gap_ret'),
pl.col('observed_liquidity_score').alias('feature_liquidity_score'),
pl.col('observed_vol_state').alias('feature_vol_state'),
pl.col('daily_oc_ret').shift(1).alias('feature_prev_oc_ret'),
pl.col('daily_cc_ret').shift(1).alias('feature_prev_cc_ret'),
pl.col('daily_range').shift(1).alias('feature_prev_range'),
pl.col('daily_realized_vol').shift(1).alias('feature_prev_realized_vol'),
pl.col('log_daily_volume').shift(1).alias('feature_prev_log_volume'),
pl.col('daily_close_loc').shift(1).alias('feature_prev_close_loc'),
pl.col('daily_trend_slope').shift(1).alias('feature_prev_trend_slope'),
pl.col('daily_trend_tstat').shift(1).alias('feature_prev_trend_tstat'),
pl.col('daily_cc_ret').shift(1).rolling_mean(window_size=3).alias('feature_cc_ret_3d'),
pl.col('daily_cc_ret').shift(1).rolling_mean(window_size=5).alias('feature_cc_ret_5d'),
pl.col('daily_realized_vol').shift(1).rolling_mean(window_size=3).alias('feature_realized_vol_3d'),
pl.col('daily_realized_vol').shift(1).rolling_mean(window_size=5).alias('feature_realized_vol_5d'),
pl.col('daily_range').shift(1).rolling_mean(window_size=3).alias('feature_range_3d'),
pl.col('daily_range').shift(1).rolling_mean(window_size=5).alias('feature_range_5d'),
pl.col('log_daily_volume').shift(1).rolling_mean(window_size=3).alias('feature_log_volume_3d'),
pl.col('log_daily_volume').shift(1).rolling_mean(window_size=5).alias('feature_log_volume_5d'),
pl.col('daily_trend_slope').shift(1).rolling_mean(window_size=3).alias('feature_trend_slope_3d'),
pl.col('daily_trend_slope').shift(1).rolling_mean(window_size=5).alias('feature_trend_slope_5d'),
]
).with_columns([
(pl.col('feature_log_volume_3d') - pl.col('feature_log_volume_5d')).alias('feature_volume_trend_3v5'),
(pl.col('feature_cc_ret_3d') - pl.col('feature_cc_ret_5d')).alias('feature_return_trend_3v5'),
(pl.col('feature_realized_vol_3d') - pl.col('feature_realized_vol_5d')).alias('feature_vol_trend_3v5'),
]
)
feature_cols = sorted([col for col in frame.columns if col.startswith('feature_')])
required_cols = feature_cols + TARGET_COLS + ['realized_return_h60', 'realized_return_h60_bps', 'oracle_rate_h60']
frame = frame.filter(pl.col('scored_flag')).drop_nulls(subset=required_cols)
return frame, feature_cols
def materialize_folds(frame: pl.DataFrame, config: dict) -> tuple[
list[tuple[int, np.ndarray, np.ndarray]], pl.DataFrame]:
splitter = DateEmbargoSplit(
n_splits=config['n_splits'],
pre_embargo=config['pre_embargo_days'],
mode='expanding',
)
dates = np.array(frame['trade_date'].to_list(), dtype=object)
unique_dates = np.unique(dates)
date_to_pos = {value: idx for idx, value in enumerate(unique_dates)}
x_dummy = np.zeros((frame.height, 1))
folds: list[tuple[int, np.ndarray, np.ndarray]] = []
meta_rows: list[dict[str, object]] = []
next_fold = 1
for _, (train_idx, val_idx) in enumerate(splitter.split(x_dummy, groups=dates), start=1):
train_dates = np.unique(dates[train_idx])
val_dates = np.unique(dates[val_idx])
if train_idx.size == 0 or train_dates.shape[0] < config['min_train_days']:
continue
folds.append((next_fold, train_idx, val_idx))
gap = int(date_to_pos[val_dates[0]] - date_to_pos[train_dates[-1]] - 1)
meta_rows.append({
'fold': next_fold,
'train_start': train_dates[0],
'train_end': train_dates[-1],
'val_start': val_dates[0],
'val_end': val_dates[-1],
'train_days': int(train_dates.shape[0]),
'val_days': int(val_dates.shape[0]),
'business_day_gap': gap,
'gap_ok': gap >= config['pre_embargo_days'],
}
)
next_fold += 1
if not folds:
raise ValueError('No valid folds were created. Increase history or reduce min_train_days.')
return folds, pl.DataFrame(meta_rows)
def evaluate_run(
frame: pl.DataFrame,
feature_cols: list[str],
folds: list[tuple[int, np.ndarray, np.ndarray]],
*,
run_id: str,
scenario_name: str,
scenario_kind: str,
seed: int,
) -> tuple[pl.DataFrame, pl.DataFrame]:
x = frame.select(feature_cols).to_numpy()
n_rows = frame.height
day_idx = frame['day_idx'].to_numpy()
trade_dates = frame['trade_date'].to_numpy()
shift_phase = frame['shift_phase'].to_numpy()
realized_return_h60 = frame['realized_return_h60'].to_numpy()
realized_return_h60_bps = frame['realized_return_h60_bps'].to_numpy()
oracle_rate_h60 = frame['oracle_rate_h60'].to_numpy()
target_arrays = {target_col: frame[target_col].to_numpy() for target_col in TARGET_COLS}
fold_assignment = np.full(n_rows, -1, dtype=int)
for fold_id, _, val_idx in folds:
fold_assignment[val_idx] = fold_id
oos_mask = fold_assignment > 0
oos_count = int(oos_mask.sum())
diag_rows: list[dict[str, object]] = []
trade_frames: list[pl.DataFrame] = []
def build_trade_frame(model_family: str, method_name: str, prediction: np.ndarray,
position_size: np.ndarray) -> pl.DataFrame:
position_oos = position_size[oos_mask]
realized_bps_oos = realized_return_h60_bps[oos_mask]
return pl.DataFrame({
'run_id': [run_id] * oos_count,
'scenario_name': [scenario_name] * oos_count,
'scenario_kind': [scenario_kind] * oos_count,
'seed': [seed] * oos_count,
'model_family': [model_family] * oos_count,
'method_name': [method_name] * oos_count,
'fold': fold_assignment[oos_mask],
'day_idx': day_idx[oos_mask],
'trade_date': trade_dates[oos_mask].tolist(),
'shift_phase': shift_phase[oos_mask].tolist(),
'prediction': prediction[oos_mask],
'position_size': position_oos,
'trade_flag': position_oos > 0.0,
'realized_return_h60': realized_return_h60[oos_mask],
'realized_return_h60_bps': realized_bps_oos,
'gross_pnl_bps': position_oos * realized_bps_oos,
}
)
for model_family in MODEL_FAMILIES:
for target_col in TARGET_COLS:
y_target = target_arrays[target_col]
prediction = np.full(n_rows, np.nan, dtype=float)
position_size = np.zeros(n_rows, dtype=float)
for fold_id, train_idx, val_idx in folds:
model = make_model(model_family, seed + fold_id)
model.fit(x[train_idx], y_target[train_idx])
train_score = model.predict(x[train_idx])
val_score = model.predict(x[val_idx])
scale = reference_scale(train_score)
val_size = position_size_from_score(val_score, scale)
prediction[val_idx] = val_score
position_size[val_idx] = val_size
diag_rows.append({
'run_id': run_id,
'scenario_name': scenario_name,
'scenario_kind': scenario_kind,
'seed': seed,
'model_family': model_family,
'method_name': target_col,
'fold': fold_id,
'val_days': int(val_idx.size),
'prediction_ic': rank_ic(realized_return_h60[val_idx], val_score),
'trade_rate': float(np.mean(val_size > 0.0)),
'mean_position_size': float(np.mean(val_size)),
'mean_fold_gross_pnl_bps': float(np.mean(val_size * realized_return_h60_bps[val_idx])),
}
)
trade_frames.append(build_trade_frame(model_family, target_col, prediction, position_size))
for benchmark_name, benchmark_score_all in [('oracle', oracle_rate_h60), ('perfect', realized_return_h60)]:
prediction = np.full(n_rows, np.nan, dtype=float)
position_size = np.zeros(n_rows, dtype=float)
for fold_id, train_idx, val_idx in folds:
val_score = benchmark_score_all[val_idx]
scale = reference_scale(benchmark_score_all[train_idx])
val_size = position_size_from_score(val_score, scale)
prediction[val_idx] = val_score
position_size[val_idx] = val_size
diag_rows.append({
'run_id': run_id,
'scenario_name': scenario_name,
'scenario_kind': scenario_kind,
'seed': seed,
'model_family': 'benchmark',
'method_name': benchmark_name,
'fold': fold_id,
'val_days': int(val_idx.size),
'prediction_ic': rank_ic(realized_return_h60[val_idx], val_score),
'trade_rate': float(np.mean(val_size > 0.0)),
'mean_position_size': float(np.mean(val_size)),
'mean_fold_gross_pnl_bps': float(np.mean(val_size * realized_return_h60_bps[val_idx])),
}
)
trade_frames.append(build_trade_frame('benchmark', benchmark_name, prediction, position_size))
return pl.DataFrame(diag_rows), pl.concat(trade_frames, how='vertical')
def summarize_trade_days(trade_day_metrics: pl.DataFrame) -> pl.DataFrame:
rows = []
for keys, part in trade_day_metrics.partition_by(
['run_id', 'scenario_name', 'scenario_kind', 'seed', 'model_family', 'method_name'], as_dict=True
).items():
run_id, scenario_name, scenario_kind, seed, model_family, method_name = keys
for eval_slice, slice_part in [('all', part), ('base', part.filter(pl.col('shift_phase') == 'base')),
('shifted', part.filter(pl.col('shift_phase') == 'shifted'))]:
if slice_part.height == 0:
continue
pnl = slice_part['gross_pnl_bps'].to_numpy()
size = slice_part['position_size'].to_numpy()
trade_flag = slice_part['trade_flag'].cast(pl.Int64).to_numpy()
rows.append({
'run_id': run_id,
'scenario_name': scenario_name,
'scenario_kind': scenario_kind,
'seed': seed,
'model_family': model_family,
'method_name': method_name,
'eval_slice': eval_slice,
'n_days': int(slice_part.height),
'mean_daily_gross_pnl_bps': float(np.mean(pnl)),
'std_daily_gross_pnl_bps': float(np.std(pnl, ddof=1)) if slice_part.height > 1 else float('nan'),
'daily_sharpe': sharpe_ratio(pnl),
'daily_sortino': sortino_ratio(pnl),
'max_drawdown_bps': max_drawdown_bps(pnl),
'trade_rate': float(np.mean(trade_flag)),
'avg_position_size': float(np.mean(size)),
'avg_trade_size': float(np.mean(size[trade_flag > 0])) if np.any(trade_flag > 0) else 0.0,
'trade_hit_rate': float(np.mean(pnl[trade_flag > 0] > 0.0)) if np.any(trade_flag > 0) else float('nan'),
}
)
return pl.DataFrame(rows)
def aggregate_metric_summary(run_summary: pl.DataFrame, metric_col: str, reps: int, seed: int) -> pl.DataFrame:
rows = []
for keys, part in run_summary.partition_by(['model_family', 'method_name', 'eval_slice'], as_dict=True).items():
model_family, method_name, eval_slice = keys
values = part[metric_col].to_numpy()
ci_low, ci_high = bootstrap_mean_ci(values, reps=reps, seed=seed)
finite_n = finite_count(values)
rows.append({
'model_family': model_family,
'method_name': method_name,
'eval_slice': eval_slice,
f'{metric_col}_mean': finite_mean(values),
f'{metric_col}_median': finite_median(values),
f'{metric_col}_std': finite_std(values),
f'{metric_col}_ci_low': ci_low,
f'{metric_col}_ci_high': ci_high,
'n_runs': int(part.height),
f'{metric_col}_finite_runs': finite_n,
f'{metric_col}_nan_runs': int(part.height - finite_n),
}
)
return pl.DataFrame(rows)
def compute_run_ranks(run_summary: pl.DataFrame) -> pl.DataFrame:
rows = []
subset = run_summary.filter((pl.col('model_family') != 'benchmark') & (pl.col('eval_slice') == 'all'))
for keys, part in subset.partition_by(['run_id', 'model_family'], as_dict=True).items():
run_id, model_family = keys
records = part.select(['scenario_name', 'scenario_kind', 'seed', 'method_name', 'daily_sharpe',
'mean_daily_gross_pnl_bps']
).to_dicts()
records.sort(
key=lambda row: (
np.isfinite(row['daily_sharpe']),
row['daily_sharpe'] if np.isfinite(row['daily_sharpe']) else -np.inf,
row['mean_daily_gross_pnl_bps'] if np.isfinite(row['mean_daily_gross_pnl_bps']) else -np.inf,
),
reverse=True,
)
for rank, row in enumerate(records, start=1):
rows.append({
'run_id': run_id,
'model_family': model_family,
'scenario_name': row['scenario_name'],
'scenario_kind': row['scenario_kind'],
'seed': row['seed'],
'method_name': row['method_name'],
'rank': rank,
'is_winner': rank == 1,
}
)
return pl.DataFrame(rows)
def build_paired_comparison_table(run_summary: pl.DataFrame, reps: int) -> pl.DataFrame:
rows = []
subset = run_summary.filter((pl.col('model_family') != 'benchmark') & (pl.col('eval_slice') == 'all'))
for model_family in MODEL_FAMILIES:
part = subset.filter(pl.col('model_family') == model_family)
summary = (
part.group_by('method_name')
.agg([
pl.col('daily_sharpe').filter(pl.col('daily_sharpe').is_finite()).mean().alias('daily_sharpe'),
pl.col('mean_daily_gross_pnl_bps').filter(pl.col('mean_daily_gross_pnl_bps').is_finite()).mean().alias(
'mean_daily_gross_pnl_bps'
),
pl.col('daily_sharpe').is_finite().sum().alias('finite_sharpe_runs'),
]
)
.filter(pl.col('daily_sharpe').is_finite())
.sort(['daily_sharpe', 'mean_daily_gross_pnl_bps'], descending=[True, True])
)
winner = summary.item(0, 'method_name')
winner_part = part.filter(pl.col('method_name') == winner).sort('run_id')
for challenger in summary['method_name'].to_list()[1:]:
challenger_part = part.filter(pl.col('method_name') == challenger).sort('run_id')
joined = winner_part.join(
challenger_part.select(['run_id', 'daily_sharpe', 'mean_daily_gross_pnl_bps']).rename({
'daily_sharpe': 'challenger_sharpe',
'mean_daily_gross_pnl_bps': 'challenger_mean_pnl_bps',
}
),
on='run_id',
how='inner',
)
sharpe_diff, sharpe_ci_low, sharpe_ci_high, sharpe_prob = paired_bootstrap_diff(
joined['daily_sharpe'].to_numpy(),
joined['challenger_sharpe'].to_numpy(),
reps=reps,
seed=17,
)
pnl_diff, pnl_ci_low, pnl_ci_high, pnl_prob = paired_bootstrap_diff(
joined['mean_daily_gross_pnl_bps'].to_numpy(),
joined['challenger_mean_pnl_bps'].to_numpy(),
reps=reps,
seed=23,
)
rows.append({
'model_family': model_family,
'winner_method': winner,
'challenger_method': challenger,
'sharpe_diff_mean': sharpe_diff,
'sharpe_diff_ci_low': sharpe_ci_low,
'sharpe_diff_ci_high': sharpe_ci_high,
'winner_prob_beats_challenger_sharpe': sharpe_prob,
'mean_pnl_diff_bps': pnl_diff,
'mean_pnl_diff_ci_low': pnl_ci_low,
'mean_pnl_diff_ci_high': pnl_ci_high,
'winner_prob_beats_challenger_pnl': pnl_prob,
'matched_runs': int(joined.height),
}
)
return pl.DataFrame(rows)
def build_seed_budget_table(run_summary: pl.DataFrame, seed_budgets: list[int]) -> pl.DataFrame:
subset = run_summary.filter((pl.col('model_family') != 'benchmark') & (pl.col('eval_slice') == 'all'))
rows = []
for model_family in MODEL_FAMILIES:
family_part = subset.filter(pl.col('model_family') == model_family)
for budget in seed_budgets:
active_seeds = SEEDS[:budget]
budget_part = family_part.filter(pl.col('seed').is_in(active_seeds))
summary = (
budget_part.group_by('method_name')
.agg([
pl.col('daily_sharpe').filter(pl.col('daily_sharpe').is_finite()).mean().alias('daily_sharpe_mean'),
pl.col('mean_daily_gross_pnl_bps').filter(
pl.col('mean_daily_gross_pnl_bps').is_finite()
).mean().alias('mean_daily_gross_pnl_bps_mean'),
pl.col('trade_rate').filter(pl.col('trade_rate').is_finite()).mean().alias('trade_rate_mean'),
pl.col('daily_sharpe').is_finite().sum().alias('finite_sharpe_runs'),
]
)
.filter(pl.col('daily_sharpe_mean').is_finite())
.sort(['daily_sharpe_mean', 'mean_daily_gross_pnl_bps_mean'], descending=[True, True])
)
best = summary.row(0, named=True)
runner_up = summary.row(1, named=True)
rows.append({
'model_family': model_family,
'seed_budget': budget,
'runs_used': int(budget_part.height / len(TARGET_COLS)),
'finite_sharpe_runs': best['finite_sharpe_runs'],
'best_method': best['method_name'],
'best_daily_sharpe_mean': best['daily_sharpe_mean'],
'best_mean_daily_gross_pnl_bps_mean': best['mean_daily_gross_pnl_bps_mean'],
'runner_up_method': runner_up['method_name'],
'sharpe_gap_to_runner': best['daily_sharpe_mean'] - runner_up['daily_sharpe_mean'],
'pnl_gap_to_runner_bps': best['mean_daily_gross_pnl_bps_mean'] - runner_up[
'mean_daily_gross_pnl_bps_mean'],
}
)
return pl.DataFrame(rows).sort(['model_family', 'seed_budget'])
def build_run_tail_table(run_summary: pl.DataFrame) -> pl.DataFrame:
return (
run_summary
.filter((pl.col('model_family') != 'benchmark') & (pl.col('eval_slice') == 'all'))
.group_by(['model_family', 'method_name'])
.agg([
pl.col('daily_sharpe').filter(pl.col('daily_sharpe').is_finite()).min().alias('worst_run_daily_sharpe'),
pl.col('daily_sharpe').filter(pl.col('daily_sharpe').is_finite()).median().alias('median_run_daily_sharpe'),
pl.col('mean_daily_gross_pnl_bps').filter(pl.col('mean_daily_gross_pnl_bps').is_finite()).min().alias(
'worst_run_mean_daily_gross_pnl_bps'
),
pl.col('mean_daily_gross_pnl_bps').filter(pl.col('mean_daily_gross_pnl_bps').is_finite()).median().alias(
'median_run_mean_daily_gross_pnl_bps'
),
pl.col('trade_rate').filter(pl.col('trade_rate').is_finite()).mean().alias('trade_rate_mean'),
pl.col('daily_sharpe').is_finite().sum().alias('finite_sharpe_runs'),
pl.col('daily_sharpe').is_nan().sum().alias('nan_sharpe_runs'),
pl.len().alias('n_runs'),
]
)
.sort(['median_run_daily_sharpe', 'worst_run_daily_sharpe'], descending=[True, True])
)
def build_degenerate_strategy_table(run_summary: pl.DataFrame) -> pl.DataFrame:
return (
run_summary
.filter((pl.col('model_family') != 'benchmark') & (pl.col('eval_slice') == 'all'))
.group_by(['model_family', 'method_name'])
.agg([
pl.len().alias('n_runs'),
pl.col('daily_sharpe').is_nan().sum().alias('nan_sharpe_runs'),
(pl.col('trade_rate') == 0.0).sum().alias('zero_trade_runs'),
pl.mean('trade_rate').alias('mean_trade_rate'),
pl.min('trade_rate').alias('min_trade_rate'),
pl.max('trade_rate').alias('max_trade_rate'),
]
)
.with_columns([
(pl.col('nan_sharpe_runs') / pl.col('n_runs')).alias('nan_sharpe_run_rate'),
(pl.col('zero_trade_runs') / pl.col('n_runs')).alias('zero_trade_run_rate'),
]
)
.sort(['model_family', 'nan_sharpe_runs', 'zero_trade_runs'], descending=[False, True, True])
)scenario_metric_rows = []
for scenario in SCENARIOS:
scenario_label = scenario['scenario_name'].replace('_', ' ')
for metric_label, column in [
('Base event edge', 'event_scale'),
('Base observability', 'observability'),
('Base volatility', 'vol_scale'),
('Base jump risk', 'jump_scale'),
('Base decay', 'decay_scale'),
('Carry persistence', 'carry_persistence'),
('Shift start fraction', 'shift_start_frac'),
('Late event edge', 'late_event_scale_mult'),
('Late observability', 'late_observability_mult'),
('Late volatility', 'late_vol_mult'),
('Late jump risk', 'late_jump_mult'),
('Late decay', 'late_decay_mult'),
]:
value = float(scenario[column])
scenario_metric_rows.append({
'scenario_label': scenario_label,
'scenario_kind': scenario['scenario_kind'],
'metric_label': metric_label,
'value': value,
'value_label': f'{value:.2f}',
}
)
scenario_metric_frame = pl.DataFrame(scenario_metric_rows)cached_panel_summary = load_cached_parquet('panel_summary')
cached_prediction_diagnostics = load_cached_parquet('prediction_diagnostics')
cached_trade_day_metrics = load_cached_parquet('trade_day_metrics')
cached_reference_frame = load_cached_parquet('reference_frame')
cached_reference_fold_meta = load_cached_parquet('reference_fold_meta')
cached_reference_minute_sample = load_cached_parquet('reference_minute_sample')
cached_reference_features = load_cached_parquet('reference_features')
if all(
artifact is not None
for artifact in [
cached_panel_summary,
cached_prediction_diagnostics,
cached_trade_day_metrics,
cached_reference_frame,
cached_reference_fold_meta,
cached_reference_minute_sample,
cached_reference_features,
]
):
panel_summary = cached_panel_summary
prediction_diagnostics = cached_prediction_diagnostics
trade_day_metrics = cached_trade_day_metrics
reference_frame = cached_reference_frame
reference_fold_meta = cached_reference_fold_meta
reference_minute_sample = cached_reference_minute_sample
reference_features = cached_reference_features['feature_name'].to_list()
else:
reference_frame = None
reference_features = None
reference_fold_meta = None
reference_minute_sample = None
reference_priority = -1
panel_summaries = []
diag_frames = []
trade_frames = []
for scenario in SCENARIOS:
for seed in SEEDS:
panel, minute_sample = simulate_single_ticker_panel(CONFIG, scenario, seed)
frame, feature_cols = build_feature_frame(panel)
folds, fold_meta = materialize_folds(frame, CONFIG)
run_id = frame.item(0, 'run_id')
diag_df, trade_df = evaluate_run(
frame,
feature_cols,
folds,
run_id=run_id,
scenario_name=scenario['scenario_name'],
scenario_kind=scenario['scenario_kind'],
seed=seed,
)
diag_frames.append(diag_df)
trade_frames.append(trade_df)
shifted_part = frame.filter(pl.col('shifted_flag'))
panel_summaries.append({
'run_id': run_id,
'scenario_name': scenario['scenario_name'],
'scenario_kind': scenario['scenario_kind'],
'seed': seed,
'event_rows': int(panel.height),
'feature_rows': int(frame.height),
'n_features': int(len(feature_cols)),
'n_folds': int(len(folds)),
'shifted_share': float(frame['shifted_flag'].cast(pl.Float64).mean()),
'feature_to_oracle_corr': float(
np.corrcoef(frame['feature_signed_event'].to_numpy(), frame['latent_event_alpha_0'].to_numpy())[
0, 1]
),
'late_signal_mean': float(
shifted_part.select(pl.mean('event_scale_live')).item()
) if shifted_part.height else float('nan'),
'late_observability_mean': float(
shifted_part.select(pl.mean('observability_live')).item()
) if shifted_part.height else float('nan'),
'late_vol_mean': float(
shifted_part.select(pl.mean('vol_scale_live')).item()
) if shifted_part.height else float('nan'),
}
)
candidate_priority = 1 if scenario['scenario_kind'] == 'shifted' else 0
if candidate_priority > reference_priority:
reference_frame = frame
reference_features = feature_cols
reference_fold_meta = fold_meta
reference_minute_sample = minute_sample
reference_priority = candidate_priority
panel_summary = pl.DataFrame(panel_summaries)
prediction_diagnostics = pl.concat(diag_frames, how='vertical')
trade_day_metrics = pl.concat(trade_frames, how='vertical')
write_cached_parquet(panel_summary, 'panel_summary')
write_cached_parquet(prediction_diagnostics, 'prediction_diagnostics')
write_cached_parquet(trade_day_metrics, 'trade_day_metrics')
write_cached_parquet(reference_frame, 'reference_frame')
write_cached_parquet(reference_fold_meta, 'reference_fold_meta')
write_cached_parquet(reference_minute_sample, 'reference_minute_sample')
write_cached_parquet(pl.DataFrame({'feature_name': reference_features}), 'reference_features')reference_policy = trade_day_metrics.filter(
(pl.col('run_id') == panel_summary.item(0, 'run_id'))
& (pl.col('model_family') == 'ridge')
& (pl.col('method_name') == 'target_fixed_rate_h60')
)
reference_competing_risk = reference_frame['target_competing_risk_hit'].to_numpy()
reference_fixed_rate = reference_frame['target_fixed_rate_h60'].to_numpy()
reference_mae_penalized = reference_frame['target_mae_penalized_rate_h60'].to_numpy()
reference_downside_adjusted = reference_frame['target_downside_adj_rate_h60'].to_numpy()
reference_tradeability = reference_frame['target_tradeability_score'].to_numpy()
reference_competing_risk_min = float(np.nanmin(reference_competing_risk))
reference_competing_risk_max = float(np.nanmax(reference_competing_risk))
reference_tradeability_min = float(np.nanmin(reference_tradeability))
reference_tradeability_max = float(np.nanmax(reference_tradeability))
reference_tradeability_span = float(reference_tradeability_max - reference_tradeability_min)
reference_downside_abs_max = float(np.nanmax(np.abs(reference_downside_adjusted)))
reference_mae_gap_max = float(np.nanmax(reference_mae_penalized - reference_fixed_rate))
test_rows = [
{
'test': 'Reference scored rows have no missing feature values',
'passed': bool(
reference_frame.select([pl.col(col).is_null().sum().alias(col) for col in reference_features]).row(
0
) == tuple(0 for _ in reference_features)
),
'detail': f"reference_rows={reference_frame.height}",
},
{
'test': 'All reference folds respect 5d pre-embargo',
'passed': bool(reference_fold_meta['gap_ok'].all()),
'detail': f"min_gap={int(reference_fold_meta['business_day_gap'].min())}",
},
{
'test': 'Reference fold count and OOS history are intentionally larger',
'passed': bool(
reference_fold_meta.height >= 6
and reference_frame.height >= 320
and int(reference_fold_meta['val_days'].min()) >= 20
),
'detail': f"folds={reference_fold_meta.height}; reference_rows={reference_frame.height}; min_val_days={int(reference_fold_meta['val_days'].min())}",
},
{
'test': 'Reference frame has one row per day',
'passed': bool(reference_frame.height == reference_frame['day_idx'].n_unique()),
'detail': f"n_unique_days={int(reference_frame['day_idx'].n_unique())}",
},
{
'test': 'Position size bounded and no-trade days obey score sign',
'passed': bool(
(trade_day_metrics['position_size'].min() >= 0.0)
and (trade_day_metrics['position_size'].max() <= 1.0)
and trade_day_metrics.filter((pl.col('prediction') <= 0.0) & (pl.col('position_size') > 0.0)).is_empty()
),
'detail': f"reference_no_trade_rate={(1.0 - reference_policy['trade_flag'].cast(pl.Float64).mean()):.3f}",
},
{
'test': 'Daily gross PnL matches size times realized return',
'passed': bool(np.allclose(trade_day_metrics['gross_pnl_bps'].to_numpy(),
trade_day_metrics['position_size'].to_numpy() * trade_day_metrics[
'realized_return_h60_bps'].to_numpy()
)
),
'detail': 'allclose=true',
},
{
'test': 'Shifted scenarios reduce observability late in sample',
'passed': bool(
panel_summary.filter(pl.col('scenario_kind') == 'shifted').select(pl.mean('late_observability_mean')).item()
< panel_summary.filter(pl.col('scenario_kind') == 'stable').select(
pl.mean('late_observability_mean')
).fill_null(1.0).item()
),
'detail': (
f"shifted_late_obs={panel_summary.filter(pl.col('scenario_kind') == 'shifted').select(pl.mean('late_observability_mean')).item():.3f}; "
f"stable_late_obs={panel_summary.filter(pl.col('scenario_kind') == 'stable').select(pl.mean('late_observability_mean')).fill_null(1.0).item():.3f}"
),
},
{
'test': 'Competing-risk score stays within [-1, 1]',
'passed': bool(reference_competing_risk_min >= -1.0 - 1e-9 and reference_competing_risk_max <= 1.0 + 1e-9),
'detail': f"min={reference_competing_risk_min:.3f}; max={reference_competing_risk_max:.3f}",
},
{
'test': 'MAE-penalized return never exceeds fixed return',
'passed': bool(np.all(reference_mae_penalized <= reference_fixed_rate + 1e-12)),
'detail': f"max_gap={reference_mae_gap_max:.6f}",
},
{
'test': 'Downside-adjusted return remains finite after clipping',
'passed': bool(np.isfinite(reference_downside_adjusted).all()),
'detail': f"abs_max={reference_downside_abs_max:.3f}",
},
{
'test': 'Tradeability score stays bounded and non-degenerate',
'passed': bool(
reference_tradeability_min >= -1.0 - 1e-9
and reference_tradeability_max <= 1.0 + 1e-9
and reference_tradeability_span > 1e-6
),
'detail': f"min={reference_tradeability_min:.3f}; max={reference_tradeability_max:.3f}",
},
]
tests_summary = pl.DataFrame(test_rows)
assert tests_summary['passed'].all(), 'At least one notebook validation check failed.'run_summary = summarize_trade_days(trade_day_metrics)
oracle_summary = run_summary.filter(
(pl.col('model_family') == 'benchmark') & (pl.col('method_name') == 'oracle')
).select([
'run_id',
'eval_slice',
'daily_sharpe',
'mean_daily_gross_pnl_bps',
]
).rename({
'daily_sharpe': 'oracle_daily_sharpe',
'mean_daily_gross_pnl_bps': 'oracle_mean_daily_gross_pnl_bps',
}
)
perfect_summary = run_summary.filter(
(pl.col('model_family') == 'benchmark') & (pl.col('method_name') == 'perfect')
).select([
'run_id',
'eval_slice',
'daily_sharpe',
'mean_daily_gross_pnl_bps',
]
).rename({
'daily_sharpe': 'perfect_daily_sharpe',
'mean_daily_gross_pnl_bps': 'perfect_mean_daily_gross_pnl_bps',
}
)
model_run_summary = run_summary.filter(pl.col('model_family') != 'benchmark').join(
oracle_summary,
on=['run_id', 'eval_slice'],
how='left',
).join(
perfect_summary,
on=['run_id', 'eval_slice'],
how='left',
).with_columns([
safe_ratio(pl.col('daily_sharpe'), pl.col('oracle_daily_sharpe')).alias('positive_sharpe_ratio_to_oracle'),
safe_ratio(pl.col('mean_daily_gross_pnl_bps'), pl.col('oracle_mean_daily_gross_pnl_bps')).alias(
'positive_pnl_ratio_to_oracle'
),
safe_ratio(pl.col('daily_sharpe'), pl.col('perfect_daily_sharpe')).alias('positive_sharpe_ratio_to_perfect'),
safe_ratio(pl.col('mean_daily_gross_pnl_bps'), pl.col('perfect_mean_daily_gross_pnl_bps')).alias(
'positive_pnl_ratio_to_perfect'
),
]
)
run_ranks = compute_run_ranks(model_run_summary)
paired_comparison_table = build_paired_comparison_table(model_run_summary, reps=CONFIG['bootstrap_reps'])
seed_budget_winner_table = build_seed_budget_table(model_run_summary, seed_budgets=SEED_BUDGETS)
run_tail_table = build_run_tail_table(model_run_summary)
degenerate_strategy_table = build_degenerate_strategy_table(model_run_summary)
aggregate_sharpe = aggregate_metric_summary(model_run_summary, metric_col='daily_sharpe', reps=CONFIG['bootstrap_reps'],
seed=101
)
aggregate_pnl = aggregate_metric_summary(model_run_summary, metric_col='mean_daily_gross_pnl_bps',
reps=CONFIG['bootstrap_reps'], seed=103
)
aggregate_trade_rate = aggregate_metric_summary(model_run_summary, metric_col='trade_rate',
reps=CONFIG['bootstrap_reps'], seed=107
)
aggregate_capture = aggregate_metric_summary(model_run_summary, metric_col='positive_sharpe_ratio_to_oracle',
reps=CONFIG['bootstrap_reps'], seed=109
)
aggregate_summary = aggregate_sharpe.join(
aggregate_pnl,
on=['model_family', 'method_name', 'eval_slice', 'n_runs'],
how='left',
).join(
aggregate_trade_rate,
on=['model_family', 'method_name', 'eval_slice', 'n_runs'],
how='left',
).join(
aggregate_capture,
on=['model_family', 'method_name', 'eval_slice', 'n_runs'],
how='left',
).join(
run_ranks.group_by(['model_family', 'method_name']).agg([
pl.mean('rank').alias('mean_rank'),
pl.mean('is_winner').alias('win_rate'),
]
),
on=['model_family', 'method_name'],
how='left',
)
primary_sharpe_table = aggregate_summary.filter(
(pl.col('eval_slice') == 'all') & pl.col('daily_sharpe_mean').is_finite()
).sort(
['model_family', 'daily_sharpe_mean', 'mean_daily_gross_pnl_bps_mean'], descending=[False, True, True]
).select([
'model_family',
'method_name',
'daily_sharpe_mean',
'daily_sharpe_ci_low',
'daily_sharpe_ci_high',
'positive_sharpe_ratio_to_oracle_mean',
'mean_rank',
'win_rate',
'n_runs',
'daily_sharpe_finite_runs',
'daily_sharpe_nan_runs',
]
)
primary_pnl_table = aggregate_summary.filter(pl.col('eval_slice') == 'all').sort(
['model_family', 'daily_sharpe_mean', 'mean_daily_gross_pnl_bps_mean'], descending=[False, True, True]
).select([
'model_family',
'method_name',
'mean_daily_gross_pnl_bps_mean',
'mean_daily_gross_pnl_bps_ci_low',
'mean_daily_gross_pnl_bps_ci_high',
'trade_rate_mean',
'mean_rank',
'win_rate',
'n_runs',
'daily_sharpe_finite_runs',
'daily_sharpe_nan_runs',
]
)
run_stability_table = run_ranks.group_by(['model_family', 'method_name']).agg([
pl.mean('rank').alias('mean_rank'),
pl.mean('is_winner').alias('win_rate'),
]
).sort(['model_family', 'mean_rank'])
shift_performance_table = aggregate_summary.filter(pl.col('eval_slice').is_in(['base', 'shifted'])).select([
'model_family',
'method_name',
'eval_slice',
'daily_sharpe_mean',
'mean_daily_gross_pnl_bps_mean',
'trade_rate_mean',
]
).sort(['model_family', 'method_name', 'eval_slice'])
oracle_kind_summary = run_summary.filter(
(pl.col('model_family') == 'benchmark') & (pl.col('method_name') == 'oracle') & (
pl.col('eval_slice') == 'all')
).group_by('scenario_kind').agg([
pl.mean('daily_sharpe').alias('oracle_daily_sharpe'),
pl.mean('mean_daily_gross_pnl_bps').alias('oracle_mean_daily_gross_pnl_bps'),
]
).sort('scenario_kind')
prediction_diagnostic_summary = prediction_diagnostics.group_by(['model_family', 'method_name']).agg([
pl.mean('prediction_ic').alias('mean_prediction_ic'),
pl.mean('trade_rate').alias('mean_trade_rate'),
pl.mean('mean_position_size').alias('mean_position_size'),
]
).sort(['model_family', 'mean_prediction_ic'], descending=[False, True])
model_family_conclusion_rows = []
for model_family in MODEL_FAMILIES:
ranked = primary_sharpe_table.filter(pl.col('model_family') == model_family)
best = ranked.row(0, named=True)
runner_up = ranked.row(1, named=True)
pair_join = (
model_run_summary
.filter((pl.col('model_family') == model_family) & (pl.col('method_name') == best['method_name']) & (
pl.col('eval_slice') == 'all')
)
.select(['run_id', 'daily_sharpe', 'mean_daily_gross_pnl_bps'])
.rename({
'daily_sharpe': 'best_daily_sharpe',
'mean_daily_gross_pnl_bps': 'best_mean_daily_gross_pnl_bps',
}
)
.join(
model_run_summary
.filter((pl.col('model_family') == model_family) & (pl.col('method_name') == runner_up['method_name']) & (
pl.col('eval_slice') == 'all')
)
.select(['run_id', 'daily_sharpe', 'mean_daily_gross_pnl_bps'])
.rename({
'daily_sharpe': 'runner_up_daily_sharpe',
'mean_daily_gross_pnl_bps': 'runner_up_mean_daily_gross_pnl_bps',
}
),
on='run_id',
how='inner',
)
)
sharpe_diff, sharpe_ci_low, sharpe_ci_high, sharpe_prob = paired_bootstrap_diff(
pair_join['best_daily_sharpe'].to_numpy(),
pair_join['runner_up_daily_sharpe'].to_numpy(),
reps=CONFIG['bootstrap_reps'],
seed=17,
)
pnl_diff, _, _, pnl_prob = paired_bootstrap_diff(
pair_join['best_mean_daily_gross_pnl_bps'].to_numpy(),
pair_join['runner_up_mean_daily_gross_pnl_bps'].to_numpy(),
reps=CONFIG['bootstrap_reps'],
seed=23,
)
run_tail_row = run_tail_table.filter(
(pl.col('model_family') == model_family)
& (pl.col('method_name') == best['method_name'])
).row(0, named=True)
positive_run_summary = model_run_summary.filter(
(pl.col('model_family') == model_family)
& (pl.col('method_name') == best['method_name'])
& (pl.col('eval_slice') == 'all')
)
model_family_conclusion_rows.append({
'model_family': model_family,
'best_method': best['method_name'],
'runner_up_method': runner_up['method_name'],
'best_daily_sharpe_mean': best['daily_sharpe_mean'],
'best_mean_daily_gross_pnl_bps_mean': primary_pnl_table.filter(
(pl.col('model_family') == model_family) & (pl.col('method_name') == best['method_name'])
).item(0,
'mean_daily_gross_pnl_bps_mean'
),
'runner_up_daily_sharpe_mean': runner_up['daily_sharpe_mean'],
'sharpe_gap_to_runner': best['daily_sharpe_mean'] - runner_up['daily_sharpe_mean'],
'paired_sharpe_prob': sharpe_prob,
'paired_sharpe_ci_low': sharpe_ci_low,
'paired_sharpe_ci_high': sharpe_ci_high,
'paired_pnl_prob': pnl_prob,
'median_run_daily_sharpe': run_tail_row['median_run_daily_sharpe'],
'median_run_mean_daily_gross_pnl_bps': run_tail_row['median_run_mean_daily_gross_pnl_bps'],
'positive_sharpe_run_rate': float((positive_run_summary['daily_sharpe'] > 0.0).mean()),
'positive_pnl_run_rate': float((positive_run_summary['mean_daily_gross_pnl_bps'] > 0.0).mean()),
}
)
model_family_decision_table = pl.DataFrame(model_family_conclusion_rows).sort('model_family')
analysis_validation_rows = []
for model_family in MODEL_FAMILIES:
primary_winner = primary_sharpe_table.filter(pl.col('model_family') == model_family).item(0, 'method_name')
paired_winner = paired_comparison_table.filter(pl.col('model_family') == model_family).item(0, 'winner_method')
analysis_validation_rows.append({
'test': f'Paired comparison winner matches primary Sharpe winner ({model_family})',
'passed': paired_winner == primary_winner,
'detail': f'paired={paired_winner}; primary={primary_winner}',
}
)
analysis_validation_rows.append({
'test': 'Seed-budget winners have finite Sharpe support',
'passed': bool(seed_budget_winner_table['best_daily_sharpe_mean'].is_finite().all()),
'detail': f"rows={seed_budget_winner_table.height}",
}
)
analysis_validation_rows.append({
'test': 'Undefined-Sharpe strategy runs are explicitly reported',
'passed': bool(degenerate_strategy_table['nan_sharpe_runs'].max() >= 0),
'detail': f"max_nan_runs={int(degenerate_strategy_table['nan_sharpe_runs'].max())}",
}
)
analysis_validation_rows.append({
'test': 'Positive oracle ratio metric has no infinite values',
'passed': bool(not model_run_summary['positive_sharpe_ratio_to_oracle'].is_infinite().any()),
'detail': f"nulls={int(model_run_summary['positive_sharpe_ratio_to_oracle'].is_null().sum())}",
}
)
analysis_validation_summary = pl.DataFrame(analysis_validation_rows)
assert analysis_validation_summary['passed'].all(), 'At least one analysis validation check failed.'representative_run_id = reference_frame.item(0, 'run_id')
representative_shift_start = int(reference_frame.filter(pl.col('shift_phase') == 'shifted').item(0, 'day_idx'))
sample_target_reference = reference_frame.filter(
pl.col('day_idx').is_between(representative_shift_start - 5, representative_shift_start + 6)
).select([
'day_idx',
'shift_phase',
'sentiment_score',
'signed_event',
'realized_return_h60_bps',
*TARGET_COLS,
]
)
representative_run_methods = model_run_summary.filter(
(pl.col('run_id') == representative_run_id) & (pl.col('eval_slice') == 'all')
).sort(['daily_sharpe', 'mean_daily_gross_pnl_bps'], descending=[True, True])
representative_model_family = representative_run_methods.item(0, 'model_family')
representative_method = representative_run_methods.item(0, 'method_name')
representative_runner_up = representative_run_methods.filter(
pl.col('model_family') == representative_model_family
).item(1, 'method_name')
representative_run_summary = run_summary.filter(pl.col('run_id') == representative_run_id).sort(
['model_family', 'method_name', 'eval_slice']
)
representative_trade_journal = trade_day_metrics.filter(
(pl.col('run_id') == representative_run_id)
& (pl.col('model_family') == representative_model_family)
& (pl.col('method_name').is_in([representative_method, representative_runner_up]))
).sort(['method_name', 'day_idx']).with_columns(
pl.col('gross_pnl_bps').cum_sum().over('method_name').alias('cumulative_gross_pnl_bps')
)
equity_curve_reference = trade_day_metrics.filter(
(pl.col('run_id') == representative_run_id)
& (pl.col('model_family') == representative_model_family)
& (pl.col('method_name').is_in(TARGET_COLS))
).sort(
['method_name', 'day_idx']
).with_columns(
pl.col('gross_pnl_bps').cum_sum().over('method_name').alias('cumulative_gross_pnl_bps')
)
risk_return_reference = aggregate_summary.filter(
(pl.col('model_family') != 'benchmark') & (pl.col('eval_slice') == 'all')
).select([
'model_family',
'method_name',
'daily_sharpe_mean',
'mean_daily_gross_pnl_bps_mean',
'trade_rate_mean',
]
)method_labels = {
'target_fixed_rate_h60': 'Fixed 60m return',
'target_path_mean_rate_h60': 'Path mean return',
'target_profit_factor_h60': 'Profit factor',
'target_trend_slope': 'Trend slope',
'target_trend_tstat': 'Trend t-stat',
'target_barrier_rate': 'Barrier reward',
'target_competing_risk_hit': 'Competing-risk hit',
'target_mae_penalized_rate_h60': 'MAE-penalized return',
'target_downside_adj_rate_h60': 'Downside-adjusted return',
'target_tradeability_score': 'Tradeability score',
'oracle': 'Oracle',
'perfect': 'Perfect',
}
method_order = [method_labels[target_col] for target_col in TARGET_COLS]
method_palette = {
'Fixed 60m return': '#4c78a8',
'Path mean return': '#2a9d8f',
'Profit factor': '#f4a261',
'Trend slope': '#6c5ce7',
'Trend t-stat': '#b56576',
'Barrier reward': '#e45756',
'Competing-risk hit': '#9c755f',
'MAE-penalized return': '#54a24b',
'Downside-adjusted return': '#eeca3b',
'Tradeability score': '#ff9da6',
'Oracle': '#3a86ff',
'Perfect': '#1d3557',
}
model_labels = {'ridge': 'Ridge', 'hist_gbrt': 'HistGBRT', 'benchmark': 'benchmark'}
model_palette = {'Ridge': '#355070', 'HistGBRT': '#c44536'}
phase_labels = {'base': 'Base', 'shifted': 'Shifted'}
phase_palette = {'Base': '#4c78a8', 'Shifted': '#e45756'}
scenario_kind_labels = {'stable': 'Stable', 'shifted': 'Shifted'}
scenario_name_labels = {scenario['scenario_name']: scenario['scenario_name'].replace('_', ' ') for scenario in
SCENARIOS}
def relabel_frame(frame: pl.DataFrame) -> pl.DataFrame:
updates = []
if 'method_name' in frame.columns:
updates.append(pl.col('method_name').replace(method_labels).alias('method_label'))
if 'model_family' in frame.columns:
updates.append(pl.col('model_family').replace(model_labels).alias('model_label'))
if 'shift_phase' in frame.columns:
updates.append(pl.col('shift_phase').replace(phase_labels).alias('phase_label'))
if 'scenario_kind' in frame.columns:
updates.append(pl.col('scenario_kind').replace(scenario_kind_labels).alias('scenario_kind_label'))
if 'scenario_name' in frame.columns:
updates.append(pl.col('scenario_name').replace(scenario_name_labels).alias('scenario_label'))
return frame.with_columns(updates) if updates else frame
sample_target_measure_pairs = [('Realized 60m return (bps)', 'realized_return_h60_bps')] + [
(method_labels[target_col], target_col) for target_col in TARGET_COLS
]
sample_target_long = pl.concat([
sample_target_reference.select([
'day_idx',
'shift_phase',
pl.lit(measure_label).alias('measure_label'),
pl.col(column).alias('value'),
]
)
for measure_label, column in sample_target_measure_pairs
], how='vertical'
).with_columns(
pl.col('shift_phase').replace(phase_labels).alias('phase_label')
)
equity_curve_plot_data = relabel_frame(equity_curve_reference)
trade_journal_plot_data = relabel_frame(representative_trade_journal)
risk_return_plot_data = relabel_frame(risk_return_reference)
minute_path_plot_data = relabel_frame(reference_minute_sample).with_columns([
((pl.col('ts').dt.hour().cast(pl.Int32) * 60 + pl.col('ts').dt.minute().cast(pl.Int32)) - (9 * 60 + 30)).alias(
'minute_from_open'
),
((pl.col('close').log() - pl.col('close').first().over('sample_id').log()) * 10_000.0).alias('path_bps'),
]
).sort(['sample_id', 'ts'])
assert int(minute_path_plot_data['minute_from_open'].min()) == 0
assert int(minute_path_plot_data['minute_from_open'].max()) == CONFIG['session_minutes'] - 1
sharpe_plot_data = relabel_frame(primary_sharpe_table)
pnl_plot_data = relabel_frame(primary_pnl_table)
stability_plot_data = relabel_frame(run_stability_table).with_columns(
((pl.col('win_rate') * 100).round(1).cast(pl.String) + pl.lit('%')).alias('win_rate_label')
)
shift_plot_data = relabel_frame(shift_performance_table).with_columns(
pl.col('eval_slice').replace({'base': 'Base', 'shifted': 'Shifted'}).alias('eval_label')
)
oracle_plot_data = pl.concat([
oracle_kind_summary.select([
pl.col('scenario_kind').replace(scenario_kind_labels).alias('scenario_kind_label'),
pl.lit('Oracle daily Sharpe').alias('metric_label'),
pl.col('oracle_daily_sharpe').alias('value'),
]
),
oracle_kind_summary.select([
pl.col('scenario_kind').replace(scenario_kind_labels).alias('scenario_kind_label'),
pl.lit('Oracle mean daily PnL (bps)').alias('metric_label'),
pl.col('oracle_mean_daily_gross_pnl_bps').alias('value'),
]
),
], how='vertical'
)
diagnostic_plot_data = relabel_frame(
prediction_diagnostic_summary.join(
aggregate_summary.filter(pl.col('eval_slice') == 'all').select([
'model_family',
'method_name',
'daily_sharpe_mean',
]
),
on=['model_family', 'method_name'],
how='left',
)
)
diagnostic_rank_plot_data = (
diagnostic_plot_data
.filter((pl.col('model_family') != 'benchmark') & pl.col('daily_sharpe_mean').is_finite())
.with_columns([
pl.col('mean_prediction_ic').rank('ordinal', descending=True).over('model_family').alias('prediction_ic_rank'),
pl.col('daily_sharpe_mean').rank('ordinal', descending=True).over('model_family').alias('trading_sharpe_rank'),
])
)
paired_plot_data = relabel_frame(paired_comparison_table).with_columns(
pl.col('challenger_method').replace(method_labels).alias('challenger_label')
)
seed_budget_rows = []
seed_subset = model_run_summary.filter(pl.col('eval_slice') == 'all')
for model_family in MODEL_FAMILIES:
family_part = seed_subset.filter(pl.col('model_family') == model_family)
for budget in SEED_BUDGETS:
budget_part = family_part.filter(pl.col('seed').is_in(SEEDS[:budget]))
budget_summary = budget_part.group_by('method_name').agg([
pl.col('daily_sharpe').filter(pl.col('daily_sharpe').is_finite()).mean().alias('daily_sharpe_mean'),
pl.col('mean_daily_gross_pnl_bps').filter(pl.col('mean_daily_gross_pnl_bps').is_finite()).mean().alias(
'mean_daily_gross_pnl_bps_mean'
),
pl.col('daily_sharpe').is_finite().sum().alias('finite_sharpe_runs'),
]
)
for row in budget_summary.iter_rows(named=True):
seed_budget_rows.append({
'model_family': model_family,
'method_name': row['method_name'],
'seed_budget': budget,
'daily_sharpe_mean': row['daily_sharpe_mean'],
'mean_daily_gross_pnl_bps_mean': row['mean_daily_gross_pnl_bps_mean'],
'finite_sharpe_runs': row['finite_sharpe_runs'],
}
)
seed_budget_curve_table = relabel_frame(pl.DataFrame(seed_budget_rows))
run_tail_plot_data = relabel_frame(run_tail_table)
degenerate_strategy_plot_data = relabel_frame(degenerate_strategy_table)
winner_summary_rows = []
for row in model_family_decision_table.to_dicts():
winner_summary_rows.append({
'model_label': model_labels[row['model_family']],
'best_daily_sharpe_mean': row['best_daily_sharpe_mean'],
'runner_up_daily_sharpe_mean': row['runner_up_daily_sharpe_mean'],
'best_label': f"Best: {method_labels[row['best_method']]}",
'runner_label': f"Runner-up: {method_labels[row['runner_up_method']]}",
}
)
winner_summary_frame = pl.DataFrame(winner_summary_rows)Decision is based off: primary Sharpe ranking, size of the leadership gap, shifted-regime behavior, trading style, and cost robustness. Secondary diagnostics are in the appendix.
Trend t-stat is the mean-Sharpe leader for both model families, but the uncertainty bands and small gaps argue for a shortlist rather than a single-target decision.
(
ggplot(sharpe_plot_data.with_columns(pl.col('method_label').is_in(SHORTLIST_LABELS).alias('shortlist_flag')),
aes(x='method_label', y='daily_sharpe_mean', color='model_label', alpha='shortlist_flag')
)
+ geom_hline(yintercept=0.0,
linetype='dashed',
color='#b0b7c3'
)
+ geom_segment(aes(x='method_label', xend='method_label', y='daily_sharpe_ci_low', yend='daily_sharpe_ci_high',
color='model_label'
),
size=1.5,
alpha=0.65,
)
+ geom_point(size=3.2)
+ facet_grid(y='model_label')
+ facet_separator_theme
+ coord_flip()
+ scale_x_discrete(limits=method_order)
+ scale_color_manual(values=model_palette)
+ scale_alpha_manual(values={True: 1.0, False: 0.22}, guide='none')
+ labs(title='Primary ranking: average daily Sharpe',
x='',
y='Average daily Sharpe with uncertainty band',
color='Model family',
)
+ plot_size(980, 420)
)The key point is not that one target dominates; it is that Trend t-stat leads while several alternatives remain inside a practical tie band.
The lead over MAE-penalized return is small in both model families. Will guide our decision path, but not a final target selection.
(
ggplot(winner_summary_frame)
+ geom_segment(aes(x='runner_up_daily_sharpe_mean',
xend='best_daily_sharpe_mean', y='model_label',
yend='model_label'
),
color='#8d99ae',
size=4,
alpha=0.8,
)
+ geom_point(aes(x='runner_up_daily_sharpe_mean', y='model_label'), color='#b56576',
size=4
)
+ geom_point(aes(x='best_daily_sharpe_mean',
y='model_label'
),
color='#2a9d8f', size=4
)
+ labs(title='Gap between the best and second-best target',
x='Mean daily Sharpe',
y='',
)
+ plot_size(980, 220)
)MAE-penalized return is the runner up, and the small gap behind first place is why we include it in downstream experiments despite not being the mean-Sharpe leader.
The shifted regimes do not erase the edge, but they change which targets look resilient. We should focus on shortlist behavior, not all ten labels.
(
ggplot(shift_plot_data.filter(pl.col('method_label').is_in(SHORTLIST_LABELS)),
aes(x='eval_label', y='daily_sharpe_mean', group='method_label', color='method_label')
)
+ geom_hline(yintercept=0.0,
linetype='dashed', color='#b0b7c3'
)
+ geom_line(size=1.0, alpha=0.85)
+ geom_point(size=2.4
)
+ facet_grid(y='model_label')
+ facet_separator_theme
+ scale_color_manual(values=method_palette)
+ labs(title='Stable-to-shifted Sharpe sensitivity',
x='Evaluation slice',
y='Mean daily Sharpe',
color='Target',
)
+ plot_size(980, 420)
)This is a stress check: targets that only work in stable slices are not ideal and should be demoted.
Targets change trading behavior as much as forecast accuracy. Up/right is better; bubble size shows participation. Tradeability score is the clearest low-turnover style outlier.
(
ggplot(
risk_return_plot_data.filter(pl.col('method_label').is_in(SHORTLIST_LABELS)),
aes(x='mean_daily_gross_pnl_bps_mean', y='daily_sharpe_mean', size='trade_rate_mean'),
)
+ geom_hline(yintercept=0.0, linetype='dashed', color='#b0b7c3')
+ geom_vline(xintercept=0.0, linetype='dashed',
color='#b0b7c3'
)
+ geom_point(aes(color='method_label'), alpha=0.82
)
+ geom_text(aes(label='method_label', color='method_label'),
size=7, nudge_y=0.00035, show_legend=False
)
+ facet_grid(y='model_label')
+ facet_separator_theme
+ scale_color_manual(values=method_palette)
+ labs(title='Trading outcome map: return, Sharpe, and trade rate',
subtitle='Up/right is better; larger bubbles trade more often',
x='Mean daily gross PnL (bps)',
y='Average daily Sharpe',
size='Mean trade rate',
color='Target',
)
+ plot_size(980, 600)
)This chart explains the style trade-off: high participation can raise gross PnL but also increases cost and fill sensitivity.
Gross Sharpe is not enough. Lower-turnover alternatives become more competitive as round-trip costs rise, which is why MAE-penalized return remains on the shortlist.
COST_LEVELS_BPS = [0.0, 0.25, 0.50, 1.00, 2.00]
cost_rows = []
for cost_bps in COST_LEVELS_BPS:
cost_frame = trade_day_metrics.filter(pl.col('model_family') != 'benchmark').with_columns(
(pl.col('gross_pnl_bps') - pl.lit(cost_bps) * pl.col('trade_flag').cast(pl.Float64)).alias('net_pnl_bps')
)
for keys, part in cost_frame.partition_by(
['run_id', 'scenario_name', 'scenario_kind', 'seed', 'model_family', 'method_name'], as_dict=True
).items():
run_id, scenario_name, scenario_kind, seed, model_family, method_name = keys
pnl = part['net_pnl_bps'].to_numpy()
cost_rows.append({
'round_trip_cost_bps': cost_bps,
'run_id': run_id,
'scenario_name': scenario_name,
'scenario_kind': scenario_kind,
'seed': seed,
'model_family': model_family,
'method_name': method_name,
'net_daily_sharpe': sharpe_ratio(pnl),
'mean_daily_net_pnl_bps': finite_mean(pnl),
'trade_rate': float(part['trade_flag'].cast(pl.Float64).mean()),
}
)
cost_sensitivity_run_table = pl.DataFrame(cost_rows)
cost_sensitivity_table = (
cost_sensitivity_run_table
.group_by(['round_trip_cost_bps', 'model_family', 'method_name'])
.agg([
pl.col('net_daily_sharpe').filter(pl.col('net_daily_sharpe').is_finite()).mean().alias('net_daily_sharpe_mean'),
pl.mean('mean_daily_net_pnl_bps').alias('mean_daily_net_pnl_bps_mean'),
pl.mean('trade_rate').alias('trade_rate_mean'),
(pl.col('net_daily_sharpe').filter(pl.col('net_daily_sharpe').is_finite()) > 0.0).mean().alias(
'positive_net_sharpe_run_rate'
),
]
)
.with_columns(
pl.col('net_daily_sharpe_mean').rank('ordinal', descending=True).over(
['round_trip_cost_bps', 'model_family']
).alias('cost_rank')
)
.sort(['model_family', 'round_trip_cost_bps', 'cost_rank'])
)
cost_winner_table = cost_sensitivity_table.filter(pl.col('cost_rank') <= 3)
cost_plot_data = relabel_frame(cost_sensitivity_table)cost_leader_display = (
cost_sensitivity_table
.filter(pl.col('cost_rank') == 1)
.sort(['model_family', 'round_trip_cost_bps'])
)
relabel_frame(cost_leader_display).select([
pl.col('round_trip_cost_bps').alias('Cost bps'),
'model_label',
'method_label',
pl.col('net_daily_sharpe_mean').round(4).alias('net Sharpe'),
pl.col('mean_daily_net_pnl_bps_mean').round(3).alias('net PnL bps'),
pl.col('trade_rate_mean').round(3).alias('trade rate'),
])| Loading ITables v2.7.3 from the internet... (need help?) |
At higher cost assumptions, lower-turnover targets become more competitive. This supports keeping MAE-penalized return as the main challenger.
The evidence supports a shortlist, not a permanent target. Trend t-stat is the best current default because it leads mean daily Sharpe for both model families. MAE-penalized return is the most important challenger because it is close on Sharpe, trades less, and explicitly penalizes adverse path pain. Barrier reward remains useful as an event-time alternative. Tradeability score is better interpreted as a conservative gating lens than as the primary alpha label.
Future research should test this shortlist under net costs, realistic execution assumptions, policy variants, and non-synthetic data.
These results should not be over-generalized.
Carry forward Trend t-stat, MAE-penalized return, and Barrier reward. Use Tradeability score as a conservative gating benchmark. Do not select a permanent target until the shortlist survives net costs, execution assumptions, policy sensitivity, shifted scenarios, and non-synthetic validation.
Includes technical validation, simulator diagnostics, representative examples, and exact target definitions
These figures and tables document that the synthetic test grid, walk-forward validation, and label checks behaved as intended.
ggplot(scenario_metric_frame,
aes(x='metric_label', y='scenario_label', fill='value')
) + geom_tile() + scale_fill_gradient2(low='#4c78a8',
mid='#f6f7f9',
high='#d1495b',
midpoint=1.0
) + labs(
title='How shifted regimes become harder than stable regimes',
x='',
y='Scenario',
fill='Value',
) + theme(axis_text_x=element_text(angle=35, hjust=1)) + plot_size(1100, 320)signal_fidelity_frame = (
panel_summary.group_by(['scenario_name', 'scenario_kind']).agg([
pl.mean('feature_to_oracle_corr').alias('mean_corr'),
pl.min('feature_to_oracle_corr').alias('min_corr'),
pl.max('feature_to_oracle_corr').alias('max_corr'),
]
)
.with_columns(pl.col('scenario_name').str.replace_all('_', ' ').alias('scenario_label'))
.sort('mean_corr', descending=True)
)
ggplot(signal_fidelity_frame,
aes(x='scenario_label', y='mean_corr', color='scenario_kind')
) + geom_segment(
aes(x='scenario_label', xend='scenario_label', y='min_corr', yend='max_corr', color='scenario_kind'),
size=2.6,
alpha=0.45,
) + geom_point(size=4) + coord_flip() + scale_color_manual(values={'stable': '#2e8b57', 'shifted': '#b24c63'}) + labs(
title='How well the key observable feature tracks the hidden signal',
x='',
y='Feature-to-hidden-signal correlation',
color='Scenario kind',
) + plot_size(920, 260)These checks are included for auditability.
| Loading ITables v2.7.3 from the internet... (need help?) |
fold_timeline_rows = []
for row in reference_fold_meta.iter_rows(named=True):
fold_label = f"Fold {row['fold']}"
train_end = int(row['train_days'])
embargo_end = train_end + int(row['business_day_gap'])
val_end = embargo_end + int(row['val_days'])
fold_timeline_rows.extend([
{
'fold_label': fold_label,
'segment': 'Train',
'start_idx': 0,
'end_idx': train_end,
},
{
'fold_label': fold_label,
'segment': 'Embargo',
'start_idx': train_end,
'end_idx': embargo_end,
},
{
'fold_label': fold_label,
'segment': 'Validation',
'start_idx': embargo_end,
'end_idx': val_end,
},
]
)
fold_timeline_frame = pl.DataFrame(fold_timeline_rows)
ggplot(fold_timeline_frame) + geom_segment(
aes(x='start_idx', xend='end_idx', y='fold_label', yend='fold_label', color='segment'),
size=10
) + scale_color_manual(values={'Train': '#355070', 'Embargo': '#d08c60', 'Validation': '#4c956c'}) + labs(
title='Walk-forward validation layout with 5-day buffer',
x='Relative day from run start',
y='',
color='Segment',
) + plot_size(980, 260)| Loading ITables v2.7.3 from the internet... (need help?) |
These examples help build intuition for how the target labels and trade decisions behave in one run. They are not robustness evidence and can be excluded from the non-technical report.
(
ggplot(equity_curve_plot_data,
aes(x='day_idx', y='cumulative_gross_pnl_bps', color='method_label')
)
+ geom_vline(xintercept=representative_shift_start - 0.5, linetype='dashed', color='#5c677d')
+ geom_hline(yintercept=0.0,
linetype='dashed',
color='#b0b7c3'
)
+ geom_line(size=1.0, alpha=0.82)
+ scale_color_manual(values=method_palette)
+ labs(title='Representative run: cumulative gross PnL',
x='Synthetic day',
y='Cumulative gross PnL (bps)',
color='Target',
)
+ plot_size(980, 420)
)(
ggplot(trade_journal_plot_data, aes(x='day_idx', y='gross_pnl_bps'))
+ geom_vline(xintercept=representative_shift_start - 0.5, linetype='dashed', color='#5c677d')
+ geom_hline(yintercept=0.0,
linetype='dashed',
color='#b0b7c3'
)
+ geom_point(aes(size='position_size', color='phase_label'), alpha=0.78)
+ facet_grid(y='method_label',
scales='free_y'
)
+ facet_separator_theme
+ scale_color_manual(values=phase_palette)
+ labs(title='Representative run: trade decisions and outcomes',
x='Synthetic day',
y='Gross PnL (bps)',
size='Position size',
color='Phase',
)
+ plot_size(980, 420)
)These diagnostics help explain the result surface but should not be treated as the main decision rule.
This diagnostics are for analyst review.
(
ggplot(seed_budget_curve_table,
aes(x='seed_budget', y='daily_sharpe_mean', group='method_label', color='method_label')
)
+ geom_hline(yintercept=0.0, linetype='dashed', color='#b0b7c3')
+ geom_line(size=1.0, alpha=0.9)
+ geom_point(size=2.4)
+ facet_grid(y='model_label')
+ facet_separator_theme
+ scale_color_manual(values=method_palette)
+ labs(title='Seed-budget robustness of target rankings',
x='Seeds included',
y='Average daily Sharpe',
color='Target')
+ plot_size(980, 420)
)This is a Monte Carlo stability check to support analyst confidence.
(
ggplot(stability_plot_data,
aes(x='model_label', y='method_label', fill='win_rate')
)
+ geom_tile()
+ scale_y_discrete(limits=method_order)
+ scale_fill_gradient(low='#edf2f7', high='#355070')
+ labs(title='Winner stability across runs',
x='',
y='Training target',
fill='Win share',
)
+ plot_size(760, 340)
)Run-level win share is useful as a robustness check, but low win share alone does not disqualify a target with lower turnover or drawdown.
(
ggplot(pnl_plot_data,
aes(x='method_label', y='mean_daily_gross_pnl_bps_mean', color='model_label')
)
+ geom_hline(yintercept=0.0,
linetype='dashed',
color='#b0b7c3'
)
+ geom_segment(aes(x='method_label',
xend='method_label',
y='mean_daily_gross_pnl_bps_ci_low',
yend='mean_daily_gross_pnl_bps_ci_high',
color='model_label',
),
size=1.5,
alpha=0.65
)
+ geom_point(size=3.2)
+ facet_grid(y='model_label')
+ facet_separator_theme
+ coord_flip()
+ scale_x_discrete(limits=method_order)
+ scale_color_manual(values=model_palette)
+ labs(title='Secondary ranking: mean daily gross PnL',
x='',
y='Mean daily gross PnL (bps) with uncertainty band',
color='Model family'
)
+ plot_size(980, 420)
)Mean PnL confirms economic direction but should not override weak Sharpe separation.
PRACTICAL_SHARPE_TIE = 0.005
PRACTICAL_PNL_TIE_BPS = 0.25
run_level_extra = (
model_run_summary
.filter(pl.col('eval_slice') == 'all')
.group_by(['model_family', 'method_name'])
.agg([
(pl.col('daily_sharpe') > 0.0).mean().alias('positive_sharpe_run_rate'),
(pl.col('mean_daily_gross_pnl_bps') > 0.0).mean().alias('positive_pnl_run_rate'),
pl.mean('max_drawdown_bps').alias('max_drawdown_bps_mean'),
pl.median('max_drawdown_bps').alias('max_drawdown_bps_median'),
pl.mean('trade_hit_rate').alias('trade_hit_rate_mean'),
pl.mean('avg_trade_size').alias('avg_trade_size_mean'),
]
)
)
base_shift_summary = (
aggregate_summary
.filter(pl.col('eval_slice').is_in(['base', 'shifted']))
.select([
'model_family',
'method_name',
'eval_slice',
'daily_sharpe_mean',
'mean_daily_gross_pnl_bps_mean',
]
)
.pivot(
values=['daily_sharpe_mean', 'mean_daily_gross_pnl_bps_mean'],
index=['model_family', 'method_name'],
on='eval_slice',
)
.rename({
'daily_sharpe_mean_base': 'base_daily_sharpe_mean',
'daily_sharpe_mean_shifted': 'shifted_daily_sharpe_mean',
'mean_daily_gross_pnl_bps_mean_base': 'base_mean_daily_gross_pnl_bps',
'mean_daily_gross_pnl_bps_mean_shifted': 'shifted_mean_daily_gross_pnl_bps',
}
)
.with_columns([
(pl.col('shifted_daily_sharpe_mean') - pl.col('base_daily_sharpe_mean')).alias('shift_minus_base_sharpe'),
(pl.col('shifted_mean_daily_gross_pnl_bps') - pl.col('base_mean_daily_gross_pnl_bps')).alias(
'shift_minus_base_pnl_bps'
),
]
)
)
robust_ranking_table = (
aggregate_summary
.filter((pl.col('eval_slice') == 'all') & pl.col('daily_sharpe_mean').is_finite())
.select([
'model_family',
'method_name',
'daily_sharpe_mean',
'daily_sharpe_ci_low',
'daily_sharpe_ci_high',
'mean_daily_gross_pnl_bps_mean',
'trade_rate_mean',
'mean_rank',
'win_rate',
]
)
.join(run_level_extra, on=['model_family', 'method_name'], how='left')
.join(base_shift_summary, on=['model_family', 'method_name'], how='left')
.with_columns([
pl.col('daily_sharpe_mean').max().over('model_family').alias('family_best_daily_sharpe_mean'),
pl.col('mean_daily_gross_pnl_bps_mean').max().over('model_family').alias('family_best_pnl_mean'),
]
)
.with_columns([
(pl.col('family_best_daily_sharpe_mean') - pl.col('daily_sharpe_mean')).alias('sharpe_gap_to_family_best'),
(pl.col('family_best_pnl_mean') - pl.col('mean_daily_gross_pnl_bps_mean')).alias('pnl_gap_to_family_best_bps'),
]
)
.with_columns([
(pl.col('sharpe_gap_to_family_best') <= PRACTICAL_SHARPE_TIE).alias('inside_practical_sharpe_tie'),
(pl.col('pnl_gap_to_family_best_bps') <= PRACTICAL_PNL_TIE_BPS).alias('inside_practical_pnl_tie'),
pl.col('daily_sharpe_mean').rank('ordinal', descending=True).over('model_family').alias('mean_sharpe_rank'),
pl.col('shifted_daily_sharpe_mean').rank('ordinal', descending=True).over('model_family').alias(
'shifted_sharpe_rank'
),
pl.col('positive_sharpe_run_rate').rank('ordinal', descending=True).over('model_family').alias(
'positive_run_rank'
),
pl.col('max_drawdown_bps_mean').rank('ordinal').over('model_family').alias('drawdown_rank'),
]
)
.with_columns(
(
pl.col('mean_sharpe_rank')
+ pl.col('shifted_sharpe_rank')
+ pl.col('positive_run_rank')
+ pl.col('drawdown_rank')
).alias('robust_rank_score')
)
.sort(['model_family', 'robust_rank_score', 'daily_sharpe_mean'], descending=[False, False, True])
)
robust_shortlist_table = (
robust_ranking_table
.filter(
pl.col('inside_practical_sharpe_tie')
| (pl.col('shifted_sharpe_rank') <= 2)
| (pl.col('positive_run_rank') <= 2)
| (pl.col('drawdown_rank') <= 2)
)
.sort(['model_family', 'robust_rank_score', 'daily_sharpe_mean'], descending=[False, False, True])
)
robust_ranking_plot_data = relabel_frame(robust_ranking_table)
robust_shortlist_display = relabel_frame(robust_shortlist_table)robust_shortlist_display.select([
'model_label',
'method_label',
pl.col('daily_sharpe_mean').round(4).alias('mean Sharpe'),
pl.col('shifted_daily_sharpe_mean').round(4).alias('shifted Sharpe'),
pl.col('mean_daily_gross_pnl_bps_mean').round(3).alias('mean PnL bps'),
pl.col('trade_rate_mean').round(3).alias('trade rate'),
pl.col('positive_sharpe_run_rate').round(3).alias('positive Sharpe run rate'),
pl.col('max_drawdown_bps_mean').round(1).alias('mean max drawdown bps'),
'inside_practical_sharpe_tie',
'robust_rank_score',
]
)| Loading ITables v2.7.3 from the internet... (need help?) |
Interpret robust_rank_score as a heuristic screen, not as a formal utility function. The score uses equal-weight ordinal ranks across mean Sharpe, shifted Sharpe, positive-run rate, and drawdown.
def summarize_policy_frame(policy_frame: pl.DataFrame, policy_name: str) -> list[dict[str, object]]:
rows = []
for keys, part in policy_frame.partition_by(
['run_id', 'scenario_name', 'scenario_kind', 'seed', 'model_family', 'method_name'], as_dict=True
).items():
run_id, scenario_name, scenario_kind, seed, model_family, method_name = keys
pnl = part['policy_pnl_bps'].to_numpy()
size = part['policy_position_size'].to_numpy()
trade_flag = size > 0.0
rows.append({
'policy_name': policy_name,
'run_id': run_id,
'scenario_name': scenario_name,
'scenario_kind': scenario_kind,
'seed': seed,
'model_family': model_family,
'method_name': method_name,
'daily_sharpe': sharpe_ratio(pnl),
'mean_daily_pnl_bps': finite_mean(pnl),
'trade_rate': float(np.mean(trade_flag)),
'avg_trade_size': float(np.mean(size[trade_flag])) if np.any(trade_flag) else 0.0,
'trade_hit_rate': float(np.mean(pnl[trade_flag] > 0.0)) if np.any(trade_flag) else float('nan'),
}
)
return rows
policy_base = trade_day_metrics.filter(pl.col('model_family') != 'benchmark')
current_policy_frame = policy_base.select([
'run_id', 'scenario_name', 'scenario_kind', 'seed', 'model_family', 'method_name',
pl.col('position_size').alias('policy_position_size'),
pl.col('gross_pnl_bps').alias('policy_pnl_bps'),
]
)
binary_policy_frame = policy_base.with_columns(
pl.when(pl.col('prediction') > 0.0).then(1.0).otherwise(0.0).alias('policy_position_size')
).with_columns(
(pl.col('policy_position_size') * pl.col('realized_return_h60_bps')).alias('policy_pnl_bps')
).select([
'run_id', 'scenario_name', 'scenario_kind', 'seed', 'model_family', 'method_name',
'policy_position_size', 'policy_pnl_bps',
]
)
positive_prediction_cutoffs = (
policy_base
.filter(pl.col('prediction') > 0.0)
.group_by(['run_id', 'model_family', 'method_name'])
.agg(pl.col('prediction').quantile(0.50).alias('positive_prediction_median'))
)
selective_policy_frame = policy_base.join(
positive_prediction_cutoffs,
on=['run_id', 'model_family', 'method_name'],
how='left',
).with_columns(
pl.when((pl.col('prediction') > 0.0) & (pl.col('prediction') >= pl.col('positive_prediction_median')))
.then(1.0)
.otherwise(0.0)
.alias('policy_position_size')
).with_columns(
(pl.col('policy_position_size') * pl.col('realized_return_h60_bps')).alias('policy_pnl_bps')
).select([
'run_id', 'scenario_name', 'scenario_kind', 'seed', 'model_family', 'method_name',
'policy_position_size', 'policy_pnl_bps',
]
)
policy_sensitivity_run_table = pl.DataFrame(
summarize_policy_frame(current_policy_frame, 'current scaled positive-score')
+ summarize_policy_frame(binary_policy_frame, 'binary positive-score')
+ summarize_policy_frame(selective_policy_frame, 'top-half positive-score')
)
policy_sensitivity_table = (
policy_sensitivity_run_table
.group_by(['policy_name', 'model_family', 'method_name'])
.agg([
pl.col('daily_sharpe').filter(pl.col('daily_sharpe').is_finite()).mean().alias('daily_sharpe_mean'),
pl.mean('mean_daily_pnl_bps').alias('mean_daily_pnl_bps_mean'),
pl.mean('trade_rate').alias('trade_rate_mean'),
(pl.col('daily_sharpe').filter(pl.col('daily_sharpe').is_finite()) > 0.0).mean().alias(
'positive_sharpe_run_rate'
),
]
)
.with_columns(
pl.col('daily_sharpe_mean').rank('ordinal', descending=True).over(['policy_name', 'model_family']).alias(
'policy_rank'
)
)
.sort(['model_family', 'policy_name', 'policy_rank'])
)
policy_winner_table = policy_sensitivity_table.filter(pl.col('policy_rank') <= 3)
policy_plot_data = relabel_frame(policy_sensitivity_table)relabel_frame(policy_winner_table).select([
'policy_name',
'model_label',
'method_label',
pl.col('daily_sharpe_mean').round(4).alias('mean Sharpe'),
pl.col('mean_daily_pnl_bps_mean').round(3).alias('mean PnL bps'),
pl.col('trade_rate_mean').round(3).alias('trade rate'),
pl.col('positive_sharpe_run_rate').round(3).alias('positive run rate'),
'policy_rank',
]
)| Loading ITables v2.7.3 from the internet... (need help?) |
Policy changes can alter which target is most attractive. This is a required promotion check for the shortlist.
scenario_robustness_table = (
model_run_summary
.filter((pl.col('eval_slice') == 'all') & (pl.col('model_family') != 'benchmark'))
.group_by(['model_family', 'method_name', 'scenario_name', 'scenario_kind'])
.agg([
pl.mean('daily_sharpe').alias('daily_sharpe_mean'),
pl.mean('mean_daily_gross_pnl_bps').alias('mean_daily_gross_pnl_bps_mean'),
(pl.col('daily_sharpe') > 0.0).mean().alias('positive_sharpe_run_rate'),
]
)
.sort(['model_family', 'method_name', 'scenario_name'])
)
scenario_worst_case_table = (
scenario_robustness_table
.group_by(['model_family', 'method_name'])
.agg([
pl.min('daily_sharpe_mean').alias('worst_scenario_daily_sharpe_mean'),
pl.mean('daily_sharpe_mean').alias('mean_scenario_daily_sharpe_mean'),
pl.std('daily_sharpe_mean').alias('scenario_daily_sharpe_std'),
]
)
.with_columns(
pl.when(pl.col('worst_scenario_daily_sharpe_mean').is_finite())
.then(pl.col('worst_scenario_daily_sharpe_mean').rank('ordinal', descending=True).over('model_family'))
.otherwise(None)
.alias('worst_case_rank')
)
.sort(['model_family', 'worst_case_rank'])
)
def eta_squared(frame: pl.DataFrame, group_col: str, metric_col: str) -> float:
subset = (
frame
.select([group_col, metric_col])
.filter(pl.col(metric_col).is_finite())
.drop_nulls()
)
if subset.is_empty():
return float('nan')
values = subset[metric_col].to_numpy()
grand_mean = float(np.mean(values))
total = float(np.dot(values - grand_mean, values - grand_mean))
if total <= 0.0:
return float('nan')
group_summary = subset.group_by(group_col).agg([
pl.len().alias('n'),
pl.mean(metric_col).alias('group_mean'),
]
)
between = float(np.sum(
group_summary['n'].to_numpy()
* (group_summary['group_mean'].to_numpy() - grand_mean) ** 2
)
)
return between / total
variance_rows = []
variance_frame = model_run_summary.filter((pl.col('eval_slice') == 'all') & (pl.col('model_family') != 'benchmark'))
for model_family in MODEL_FAMILIES:
part = variance_frame.filter(pl.col('model_family') == model_family)
for factor_col, factor_label in [
('method_name', 'Target'),
('scenario_name', 'Scenario'),
('seed', 'Seed'),
]:
variance_rows.append({
'model_family': model_family,
'factor': factor_label,
'eta_squared_daily_sharpe': eta_squared(part, factor_col, 'daily_sharpe'),
}
)
variance_decomposition_table = pl.DataFrame(variance_rows).sort(['model_family', 'eta_squared_daily_sharpe'],
descending=[False, True]
)
scenario_plot_data = relabel_frame(scenario_robustness_table)
scenario_worst_case_display = relabel_frame(scenario_worst_case_table)
variance_plot_data = relabel_frame(variance_decomposition_table)scenario_worst_case_display.filter(
pl.col('worst_scenario_daily_sharpe_mean').is_finite()
& pl.col('mean_scenario_daily_sharpe_mean').is_finite()
& pl.col('scenario_daily_sharpe_std').is_finite()
).select([
'model_label',
'method_label',
pl.col('worst_scenario_daily_sharpe_mean').round(4).alias('worst scenario Sharpe'),
pl.col('mean_scenario_daily_sharpe_mean').round(4).alias('mean scenario Sharpe'),
pl.col('scenario_daily_sharpe_std').round(4).alias('scenario Sharpe std'),
'worst_case_rank',
]
)| Loading ITables v2.7.3 from the internet... (need help?) |
(
ggplot(
variance_plot_data.filter(pl.col('eta_squared_daily_sharpe').is_finite()),
aes(x='factor', y='eta_squared_daily_sharpe', fill='factor'),
)
+ geom_bar(stat='identity', width=0.72)
+ coord_flip()
+ facet_wrap('model_label', ncol=1)
+ facet_separator_theme
+ labs(title='What Drives Run-to-Run Sharpe Variation?',
subtitle='One-way eta-squared by model family; larger bars explain more Sharpe dispersion',
x='Factor',
y='Share of Sharpe variance',
fill='Factor',
)
+ scale_fill_manual(values={'Target': '#54a24b',
'Scenario': '#e45756',
'Seed': '#4c78a8',
})
+ plot_size(420, 280)
)Rows with undefined scenario Sharpe are excluded from ranking interpretation. The variance decomposition is a secondary diagnostic and should not override the direct shifted-regime read.
target_corr_rows = []
for left, right in product(TARGET_COLS, TARGET_COLS):
left_values = reference_frame[left].to_numpy()
right_values = reference_frame[right].to_numpy()
target_corr_rows.append({
'left_method': left,
'right_method': right,
'spearman_corr': rank_ic(left_values, right_values),
}
)
target_correlation_table = pl.DataFrame(target_corr_rows).with_columns([
pl.col('left_method').replace(method_labels).alias('left_label'),
pl.col('right_method').replace(method_labels).alias('right_label'),
]
)
target_redundancy_table = (
target_correlation_table
.filter(pl.col('left_method') < pl.col('right_method'))
.with_columns(pl.col('spearman_corr').abs().alias('abs_spearman_corr'))
.sort('abs_spearman_corr', descending=True)
)
target_corr_plot_data = target_correlation_table| Loading ITables v2.7.3 from the internet... (need help?) |
The high correlations is why main focus is on target families rather than the independent targets.
mae_lambda_grid = [0.0, 0.25, 0.50, 0.75, 1.00]
tradeability_threshold_grid = [-0.0020, 0.0, 0.0020]
tradeability_mae_cap_grid = [0.0020, CONFIG['tradeability_mae_cap'], 0.0060]
reference_terminal_log_return = np.log1p(reference_frame['realized_return_h60'].to_numpy())
reference_mae = reference_frame['max_adverse_excursion_h60'].to_numpy()
reference_fixed_target = reference_frame['target_fixed_rate_h60'].to_numpy()
reference_tradeability_target = reference_frame['target_tradeability_score'].to_numpy()
parameter_rows = []
for penalty_lambda in mae_lambda_grid:
candidate = (reference_terminal_log_return - penalty_lambda * reference_mae) / CONFIG['primary_horizon']
parameter_rows.append({
'target_family': 'MAE-penalized return',
'parameter_label': f'lambda={penalty_lambda:.2f}',
'spearman_to_current_target': rank_ic(reference_frame['target_mae_penalized_rate_h60'].to_numpy(), candidate),
'spearman_to_fixed_return': rank_ic(reference_fixed_target, candidate),
'mean_value': finite_mean(candidate),
'std_value': finite_std(candidate),
}
)
for return_threshold in tradeability_threshold_grid:
for mae_cap in tradeability_mae_cap_grid:
candidate = np.asarray([
tradeability_score(
terminal_log_return=float(ret),
max_adverse_excursion_value=float(mae),
return_threshold=return_threshold,
return_temp=CONFIG['tradeability_return_temp'],
mae_cap=mae_cap,
mae_temp=CONFIG['tradeability_mae_temp'],
)
for ret, mae in zip(reference_terminal_log_return, reference_mae)
]
)
parameter_rows.append({
'target_family': 'Tradeability score',
'parameter_label': f'ret_thr={return_threshold:.4f}; mae_cap={mae_cap:.4f}',
'spearman_to_current_target': rank_ic(reference_tradeability_target, candidate),
'spearman_to_fixed_return': rank_ic(reference_fixed_target, candidate),
'mean_value': finite_mean(candidate),
'std_value': finite_std(candidate),
}
)
target_parameter_sensitivity_table = pl.DataFrame(parameter_rows).sort(['target_family', 'parameter_label'])
parameter_plot_data = target_parameter_sensitivity_tabletarget_parameter_sensitivity_table.select([
'target_family',
'parameter_label',
pl.col('spearman_to_current_target').round(3).alias('corr to current target'),
pl.col('spearman_to_fixed_return').round(3).alias('corr to fixed return'),
pl.col('mean_value').round(5).alias('mean value'),
pl.col('std_value').round(5).alias('std value'),
]
)| Loading ITables v2.7.3 from the internet... (need help?) |
Nearby parameter choices do not materially change rank ordering for the tested MAE and tradeability variants.
(
ggplot(oracle_plot_data, aes(x='scenario_kind_label', y='value', group='metric_label'))
+ geom_line(color='#8d99ae', size=0.9)
+ geom_point(aes(color='scenario_kind_label'), size=3.6)
+ facet_grid(y='metric_label', scales='free_y')
+ facet_separator_theme
+ scale_color_manual(values={'Stable': '#2e8b57', 'Shifted': '#b24c63'})
+ labs(title='Oracle benchmark degradation in shifted regimes',
x='',
y='Value',
color='Scenario kind', )
+ plot_size(880, 300)
)Question: Do targets with better row-level prediction quality also become better trading targets?
Takeaway: Not reliably. Prediction IC ranks whether scores order realized 60-minute returns correctly; trading Sharpe ranks the deployed long/skip sizing policy. Points above the diagonal monetize better than their prediction rank suggests, while points below it have prediction quality that translates less cleanly into the trading rule.
(
ggplot(diagnostic_rank_plot_data,
aes(x='prediction_ic_rank', y='trading_sharpe_rank', size='mean_trade_rate'))
+ geom_abline(slope=1.0, intercept=0.0, linetype='dashed', color='#8d99ae')
+ geom_point(aes(color='method_label'), alpha=0.82)
+ geom_text(aes(label='method_label', color='method_label'), size=7, nudge_y=-0.18, show_legend=False)
+ facet_grid(y='model_label')
+ facet_separator_theme
+ scale_x_reverse(breaks=list(range(1, len(TARGET_COLS) + 1)))
+ scale_y_reverse(breaks=list(range(1, len(TARGET_COLS) + 1)))
+ scale_color_manual(values=method_palette)
+ labs(title='Prediction rank versus trading rank',
subtitle='Trainable targets only; up/right is better, diagonal means prediction rank equals trading rank',
x='Prediction IC rank',
y='Trading Sharpe rank',
size='Mean trade rate',
color='Target')
+ plot_size(980, 500)
)(
ggplot(paired_plot_data,
aes(x='challenger_label', y='sharpe_diff_mean', color='model_label'))
+ geom_hline(yintercept=0.0,
linetype='dashed',
color='#b0b7c3'
)
+ geom_segment(aes(x='challenger_label',
xend='challenger_label', y='sharpe_diff_ci_low', yend='sharpe_diff_ci_high',
color='model_label'
), size=1.5,
alpha=0.65)
+ geom_point(size=3.2)
+ facet_grid(y='model_label')
+ facet_separator_theme
+ coord_flip()
+ scale_x_discrete(limits=method_order)
+ scale_color_manual(values=model_palette)
+ labs(title='Sharpe difference between the leader and alternatives',
x='Challenger target',
y='Sharpe difference with uncertainty band',
color='Model family',
)
+ plot_size(980, 420)
)(
ggplot(run_tail_plot_data,
aes(x='method_label', y='median_run_daily_sharpe', color='model_label')
)
+ geom_hline(yintercept=0.0,
linetype='dashed',
color='#b0b7c3'
)
+ geom_segment(aes(x='method_label',
xend='method_label', y='worst_run_daily_sharpe', yend='median_run_daily_sharpe',
color='model_label'
),
size=1.8,
alpha=0.7,
)
+ geom_point(size=3.2)
+ geom_point(aes(y='worst_run_daily_sharpe'), size=2.2, alpha=0.55)
+ facet_grid(y='model_label')
+ facet_separator_theme
+ coord_flip()
+ scale_x_discrete(limits=method_order)
+ scale_color_manual(values=model_palette)
+ labs(title='Downside tail of run-level Sharpe',
x='',
y='Daily Sharpe',
color='Model family',
)
+ plot_size(980, 420)
)(
ggplot(degenerate_strategy_plot_data,
aes(x='method_label', y='nan_sharpe_run_rate', fill='model_label'), )
+ geom_bar(stat='identity', position='dodge')
+ coord_flip()
+ scale_x_discrete(limits=method_order)
+ labs(title='Undefined-Sharpe incidence by target',
x='',
y='Share of runs with undefined daily Sharpe',
fill='Model family'
)
+ plot_size(920, 420)
)Formula-level details included here for reproducibility.
Implemented with the same TP/SL/horizon grid as target_barrier_rate and a single decay parameter tau=15. Each grid element scores +exp(-t_hit / tau) if take-profit hits first, -exp(-t_hit / tau) if stop-loss hits first, and 0 if neither hits.
Implemented as (cum_60[-1] - 0.5 * max_adverse_excursion_h60) / 60. This keeps the terminal-return signal while explicitly penalizing paths that hurt first.
Implemented as fixed 60m return rate divided by downside-only semivolatility with eps=1e-5 and symmetric clipping at 20. This makes downside-aware path quality explicit without allowing near-zero denominators to dominate.
Implemented as a smooth score that combines positive 60m terminal return with staying inside a pain budget. The current defaults use return_threshold=0, mae_cap=min(stop_losses), return_temp=0.002, and mae_temp=0.0015, then map the joint score to [-1, 1].