Target Engineering Study: Optimizing 60-Minute Intraday Trading Signals

Published

May 4, 2026

Modified

May 15, 2026

Experiment Summary: 60-Minute Target Selection

This notebook compares training targets for a single-ticker 09:30 signal that decides whether to take a long position through 10:30. The objective is not the best row-level label fit; it is the most useful out-of-sample trading signal under the same features, model families, and deployment rule.

Recommendation: carry a compact shortlist forward rather than selecting a permanent target from this synthetic study alone.

Role Target Current read
Default mean-Sharpe leader Trend t-stat Best average daily Sharpe for both Ridge and HistGBRT, but gaps are small
Robust / cost-aware alternative MAE-penalized return Close runner-up with lower trade rate and explicit path-pain control
Event-time alternative Barrier reward Useful when early actionable moves matter more than clean one-hour drift
Conservative lens Tradeability score Not a primary alpha target here; useful as a selective gating benchmark

The decision message is deliberately conservative: the evidence narrows target choice, but final promotion should require net-cost, policy, and shifted-regime survival on non-synthetic data.

Decision Rule and Scope

A target is better only if it improves the deployed trading rule after fitting the same features with the same model family. The rule scores the ticker at 09:30, skips non-positive scores, sizes positive scores up to 1.0, exits at 10:30, and measures gross PnL.

The primary metric is mean daily Sharpe of gross PnL. Mean daily PnL, trade rate, positive-run rate, drawdown, cost sensitivity, and shifted-regime behavior are supporting checks.

Scope limits: this is single-ticker, long/skip only, gross of transaction costs and execution uncertainty, and based on synthetic controlled regimes rather than live historical validation.

Code
%load_ext autoreload
%autoreload 2
Code
from lets_plot import *

LetsPlot.setup_html()
LetsPlot.set_theme(
    theme_minimal()
    + theme(
        axis_text=element_text(size=11),
        axis_title=element_text(size=12),
        plot_title=element_text(face='bold', size=14),
        legend_title=element_text(size=11),
        legend_text=element_text(size=10),
    )
)

facet_separator_theme = theme(
    panel_spacing=5.0,
    panel_border=element_rect(color='#c8ced8', size=5.0),
    panel_border_ontop=True,
)
Code
from __future__ import annotations

from datetime import date, datetime, time, timedelta
from itertools import product
from pathlib import Path

import numpy as np
import polars as pl
from scipy.stats import spearmanr
from sklearn.ensemble import HistGradientBoostingRegressor
from sklearn.linear_model import Ridge
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler

from trading_research.target_engineering import trend_scanning_tstat
from trading_research.time_aware_cross_validation import DateEmbargoSplit

from IPython.display import display, Markdown

pl.Config.set_tbl_rows(14)
pl.Config.set_tbl_cols(30)

def plot_size(width: int, height: int):
    return ggsize(width * 2, height * 2)
Code
from itables import init_notebook_mode

init_notebook_mode(all_interactive=True, connected=True)
Code
CONFIG = {
    'scored_days': 320,
    'warmup_days': 20,
    'session_minutes': 390,
    'primary_horizon': 60,
    'trend_min_horizon': 10,
    'trend_max_horizon': 60,
    'barrier_horizons': [15, 30, 60],
    'stop_losses': [0.004, 0.008, 0.012],
    'take_profits': [0.006, 0.010, 0.015],
    'profit_factor_eps': 1e-6,
    'competing_risk_tau': 15.0,
    'mae_penalty_lambda': 0.5,
    'downside_risk_eps': 1e-5,
    'downside_target_clip': 20.0,
    'tradeability_return_threshold': 0.0,
    'tradeability_return_temp': 0.0020,
    'tradeability_mae_temp': 0.0015,
    'n_splits': 8,
    'pre_embargo_days': 5,
    'min_train_days': 60,
    'bootstrap_reps': 2000,
    'sample_path_days': 8,
}
CONFIG['tradeability_mae_cap'] = min(CONFIG['stop_losses'])

SEEDS = [
    7, 11, 19, 23, 29, 31, 43, 47, 59, 71,
    83, 97, 101, 109, 127, 149, 163, 181, 211, 239,
    251, 263, 277, 293, 307, 331, 347, 359, 379, 397,
    419, 433, 449, 467, 487, 503, 521, 541, 563, 587,
    601, 613, 631, 647, 659, 673, 691, 709, 727, 739,
    757, 773, 787, 809, 827, 839, 853, 877, 887, 907,
    919, 937, 953, 967, 983, 997, 1013, 1031, 1049, 1061,
]
SEED_BUDGETS = [8, 16, 24, 40, 60]

SCENARIOS = [
    {
        'scenario_name': 'stable_balanced',
        'scenario_kind': 'stable',
        'event_scale': 1.00,
        'observability': 1.00,
        'vol_scale': 1.00,
        'jump_scale': 1.00,
        'decay_scale': 1.00,
        'carry_persistence': 0.72,
        'shift_start_frac': 0.70,
        'late_event_scale_mult': 1.00,
        'late_observability_mult': 1.00,
        'late_vol_mult': 1.00,
        'late_jump_mult': 1.00,
        'late_decay_mult': 1.00,
    },
    {
        'scenario_name': 'stable_noisy',
        'scenario_kind': 'stable',
        'event_scale': 0.90,
        'observability': 0.88,
        'vol_scale': 1.20,
        'jump_scale': 1.30,
        'decay_scale': 1.10,
        'carry_persistence': 0.68,
        'shift_start_frac': 0.70,
        'late_event_scale_mult': 1.00,
        'late_observability_mult': 1.00,
        'late_vol_mult': 1.00,
        'late_jump_mult': 1.00,
        'late_decay_mult': 1.00,
    },
    {
        'scenario_name': 'shift_signal_break',
        'scenario_kind': 'shifted',
        'event_scale': 1.00,
        'observability': 0.98,
        'vol_scale': 1.05,
        'jump_scale': 1.10,
        'decay_scale': 1.05,
        'carry_persistence': 0.72,
        'shift_start_frac': 0.58,
        'late_event_scale_mult': 0.62,
        'late_observability_mult': 0.58,
        'late_vol_mult': 1.20,
        'late_jump_mult': 1.35,
        'late_decay_mult': 1.45,
    },
    {
        'scenario_name': 'shift_jump_stress',
        'scenario_kind': 'shifted',
        'event_scale': 0.95,
        'observability': 0.92,
        'vol_scale': 1.10,
        'jump_scale': 1.20,
        'decay_scale': 1.10,
        'carry_persistence': 0.67,
        'shift_start_frac': 0.55,
        'late_event_scale_mult': 0.70,
        'late_observability_mult': 0.68,
        'late_vol_mult': 1.45,
        'late_jump_mult': 1.85,
        'late_decay_mult': 1.35,
    },
]

NEW_TARGET_COLS = [
    'target_competing_risk_hit',
    'target_mae_penalized_rate_h60',
    'target_downside_adj_rate_h60',
    'target_tradeability_score',
]

TARGET_COLS = [
    'target_fixed_rate_h60',
    'target_path_mean_rate_h60',
    'target_profit_factor_h60',
    'target_trend_slope',
    'target_trend_tstat',
    'target_barrier_rate',
    *NEW_TARGET_COLS,
]

MODEL_FAMILIES = ['ridge', 'hist_gbrt']

SHORTLIST_LABELS = ['Trend t-stat', 'MAE-penalized return', 'Barrier reward', 'Tradeability score']

CACHE_DIR = Path('data/eda_01_target_engineering_v4')
CACHE_DIR.mkdir(parents=True, exist_ok=True)


def cache_path(name: str) -> Path:
    return CACHE_DIR / f'{name}.parquet'


def load_cached_parquet(name: str) -> pl.DataFrame | None:
    path = cache_path(name)
    if not path.exists():
        return None
    try:
        print(f'Loading cache: {path}')
        return pl.read_parquet(path)
    except Exception as exc:
        print(f'Ignoring corrupt cache: {path} ({exc})')
        path.unlink(missing_ok=True)
        return None


def write_cached_parquet(df: pl.DataFrame, name: str) -> pl.DataFrame:
    path = cache_path(name)
    df.write_parquet(path, compression='zstd')
    print(f'Saved cache: {path}')
    return df
Code
target_method_reference = pl.DataFrame([
    {
        'method_name': 'target_fixed_rate_h60',
        'emphasis': 'Terminal move at 60m',
        'path_preference': 'Can tolerate noisy path if end result is strong',
        'strength': 'Directly aligned with a terminal-horizon objective',
        'risk': 'Can overvalue late reversals that look weak intrawindow',
    },
    {
        'method_name': 'target_path_mean_rate_h60',
        'emphasis': 'Average path quality through 60m',
        'path_preference': 'Rewards smoother follow-through',
        'strength': 'Less terminal-print dependent',
        'risk': 'May underweight sharp late accelerations',
    },
    {
        'method_name': 'target_profit_factor_h60',
        'emphasis': 'Upside bar mass vs downside bar mass',
        'path_preference': 'Prefers many or large up bars with limited down-bar drag',
        'strength': 'Captures minute-level path asymmetry without terminal-only dependence',
        'risk': 'Can overreward grind-up paths and underweight late convex breakouts',
    },
    {
        'method_name': 'target_trend_slope',
        'emphasis': 'Persistent directional drift',
        'path_preference': 'Prefers linear trend-like movement',
        'strength': 'Often helpful when edge decays smoothly',
        'risk': 'Can miss convex or fast-jump paths',
    },
    {
        'method_name': 'target_trend_tstat',
        'emphasis': 'Drift plus path cleanliness',
        'path_preference': 'Prefers lower-noise trends',
        'strength': 'Can suppress noisy false positives',
        'risk': 'Can underweight large but volatile opportunities',
    },
    {
        'method_name': 'target_barrier_rate',
        'emphasis': 'Early actionable path hits',
        'path_preference': 'Sensitive to fast take-profit / stop behavior',
        'strength': 'Captures timing asymmetry',
        'risk': 'Threshold choices can dominate behavior',
    },
    {
        'method_name': 'target_competing_risk_hit',
        'emphasis': 'Which barrier hits first and how fast',
        'path_preference': 'Prefers fast take-profit hits and punishes fast stop hits',
        'strength': 'Directly matches first-hit event timing',
        'risk': 'Barrier and decay choices can dominate behavior',
    },
    {
        'method_name': 'target_mae_penalized_rate_h60',
        'emphasis': 'Terminal return minus downside pain',
        'path_preference': 'Rewards winners that avoid deep early drawdown',
        'strength': 'Makes adverse excursion explicit in the label',
        'risk': 'Penalty weight can overpunish volatile winners',
    },
    {
        'method_name': 'target_downside_adj_rate_h60',
        'emphasis': 'Return per unit of downside path risk',
        'path_preference': 'Prefers upside earned with limited downside semivolatility',
        'strength': 'Makes downside-aware path quality explicit',
        'risk': 'Needs clipping when downside risk is near zero',
    },
    {
        'method_name': 'target_tradeability_score',
        'emphasis': 'Was the day worth trading at all?',
        'path_preference': 'Prefers positive outcomes that stay inside a pain budget',
        'strength': 'Closest to the trade-or-skip deployment rule',
        'risk': 'Threshold and smoothing choices shape the label',
    },
]
)

Targets at a Glance

Code
target_method_reference.select([
    pl.col('method_name').replace({
        'target_fixed_rate_h60': 'Fixed 60m return',
        'target_path_mean_rate_h60': 'Path mean return',
        'target_profit_factor_h60': 'Profit factor',
        'target_trend_slope': 'Trend slope',
        'target_trend_tstat': 'Trend t-stat',
        'target_barrier_rate': 'Barrier reward',
        'target_competing_risk_hit': 'Competing-risk hit',
        'target_mae_penalized_rate_h60': 'MAE-penalized return',
        'target_downside_adj_rate_h60': 'Downside-adjusted return',
        'target_tradeability_score': 'Tradeability score',
    }
    ).alias('Target'),
    pl.col('emphasis').alias('What it rewards'),
    pl.col('path_preference').alias('Preferred path shape'),
    pl.col('risk').alias('Where it can fail'),
]
)
Loading ITables v2.7.3 from the internet... (need help?)

The ten labels reduce to four practical target families. The full target dictionary remains below for reproducibility, but the interpretation should focus on families rather than treating all ten labels as independent bets.

Family Representative targets What it tests
Terminal / path return Fixed 60m return, path mean, profit factor, downside-adjusted, MAE-penalized Whether the model should learn the size and path quality of the hour-long move
Trend cleanliness Trend slope, trend t-stat Whether clean directional drift is easier to monetize than raw return
Event-time behavior Barrier reward, competing-risk hit Whether early take-profit / stop behavior matters more than terminal return
Selective gating Tradeability score Whether the model should learn when not to trade

Study Design

Each target is trained on the same feature set with Ridge and HistGBRT. The out-of-sample test uses walk-forward validation with a five-day pre-validation embargo. The reference analysis has six effective validation folds after warmup, minimum-history, and embargo constraints.

Synthetic days are rerun across multiple seeds and four regimes: two stable regimes and two shifted regimes where observability falls and volatility/jump risk rises. Stable results show learnability; shifted results show fragility.

Benchmarks

Two idealized benchmarks are used only for calibration. Oracle uses the simulator’s hidden expected edge, and Perfect uses the realized 09:30 to 10:30 return. They are not tradable target candidates; their plots remain in the appendix.

Code
def consecutive_dates(start_date: date, n_days: int) -> list[date]:
    return [start_date + timedelta(days=offset) for offset in range(n_days)]


def logistic(x: np.ndarray | float) -> np.ndarray | float:
    return 1.0 / (1.0 + np.exp(-x))


def rank_ic(y_true: np.ndarray, score: np.ndarray) -> float:
    mask = np.isfinite(y_true) & np.isfinite(score)
    if mask.sum() < 3:
        return float('nan')
    stat = spearmanr(y_true[mask], score[mask]).statistic
    return float(stat) if stat is not None else float('nan')


def path_mean_rate(cumulative_log_returns: np.ndarray) -> float:
    horizons = np.arange(1, cumulative_log_returns.shape[0] + 1, dtype=float)
    return float(np.mean(cumulative_log_returns / horizons))


def profit_factor_log_reward(step_log_returns: np.ndarray, eps: float) -> float:
    up_sum = float(np.maximum(step_log_returns, 0.0).sum())
    down_sum = float(np.maximum(-step_log_returns, 0.0).sum())
    return float(np.log((eps + up_sum) / (eps + down_sum)))


def max_adverse_excursion(cumulative_log_returns: np.ndarray) -> float:
    return float(max(0.0, -float(np.min(cumulative_log_returns))))


def first_barrier_event(
        cumulative_log_returns: np.ndarray,
        stop_loss: float,
        take_profit: float,
        horizon: int,
) -> tuple[str, int, float]:
    path = cumulative_log_returns[:horizon]
    hit_take = np.flatnonzero(path >= take_profit)
    hit_stop = np.flatnonzero(path <= -stop_loss)
    first_take = int(hit_take[0] + 1) if hit_take.size else horizon + 1
    first_stop = int(hit_stop[0] + 1) if hit_stop.size else horizon + 1
    exit_minute = min(first_take, first_stop)
    if exit_minute > horizon:
        return 'none', horizon, float(path[-1])
    if first_take < first_stop:
        return 'take', exit_minute, float(take_profit)
    return 'stop', exit_minute, float(-stop_loss)


def barrier_reward_rate(
        cumulative_log_returns: np.ndarray,
        stop_losses: list[float],
        take_profits: list[float],
        horizons: list[int],
) -> float:
    rewards = []
    for stop_loss, take_profit, horizon in product(stop_losses, take_profits, horizons):
        _, exit_minute, realized_reward = first_barrier_event(
            cumulative_log_returns,
            stop_loss=stop_loss,
            take_profit=take_profit,
            horizon=horizon,
        )
        rewards.append(realized_reward / exit_minute)
    return float(np.mean(rewards))


def competing_risk_hit_score(
        cumulative_log_returns: np.ndarray,
        stop_losses: list[float],
        take_profits: list[float],
        horizons: list[int],
        tau: float,
) -> float:
    scores = []
    tau_safe = max(float(tau), 1e-8)
    for stop_loss, take_profit, horizon in product(stop_losses, take_profits, horizons):
        outcome, exit_minute, _ = first_barrier_event(
            cumulative_log_returns,
            stop_loss=stop_loss,
            take_profit=take_profit,
            horizon=horizon,
        )
        if outcome == 'take':
            score = float(np.exp(-exit_minute / tau_safe))
        elif outcome == 'stop':
            score = float(-np.exp(-exit_minute / tau_safe))
        else:
            score = 0.0
        scores.append(score)
    return float(np.mean(scores))


def mae_penalized_rate(cumulative_log_returns: np.ndarray, horizon: int, penalty_lambda: float) -> float:
    mae = max_adverse_excursion(cumulative_log_returns)
    return float((float(cumulative_log_returns[-1]) - penalty_lambda * mae) / horizon)


def downside_semivol(step_log_returns: np.ndarray) -> float:
    downside = np.minimum(np.asarray(step_log_returns, dtype=float), 0.0)
    return float(np.sqrt(np.mean(downside * downside)))


def downside_adjusted_return_rate(terminal_rate: float, step_log_returns: np.ndarray, eps: float, clip: float) -> float:
    downside_risk = downside_semivol(step_log_returns)
    raw_value = terminal_rate / max(float(eps), downside_risk)
    return float(np.clip(raw_value, -clip, clip))


def tradeability_score(
        terminal_log_return: float,
        max_adverse_excursion_value: float,
        return_threshold: float,
        return_temp: float,
        mae_cap: float,
        mae_temp: float,
) -> float:
    return_component = float(logistic((terminal_log_return - return_threshold) / max(return_temp, 1e-8)))
    pain_component = float(logistic((mae_cap - max_adverse_excursion_value) / max(mae_temp, 1e-8)))
    return float(2.0 * (return_component * pain_component) - 1.0)


def ols_tstat(y: np.ndarray) -> tuple[float, float]:
    x = np.arange(y.shape[0], dtype=float)
    x_centered = x - x.mean()
    y_centered = y - y.mean()
    ss_x = float(np.dot(x_centered, x_centered))
    if ss_x <= 0.0:
        return float('nan'), float('nan')
    slope = float(np.dot(x_centered, y_centered) / ss_x)
    intercept = float(y.mean() - slope * x.mean())
    residuals = y - (intercept + slope * x)
    dof = y.shape[0] - 2
    if dof <= 0:
        return float('nan'), slope
    rss = float(np.dot(residuals, residuals))
    if rss <= 0.0:
        return float('nan'), slope
    se = float(np.sqrt((rss / dof) / ss_x))
    if se == 0.0:
        return float('nan'), slope
    return slope / se, slope


def sharpe_ratio(values: np.ndarray) -> float:
    arr = np.asarray(values, dtype=float)
    arr = arr[np.isfinite(arr)]
    if arr.size < 2:
        return float('nan')
    std = float(np.std(arr, ddof=1))
    if std == 0.0:
        return float('nan')
    return float(np.mean(arr) / std)


def sortino_ratio(values: np.ndarray) -> float:
    arr = np.asarray(values, dtype=float)
    arr = arr[np.isfinite(arr)]
    if arr.size < 2:
        return float('nan')
    downside = arr[arr < 0.0]
    if downside.size < 2:
        return float('nan')
    downside_std = float(np.std(downside, ddof=1))
    if downside_std == 0.0:
        return float('nan')
    return float(np.mean(arr) / downside_std)


def max_drawdown_bps(values: np.ndarray) -> float:
    arr = np.asarray(values, dtype=float)
    arr = np.nan_to_num(arr, nan=0.0)
    equity = np.cumsum(arr)
    peak = np.maximum.accumulate(equity)
    drawdown = equity - peak
    return float(-np.min(drawdown))


def bootstrap_mean_ci(values: np.ndarray, reps: int, seed: int) -> tuple[float, float]:
    arr = np.asarray(values, dtype=float)
    arr = arr[np.isfinite(arr)]
    if arr.size == 0:
        return float('nan'), float('nan')
    rng = np.random.default_rng(seed)
    sample_idx = rng.integers(0, arr.size, size=(reps, arr.size))
    draws = arr[sample_idx].mean(axis=1)
    return float(np.quantile(draws, 0.025)), float(np.quantile(draws, 0.975))


def finite_mean(values: np.ndarray) -> float:
    arr = np.asarray(values, dtype=float)
    arr = arr[np.isfinite(arr)]
    return float(np.mean(arr)) if arr.size else float('nan')


def finite_median(values: np.ndarray) -> float:
    arr = np.asarray(values, dtype=float)
    arr = arr[np.isfinite(arr)]
    return float(np.median(arr)) if arr.size else float('nan')


def finite_std(values: np.ndarray) -> float:
    arr = np.asarray(values, dtype=float)
    arr = arr[np.isfinite(arr)]
    return float(np.std(arr)) if arr.size else float('nan')


def finite_count(values: np.ndarray) -> int:
    return int(np.isfinite(np.asarray(values, dtype=float)).sum())


def safe_ratio(numerator: pl.Expr, denominator: pl.Expr) -> pl.Expr:
    return pl.when(denominator.is_finite() & (denominator > 0.0)).then(numerator / denominator).otherwise(None)


def paired_bootstrap_diff(
        left: np.ndarray,
        right: np.ndarray,
        reps: int,
        seed: int,
) -> tuple[float, float, float, float]:
    left_arr = np.asarray(left, dtype=float)
    right_arr = np.asarray(right, dtype=float)
    mask = np.isfinite(left_arr) & np.isfinite(right_arr)
    diff = left_arr[mask] - right_arr[mask]
    if diff.size == 0:
        return float('nan'), float('nan'), float('nan'), float('nan')
    rng = np.random.default_rng(seed)
    sample_idx = rng.integers(0, diff.size, size=(reps, diff.size))
    draws = diff[sample_idx].mean(axis=1)
    return (
        float(np.mean(diff)),
        float(np.quantile(draws, 0.025)),
        float(np.quantile(draws, 0.975)),
        float(np.mean(draws > 0.0)),
    )


def make_model(model_family: str, seed: int):
    if model_family == 'ridge':
        return Pipeline([
            ('scaler', StandardScaler()),
            ('model', Ridge(alpha=1.0)),
        ]
        )
    if model_family == 'hist_gbrt':
        return HistGradientBoostingRegressor(
            learning_rate=0.05,
            max_depth=3,
            max_iter=90,
            min_samples_leaf=16,
            l2_regularization=0.1,
            random_state=seed,
        )
    raise KeyError(model_family)


def reference_scale(scores: np.ndarray) -> float:
    arr = np.asarray(scores, dtype=float)
    positive = arr[arr > 0.0]
    if positive.size >= 5:
        scale = float(np.quantile(positive, 0.9))
    elif positive.size > 0:
        scale = float(np.max(positive))
    else:
        scale = float(np.quantile(np.abs(arr), 0.9))
    return max(scale, 1e-8)


def position_size_from_score(score: np.ndarray, scale: float) -> np.ndarray:
    clipped = np.maximum(np.asarray(score, dtype=float), 0.0)
    return np.clip(clipped / scale, 0.0, 1.0)


def simulate_single_ticker_panel(config: dict, scenario: dict, seed: int) -> tuple[pl.DataFrame, pl.DataFrame]:
    rng = np.random.default_rng(seed)
    total_days = config['warmup_days'] + config['scored_days']
    trade_dates = consecutive_dates(date(2024, 1, 1), total_days)
    n_minutes = config['session_minutes']
    minute_idx = np.arange(n_minutes, dtype=float)
    vol_profile = 0.80 + 0.95 * np.cos(np.pi * minute_idx / (n_minutes - 1)) ** 2
    volume_profile = 0.92 + 1.25 * np.cos(np.pi * minute_idx / (n_minutes - 1)) ** 2
    session_open = time(9, 30)
    shift_start = config['warmup_days'] + int(config['scored_days'] * scenario['shift_start_frac'])
    panel_rows: list[dict[str, object]] = []
    sample_rows: list[dict[str, object]] = []
    sample_day_idx = set(
        np.linspace(
            config['warmup_days'],
            total_days - 1,
            num=min(config['sample_path_days'], total_days - config['warmup_days']),
            dtype=int,
        ).tolist()
    )

    ticker = 'SIM'
    ticker_quality = float(rng.normal(0.0, 1.0))
    ticker_liquidity = float(np.clip(0.2 + 0.7 * rng.beta(4, 2), 0.05, 0.98))
    prev_close = float(20.0 + 120.0 * rng.random())
    carry_state = float(rng.normal(0.0, 0.25))
    vol_state = float(np.clip(rng.normal(1.0, 0.12), 0.65, 1.8))

    for day_idx, trade_date in enumerate(trade_dates):
        shifted_flag = day_idx >= shift_start
        event_scale_live = scenario['event_scale'] * (scenario['late_event_scale_mult'] if shifted_flag else 1.0)
        observability_live = scenario['observability'] * (scenario['late_observability_mult'] if shifted_flag else 1.0)
        vol_scale_live = scenario['vol_scale'] * (scenario['late_vol_mult'] if shifted_flag else 1.0)
        jump_scale_live = scenario['jump_scale'] * (scenario['late_jump_mult'] if shifted_flag else 1.0)
        decay_scale_live = scenario['decay_scale'] * (scenario['late_decay_mult'] if shifted_flag else 1.0)

        latent_signal = float(0.60 * carry_state + 0.22 * ticker_quality + rng.normal(0.0, 0.70))
        latent_sentiment = float(np.clip(latent_signal + rng.normal(0.0, 0.35), -3.0, 3.0))
        latent_direction = float(
            np.clip(0.10 + 0.82 * logistic(abs(latent_signal) + rng.normal(0.0, 0.55)), 0.05, 0.99)
        )
        latent_materiality = float(
            np.clip(0.12 + 0.78 * logistic(abs(latent_signal) + rng.normal(0.0, 0.80)), 0.05, 0.99)
        )
        latent_novelty = float(np.clip(rng.beta(2.0, 3.0), 0.05, 0.99))
        true_edge = float(
            np.tanh(latent_sentiment)
            * (0.35 + 0.65 * latent_direction)
            * (0.45 + 0.55 * latent_materiality)
        )

        sentiment_score = float(
            np.clip(latent_sentiment + rng.normal(0.0, 0.60 / max(observability_live, 0.15)), -3.0, 3.0)
        )
        direction_strength = float(
            np.clip(latent_direction + rng.normal(0.0, 0.12 / max(observability_live, 0.15)), 0.05, 0.99)
        )
        materiality = float(
            np.clip(latent_materiality + rng.normal(0.0, 0.12 / max(observability_live, 0.15)), 0.05, 0.99)
        )
        novelty = float(np.clip(latent_novelty + rng.normal(0.0, 0.10 / max(observability_live, 0.15)), 0.05, 0.99))
        signed_event = float(
            np.tanh(sentiment_score)
            * (0.35 + 0.65 * direction_strength)
            * (0.45 + 0.55 * materiality)
        )

        overnight_gap = float(0.0005 * carry_state + rng.normal(0.0, 0.0032 * (1.1 - 0.45 * ticker_liquidity)))
        day_open = float(prev_close * np.exp(overnight_gap))
        gap_ret = float(np.log(day_open / prev_close))

        decay = float(0.0095 * decay_scale_live * (1.0 + 0.55 * (1.0 - latent_direction) + 0.20 * rng.random()))
        baseline_drift = float(0.000004 * carry_state)
        event_alpha_0 = float(0.00011 * event_scale_live * true_edge + 0.000018 * carry_state)
        alpha_path = event_alpha_0 * np.exp(-decay * minute_idx)

        sigma_level = float((0.00055 + 0.00042 * vol_state + 0.00016 * (1.0 - ticker_liquidity)) * vol_scale_live)
        sigma_path = sigma_level * vol_profile
        jump_prob = float((0.0010 + 0.0040 * latent_materiality * (1.0 - ticker_liquidity)) * jump_scale_live)
        shocks = rng.normal(0.0, sigma_path)
        jumps = rng.normal(0.0, sigma_path * 1.8) * (rng.random(n_minutes) < jump_prob)
        log_returns = baseline_drift + alpha_path + shocks + jumps

        log_close = np.log(day_open) + np.cumsum(log_returns)
        close_prices = np.exp(log_close)
        open_prices = np.concatenate(([day_open], close_prices[:-1]))
        wick = np.maximum(0.00018, np.abs(log_returns) * 0.55 + rng.uniform(0.0, sigma_path * 1.4))
        high_prices = np.maximum(open_prices, close_prices) * (1.0 + wick)
        low_prices = np.minimum(open_prices, close_prices) * np.maximum(1e-9, 1.0 - wick)
        dollar_volume = (35_000.0 + 205_000.0 * ticker_liquidity) * (1.0 + 1.6 * latent_materiality) * volume_profile
        dollar_volume = dollar_volume * rng.lognormal(mean=-0.10, sigma=0.34, size=n_minutes)
        share_volume = np.maximum(100.0, dollar_volume / np.maximum(close_prices, 1.0)).astype(int)

        primary_step_log_returns = log_returns[: config['primary_horizon']]
        primary_cum = np.cumsum(primary_step_log_returns)
        primary_path = np.concatenate(([day_open], close_prices[: config['primary_horizon']]))
        primary_trend = trend_scanning_tstat(
            primary_path,
            min_horizon=config['trend_min_horizon'],
            max_horizon=config['trend_max_horizon'],
            step=5,
        )
        full_session_log = np.concatenate(([np.log(day_open)], log_close))
        day_tstat, day_slope = ols_tstat(full_session_log)

        target_fixed_rate_h60 = float(primary_cum[-1] / config['primary_horizon'])
        target_path_mean_rate_h60 = path_mean_rate(primary_cum)
        target_profit_factor_h60 = profit_factor_log_reward(primary_step_log_returns, eps=config['profit_factor_eps'])
        target_trend_slope = float(primary_trend.slope[0])
        target_trend_tstat = float(primary_trend.t_value[0])
        target_barrier_rate = barrier_reward_rate(
            primary_cum,
            stop_losses=config['stop_losses'],
            take_profits=config['take_profits'],
            horizons=config['barrier_horizons'],
        )
        max_adverse_excursion_h60 = max_adverse_excursion(primary_cum)
        target_competing_risk_hit = competing_risk_hit_score(
            primary_cum,
            stop_losses=config['stop_losses'],
            take_profits=config['take_profits'],
            horizons=config['barrier_horizons'],
            tau=config['competing_risk_tau'],
        )
        target_mae_penalized_rate_h60 = mae_penalized_rate(
            primary_cum,
            horizon=config['primary_horizon'],
            penalty_lambda=config['mae_penalty_lambda'],
        )
        target_downside_adj_rate_h60 = downside_adjusted_return_rate(
            target_fixed_rate_h60,
            primary_step_log_returns,
            eps=config['downside_risk_eps'],
            clip=config['downside_target_clip'],
        )
        target_tradeability_score = tradeability_score(
            terminal_log_return=float(primary_cum[-1]),
            max_adverse_excursion_value=max_adverse_excursion_h60,
            return_threshold=config['tradeability_return_threshold'],
            return_temp=config['tradeability_return_temp'],
            mae_cap=config['tradeability_mae_cap'],
            mae_temp=config['tradeability_mae_temp'],
        )

        realized_return_h60 = float(np.expm1(primary_cum[-1]))
        realized_return_h60_bps = float(10_000.0 * realized_return_h60)
        oracle_rate_h60 = float((baseline_drift + alpha_path[: config['primary_horizon']]).mean())

        day_close = float(close_prices[-1])
        daily_high = float(high_prices.max())
        daily_low = float(low_prices.min())
        daily_oc_ret = float(np.log(day_close / day_open))
        daily_cc_ret = float(np.log(day_close / prev_close))
        daily_range = float(np.log(daily_high / daily_low))
        daily_realized_vol = float(np.std(log_returns))
        daily_volume = int(share_volume.sum())
        daily_close_loc = float((day_close - daily_low) / max(daily_high - daily_low, 1e-9))

        panel_rows.append({
            'scenario_name': scenario['scenario_name'],
            'scenario_kind': scenario['scenario_kind'],
            'seed': seed,
            'run_id': f"{scenario['scenario_name']}|seed={seed}",
            'ticker': ticker,
            'day_idx': day_idx,
            'trade_date': trade_date,
            'publish_ts': datetime.combine(trade_date, session_open),
            'event_time': '09:30',
            'scored_flag': day_idx >= config['warmup_days'],
            'shifted_flag': shifted_flag,
            'shift_phase': 'shifted' if shifted_flag else 'base',
            'event_scale_live': event_scale_live,
            'observability_live': observability_live,
            'vol_scale_live': vol_scale_live,
            'jump_scale_live': jump_scale_live,
            'decay_scale_live': decay_scale_live,
            'sentiment_score': sentiment_score,
            'direction_strength': direction_strength,
            'materiality': materiality,
            'novelty': novelty,
            'signed_event': signed_event,
            'gap_ret': gap_ret,
            'observed_liquidity_score': float(
                np.clip(ticker_liquidity + rng.normal(0.0, 0.05 / max(observability_live, 0.20)), 0.02, 1.0)
            ),
            'observed_vol_state': float(
                np.clip(vol_state + rng.normal(0.0, 0.10 / max(observability_live, 0.20)), 0.5, 3.0)
            ),
            'daily_oc_ret': daily_oc_ret,
            'daily_cc_ret': daily_cc_ret,
            'daily_range': daily_range,
            'daily_realized_vol': daily_realized_vol,
            'daily_volume': daily_volume,
            'daily_close_loc': daily_close_loc,
            'daily_trend_tstat': float(day_tstat),
            'daily_trend_slope': float(day_slope),
            'realized_return_h60': realized_return_h60,
            'realized_return_h60_bps': realized_return_h60_bps,
            'max_adverse_excursion_h60': max_adverse_excursion_h60,
            'oracle_rate_h60': oracle_rate_h60,
            'target_fixed_rate_h60': target_fixed_rate_h60,
            'target_path_mean_rate_h60': target_path_mean_rate_h60,
            'target_profit_factor_h60': target_profit_factor_h60,
            'target_trend_slope': target_trend_slope,
            'target_trend_tstat': target_trend_tstat,
            'target_barrier_rate': target_barrier_rate,
            'target_competing_risk_hit': target_competing_risk_hit,
            'target_mae_penalized_rate_h60': target_mae_penalized_rate_h60,
            'target_downside_adj_rate_h60': target_downside_adj_rate_h60,
            'target_tradeability_score': target_tradeability_score,
            'latent_true_edge': true_edge,
            'latent_event_alpha_0': event_alpha_0,
            'latent_carry_state': carry_state,
        }
        )

        if day_idx in sample_day_idx:
            sample_id = f"{scenario['scenario_name']} | day {day_idx:03d}"
            session_start = datetime.combine(trade_date, session_open)
            sample_rows.extend(
                {
                    'scenario_name': scenario['scenario_name'],
                    'day_idx': day_idx,
                    'sample_id': sample_id,
                    'ts': session_start + timedelta(minutes=int(i)),
                    'close': float(close_prices[i]),
                    'latent_alpha_per_min': float(alpha_path[i]),
                    'shift_phase': 'shifted' if shifted_flag else 'base',
                }
                for i in range(n_minutes)
            )

        carry_state = float(scenario['carry_persistence'] * carry_state + 0.28 * true_edge + rng.normal(0.0, 0.20))
        vol_state = float(np.clip(
            0.78 * vol_state + 0.22 * (0.75 + 0.85 * latent_materiality + 0.30 * abs(true_edge)) + abs(
                rng.normal(0.0, 0.05)
            ), 0.6, 2.8
        )
        )
        prev_close = day_close

    return pl.DataFrame(panel_rows), pl.DataFrame(sample_rows)


def build_feature_frame(panel: pl.DataFrame) -> tuple[pl.DataFrame, list[str]]:
    frame = panel.sort('day_idx').with_columns(
        pl.col('daily_volume').log1p().alias('log_daily_volume')
    ).with_columns([
        pl.col('sentiment_score').alias('feature_sentiment_score'),
        pl.col('direction_strength').alias('feature_direction_strength'),
        pl.col('materiality').alias('feature_materiality'),
        pl.col('novelty').alias('feature_novelty'),
        pl.col('signed_event').alias('feature_signed_event'),
        pl.col('gap_ret').alias('feature_gap_ret'),
        pl.col('observed_liquidity_score').alias('feature_liquidity_score'),
        pl.col('observed_vol_state').alias('feature_vol_state'),
        pl.col('daily_oc_ret').shift(1).alias('feature_prev_oc_ret'),
        pl.col('daily_cc_ret').shift(1).alias('feature_prev_cc_ret'),
        pl.col('daily_range').shift(1).alias('feature_prev_range'),
        pl.col('daily_realized_vol').shift(1).alias('feature_prev_realized_vol'),
        pl.col('log_daily_volume').shift(1).alias('feature_prev_log_volume'),
        pl.col('daily_close_loc').shift(1).alias('feature_prev_close_loc'),
        pl.col('daily_trend_slope').shift(1).alias('feature_prev_trend_slope'),
        pl.col('daily_trend_tstat').shift(1).alias('feature_prev_trend_tstat'),
        pl.col('daily_cc_ret').shift(1).rolling_mean(window_size=3).alias('feature_cc_ret_3d'),
        pl.col('daily_cc_ret').shift(1).rolling_mean(window_size=5).alias('feature_cc_ret_5d'),
        pl.col('daily_realized_vol').shift(1).rolling_mean(window_size=3).alias('feature_realized_vol_3d'),
        pl.col('daily_realized_vol').shift(1).rolling_mean(window_size=5).alias('feature_realized_vol_5d'),
        pl.col('daily_range').shift(1).rolling_mean(window_size=3).alias('feature_range_3d'),
        pl.col('daily_range').shift(1).rolling_mean(window_size=5).alias('feature_range_5d'),
        pl.col('log_daily_volume').shift(1).rolling_mean(window_size=3).alias('feature_log_volume_3d'),
        pl.col('log_daily_volume').shift(1).rolling_mean(window_size=5).alias('feature_log_volume_5d'),
        pl.col('daily_trend_slope').shift(1).rolling_mean(window_size=3).alias('feature_trend_slope_3d'),
        pl.col('daily_trend_slope').shift(1).rolling_mean(window_size=5).alias('feature_trend_slope_5d'),
    ]
    ).with_columns([
        (pl.col('feature_log_volume_3d') - pl.col('feature_log_volume_5d')).alias('feature_volume_trend_3v5'),
        (pl.col('feature_cc_ret_3d') - pl.col('feature_cc_ret_5d')).alias('feature_return_trend_3v5'),
        (pl.col('feature_realized_vol_3d') - pl.col('feature_realized_vol_5d')).alias('feature_vol_trend_3v5'),
    ]
    )
    feature_cols = sorted([col for col in frame.columns if col.startswith('feature_')])
    required_cols = feature_cols + TARGET_COLS + ['realized_return_h60', 'realized_return_h60_bps', 'oracle_rate_h60']
    frame = frame.filter(pl.col('scored_flag')).drop_nulls(subset=required_cols)
    return frame, feature_cols


def materialize_folds(frame: pl.DataFrame, config: dict) -> tuple[
    list[tuple[int, np.ndarray, np.ndarray]], pl.DataFrame]:
    splitter = DateEmbargoSplit(
        n_splits=config['n_splits'],
        pre_embargo=config['pre_embargo_days'],
        mode='expanding',
    )
    dates = np.array(frame['trade_date'].to_list(), dtype=object)
    unique_dates = np.unique(dates)
    date_to_pos = {value: idx for idx, value in enumerate(unique_dates)}
    x_dummy = np.zeros((frame.height, 1))
    folds: list[tuple[int, np.ndarray, np.ndarray]] = []
    meta_rows: list[dict[str, object]] = []
    next_fold = 1

    for _, (train_idx, val_idx) in enumerate(splitter.split(x_dummy, groups=dates), start=1):
        train_dates = np.unique(dates[train_idx])
        val_dates = np.unique(dates[val_idx])
        if train_idx.size == 0 or train_dates.shape[0] < config['min_train_days']:
            continue
        folds.append((next_fold, train_idx, val_idx))
        gap = int(date_to_pos[val_dates[0]] - date_to_pos[train_dates[-1]] - 1)
        meta_rows.append({
            'fold': next_fold,
            'train_start': train_dates[0],
            'train_end': train_dates[-1],
            'val_start': val_dates[0],
            'val_end': val_dates[-1],
            'train_days': int(train_dates.shape[0]),
            'val_days': int(val_dates.shape[0]),
            'business_day_gap': gap,
            'gap_ok': gap >= config['pre_embargo_days'],
        }
        )
        next_fold += 1

    if not folds:
        raise ValueError('No valid folds were created. Increase history or reduce min_train_days.')
    return folds, pl.DataFrame(meta_rows)


def evaluate_run(
        frame: pl.DataFrame,
        feature_cols: list[str],
        folds: list[tuple[int, np.ndarray, np.ndarray]],
        *,
        run_id: str,
        scenario_name: str,
        scenario_kind: str,
        seed: int,
) -> tuple[pl.DataFrame, pl.DataFrame]:
    x = frame.select(feature_cols).to_numpy()
    n_rows = frame.height
    day_idx = frame['day_idx'].to_numpy()
    trade_dates = frame['trade_date'].to_numpy()
    shift_phase = frame['shift_phase'].to_numpy()
    realized_return_h60 = frame['realized_return_h60'].to_numpy()
    realized_return_h60_bps = frame['realized_return_h60_bps'].to_numpy()
    oracle_rate_h60 = frame['oracle_rate_h60'].to_numpy()
    target_arrays = {target_col: frame[target_col].to_numpy() for target_col in TARGET_COLS}
    fold_assignment = np.full(n_rows, -1, dtype=int)
    for fold_id, _, val_idx in folds:
        fold_assignment[val_idx] = fold_id
    oos_mask = fold_assignment > 0
    oos_count = int(oos_mask.sum())
    diag_rows: list[dict[str, object]] = []
    trade_frames: list[pl.DataFrame] = []

    def build_trade_frame(model_family: str, method_name: str, prediction: np.ndarray,
                          position_size: np.ndarray) -> pl.DataFrame:
        position_oos = position_size[oos_mask]
        realized_bps_oos = realized_return_h60_bps[oos_mask]
        return pl.DataFrame({
            'run_id': [run_id] * oos_count,
            'scenario_name': [scenario_name] * oos_count,
            'scenario_kind': [scenario_kind] * oos_count,
            'seed': [seed] * oos_count,
            'model_family': [model_family] * oos_count,
            'method_name': [method_name] * oos_count,
            'fold': fold_assignment[oos_mask],
            'day_idx': day_idx[oos_mask],
            'trade_date': trade_dates[oos_mask].tolist(),
            'shift_phase': shift_phase[oos_mask].tolist(),
            'prediction': prediction[oos_mask],
            'position_size': position_oos,
            'trade_flag': position_oos > 0.0,
            'realized_return_h60': realized_return_h60[oos_mask],
            'realized_return_h60_bps': realized_bps_oos,
            'gross_pnl_bps': position_oos * realized_bps_oos,
        }
        )

    for model_family in MODEL_FAMILIES:
        for target_col in TARGET_COLS:
            y_target = target_arrays[target_col]
            prediction = np.full(n_rows, np.nan, dtype=float)
            position_size = np.zeros(n_rows, dtype=float)
            for fold_id, train_idx, val_idx in folds:
                model = make_model(model_family, seed + fold_id)
                model.fit(x[train_idx], y_target[train_idx])
                train_score = model.predict(x[train_idx])
                val_score = model.predict(x[val_idx])
                scale = reference_scale(train_score)
                val_size = position_size_from_score(val_score, scale)
                prediction[val_idx] = val_score
                position_size[val_idx] = val_size
                diag_rows.append({
                    'run_id': run_id,
                    'scenario_name': scenario_name,
                    'scenario_kind': scenario_kind,
                    'seed': seed,
                    'model_family': model_family,
                    'method_name': target_col,
                    'fold': fold_id,
                    'val_days': int(val_idx.size),
                    'prediction_ic': rank_ic(realized_return_h60[val_idx], val_score),
                    'trade_rate': float(np.mean(val_size > 0.0)),
                    'mean_position_size': float(np.mean(val_size)),
                    'mean_fold_gross_pnl_bps': float(np.mean(val_size * realized_return_h60_bps[val_idx])),
                }
                )
            trade_frames.append(build_trade_frame(model_family, target_col, prediction, position_size))

    for benchmark_name, benchmark_score_all in [('oracle', oracle_rate_h60), ('perfect', realized_return_h60)]:
        prediction = np.full(n_rows, np.nan, dtype=float)
        position_size = np.zeros(n_rows, dtype=float)
        for fold_id, train_idx, val_idx in folds:
            val_score = benchmark_score_all[val_idx]
            scale = reference_scale(benchmark_score_all[train_idx])
            val_size = position_size_from_score(val_score, scale)
            prediction[val_idx] = val_score
            position_size[val_idx] = val_size
            diag_rows.append({
                'run_id': run_id,
                'scenario_name': scenario_name,
                'scenario_kind': scenario_kind,
                'seed': seed,
                'model_family': 'benchmark',
                'method_name': benchmark_name,
                'fold': fold_id,
                'val_days': int(val_idx.size),
                'prediction_ic': rank_ic(realized_return_h60[val_idx], val_score),
                'trade_rate': float(np.mean(val_size > 0.0)),
                'mean_position_size': float(np.mean(val_size)),
                'mean_fold_gross_pnl_bps': float(np.mean(val_size * realized_return_h60_bps[val_idx])),
            }
            )
        trade_frames.append(build_trade_frame('benchmark', benchmark_name, prediction, position_size))

    return pl.DataFrame(diag_rows), pl.concat(trade_frames, how='vertical')


def summarize_trade_days(trade_day_metrics: pl.DataFrame) -> pl.DataFrame:
    rows = []
    for keys, part in trade_day_metrics.partition_by(
            ['run_id', 'scenario_name', 'scenario_kind', 'seed', 'model_family', 'method_name'], as_dict=True
    ).items():
        run_id, scenario_name, scenario_kind, seed, model_family, method_name = keys
        for eval_slice, slice_part in [('all', part), ('base', part.filter(pl.col('shift_phase') == 'base')),
                                       ('shifted', part.filter(pl.col('shift_phase') == 'shifted'))]:
            if slice_part.height == 0:
                continue
            pnl = slice_part['gross_pnl_bps'].to_numpy()
            size = slice_part['position_size'].to_numpy()
            trade_flag = slice_part['trade_flag'].cast(pl.Int64).to_numpy()
            rows.append({
                'run_id': run_id,
                'scenario_name': scenario_name,
                'scenario_kind': scenario_kind,
                'seed': seed,
                'model_family': model_family,
                'method_name': method_name,
                'eval_slice': eval_slice,
                'n_days': int(slice_part.height),
                'mean_daily_gross_pnl_bps': float(np.mean(pnl)),
                'std_daily_gross_pnl_bps': float(np.std(pnl, ddof=1)) if slice_part.height > 1 else float('nan'),
                'daily_sharpe': sharpe_ratio(pnl),
                'daily_sortino': sortino_ratio(pnl),
                'max_drawdown_bps': max_drawdown_bps(pnl),
                'trade_rate': float(np.mean(trade_flag)),
                'avg_position_size': float(np.mean(size)),
                'avg_trade_size': float(np.mean(size[trade_flag > 0])) if np.any(trade_flag > 0) else 0.0,
                'trade_hit_rate': float(np.mean(pnl[trade_flag > 0] > 0.0)) if np.any(trade_flag > 0) else float('nan'),
            }
            )
    return pl.DataFrame(rows)


def aggregate_metric_summary(run_summary: pl.DataFrame, metric_col: str, reps: int, seed: int) -> pl.DataFrame:
    rows = []
    for keys, part in run_summary.partition_by(['model_family', 'method_name', 'eval_slice'], as_dict=True).items():
        model_family, method_name, eval_slice = keys
        values = part[metric_col].to_numpy()
        ci_low, ci_high = bootstrap_mean_ci(values, reps=reps, seed=seed)
        finite_n = finite_count(values)
        rows.append({
            'model_family': model_family,
            'method_name': method_name,
            'eval_slice': eval_slice,
            f'{metric_col}_mean': finite_mean(values),
            f'{metric_col}_median': finite_median(values),
            f'{metric_col}_std': finite_std(values),
            f'{metric_col}_ci_low': ci_low,
            f'{metric_col}_ci_high': ci_high,
            'n_runs': int(part.height),
            f'{metric_col}_finite_runs': finite_n,
            f'{metric_col}_nan_runs': int(part.height - finite_n),
        }
        )
    return pl.DataFrame(rows)


def compute_run_ranks(run_summary: pl.DataFrame) -> pl.DataFrame:
    rows = []
    subset = run_summary.filter((pl.col('model_family') != 'benchmark') & (pl.col('eval_slice') == 'all'))
    for keys, part in subset.partition_by(['run_id', 'model_family'], as_dict=True).items():
        run_id, model_family = keys
        records = part.select(['scenario_name', 'scenario_kind', 'seed', 'method_name', 'daily_sharpe',
                               'mean_daily_gross_pnl_bps']
                              ).to_dicts()
        records.sort(
            key=lambda row: (
                np.isfinite(row['daily_sharpe']),
                row['daily_sharpe'] if np.isfinite(row['daily_sharpe']) else -np.inf,
                row['mean_daily_gross_pnl_bps'] if np.isfinite(row['mean_daily_gross_pnl_bps']) else -np.inf,
            ),
            reverse=True,
        )
        for rank, row in enumerate(records, start=1):
            rows.append({
                'run_id': run_id,
                'model_family': model_family,
                'scenario_name': row['scenario_name'],
                'scenario_kind': row['scenario_kind'],
                'seed': row['seed'],
                'method_name': row['method_name'],
                'rank': rank,
                'is_winner': rank == 1,
            }
            )
    return pl.DataFrame(rows)


def build_paired_comparison_table(run_summary: pl.DataFrame, reps: int) -> pl.DataFrame:
    rows = []
    subset = run_summary.filter((pl.col('model_family') != 'benchmark') & (pl.col('eval_slice') == 'all'))
    for model_family in MODEL_FAMILIES:
        part = subset.filter(pl.col('model_family') == model_family)
        summary = (
            part.group_by('method_name')
            .agg([
                pl.col('daily_sharpe').filter(pl.col('daily_sharpe').is_finite()).mean().alias('daily_sharpe'),
                pl.col('mean_daily_gross_pnl_bps').filter(pl.col('mean_daily_gross_pnl_bps').is_finite()).mean().alias(
                    'mean_daily_gross_pnl_bps'
                ),
                pl.col('daily_sharpe').is_finite().sum().alias('finite_sharpe_runs'),
            ]
            )
            .filter(pl.col('daily_sharpe').is_finite())
            .sort(['daily_sharpe', 'mean_daily_gross_pnl_bps'], descending=[True, True])
        )
        winner = summary.item(0, 'method_name')
        winner_part = part.filter(pl.col('method_name') == winner).sort('run_id')
        for challenger in summary['method_name'].to_list()[1:]:
            challenger_part = part.filter(pl.col('method_name') == challenger).sort('run_id')
            joined = winner_part.join(
                challenger_part.select(['run_id', 'daily_sharpe', 'mean_daily_gross_pnl_bps']).rename({
                    'daily_sharpe': 'challenger_sharpe',
                    'mean_daily_gross_pnl_bps': 'challenger_mean_pnl_bps',
                }
                ),
                on='run_id',
                how='inner',
            )
            sharpe_diff, sharpe_ci_low, sharpe_ci_high, sharpe_prob = paired_bootstrap_diff(
                joined['daily_sharpe'].to_numpy(),
                joined['challenger_sharpe'].to_numpy(),
                reps=reps,
                seed=17,
            )
            pnl_diff, pnl_ci_low, pnl_ci_high, pnl_prob = paired_bootstrap_diff(
                joined['mean_daily_gross_pnl_bps'].to_numpy(),
                joined['challenger_mean_pnl_bps'].to_numpy(),
                reps=reps,
                seed=23,
            )
            rows.append({
                'model_family': model_family,
                'winner_method': winner,
                'challenger_method': challenger,
                'sharpe_diff_mean': sharpe_diff,
                'sharpe_diff_ci_low': sharpe_ci_low,
                'sharpe_diff_ci_high': sharpe_ci_high,
                'winner_prob_beats_challenger_sharpe': sharpe_prob,
                'mean_pnl_diff_bps': pnl_diff,
                'mean_pnl_diff_ci_low': pnl_ci_low,
                'mean_pnl_diff_ci_high': pnl_ci_high,
                'winner_prob_beats_challenger_pnl': pnl_prob,
                'matched_runs': int(joined.height),
            }
            )
    return pl.DataFrame(rows)


def build_seed_budget_table(run_summary: pl.DataFrame, seed_budgets: list[int]) -> pl.DataFrame:
    subset = run_summary.filter((pl.col('model_family') != 'benchmark') & (pl.col('eval_slice') == 'all'))
    rows = []
    for model_family in MODEL_FAMILIES:
        family_part = subset.filter(pl.col('model_family') == model_family)
        for budget in seed_budgets:
            active_seeds = SEEDS[:budget]
            budget_part = family_part.filter(pl.col('seed').is_in(active_seeds))
            summary = (
                budget_part.group_by('method_name')
                .agg([
                    pl.col('daily_sharpe').filter(pl.col('daily_sharpe').is_finite()).mean().alias('daily_sharpe_mean'),
                    pl.col('mean_daily_gross_pnl_bps').filter(
                        pl.col('mean_daily_gross_pnl_bps').is_finite()
                    ).mean().alias('mean_daily_gross_pnl_bps_mean'),
                    pl.col('trade_rate').filter(pl.col('trade_rate').is_finite()).mean().alias('trade_rate_mean'),
                    pl.col('daily_sharpe').is_finite().sum().alias('finite_sharpe_runs'),
                ]
                )
                .filter(pl.col('daily_sharpe_mean').is_finite())
                .sort(['daily_sharpe_mean', 'mean_daily_gross_pnl_bps_mean'], descending=[True, True])
            )
            best = summary.row(0, named=True)
            runner_up = summary.row(1, named=True)
            rows.append({
                'model_family': model_family,
                'seed_budget': budget,
                'runs_used': int(budget_part.height / len(TARGET_COLS)),
                'finite_sharpe_runs': best['finite_sharpe_runs'],
                'best_method': best['method_name'],
                'best_daily_sharpe_mean': best['daily_sharpe_mean'],
                'best_mean_daily_gross_pnl_bps_mean': best['mean_daily_gross_pnl_bps_mean'],
                'runner_up_method': runner_up['method_name'],
                'sharpe_gap_to_runner': best['daily_sharpe_mean'] - runner_up['daily_sharpe_mean'],
                'pnl_gap_to_runner_bps': best['mean_daily_gross_pnl_bps_mean'] - runner_up[
                    'mean_daily_gross_pnl_bps_mean'],
            }
            )
    return pl.DataFrame(rows).sort(['model_family', 'seed_budget'])


def build_run_tail_table(run_summary: pl.DataFrame) -> pl.DataFrame:
    return (
        run_summary
        .filter((pl.col('model_family') != 'benchmark') & (pl.col('eval_slice') == 'all'))
        .group_by(['model_family', 'method_name'])
        .agg([
            pl.col('daily_sharpe').filter(pl.col('daily_sharpe').is_finite()).min().alias('worst_run_daily_sharpe'),
            pl.col('daily_sharpe').filter(pl.col('daily_sharpe').is_finite()).median().alias('median_run_daily_sharpe'),
            pl.col('mean_daily_gross_pnl_bps').filter(pl.col('mean_daily_gross_pnl_bps').is_finite()).min().alias(
                'worst_run_mean_daily_gross_pnl_bps'
            ),
            pl.col('mean_daily_gross_pnl_bps').filter(pl.col('mean_daily_gross_pnl_bps').is_finite()).median().alias(
                'median_run_mean_daily_gross_pnl_bps'
            ),
            pl.col('trade_rate').filter(pl.col('trade_rate').is_finite()).mean().alias('trade_rate_mean'),
            pl.col('daily_sharpe').is_finite().sum().alias('finite_sharpe_runs'),
            pl.col('daily_sharpe').is_nan().sum().alias('nan_sharpe_runs'),
            pl.len().alias('n_runs'),
        ]
        )
        .sort(['median_run_daily_sharpe', 'worst_run_daily_sharpe'], descending=[True, True])
    )


def build_degenerate_strategy_table(run_summary: pl.DataFrame) -> pl.DataFrame:
    return (
        run_summary
        .filter((pl.col('model_family') != 'benchmark') & (pl.col('eval_slice') == 'all'))
        .group_by(['model_family', 'method_name'])
        .agg([
            pl.len().alias('n_runs'),
            pl.col('daily_sharpe').is_nan().sum().alias('nan_sharpe_runs'),
            (pl.col('trade_rate') == 0.0).sum().alias('zero_trade_runs'),
            pl.mean('trade_rate').alias('mean_trade_rate'),
            pl.min('trade_rate').alias('min_trade_rate'),
            pl.max('trade_rate').alias('max_trade_rate'),
        ]
        )
        .with_columns([
            (pl.col('nan_sharpe_runs') / pl.col('n_runs')).alias('nan_sharpe_run_rate'),
            (pl.col('zero_trade_runs') / pl.col('n_runs')).alias('zero_trade_run_rate'),
        ]
        )
        .sort(['model_family', 'nan_sharpe_runs', 'zero_trade_runs'], descending=[False, True, True])
    )
Code
scenario_metric_rows = []
for scenario in SCENARIOS:
    scenario_label = scenario['scenario_name'].replace('_', ' ')
    for metric_label, column in [
        ('Base event edge', 'event_scale'),
        ('Base observability', 'observability'),
        ('Base volatility', 'vol_scale'),
        ('Base jump risk', 'jump_scale'),
        ('Base decay', 'decay_scale'),
        ('Carry persistence', 'carry_persistence'),
        ('Shift start fraction', 'shift_start_frac'),
        ('Late event edge', 'late_event_scale_mult'),
        ('Late observability', 'late_observability_mult'),
        ('Late volatility', 'late_vol_mult'),
        ('Late jump risk', 'late_jump_mult'),
        ('Late decay', 'late_decay_mult'),
    ]:
        value = float(scenario[column])
        scenario_metric_rows.append({
            'scenario_label': scenario_label,
            'scenario_kind': scenario['scenario_kind'],
            'metric_label': metric_label,
            'value': value,
            'value_label': f'{value:.2f}',
        }
        )

scenario_metric_frame = pl.DataFrame(scenario_metric_rows)
Code
cached_panel_summary = load_cached_parquet('panel_summary')
cached_prediction_diagnostics = load_cached_parquet('prediction_diagnostics')
cached_trade_day_metrics = load_cached_parquet('trade_day_metrics')
cached_reference_frame = load_cached_parquet('reference_frame')
cached_reference_fold_meta = load_cached_parquet('reference_fold_meta')
cached_reference_minute_sample = load_cached_parquet('reference_minute_sample')
cached_reference_features = load_cached_parquet('reference_features')

if all(
        artifact is not None
        for artifact in [
            cached_panel_summary,
            cached_prediction_diagnostics,
            cached_trade_day_metrics,
            cached_reference_frame,
            cached_reference_fold_meta,
            cached_reference_minute_sample,
            cached_reference_features,
        ]
):
    panel_summary = cached_panel_summary
    prediction_diagnostics = cached_prediction_diagnostics
    trade_day_metrics = cached_trade_day_metrics
    reference_frame = cached_reference_frame
    reference_fold_meta = cached_reference_fold_meta
    reference_minute_sample = cached_reference_minute_sample
    reference_features = cached_reference_features['feature_name'].to_list()
else:
    reference_frame = None
    reference_features = None
    reference_fold_meta = None
    reference_minute_sample = None
    reference_priority = -1
    panel_summaries = []
    diag_frames = []
    trade_frames = []

    for scenario in SCENARIOS:
        for seed in SEEDS:
            panel, minute_sample = simulate_single_ticker_panel(CONFIG, scenario, seed)
            frame, feature_cols = build_feature_frame(panel)
            folds, fold_meta = materialize_folds(frame, CONFIG)
            run_id = frame.item(0, 'run_id')
            diag_df, trade_df = evaluate_run(
                frame,
                feature_cols,
                folds,
                run_id=run_id,
                scenario_name=scenario['scenario_name'],
                scenario_kind=scenario['scenario_kind'],
                seed=seed,
            )
            diag_frames.append(diag_df)
            trade_frames.append(trade_df)
            shifted_part = frame.filter(pl.col('shifted_flag'))
            panel_summaries.append({
                'run_id': run_id,
                'scenario_name': scenario['scenario_name'],
                'scenario_kind': scenario['scenario_kind'],
                'seed': seed,
                'event_rows': int(panel.height),
                'feature_rows': int(frame.height),
                'n_features': int(len(feature_cols)),
                'n_folds': int(len(folds)),
                'shifted_share': float(frame['shifted_flag'].cast(pl.Float64).mean()),
                'feature_to_oracle_corr': float(
                    np.corrcoef(frame['feature_signed_event'].to_numpy(), frame['latent_event_alpha_0'].to_numpy())[
                        0, 1]
                ),
                'late_signal_mean': float(
                    shifted_part.select(pl.mean('event_scale_live')).item()
                ) if shifted_part.height else float('nan'),
                'late_observability_mean': float(
                    shifted_part.select(pl.mean('observability_live')).item()
                ) if shifted_part.height else float('nan'),
                'late_vol_mean': float(
                    shifted_part.select(pl.mean('vol_scale_live')).item()
                ) if shifted_part.height else float('nan'),
            }
            )
            candidate_priority = 1 if scenario['scenario_kind'] == 'shifted' else 0
            if candidate_priority > reference_priority:
                reference_frame = frame
                reference_features = feature_cols
                reference_fold_meta = fold_meta
                reference_minute_sample = minute_sample
                reference_priority = candidate_priority

    panel_summary = pl.DataFrame(panel_summaries)
    prediction_diagnostics = pl.concat(diag_frames, how='vertical')
    trade_day_metrics = pl.concat(trade_frames, how='vertical')

    write_cached_parquet(panel_summary, 'panel_summary')
    write_cached_parquet(prediction_diagnostics, 'prediction_diagnostics')
    write_cached_parquet(trade_day_metrics, 'trade_day_metrics')
    write_cached_parquet(reference_frame, 'reference_frame')
    write_cached_parquet(reference_fold_meta, 'reference_fold_meta')
    write_cached_parquet(reference_minute_sample, 'reference_minute_sample')
    write_cached_parquet(pl.DataFrame({'feature_name': reference_features}), 'reference_features')
Code
reference_policy = trade_day_metrics.filter(
    (pl.col('run_id') == panel_summary.item(0, 'run_id'))
    & (pl.col('model_family') == 'ridge')
    & (pl.col('method_name') == 'target_fixed_rate_h60')
)
reference_competing_risk = reference_frame['target_competing_risk_hit'].to_numpy()
reference_fixed_rate = reference_frame['target_fixed_rate_h60'].to_numpy()
reference_mae_penalized = reference_frame['target_mae_penalized_rate_h60'].to_numpy()
reference_downside_adjusted = reference_frame['target_downside_adj_rate_h60'].to_numpy()
reference_tradeability = reference_frame['target_tradeability_score'].to_numpy()
reference_competing_risk_min = float(np.nanmin(reference_competing_risk))
reference_competing_risk_max = float(np.nanmax(reference_competing_risk))
reference_tradeability_min = float(np.nanmin(reference_tradeability))
reference_tradeability_max = float(np.nanmax(reference_tradeability))
reference_tradeability_span = float(reference_tradeability_max - reference_tradeability_min)
reference_downside_abs_max = float(np.nanmax(np.abs(reference_downside_adjusted)))
reference_mae_gap_max = float(np.nanmax(reference_mae_penalized - reference_fixed_rate))

test_rows = [
    {
        'test': 'Reference scored rows have no missing feature values',
        'passed': bool(
            reference_frame.select([pl.col(col).is_null().sum().alias(col) for col in reference_features]).row(
                0
            ) == tuple(0 for _ in reference_features)
        ),
        'detail': f"reference_rows={reference_frame.height}",
    },
    {
        'test': 'All reference folds respect 5d pre-embargo',
        'passed': bool(reference_fold_meta['gap_ok'].all()),
        'detail': f"min_gap={int(reference_fold_meta['business_day_gap'].min())}",
    },
    {
        'test': 'Reference fold count and OOS history are intentionally larger',
        'passed': bool(
            reference_fold_meta.height >= 6
            and reference_frame.height >= 320
            and int(reference_fold_meta['val_days'].min()) >= 20
        ),
        'detail': f"folds={reference_fold_meta.height}; reference_rows={reference_frame.height}; min_val_days={int(reference_fold_meta['val_days'].min())}",
    },
    {
        'test': 'Reference frame has one row per day',
        'passed': bool(reference_frame.height == reference_frame['day_idx'].n_unique()),
        'detail': f"n_unique_days={int(reference_frame['day_idx'].n_unique())}",
    },
    {
        'test': 'Position size bounded and no-trade days obey score sign',
        'passed': bool(
            (trade_day_metrics['position_size'].min() >= 0.0)
            and (trade_day_metrics['position_size'].max() <= 1.0)
            and trade_day_metrics.filter((pl.col('prediction') <= 0.0) & (pl.col('position_size') > 0.0)).is_empty()
        ),
        'detail': f"reference_no_trade_rate={(1.0 - reference_policy['trade_flag'].cast(pl.Float64).mean()):.3f}",
    },
    {
        'test': 'Daily gross PnL matches size times realized return',
        'passed': bool(np.allclose(trade_day_metrics['gross_pnl_bps'].to_numpy(),
                                   trade_day_metrics['position_size'].to_numpy() * trade_day_metrics[
                                       'realized_return_h60_bps'].to_numpy()
                                   )
                       ),
        'detail': 'allclose=true',
    },
    {
        'test': 'Shifted scenarios reduce observability late in sample',
        'passed': bool(
            panel_summary.filter(pl.col('scenario_kind') == 'shifted').select(pl.mean('late_observability_mean')).item()
            < panel_summary.filter(pl.col('scenario_kind') == 'stable').select(
                pl.mean('late_observability_mean')
            ).fill_null(1.0).item()
        ),
        'detail': (
            f"shifted_late_obs={panel_summary.filter(pl.col('scenario_kind') == 'shifted').select(pl.mean('late_observability_mean')).item():.3f}; "
            f"stable_late_obs={panel_summary.filter(pl.col('scenario_kind') == 'stable').select(pl.mean('late_observability_mean')).fill_null(1.0).item():.3f}"
        ),
    },
    {
        'test': 'Competing-risk score stays within [-1, 1]',
        'passed': bool(reference_competing_risk_min >= -1.0 - 1e-9 and reference_competing_risk_max <= 1.0 + 1e-9),
        'detail': f"min={reference_competing_risk_min:.3f}; max={reference_competing_risk_max:.3f}",
    },
    {
        'test': 'MAE-penalized return never exceeds fixed return',
        'passed': bool(np.all(reference_mae_penalized <= reference_fixed_rate + 1e-12)),
        'detail': f"max_gap={reference_mae_gap_max:.6f}",
    },
    {
        'test': 'Downside-adjusted return remains finite after clipping',
        'passed': bool(np.isfinite(reference_downside_adjusted).all()),
        'detail': f"abs_max={reference_downside_abs_max:.3f}",
    },
    {
        'test': 'Tradeability score stays bounded and non-degenerate',
        'passed': bool(
            reference_tradeability_min >= -1.0 - 1e-9
            and reference_tradeability_max <= 1.0 + 1e-9
            and reference_tradeability_span > 1e-6
        ),
        'detail': f"min={reference_tradeability_min:.3f}; max={reference_tradeability_max:.3f}",
    },
]

tests_summary = pl.DataFrame(test_rows)
assert tests_summary['passed'].all(), 'At least one notebook validation check failed.'
Code
reference_frame = reference_frame.sort('day_idx')
Code
run_summary = summarize_trade_days(trade_day_metrics)

oracle_summary = run_summary.filter(
    (pl.col('model_family') == 'benchmark') & (pl.col('method_name') == 'oracle')
).select([
    'run_id',
    'eval_slice',
    'daily_sharpe',
    'mean_daily_gross_pnl_bps',
]
).rename({
    'daily_sharpe': 'oracle_daily_sharpe',
    'mean_daily_gross_pnl_bps': 'oracle_mean_daily_gross_pnl_bps',
}
)

perfect_summary = run_summary.filter(
    (pl.col('model_family') == 'benchmark') & (pl.col('method_name') == 'perfect')
).select([
    'run_id',
    'eval_slice',
    'daily_sharpe',
    'mean_daily_gross_pnl_bps',
]
).rename({
    'daily_sharpe': 'perfect_daily_sharpe',
    'mean_daily_gross_pnl_bps': 'perfect_mean_daily_gross_pnl_bps',
}
)

model_run_summary = run_summary.filter(pl.col('model_family') != 'benchmark').join(
    oracle_summary,
    on=['run_id', 'eval_slice'],
    how='left',
).join(
    perfect_summary,
    on=['run_id', 'eval_slice'],
    how='left',
).with_columns([
    safe_ratio(pl.col('daily_sharpe'), pl.col('oracle_daily_sharpe')).alias('positive_sharpe_ratio_to_oracle'),
    safe_ratio(pl.col('mean_daily_gross_pnl_bps'), pl.col('oracle_mean_daily_gross_pnl_bps')).alias(
        'positive_pnl_ratio_to_oracle'
    ),
    safe_ratio(pl.col('daily_sharpe'), pl.col('perfect_daily_sharpe')).alias('positive_sharpe_ratio_to_perfect'),
    safe_ratio(pl.col('mean_daily_gross_pnl_bps'), pl.col('perfect_mean_daily_gross_pnl_bps')).alias(
        'positive_pnl_ratio_to_perfect'
    ),
]
)

run_ranks = compute_run_ranks(model_run_summary)
paired_comparison_table = build_paired_comparison_table(model_run_summary, reps=CONFIG['bootstrap_reps'])
seed_budget_winner_table = build_seed_budget_table(model_run_summary, seed_budgets=SEED_BUDGETS)
run_tail_table = build_run_tail_table(model_run_summary)
degenerate_strategy_table = build_degenerate_strategy_table(model_run_summary)

aggregate_sharpe = aggregate_metric_summary(model_run_summary, metric_col='daily_sharpe', reps=CONFIG['bootstrap_reps'],
                                            seed=101
                                            )
aggregate_pnl = aggregate_metric_summary(model_run_summary, metric_col='mean_daily_gross_pnl_bps',
                                         reps=CONFIG['bootstrap_reps'], seed=103
                                         )
aggregate_trade_rate = aggregate_metric_summary(model_run_summary, metric_col='trade_rate',
                                                reps=CONFIG['bootstrap_reps'], seed=107
                                                )
aggregate_capture = aggregate_metric_summary(model_run_summary, metric_col='positive_sharpe_ratio_to_oracle',
                                             reps=CONFIG['bootstrap_reps'], seed=109
                                             )

aggregate_summary = aggregate_sharpe.join(
    aggregate_pnl,
    on=['model_family', 'method_name', 'eval_slice', 'n_runs'],
    how='left',
).join(
    aggregate_trade_rate,
    on=['model_family', 'method_name', 'eval_slice', 'n_runs'],
    how='left',
).join(
    aggregate_capture,
    on=['model_family', 'method_name', 'eval_slice', 'n_runs'],
    how='left',
).join(
    run_ranks.group_by(['model_family', 'method_name']).agg([
        pl.mean('rank').alias('mean_rank'),
        pl.mean('is_winner').alias('win_rate'),
    ]
    ),
    on=['model_family', 'method_name'],
    how='left',
)

primary_sharpe_table = aggregate_summary.filter(
    (pl.col('eval_slice') == 'all') & pl.col('daily_sharpe_mean').is_finite()
).sort(
    ['model_family', 'daily_sharpe_mean', 'mean_daily_gross_pnl_bps_mean'], descending=[False, True, True]
).select([
    'model_family',
    'method_name',
    'daily_sharpe_mean',
    'daily_sharpe_ci_low',
    'daily_sharpe_ci_high',
    'positive_sharpe_ratio_to_oracle_mean',
    'mean_rank',
    'win_rate',
    'n_runs',
    'daily_sharpe_finite_runs',
    'daily_sharpe_nan_runs',
]
)

primary_pnl_table = aggregate_summary.filter(pl.col('eval_slice') == 'all').sort(
    ['model_family', 'daily_sharpe_mean', 'mean_daily_gross_pnl_bps_mean'], descending=[False, True, True]
).select([
    'model_family',
    'method_name',
    'mean_daily_gross_pnl_bps_mean',
    'mean_daily_gross_pnl_bps_ci_low',
    'mean_daily_gross_pnl_bps_ci_high',
    'trade_rate_mean',
    'mean_rank',
    'win_rate',
    'n_runs',
    'daily_sharpe_finite_runs',
    'daily_sharpe_nan_runs',
]
)

run_stability_table = run_ranks.group_by(['model_family', 'method_name']).agg([
    pl.mean('rank').alias('mean_rank'),
    pl.mean('is_winner').alias('win_rate'),
]
).sort(['model_family', 'mean_rank'])

shift_performance_table = aggregate_summary.filter(pl.col('eval_slice').is_in(['base', 'shifted'])).select([
    'model_family',
    'method_name',
    'eval_slice',
    'daily_sharpe_mean',
    'mean_daily_gross_pnl_bps_mean',
    'trade_rate_mean',
]
).sort(['model_family', 'method_name', 'eval_slice'])

oracle_kind_summary = run_summary.filter(
    (pl.col('model_family') == 'benchmark') & (pl.col('method_name') == 'oracle') & (
            pl.col('eval_slice') == 'all')
).group_by('scenario_kind').agg([
    pl.mean('daily_sharpe').alias('oracle_daily_sharpe'),
    pl.mean('mean_daily_gross_pnl_bps').alias('oracle_mean_daily_gross_pnl_bps'),
]
).sort('scenario_kind')

prediction_diagnostic_summary = prediction_diagnostics.group_by(['model_family', 'method_name']).agg([
    pl.mean('prediction_ic').alias('mean_prediction_ic'),
    pl.mean('trade_rate').alias('mean_trade_rate'),
    pl.mean('mean_position_size').alias('mean_position_size'),
]
).sort(['model_family', 'mean_prediction_ic'], descending=[False, True])

model_family_conclusion_rows = []
for model_family in MODEL_FAMILIES:
    ranked = primary_sharpe_table.filter(pl.col('model_family') == model_family)
    best = ranked.row(0, named=True)
    runner_up = ranked.row(1, named=True)
    pair_join = (
        model_run_summary
        .filter((pl.col('model_family') == model_family) & (pl.col('method_name') == best['method_name']) & (
                pl.col('eval_slice') == 'all')
                )
        .select(['run_id', 'daily_sharpe', 'mean_daily_gross_pnl_bps'])
        .rename({
            'daily_sharpe': 'best_daily_sharpe',
            'mean_daily_gross_pnl_bps': 'best_mean_daily_gross_pnl_bps',
        }
        )
        .join(
            model_run_summary
            .filter((pl.col('model_family') == model_family) & (pl.col('method_name') == runner_up['method_name']) & (
                    pl.col('eval_slice') == 'all')
                    )
            .select(['run_id', 'daily_sharpe', 'mean_daily_gross_pnl_bps'])
            .rename({
                'daily_sharpe': 'runner_up_daily_sharpe',
                'mean_daily_gross_pnl_bps': 'runner_up_mean_daily_gross_pnl_bps',
            }
            ),
            on='run_id',
            how='inner',
        )
    )
    sharpe_diff, sharpe_ci_low, sharpe_ci_high, sharpe_prob = paired_bootstrap_diff(
        pair_join['best_daily_sharpe'].to_numpy(),
        pair_join['runner_up_daily_sharpe'].to_numpy(),
        reps=CONFIG['bootstrap_reps'],
        seed=17,
    )
    pnl_diff, _, _, pnl_prob = paired_bootstrap_diff(
        pair_join['best_mean_daily_gross_pnl_bps'].to_numpy(),
        pair_join['runner_up_mean_daily_gross_pnl_bps'].to_numpy(),
        reps=CONFIG['bootstrap_reps'],
        seed=23,
    )
    run_tail_row = run_tail_table.filter(
        (pl.col('model_family') == model_family)
        & (pl.col('method_name') == best['method_name'])
    ).row(0, named=True)
    positive_run_summary = model_run_summary.filter(
        (pl.col('model_family') == model_family)
        & (pl.col('method_name') == best['method_name'])
        & (pl.col('eval_slice') == 'all')
    )
    model_family_conclusion_rows.append({
        'model_family': model_family,
        'best_method': best['method_name'],
        'runner_up_method': runner_up['method_name'],
        'best_daily_sharpe_mean': best['daily_sharpe_mean'],
        'best_mean_daily_gross_pnl_bps_mean': primary_pnl_table.filter(
            (pl.col('model_family') == model_family) & (pl.col('method_name') == best['method_name'])
        ).item(0,
               'mean_daily_gross_pnl_bps_mean'
               ),
        'runner_up_daily_sharpe_mean': runner_up['daily_sharpe_mean'],
        'sharpe_gap_to_runner': best['daily_sharpe_mean'] - runner_up['daily_sharpe_mean'],
        'paired_sharpe_prob': sharpe_prob,
        'paired_sharpe_ci_low': sharpe_ci_low,
        'paired_sharpe_ci_high': sharpe_ci_high,
        'paired_pnl_prob': pnl_prob,
        'median_run_daily_sharpe': run_tail_row['median_run_daily_sharpe'],
        'median_run_mean_daily_gross_pnl_bps': run_tail_row['median_run_mean_daily_gross_pnl_bps'],
        'positive_sharpe_run_rate': float((positive_run_summary['daily_sharpe'] > 0.0).mean()),
        'positive_pnl_run_rate': float((positive_run_summary['mean_daily_gross_pnl_bps'] > 0.0).mean()),
    }
    )
model_family_decision_table = pl.DataFrame(model_family_conclusion_rows).sort('model_family')

analysis_validation_rows = []
for model_family in MODEL_FAMILIES:
    primary_winner = primary_sharpe_table.filter(pl.col('model_family') == model_family).item(0, 'method_name')
    paired_winner = paired_comparison_table.filter(pl.col('model_family') == model_family).item(0, 'winner_method')
    analysis_validation_rows.append({
        'test': f'Paired comparison winner matches primary Sharpe winner ({model_family})',
        'passed': paired_winner == primary_winner,
        'detail': f'paired={paired_winner}; primary={primary_winner}',
    }
    )

analysis_validation_rows.append({
    'test': 'Seed-budget winners have finite Sharpe support',
    'passed': bool(seed_budget_winner_table['best_daily_sharpe_mean'].is_finite().all()),
    'detail': f"rows={seed_budget_winner_table.height}",
}
)
analysis_validation_rows.append({
    'test': 'Undefined-Sharpe strategy runs are explicitly reported',
    'passed': bool(degenerate_strategy_table['nan_sharpe_runs'].max() >= 0),
    'detail': f"max_nan_runs={int(degenerate_strategy_table['nan_sharpe_runs'].max())}",
}
)
analysis_validation_rows.append({
    'test': 'Positive oracle ratio metric has no infinite values',
    'passed': bool(not model_run_summary['positive_sharpe_ratio_to_oracle'].is_infinite().any()),
    'detail': f"nulls={int(model_run_summary['positive_sharpe_ratio_to_oracle'].is_null().sum())}",
}
)
analysis_validation_summary = pl.DataFrame(analysis_validation_rows)
assert analysis_validation_summary['passed'].all(), 'At least one analysis validation check failed.'
Code
representative_run_id = reference_frame.item(0, 'run_id')
representative_shift_start = int(reference_frame.filter(pl.col('shift_phase') == 'shifted').item(0, 'day_idx'))

sample_target_reference = reference_frame.filter(
    pl.col('day_idx').is_between(representative_shift_start - 5, representative_shift_start + 6)
).select([
    'day_idx',
    'shift_phase',
    'sentiment_score',
    'signed_event',
    'realized_return_h60_bps',
    *TARGET_COLS,
]
)

representative_run_methods = model_run_summary.filter(
    (pl.col('run_id') == representative_run_id) & (pl.col('eval_slice') == 'all')
).sort(['daily_sharpe', 'mean_daily_gross_pnl_bps'], descending=[True, True])
representative_model_family = representative_run_methods.item(0, 'model_family')
representative_method = representative_run_methods.item(0, 'method_name')
representative_runner_up = representative_run_methods.filter(
    pl.col('model_family') == representative_model_family
).item(1, 'method_name')

representative_run_summary = run_summary.filter(pl.col('run_id') == representative_run_id).sort(
    ['model_family', 'method_name', 'eval_slice']
)

representative_trade_journal = trade_day_metrics.filter(
    (pl.col('run_id') == representative_run_id)
    & (pl.col('model_family') == representative_model_family)
    & (pl.col('method_name').is_in([representative_method, representative_runner_up]))
).sort(['method_name', 'day_idx']).with_columns(
    pl.col('gross_pnl_bps').cum_sum().over('method_name').alias('cumulative_gross_pnl_bps')
)

equity_curve_reference = trade_day_metrics.filter(
    (pl.col('run_id') == representative_run_id)
    & (pl.col('model_family') == representative_model_family)
    & (pl.col('method_name').is_in(TARGET_COLS))
).sort(
    ['method_name', 'day_idx']
).with_columns(
    pl.col('gross_pnl_bps').cum_sum().over('method_name').alias('cumulative_gross_pnl_bps')
)

risk_return_reference = aggregate_summary.filter(
    (pl.col('model_family') != 'benchmark') & (pl.col('eval_slice') == 'all')
).select([
    'model_family',
    'method_name',
    'daily_sharpe_mean',
    'mean_daily_gross_pnl_bps_mean',
    'trade_rate_mean',
]
)
Code
method_labels = {
    'target_fixed_rate_h60': 'Fixed 60m return',
    'target_path_mean_rate_h60': 'Path mean return',
    'target_profit_factor_h60': 'Profit factor',
    'target_trend_slope': 'Trend slope',
    'target_trend_tstat': 'Trend t-stat',
    'target_barrier_rate': 'Barrier reward',
    'target_competing_risk_hit': 'Competing-risk hit',
    'target_mae_penalized_rate_h60': 'MAE-penalized return',
    'target_downside_adj_rate_h60': 'Downside-adjusted return',
    'target_tradeability_score': 'Tradeability score',
    'oracle': 'Oracle',
    'perfect': 'Perfect',
}
method_order = [method_labels[target_col] for target_col in TARGET_COLS]
method_palette = {
    'Fixed 60m return': '#4c78a8',
    'Path mean return': '#2a9d8f',
    'Profit factor': '#f4a261',
    'Trend slope': '#6c5ce7',
    'Trend t-stat': '#b56576',
    'Barrier reward': '#e45756',
    'Competing-risk hit': '#9c755f',
    'MAE-penalized return': '#54a24b',
    'Downside-adjusted return': '#eeca3b',
    'Tradeability score': '#ff9da6',
    'Oracle': '#3a86ff',
    'Perfect': '#1d3557',
}
model_labels = {'ridge': 'Ridge', 'hist_gbrt': 'HistGBRT', 'benchmark': 'benchmark'}
model_palette = {'Ridge': '#355070', 'HistGBRT': '#c44536'}
phase_labels = {'base': 'Base', 'shifted': 'Shifted'}
phase_palette = {'Base': '#4c78a8', 'Shifted': '#e45756'}
scenario_kind_labels = {'stable': 'Stable', 'shifted': 'Shifted'}
scenario_name_labels = {scenario['scenario_name']: scenario['scenario_name'].replace('_', ' ') for scenario in
                        SCENARIOS}


def relabel_frame(frame: pl.DataFrame) -> pl.DataFrame:
    updates = []
    if 'method_name' in frame.columns:
        updates.append(pl.col('method_name').replace(method_labels).alias('method_label'))
    if 'model_family' in frame.columns:
        updates.append(pl.col('model_family').replace(model_labels).alias('model_label'))
    if 'shift_phase' in frame.columns:
        updates.append(pl.col('shift_phase').replace(phase_labels).alias('phase_label'))
    if 'scenario_kind' in frame.columns:
        updates.append(pl.col('scenario_kind').replace(scenario_kind_labels).alias('scenario_kind_label'))
    if 'scenario_name' in frame.columns:
        updates.append(pl.col('scenario_name').replace(scenario_name_labels).alias('scenario_label'))
    return frame.with_columns(updates) if updates else frame


sample_target_measure_pairs = [('Realized 60m return (bps)', 'realized_return_h60_bps')] + [
    (method_labels[target_col], target_col) for target_col in TARGET_COLS
]
sample_target_long = pl.concat([
    sample_target_reference.select([
        'day_idx',
        'shift_phase',
        pl.lit(measure_label).alias('measure_label'),
        pl.col(column).alias('value'),
    ]
    )
    for measure_label, column in sample_target_measure_pairs
], how='vertical'
).with_columns(
    pl.col('shift_phase').replace(phase_labels).alias('phase_label')
)

equity_curve_plot_data = relabel_frame(equity_curve_reference)
trade_journal_plot_data = relabel_frame(representative_trade_journal)
risk_return_plot_data = relabel_frame(risk_return_reference)
minute_path_plot_data = relabel_frame(reference_minute_sample).with_columns([
    ((pl.col('ts').dt.hour().cast(pl.Int32) * 60 + pl.col('ts').dt.minute().cast(pl.Int32)) - (9 * 60 + 30)).alias(
        'minute_from_open'
    ),
    ((pl.col('close').log() - pl.col('close').first().over('sample_id').log()) * 10_000.0).alias('path_bps'),
]
).sort(['sample_id', 'ts'])
assert int(minute_path_plot_data['minute_from_open'].min()) == 0
assert int(minute_path_plot_data['minute_from_open'].max()) == CONFIG['session_minutes'] - 1
sharpe_plot_data = relabel_frame(primary_sharpe_table)
pnl_plot_data = relabel_frame(primary_pnl_table)
stability_plot_data = relabel_frame(run_stability_table).with_columns(
    ((pl.col('win_rate') * 100).round(1).cast(pl.String) + pl.lit('%')).alias('win_rate_label')
)
shift_plot_data = relabel_frame(shift_performance_table).with_columns(
    pl.col('eval_slice').replace({'base': 'Base', 'shifted': 'Shifted'}).alias('eval_label')
)
oracle_plot_data = pl.concat([
    oracle_kind_summary.select([
        pl.col('scenario_kind').replace(scenario_kind_labels).alias('scenario_kind_label'),
        pl.lit('Oracle daily Sharpe').alias('metric_label'),
        pl.col('oracle_daily_sharpe').alias('value'),
    ]
    ),
    oracle_kind_summary.select([
        pl.col('scenario_kind').replace(scenario_kind_labels).alias('scenario_kind_label'),
        pl.lit('Oracle mean daily PnL (bps)').alias('metric_label'),
        pl.col('oracle_mean_daily_gross_pnl_bps').alias('value'),
    ]
    ),
], how='vertical'
)
diagnostic_plot_data = relabel_frame(
    prediction_diagnostic_summary.join(
        aggregate_summary.filter(pl.col('eval_slice') == 'all').select([
            'model_family',
            'method_name',
            'daily_sharpe_mean',
        ]
        ),
        on=['model_family', 'method_name'],
        how='left',
    )
)
diagnostic_rank_plot_data = (
    diagnostic_plot_data
    .filter((pl.col('model_family') != 'benchmark') & pl.col('daily_sharpe_mean').is_finite())
    .with_columns([
        pl.col('mean_prediction_ic').rank('ordinal', descending=True).over('model_family').alias('prediction_ic_rank'),
        pl.col('daily_sharpe_mean').rank('ordinal', descending=True).over('model_family').alias('trading_sharpe_rank'),
    ])
)
paired_plot_data = relabel_frame(paired_comparison_table).with_columns(
    pl.col('challenger_method').replace(method_labels).alias('challenger_label')
)
seed_budget_rows = []
seed_subset = model_run_summary.filter(pl.col('eval_slice') == 'all')
for model_family in MODEL_FAMILIES:
    family_part = seed_subset.filter(pl.col('model_family') == model_family)
    for budget in SEED_BUDGETS:
        budget_part = family_part.filter(pl.col('seed').is_in(SEEDS[:budget]))
        budget_summary = budget_part.group_by('method_name').agg([
            pl.col('daily_sharpe').filter(pl.col('daily_sharpe').is_finite()).mean().alias('daily_sharpe_mean'),
            pl.col('mean_daily_gross_pnl_bps').filter(pl.col('mean_daily_gross_pnl_bps').is_finite()).mean().alias(
                'mean_daily_gross_pnl_bps_mean'
            ),
            pl.col('daily_sharpe').is_finite().sum().alias('finite_sharpe_runs'),
        ]
        )
        for row in budget_summary.iter_rows(named=True):
            seed_budget_rows.append({
                'model_family': model_family,
                'method_name': row['method_name'],
                'seed_budget': budget,
                'daily_sharpe_mean': row['daily_sharpe_mean'],
                'mean_daily_gross_pnl_bps_mean': row['mean_daily_gross_pnl_bps_mean'],
                'finite_sharpe_runs': row['finite_sharpe_runs'],
            }
            )
seed_budget_curve_table = relabel_frame(pl.DataFrame(seed_budget_rows))
run_tail_plot_data = relabel_frame(run_tail_table)
degenerate_strategy_plot_data = relabel_frame(degenerate_strategy_table)
winner_summary_rows = []
for row in model_family_decision_table.to_dicts():
    winner_summary_rows.append({
        'model_label': model_labels[row['model_family']],
        'best_daily_sharpe_mean': row['best_daily_sharpe_mean'],
        'runner_up_daily_sharpe_mean': row['runner_up_daily_sharpe_mean'],
        'best_label': f"Best: {method_labels[row['best_method']]}",
        'runner_label': f"Runner-up: {method_labels[row['runner_up_method']]}",
    }
    )
winner_summary_frame = pl.DataFrame(winner_summary_rows)

Main Results

Decision is based off: primary Sharpe ranking, size of the leadership gap, shifted-regime behavior, trading style, and cost robustness. Secondary diagnostics are in the appendix.

1. Primary Sharpe Ranking

Trend t-stat is the mean-Sharpe leader for both model families, but the uncertainty bands and small gaps argue for a shortlist rather than a single-target decision.

Code
(
        ggplot(sharpe_plot_data.with_columns(pl.col('method_label').is_in(SHORTLIST_LABELS).alias('shortlist_flag')),
               aes(x='method_label', y='daily_sharpe_mean', color='model_label', alpha='shortlist_flag')
               )
        + geom_hline(yintercept=0.0,
                     linetype='dashed',
                     color='#b0b7c3'
                     )
        + geom_segment(aes(x='method_label', xend='method_label', y='daily_sharpe_ci_low', yend='daily_sharpe_ci_high',
                           color='model_label'
                           ),
                       size=1.5,
                       alpha=0.65,
                       )
        + geom_point(size=3.2)
        + facet_grid(y='model_label')
        + facet_separator_theme
        + coord_flip()
        + scale_x_discrete(limits=method_order)
        + scale_color_manual(values=model_palette)
        + scale_alpha_manual(values={True: 1.0, False: 0.22}, guide='none')
        + labs(title='Primary ranking: average daily Sharpe',
               x='',
               y='Average daily Sharpe with uncertainty band',
               color='Model family',
               )
        + plot_size(980, 420)
)

The key point is not that one target dominates; it is that Trend t-stat leads while several alternatives remain inside a practical tie band.

2. Leadership Gap

The lead over MAE-penalized return is small in both model families. Will guide our decision path, but not a final target selection.

Code
(
        ggplot(winner_summary_frame)
        + geom_segment(aes(x='runner_up_daily_sharpe_mean',
                           xend='best_daily_sharpe_mean', y='model_label',
                           yend='model_label'
                           ),
                       color='#8d99ae',
                       size=4,
                       alpha=0.8,
                       )
        + geom_point(aes(x='runner_up_daily_sharpe_mean', y='model_label'), color='#b56576',
                     size=4
                     )
        + geom_point(aes(x='best_daily_sharpe_mean',
                         y='model_label'
                         ),
                     color='#2a9d8f', size=4
                     )
        + labs(title='Gap between the best and second-best target',
               x='Mean daily Sharpe',
               y='',
               )
        + plot_size(980, 220)
)

MAE-penalized return is the runner up, and the small gap behind first place is why we include it in downstream experiments despite not being the mean-Sharpe leader.

3. Shifted-Regime Read

The shifted regimes do not erase the edge, but they change which targets look resilient. We should focus on shortlist behavior, not all ten labels.

Code
(
        ggplot(shift_plot_data.filter(pl.col('method_label').is_in(SHORTLIST_LABELS)),
               aes(x='eval_label', y='daily_sharpe_mean', group='method_label', color='method_label')
               )
        + geom_hline(yintercept=0.0,
                     linetype='dashed', color='#b0b7c3'
                     )
        + geom_line(size=1.0, alpha=0.85)
        + geom_point(size=2.4
                     )
        + facet_grid(y='model_label')
        + facet_separator_theme
        + scale_color_manual(values=method_palette)
        + labs(title='Stable-to-shifted Sharpe sensitivity',
               x='Evaluation slice',
               y='Mean daily Sharpe',
               color='Target',
               )
        + plot_size(980, 420)
)

This is a stress check: targets that only work in stable slices are not ideal and should be demoted.

4. Trading Style Map

Targets change trading behavior as much as forecast accuracy. Up/right is better; bubble size shows participation. Tradeability score is the clearest low-turnover style outlier.

Code
(
        ggplot(
            risk_return_plot_data.filter(pl.col('method_label').is_in(SHORTLIST_LABELS)),
            aes(x='mean_daily_gross_pnl_bps_mean', y='daily_sharpe_mean', size='trade_rate_mean'),
        )
        + geom_hline(yintercept=0.0, linetype='dashed', color='#b0b7c3')
        + geom_vline(xintercept=0.0, linetype='dashed',
                     color='#b0b7c3'
                     )
        + geom_point(aes(color='method_label'), alpha=0.82
                     )
        + geom_text(aes(label='method_label', color='method_label'),
                    size=7, nudge_y=0.00035, show_legend=False
                    )
        + facet_grid(y='model_label')
        + facet_separator_theme
        + scale_color_manual(values=method_palette)
        + labs(title='Trading outcome map: return, Sharpe, and trade rate',
               subtitle='Up/right is better; larger bubbles trade more often',
               x='Mean daily gross PnL (bps)',
               y='Average daily Sharpe',
               size='Mean trade rate',
               color='Target',
               )
        + plot_size(980, 600)
)

This chart explains the style trade-off: high participation can raise gross PnL but also increases cost and fill sensitivity.

5. Cost Robustness

Gross Sharpe is not enough. Lower-turnover alternatives become more competitive as round-trip costs rise, which is why MAE-penalized return remains on the shortlist.

Code
COST_LEVELS_BPS = [0.0, 0.25, 0.50, 1.00, 2.00]

cost_rows = []
for cost_bps in COST_LEVELS_BPS:
    cost_frame = trade_day_metrics.filter(pl.col('model_family') != 'benchmark').with_columns(
        (pl.col('gross_pnl_bps') - pl.lit(cost_bps) * pl.col('trade_flag').cast(pl.Float64)).alias('net_pnl_bps')
    )
    for keys, part in cost_frame.partition_by(
            ['run_id', 'scenario_name', 'scenario_kind', 'seed', 'model_family', 'method_name'], as_dict=True
    ).items():
        run_id, scenario_name, scenario_kind, seed, model_family, method_name = keys
        pnl = part['net_pnl_bps'].to_numpy()
        cost_rows.append({
            'round_trip_cost_bps': cost_bps,
            'run_id': run_id,
            'scenario_name': scenario_name,
            'scenario_kind': scenario_kind,
            'seed': seed,
            'model_family': model_family,
            'method_name': method_name,
            'net_daily_sharpe': sharpe_ratio(pnl),
            'mean_daily_net_pnl_bps': finite_mean(pnl),
            'trade_rate': float(part['trade_flag'].cast(pl.Float64).mean()),
        }
        )

cost_sensitivity_run_table = pl.DataFrame(cost_rows)
cost_sensitivity_table = (
    cost_sensitivity_run_table
    .group_by(['round_trip_cost_bps', 'model_family', 'method_name'])
    .agg([
        pl.col('net_daily_sharpe').filter(pl.col('net_daily_sharpe').is_finite()).mean().alias('net_daily_sharpe_mean'),
        pl.mean('mean_daily_net_pnl_bps').alias('mean_daily_net_pnl_bps_mean'),
        pl.mean('trade_rate').alias('trade_rate_mean'),
        (pl.col('net_daily_sharpe').filter(pl.col('net_daily_sharpe').is_finite()) > 0.0).mean().alias(
            'positive_net_sharpe_run_rate'
        ),
    ]
    )
    .with_columns(
        pl.col('net_daily_sharpe_mean').rank('ordinal', descending=True).over(
            ['round_trip_cost_bps', 'model_family']
        ).alias('cost_rank')
    )
    .sort(['model_family', 'round_trip_cost_bps', 'cost_rank'])
)

cost_winner_table = cost_sensitivity_table.filter(pl.col('cost_rank') <= 3)
cost_plot_data = relabel_frame(cost_sensitivity_table)
Code
cost_leader_display = (
    cost_sensitivity_table
    .filter(pl.col('cost_rank') == 1)
    .sort(['model_family', 'round_trip_cost_bps'])
)

relabel_frame(cost_leader_display).select([
    pl.col('round_trip_cost_bps').alias('Cost bps'),
    'model_label',
    'method_label',
    pl.col('net_daily_sharpe_mean').round(4).alias('net Sharpe'),
    pl.col('mean_daily_net_pnl_bps_mean').round(3).alias('net PnL bps'),
    pl.col('trade_rate_mean').round(3).alias('trade rate'),
])
Loading ITables v2.7.3 from the internet... (need help?)

At higher cost assumptions, lower-turnover targets become more competitive. This supports keeping MAE-penalized return as the main challenger.

Interpretation

The evidence supports a shortlist, not a permanent target. Trend t-stat is the best current default because it leads mean daily Sharpe for both model families. MAE-penalized return is the most important challenger because it is close on Sharpe, trades less, and explicitly penalizes adverse path pain. Barrier reward remains useful as an event-time alternative. Tradeability score is better interpreted as a conservative gating lens than as the primary alpha label.

Future research should test this shortlist under net costs, realistic execution assumptions, policy variants, and non-synthetic data.

Limitations

These results should not be over-generalized.

  1. The experiment is single-ticker and one-event-per-day only.
  2. Results are gross of transaction costs and execution uncertainty except for the simple cost sweep.
  3. The market design is synthetic, even though it includes stable and shifted regimes.
  4. Daily Sharpe is measured on synthetic day-level PnL, not a live portfolio.
  5. The top target gaps are small, so current winners are best interpreted as shortlist candidates.

Decision

Carry forward Trend t-stat, MAE-penalized return, and Barrier reward. Use Tradeability score as a conservative gating benchmark. Do not select a permanent target until the shortlist survives net costs, execution assumptions, policy sensitivity, shifted scenarios, and non-synthetic validation.

Appendix

Includes technical validation, simulator diagnostics, representative examples, and exact target definitions

Appendix A: Simulation Design and Validation

These figures and tables document that the synthetic test grid, walk-forward validation, and label checks behaved as intended.

Code
ggplot(scenario_metric_frame,
       aes(x='metric_label', y='scenario_label', fill='value')
       ) + geom_tile() + scale_fill_gradient2(low='#4c78a8',
                                              mid='#f6f7f9',
                                              high='#d1495b',
                                              midpoint=1.0
                                              ) + labs(
    title='How shifted regimes become harder than stable regimes',
    x='',
    y='Scenario',
    fill='Value',
) + theme(axis_text_x=element_text(angle=35, hjust=1)) + plot_size(1100, 320)
Code
signal_fidelity_frame = (
    panel_summary.group_by(['scenario_name', 'scenario_kind']).agg([
        pl.mean('feature_to_oracle_corr').alias('mean_corr'),
        pl.min('feature_to_oracle_corr').alias('min_corr'),
        pl.max('feature_to_oracle_corr').alias('max_corr'),
    ]
    )
    .with_columns(pl.col('scenario_name').str.replace_all('_', ' ').alias('scenario_label'))
    .sort('mean_corr', descending=True)
)

ggplot(signal_fidelity_frame,
       aes(x='scenario_label', y='mean_corr', color='scenario_kind')
       ) + geom_segment(
    aes(x='scenario_label', xend='scenario_label', y='min_corr', yend='max_corr', color='scenario_kind'),
    size=2.6,
    alpha=0.45,
) + geom_point(size=4) + coord_flip() + scale_color_manual(values={'stable': '#2e8b57', 'shifted': '#b24c63'}) + labs(
    title='How well the key observable feature tracks the hidden signal',
    x='',
    y='Feature-to-hidden-signal correlation',
    color='Scenario kind',
) + plot_size(920, 260)

Appendix A1: Validation Checks

These checks are included for auditability.

Code
tests_summary.select([
    pl.col('test').alias('Check'),
    pl.when(pl.col('passed')).then(pl.lit('Passed')).otherwise(pl.lit('Failed')).alias('Status'),
    pl.col('detail').alias('Detail'),
]
)
Loading ITables v2.7.3 from the internet... (need help?)
Code
fold_timeline_rows = []
for row in reference_fold_meta.iter_rows(named=True):
    fold_label = f"Fold {row['fold']}"
    train_end = int(row['train_days'])
    embargo_end = train_end + int(row['business_day_gap'])
    val_end = embargo_end + int(row['val_days'])
    fold_timeline_rows.extend([
        {
            'fold_label': fold_label,
            'segment': 'Train',
            'start_idx': 0,
            'end_idx': train_end,
        },
        {
            'fold_label': fold_label,
            'segment': 'Embargo',
            'start_idx': train_end,
            'end_idx': embargo_end,
        },
        {
            'fold_label': fold_label,
            'segment': 'Validation',
            'start_idx': embargo_end,
            'end_idx': val_end,
        },
    ]
    )

fold_timeline_frame = pl.DataFrame(fold_timeline_rows)

ggplot(fold_timeline_frame) + geom_segment(
    aes(x='start_idx', xend='end_idx', y='fold_label', yend='fold_label', color='segment'),
    size=10
) + scale_color_manual(values={'Train': '#355070', 'Embargo': '#d08c60', 'Validation': '#4c956c'}) + labs(
    title='Walk-forward validation layout with 5-day buffer',
    x='Relative day from run start',
    y='',
    color='Segment',
) + plot_size(980, 260)
Code
analysis_validation_summary.select([
    pl.col('test').alias('Check'),
    pl.when(pl.col('passed')).then(pl.lit('Passed')).otherwise(pl.lit('Failed')).alias('Status'),
    pl.col('detail').alias('Detail'),
]
)
Loading ITables v2.7.3 from the internet... (need help?)

Appendix B: Representative Examples

These examples help build intuition for how the target labels and trade decisions behave in one run. They are not robustness evidence and can be excluded from the non-technical report.

Code
(
        ggplot(equity_curve_plot_data,
               aes(x='day_idx', y='cumulative_gross_pnl_bps', color='method_label')
               )
        + geom_vline(xintercept=representative_shift_start - 0.5, linetype='dashed', color='#5c677d')
        + geom_hline(yintercept=0.0,
                     linetype='dashed',
                     color='#b0b7c3'
                     )
        + geom_line(size=1.0, alpha=0.82)
        + scale_color_manual(values=method_palette)
        + labs(title='Representative run: cumulative gross PnL',
               x='Synthetic day',
               y='Cumulative gross PnL (bps)',
               color='Target',
               )
        + plot_size(980, 420)
)
Code
(
        ggplot(trade_journal_plot_data, aes(x='day_idx', y='gross_pnl_bps'))
        + geom_vline(xintercept=representative_shift_start - 0.5, linetype='dashed', color='#5c677d')
        + geom_hline(yintercept=0.0,
                     linetype='dashed',
                     color='#b0b7c3'
                     )
        + geom_point(aes(size='position_size', color='phase_label'), alpha=0.78)
        + facet_grid(y='method_label',
                     scales='free_y'
                     )
        + facet_separator_theme
        + scale_color_manual(values=phase_palette)
        + labs(title='Representative run: trade decisions and outcomes',
               x='Synthetic day',
               y='Gross PnL (bps)',
               size='Position size',
               color='Phase',
               )
        + plot_size(980, 420)
)

Appendix C: Secondary Diagnostics

These diagnostics help explain the result surface but should not be treated as the main decision rule.

Appendix C1: Seed-Budget Robustness

This diagnostics are for analyst review.

Code
(
        ggplot(seed_budget_curve_table,
               aes(x='seed_budget', y='daily_sharpe_mean', group='method_label', color='method_label')
               )
        + geom_hline(yintercept=0.0, linetype='dashed', color='#b0b7c3')
        + geom_line(size=1.0, alpha=0.9)
        + geom_point(size=2.4)
        + facet_grid(y='model_label')
        + facet_separator_theme
        + scale_color_manual(values=method_palette)
        + labs(title='Seed-budget robustness of target rankings',
               x='Seeds included',
               y='Average daily Sharpe',
               color='Target')
        + plot_size(980, 420)
)

This is a Monte Carlo stability check to support analyst confidence.

Appendix C2: Winner Stability Across Runs

Code
(
        ggplot(stability_plot_data,
               aes(x='model_label', y='method_label', fill='win_rate')
               )
        + geom_tile()
        + scale_y_discrete(limits=method_order)
        + scale_fill_gradient(low='#edf2f7', high='#355070')
        + labs(title='Winner stability across runs',
               x='',
               y='Training target',
               fill='Win share',
               )
        + plot_size(760, 340)
)

Run-level win share is useful as a robustness check, but low win share alone does not disqualify a target with lower turnover or drawdown.

Appendix C3: Mean Daily Gross PnL

Code
(
        ggplot(pnl_plot_data,
               aes(x='method_label', y='mean_daily_gross_pnl_bps_mean', color='model_label')
               )
        + geom_hline(yintercept=0.0,
                     linetype='dashed',
                     color='#b0b7c3'
                     )
        + geom_segment(aes(x='method_label',
                           xend='method_label',
                           y='mean_daily_gross_pnl_bps_ci_low',
                           yend='mean_daily_gross_pnl_bps_ci_high',
                           color='model_label',
                           ),
                       size=1.5,
                       alpha=0.65
                       )
        + geom_point(size=3.2)
        + facet_grid(y='model_label')
        + facet_separator_theme
        + coord_flip()
        + scale_x_discrete(limits=method_order)
        + scale_color_manual(values=model_palette)
        + labs(title='Secondary ranking: mean daily gross PnL',
               x='',
               y='Mean daily gross PnL (bps) with uncertainty band',
               color='Model family'
               )
        + plot_size(980, 420)
)

Mean PnL confirms economic direction but should not override weak Sharpe separation.

Appendix C4: Multi-Objective Ranking Heuristic

Code
PRACTICAL_SHARPE_TIE = 0.005
PRACTICAL_PNL_TIE_BPS = 0.25

run_level_extra = (
    model_run_summary
    .filter(pl.col('eval_slice') == 'all')
    .group_by(['model_family', 'method_name'])
    .agg([
        (pl.col('daily_sharpe') > 0.0).mean().alias('positive_sharpe_run_rate'),
        (pl.col('mean_daily_gross_pnl_bps') > 0.0).mean().alias('positive_pnl_run_rate'),
        pl.mean('max_drawdown_bps').alias('max_drawdown_bps_mean'),
        pl.median('max_drawdown_bps').alias('max_drawdown_bps_median'),
        pl.mean('trade_hit_rate').alias('trade_hit_rate_mean'),
        pl.mean('avg_trade_size').alias('avg_trade_size_mean'),
    ]
    )
)

base_shift_summary = (
    aggregate_summary
    .filter(pl.col('eval_slice').is_in(['base', 'shifted']))
    .select([
        'model_family',
        'method_name',
        'eval_slice',
        'daily_sharpe_mean',
        'mean_daily_gross_pnl_bps_mean',
    ]
    )
    .pivot(
        values=['daily_sharpe_mean', 'mean_daily_gross_pnl_bps_mean'],
        index=['model_family', 'method_name'],
        on='eval_slice',
    )
    .rename({
        'daily_sharpe_mean_base': 'base_daily_sharpe_mean',
        'daily_sharpe_mean_shifted': 'shifted_daily_sharpe_mean',
        'mean_daily_gross_pnl_bps_mean_base': 'base_mean_daily_gross_pnl_bps',
        'mean_daily_gross_pnl_bps_mean_shifted': 'shifted_mean_daily_gross_pnl_bps',
    }
    )
    .with_columns([
        (pl.col('shifted_daily_sharpe_mean') - pl.col('base_daily_sharpe_mean')).alias('shift_minus_base_sharpe'),
        (pl.col('shifted_mean_daily_gross_pnl_bps') - pl.col('base_mean_daily_gross_pnl_bps')).alias(
            'shift_minus_base_pnl_bps'
        ),
    ]
    )
)

robust_ranking_table = (
    aggregate_summary
    .filter((pl.col('eval_slice') == 'all') & pl.col('daily_sharpe_mean').is_finite())
    .select([
        'model_family',
        'method_name',
        'daily_sharpe_mean',
        'daily_sharpe_ci_low',
        'daily_sharpe_ci_high',
        'mean_daily_gross_pnl_bps_mean',
        'trade_rate_mean',
        'mean_rank',
        'win_rate',
    ]
    )
    .join(run_level_extra, on=['model_family', 'method_name'], how='left')
    .join(base_shift_summary, on=['model_family', 'method_name'], how='left')
    .with_columns([
        pl.col('daily_sharpe_mean').max().over('model_family').alias('family_best_daily_sharpe_mean'),
        pl.col('mean_daily_gross_pnl_bps_mean').max().over('model_family').alias('family_best_pnl_mean'),
    ]
    )
    .with_columns([
        (pl.col('family_best_daily_sharpe_mean') - pl.col('daily_sharpe_mean')).alias('sharpe_gap_to_family_best'),
        (pl.col('family_best_pnl_mean') - pl.col('mean_daily_gross_pnl_bps_mean')).alias('pnl_gap_to_family_best_bps'),
    ]
    )
    .with_columns([
        (pl.col('sharpe_gap_to_family_best') <= PRACTICAL_SHARPE_TIE).alias('inside_practical_sharpe_tie'),
        (pl.col('pnl_gap_to_family_best_bps') <= PRACTICAL_PNL_TIE_BPS).alias('inside_practical_pnl_tie'),
        pl.col('daily_sharpe_mean').rank('ordinal', descending=True).over('model_family').alias('mean_sharpe_rank'),
        pl.col('shifted_daily_sharpe_mean').rank('ordinal', descending=True).over('model_family').alias(
            'shifted_sharpe_rank'
        ),
        pl.col('positive_sharpe_run_rate').rank('ordinal', descending=True).over('model_family').alias(
            'positive_run_rank'
        ),
        pl.col('max_drawdown_bps_mean').rank('ordinal').over('model_family').alias('drawdown_rank'),
    ]
    )
    .with_columns(
        (
                pl.col('mean_sharpe_rank')
                + pl.col('shifted_sharpe_rank')
                + pl.col('positive_run_rank')
                + pl.col('drawdown_rank')
        ).alias('robust_rank_score')
    )
    .sort(['model_family', 'robust_rank_score', 'daily_sharpe_mean'], descending=[False, False, True])
)

robust_shortlist_table = (
    robust_ranking_table
    .filter(
        pl.col('inside_practical_sharpe_tie')
        | (pl.col('shifted_sharpe_rank') <= 2)
        | (pl.col('positive_run_rank') <= 2)
        | (pl.col('drawdown_rank') <= 2)
    )
    .sort(['model_family', 'robust_rank_score', 'daily_sharpe_mean'], descending=[False, False, True])
)

robust_ranking_plot_data = relabel_frame(robust_ranking_table)
robust_shortlist_display = relabel_frame(robust_shortlist_table)
Code
robust_shortlist_display.select([
    'model_label',
    'method_label',
    pl.col('daily_sharpe_mean').round(4).alias('mean Sharpe'),
    pl.col('shifted_daily_sharpe_mean').round(4).alias('shifted Sharpe'),
    pl.col('mean_daily_gross_pnl_bps_mean').round(3).alias('mean PnL bps'),
    pl.col('trade_rate_mean').round(3).alias('trade rate'),
    pl.col('positive_sharpe_run_rate').round(3).alias('positive Sharpe run rate'),
    pl.col('max_drawdown_bps_mean').round(1).alias('mean max drawdown bps'),
    'inside_practical_sharpe_tie',
    'robust_rank_score',
]
)
Loading ITables v2.7.3 from the internet... (need help?)

Interpret robust_rank_score as a heuristic screen, not as a formal utility function. The score uses equal-weight ordinal ranks across mean Sharpe, shifted Sharpe, positive-run rate, and drawdown.

Appendix C5: Policy and Sizing Sensitivity

Code
def summarize_policy_frame(policy_frame: pl.DataFrame, policy_name: str) -> list[dict[str, object]]:
    rows = []
    for keys, part in policy_frame.partition_by(
            ['run_id', 'scenario_name', 'scenario_kind', 'seed', 'model_family', 'method_name'], as_dict=True
    ).items():
        run_id, scenario_name, scenario_kind, seed, model_family, method_name = keys
        pnl = part['policy_pnl_bps'].to_numpy()
        size = part['policy_position_size'].to_numpy()
        trade_flag = size > 0.0
        rows.append({
            'policy_name': policy_name,
            'run_id': run_id,
            'scenario_name': scenario_name,
            'scenario_kind': scenario_kind,
            'seed': seed,
            'model_family': model_family,
            'method_name': method_name,
            'daily_sharpe': sharpe_ratio(pnl),
            'mean_daily_pnl_bps': finite_mean(pnl),
            'trade_rate': float(np.mean(trade_flag)),
            'avg_trade_size': float(np.mean(size[trade_flag])) if np.any(trade_flag) else 0.0,
            'trade_hit_rate': float(np.mean(pnl[trade_flag] > 0.0)) if np.any(trade_flag) else float('nan'),
        }
        )
    return rows


policy_base = trade_day_metrics.filter(pl.col('model_family') != 'benchmark')
current_policy_frame = policy_base.select([
    'run_id', 'scenario_name', 'scenario_kind', 'seed', 'model_family', 'method_name',
    pl.col('position_size').alias('policy_position_size'),
    pl.col('gross_pnl_bps').alias('policy_pnl_bps'),
]
)

binary_policy_frame = policy_base.with_columns(
    pl.when(pl.col('prediction') > 0.0).then(1.0).otherwise(0.0).alias('policy_position_size')
).with_columns(
    (pl.col('policy_position_size') * pl.col('realized_return_h60_bps')).alias('policy_pnl_bps')
).select([
    'run_id', 'scenario_name', 'scenario_kind', 'seed', 'model_family', 'method_name',
    'policy_position_size', 'policy_pnl_bps',
]
)

positive_prediction_cutoffs = (
    policy_base
    .filter(pl.col('prediction') > 0.0)
    .group_by(['run_id', 'model_family', 'method_name'])
    .agg(pl.col('prediction').quantile(0.50).alias('positive_prediction_median'))
)
selective_policy_frame = policy_base.join(
    positive_prediction_cutoffs,
    on=['run_id', 'model_family', 'method_name'],
    how='left',
).with_columns(
    pl.when((pl.col('prediction') > 0.0) & (pl.col('prediction') >= pl.col('positive_prediction_median')))
    .then(1.0)
    .otherwise(0.0)
    .alias('policy_position_size')
).with_columns(
    (pl.col('policy_position_size') * pl.col('realized_return_h60_bps')).alias('policy_pnl_bps')
).select([
    'run_id', 'scenario_name', 'scenario_kind', 'seed', 'model_family', 'method_name',
    'policy_position_size', 'policy_pnl_bps',
]
)

policy_sensitivity_run_table = pl.DataFrame(
    summarize_policy_frame(current_policy_frame, 'current scaled positive-score')
    + summarize_policy_frame(binary_policy_frame, 'binary positive-score')
    + summarize_policy_frame(selective_policy_frame, 'top-half positive-score')
)

policy_sensitivity_table = (
    policy_sensitivity_run_table
    .group_by(['policy_name', 'model_family', 'method_name'])
    .agg([
        pl.col('daily_sharpe').filter(pl.col('daily_sharpe').is_finite()).mean().alias('daily_sharpe_mean'),
        pl.mean('mean_daily_pnl_bps').alias('mean_daily_pnl_bps_mean'),
        pl.mean('trade_rate').alias('trade_rate_mean'),
        (pl.col('daily_sharpe').filter(pl.col('daily_sharpe').is_finite()) > 0.0).mean().alias(
            'positive_sharpe_run_rate'
        ),
    ]
    )
    .with_columns(
        pl.col('daily_sharpe_mean').rank('ordinal', descending=True).over(['policy_name', 'model_family']).alias(
            'policy_rank'
        )
    )
    .sort(['model_family', 'policy_name', 'policy_rank'])
)

policy_winner_table = policy_sensitivity_table.filter(pl.col('policy_rank') <= 3)
policy_plot_data = relabel_frame(policy_sensitivity_table)
Code
relabel_frame(policy_winner_table).select([
    'policy_name',
    'model_label',
    'method_label',
    pl.col('daily_sharpe_mean').round(4).alias('mean Sharpe'),
    pl.col('mean_daily_pnl_bps_mean').round(3).alias('mean PnL bps'),
    pl.col('trade_rate_mean').round(3).alias('trade rate'),
    pl.col('positive_sharpe_run_rate').round(3).alias('positive run rate'),
    'policy_rank',
]
)
Loading ITables v2.7.3 from the internet... (need help?)

Policy changes can alter which target is most attractive. This is a required promotion check for the shortlist.

Appendix C6: Scenario Robustness

Code
scenario_robustness_table = (
    model_run_summary
    .filter((pl.col('eval_slice') == 'all') & (pl.col('model_family') != 'benchmark'))
    .group_by(['model_family', 'method_name', 'scenario_name', 'scenario_kind'])
    .agg([
        pl.mean('daily_sharpe').alias('daily_sharpe_mean'),
        pl.mean('mean_daily_gross_pnl_bps').alias('mean_daily_gross_pnl_bps_mean'),
        (pl.col('daily_sharpe') > 0.0).mean().alias('positive_sharpe_run_rate'),
    ]
    )
    .sort(['model_family', 'method_name', 'scenario_name'])
)

scenario_worst_case_table = (
    scenario_robustness_table
    .group_by(['model_family', 'method_name'])
    .agg([
        pl.min('daily_sharpe_mean').alias('worst_scenario_daily_sharpe_mean'),
        pl.mean('daily_sharpe_mean').alias('mean_scenario_daily_sharpe_mean'),
        pl.std('daily_sharpe_mean').alias('scenario_daily_sharpe_std'),
    ]
    )
    .with_columns(
        pl.when(pl.col('worst_scenario_daily_sharpe_mean').is_finite())
        .then(pl.col('worst_scenario_daily_sharpe_mean').rank('ordinal', descending=True).over('model_family'))
        .otherwise(None)
        .alias('worst_case_rank')
    )
    .sort(['model_family', 'worst_case_rank'])
)


def eta_squared(frame: pl.DataFrame, group_col: str, metric_col: str) -> float:
    subset = (
        frame
        .select([group_col, metric_col])
        .filter(pl.col(metric_col).is_finite())
        .drop_nulls()
    )
    if subset.is_empty():
        return float('nan')
    values = subset[metric_col].to_numpy()
    grand_mean = float(np.mean(values))
    total = float(np.dot(values - grand_mean, values - grand_mean))
    if total <= 0.0:
        return float('nan')
    group_summary = subset.group_by(group_col).agg([
        pl.len().alias('n'),
        pl.mean(metric_col).alias('group_mean'),
    ]
    )
    between = float(np.sum(
        group_summary['n'].to_numpy()
        * (group_summary['group_mean'].to_numpy() - grand_mean) ** 2
    )
    )
    return between / total


variance_rows = []
variance_frame = model_run_summary.filter((pl.col('eval_slice') == 'all') & (pl.col('model_family') != 'benchmark'))
for model_family in MODEL_FAMILIES:
    part = variance_frame.filter(pl.col('model_family') == model_family)
    for factor_col, factor_label in [
        ('method_name', 'Target'),
        ('scenario_name', 'Scenario'),
        ('seed', 'Seed'),
    ]:
        variance_rows.append({
            'model_family': model_family,
            'factor': factor_label,
            'eta_squared_daily_sharpe': eta_squared(part, factor_col, 'daily_sharpe'),
        }
        )
variance_decomposition_table = pl.DataFrame(variance_rows).sort(['model_family', 'eta_squared_daily_sharpe'],
                                                                descending=[False, True]
                                                                )

scenario_plot_data = relabel_frame(scenario_robustness_table)
scenario_worst_case_display = relabel_frame(scenario_worst_case_table)
variance_plot_data = relabel_frame(variance_decomposition_table)
Code
scenario_worst_case_display.filter(
    pl.col('worst_scenario_daily_sharpe_mean').is_finite()
    & pl.col('mean_scenario_daily_sharpe_mean').is_finite()
    & pl.col('scenario_daily_sharpe_std').is_finite()
).select([
    'model_label',
    'method_label',
    pl.col('worst_scenario_daily_sharpe_mean').round(4).alias('worst scenario Sharpe'),
    pl.col('mean_scenario_daily_sharpe_mean').round(4).alias('mean scenario Sharpe'),
    pl.col('scenario_daily_sharpe_std').round(4).alias('scenario Sharpe std'),
    'worst_case_rank',
]
)
Loading ITables v2.7.3 from the internet... (need help?)
Code
(
        ggplot(
            variance_plot_data.filter(pl.col('eta_squared_daily_sharpe').is_finite()),
            aes(x='factor', y='eta_squared_daily_sharpe', fill='factor'),
        )
        + geom_bar(stat='identity', width=0.72)
        + coord_flip()
        + facet_wrap('model_label', ncol=1)
        + facet_separator_theme
        + labs(title='What Drives Run-to-Run Sharpe Variation?',
               subtitle='One-way eta-squared by model family; larger bars explain more Sharpe dispersion',
               x='Factor',
               y='Share of Sharpe variance',
               fill='Factor',
               )
        + scale_fill_manual(values={'Target': '#54a24b',
                                    'Scenario': '#e45756',
                                    'Seed': '#4c78a8',
                                    })
        + plot_size(420, 280)
)

Rows with undefined scenario Sharpe are excluded from ranking interpretation. The variance decomposition is a secondary diagnostic and should not override the direct shifted-regime read.

Appendix C7: Target Correlation and Redundancy

Code
target_corr_rows = []
for left, right in product(TARGET_COLS, TARGET_COLS):
    left_values = reference_frame[left].to_numpy()
    right_values = reference_frame[right].to_numpy()
    target_corr_rows.append({
        'left_method': left,
        'right_method': right,
        'spearman_corr': rank_ic(left_values, right_values),
    }
    )

target_correlation_table = pl.DataFrame(target_corr_rows).with_columns([
    pl.col('left_method').replace(method_labels).alias('left_label'),
    pl.col('right_method').replace(method_labels).alias('right_label'),
]
)

target_redundancy_table = (
    target_correlation_table
    .filter(pl.col('left_method') < pl.col('right_method'))
    .with_columns(pl.col('spearman_corr').abs().alias('abs_spearman_corr'))
    .sort('abs_spearman_corr', descending=True)
)

target_corr_plot_data = target_correlation_table
Code
target_redundancy_table.select([
    'left_label',
    'right_label',
    pl.col('spearman_corr').round(3).alias('Spearman correlation'),
]
).head(20)
Loading ITables v2.7.3 from the internet... (need help?)

The high correlations is why main focus is on target families rather than the independent targets.

Appendix C8: Target-Parameter Sensitivity

Code
mae_lambda_grid = [0.0, 0.25, 0.50, 0.75, 1.00]
tradeability_threshold_grid = [-0.0020, 0.0, 0.0020]
tradeability_mae_cap_grid = [0.0020, CONFIG['tradeability_mae_cap'], 0.0060]

reference_terminal_log_return = np.log1p(reference_frame['realized_return_h60'].to_numpy())
reference_mae = reference_frame['max_adverse_excursion_h60'].to_numpy()
reference_fixed_target = reference_frame['target_fixed_rate_h60'].to_numpy()
reference_tradeability_target = reference_frame['target_tradeability_score'].to_numpy()

parameter_rows = []
for penalty_lambda in mae_lambda_grid:
    candidate = (reference_terminal_log_return - penalty_lambda * reference_mae) / CONFIG['primary_horizon']
    parameter_rows.append({
        'target_family': 'MAE-penalized return',
        'parameter_label': f'lambda={penalty_lambda:.2f}',
        'spearman_to_current_target': rank_ic(reference_frame['target_mae_penalized_rate_h60'].to_numpy(), candidate),
        'spearman_to_fixed_return': rank_ic(reference_fixed_target, candidate),
        'mean_value': finite_mean(candidate),
        'std_value': finite_std(candidate),
    }
    )

for return_threshold in tradeability_threshold_grid:
    for mae_cap in tradeability_mae_cap_grid:
        candidate = np.asarray([
            tradeability_score(
                terminal_log_return=float(ret),
                max_adverse_excursion_value=float(mae),
                return_threshold=return_threshold,
                return_temp=CONFIG['tradeability_return_temp'],
                mae_cap=mae_cap,
                mae_temp=CONFIG['tradeability_mae_temp'],
            )
            for ret, mae in zip(reference_terminal_log_return, reference_mae)
        ]
        )
        parameter_rows.append({
            'target_family': 'Tradeability score',
            'parameter_label': f'ret_thr={return_threshold:.4f}; mae_cap={mae_cap:.4f}',
            'spearman_to_current_target': rank_ic(reference_tradeability_target, candidate),
            'spearman_to_fixed_return': rank_ic(reference_fixed_target, candidate),
            'mean_value': finite_mean(candidate),
            'std_value': finite_std(candidate),
        }
        )

target_parameter_sensitivity_table = pl.DataFrame(parameter_rows).sort(['target_family', 'parameter_label'])
parameter_plot_data = target_parameter_sensitivity_table
Code
target_parameter_sensitivity_table.select([
    'target_family',
    'parameter_label',
    pl.col('spearman_to_current_target').round(3).alias('corr to current target'),
    pl.col('spearman_to_fixed_return').round(3).alias('corr to fixed return'),
    pl.col('mean_value').round(5).alias('mean value'),
    pl.col('std_value').round(5).alias('std value'),
]
)
Loading ITables v2.7.3 from the internet... (need help?)

Nearby parameter choices do not materially change rank ordering for the tested MAE and tradeability variants.

Code
(
        ggplot(oracle_plot_data, aes(x='scenario_kind_label', y='value', group='metric_label'))
        + geom_line(color='#8d99ae', size=0.9)
        + geom_point(aes(color='scenario_kind_label'), size=3.6)
        + facet_grid(y='metric_label', scales='free_y')
        + facet_separator_theme
        + scale_color_manual(values={'Stable': '#2e8b57', 'Shifted': '#b24c63'})
        + labs(title='Oracle benchmark degradation in shifted regimes',
               x='',
               y='Value',
               color='Scenario kind', )
        + plot_size(880, 300)
)

Prediction Rank Versus Trading Rank

Question: Do targets with better row-level prediction quality also become better trading targets?

Takeaway: Not reliably. Prediction IC ranks whether scores order realized 60-minute returns correctly; trading Sharpe ranks the deployed long/skip sizing policy. Points above the diagonal monetize better than their prediction rank suggests, while points below it have prediction quality that translates less cleanly into the trading rule.

Code
(
        ggplot(diagnostic_rank_plot_data,
               aes(x='prediction_ic_rank', y='trading_sharpe_rank', size='mean_trade_rate'))
        + geom_abline(slope=1.0, intercept=0.0, linetype='dashed', color='#8d99ae')
        + geom_point(aes(color='method_label'), alpha=0.82)
        + geom_text(aes(label='method_label', color='method_label'), size=7, nudge_y=-0.18, show_legend=False)
        + facet_grid(y='model_label')
        + facet_separator_theme
        + scale_x_reverse(breaks=list(range(1, len(TARGET_COLS) + 1)))
        + scale_y_reverse(breaks=list(range(1, len(TARGET_COLS) + 1)))
        + scale_color_manual(values=method_palette)
        + labs(title='Prediction rank versus trading rank',
               subtitle='Trainable targets only; up/right is better, diagonal means prediction rank equals trading rank',
               x='Prediction IC rank',
               y='Trading Sharpe rank',
               size='Mean trade rate',
               color='Target')
        + plot_size(980, 500)
)
Code
(
        ggplot(paired_plot_data,
               aes(x='challenger_label', y='sharpe_diff_mean', color='model_label'))
        + geom_hline(yintercept=0.0,
                     linetype='dashed',
                     color='#b0b7c3'
                     )
        + geom_segment(aes(x='challenger_label',
                           xend='challenger_label', y='sharpe_diff_ci_low', yend='sharpe_diff_ci_high',
                           color='model_label'
                           ), size=1.5,
                       alpha=0.65)
        + geom_point(size=3.2)
        + facet_grid(y='model_label')
        + facet_separator_theme
        + coord_flip()
        + scale_x_discrete(limits=method_order)
        + scale_color_manual(values=model_palette)
        + labs(title='Sharpe difference between the leader and alternatives',
               x='Challenger target',
               y='Sharpe difference with uncertainty band',
               color='Model family',
               )
        + plot_size(980, 420)
)
Code
(
        ggplot(run_tail_plot_data,
               aes(x='method_label', y='median_run_daily_sharpe', color='model_label')
               )
        + geom_hline(yintercept=0.0,
                     linetype='dashed',
                     color='#b0b7c3'
                     )
        + geom_segment(aes(x='method_label',
                           xend='method_label', y='worst_run_daily_sharpe', yend='median_run_daily_sharpe',
                           color='model_label'
                           ),
                       size=1.8,
                       alpha=0.7,
                       )
        + geom_point(size=3.2)
        + geom_point(aes(y='worst_run_daily_sharpe'), size=2.2, alpha=0.55)
        + facet_grid(y='model_label')
        + facet_separator_theme
        + coord_flip()
        + scale_x_discrete(limits=method_order)
        + scale_color_manual(values=model_palette)
        + labs(title='Downside tail of run-level Sharpe',
               x='',
               y='Daily Sharpe',
               color='Model family',
               )
        + plot_size(980, 420)
)
Code
(
        ggplot(degenerate_strategy_plot_data,
               aes(x='method_label', y='nan_sharpe_run_rate', fill='model_label'), )
        + geom_bar(stat='identity', position='dodge')
        + coord_flip()
        + scale_x_discrete(limits=method_order)
        + labs(title='Undefined-Sharpe incidence by target',
               x='',
               y='Share of runs with undefined daily Sharpe',
               fill='Model family'
               )
        + plot_size(920, 420)
)

Appendix D: Exact Target Definitions

Formula-level details included here for reproducibility.

1. Competing-Risk Hit Score

Implemented with the same TP/SL/horizon grid as target_barrier_rate and a single decay parameter tau=15. Each grid element scores +exp(-t_hit / tau) if take-profit hits first, -exp(-t_hit / tau) if stop-loss hits first, and 0 if neither hits.

2. MAE-Penalized Return

Implemented as (cum_60[-1] - 0.5 * max_adverse_excursion_h60) / 60. This keeps the terminal-return signal while explicitly penalizing paths that hurt first.

3. Downside-Adjusted Return

Implemented as fixed 60m return rate divided by downside-only semivolatility with eps=1e-5 and symmetric clipping at 20. This makes downside-aware path quality explicit without allowing near-zero denominators to dominate.

4. Tradeability Score

Implemented as a smooth score that combines positive 60m terminal return with staying inside a pain budget. The current defaults use return_threshold=0, mae_cap=min(stop_losses), return_temp=0.002, and mae_temp=0.0015, then map the joint score to [-1, 1].