Nathan’s Research
  • Trading Research
  • Payroll Anomaly Ranking
  • About

Contents

  • Research design
  • 1) Load article events from ClickHouse
  • 2) Load real kg.article_security_sentiment_v features
  • 3) Build 1-second market state around the real article events
  • 4) Build policy-conditioned event outcomes
  • 5) Policy summary tables and action-space definition
  • 6) Diagnostic plots, Pareto frontier, and signal-policy interaction
  • 6.5) Exploratory within-family frontiers
  • 6.6) Signal-vs-aggressiveness by policy family
  • 7) Joint-model dataset construction
  • 8) Execution decision package
    • 8.1) Core scoreboard
    • 8.2) Month-by-month breakdown
    • 8.3) Regime breakdown
    • 8.4) Frontier charts
    • 8.5) Stress table
    • 8.6) Final gating decision
  • Appendix - diagnostic detail
    • Appendix: capacity and setup diagnostics

Intraday Execution After News Releases (EDA 04)

  • Show All Code
  • Hide All Code
pr-graph
execution
news
Published

April 6, 2026

Modified

May 15, 2026

This notebook evaluates policy-conditioned execution outcomes after second-stamped news and prepares the supervised table used in the next joint-modeling stage.

The notebook’s main outputs are:

  • policy frontier / Pareto analysis
  • aggressiveness-vs-alpha interaction
  • joint-model dataset construction

This notebook is the last pre-modeling notebook. It evaluates policy-conditioned outcomes. It constructs the supervised learning table for later joint modeling. It does not attempt production-grade model fitting.

Research design

Assumptions in this prototype:

  • long-only execution research
  • 1-second market state from 04:00 to 20:00 ET
  • article events sourced from news.articles joined to news.articles_extracted_tickers_backfill_stage
  • minimum source-to-machine-to-execution delay is modeled on a 30s to 90s grid
  • premarket event-latency combinations are excluded once the delayed decision time reaches 09:30 ET
  • holding metrics reported to segment end and same-day extended-session end
  • fill-to-open uses the same-day 09:30 open for premarket events and a synthetic next-day regular open proxy otherwise

Real inputs in this notebook:

  • article events and tickers from ClickHouse
  • kg.article_security_sentiment_v features using the pipeline article_id

Toggleable for methodology development:

  • 1-second quote/trade state can come from ClickHouse Polygon raw tables or the synthetic fallback

Core questions in this notebook:

  • Which policies are non-dominated on fillability vs value?
  • How does optimal aggressiveness change with coarse signal quality?
  • What is the correct policy-value target for joint modeling?
Code
%load_ext autoreload
%autoreload 2
Code
from lets_plot import *

LetsPlot.setup_html()
LetsPlot.set_theme(
    theme_minimal()
    + theme(
        axis_text=element_text(size=11),
        axis_title=element_text(size=12),
        plot_title=element_text(face='bold', size=14),
        legend_title=element_text(size=11),
        legend_text=element_text(size=10),
    )
)
Code
from trading_research.research_setup import *
Code
import IPython.display as display
Code
import hashlib
from pathlib import Path
from datetime import date, datetime, time, timedelta
from typing import Iterable

from trading_research.pr_graph.intraday_news_execution_market_data import (
    _finalize_outcomes,
    build_real_event_state_and_outcomes,
    normalize_loaded_article_events,
)
from trading_research.pr_graph.intraday_news_research_io import (
    load_article_events,
    load_article_security_sentiment,
)

ARTICLES_TABLE = os.getenv("NEWS_ARTICLES_TABLE", "news.articles")
ARTICLE_TICKERS_TABLE = os.getenv(
    "NEWS_ARTICLE_TICKERS_TABLE",
    "news.articles_extracted_tickers_backfill_stage",
)
NEWS_FEATURES_TABLE = os.getenv(
    "ARTICLE_SECURITY_SENTIMENT_TABLE",
    "kg.article_security_sentiment_v",
)


def ch_query_rows(sql: str, *, external_tables=None) -> pl.DataFrame:
    return query_and_build_pl_df(
        sql.strip().rstrip(";"),
        external_tables=external_tables,
    )


def show_df(df: pl.DataFrame, n: int = 10) -> pl.DataFrame:
    # print(df.schema)
    # print("rows:", df.height)
    # return df.head(n)
    return df


def emit_warning(message: str) -> None:
    print(f"WARNING: {message}")


def stable_u01(*parts: object) -> float:
    key = "||".join("" if part is None else str(part) for part in parts)
    digest = hashlib.md5(key.encode("utf-8")).hexdigest()
    return int(digest[:12], 16) / float(16 ** 12)


def stable_normal(*parts: object, scale: float = 1.0) -> float:
    u1 = max(stable_u01(*parts, "u1"), 1e-9)
    u2 = stable_u01(*parts, "u2")
    return scale * math.sqrt(-2.0 * math.log(u1)) * math.cos(2.0 * math.pi * u2)


def sha256_hex(text: str) -> str:
    return hashlib.sha256(text.encode("utf-8")).hexdigest()


def make_article_id(url: str, publish_ts: datetime, download_ts: datetime) -> str:
    return sha256_hex(f"CH|{url}|{publish_ts}|{download_ts}")


def segment_label(ts: datetime) -> str:
    tod = ts.time()
    if tod < time(9, 30):
        return "premarket"
    if tod < time(16, 0):
        return "regular"
    return "after_hours"


def segment_end(ts: datetime) -> datetime:
    if ts.time() < time(9, 30):
        return ts.replace(hour=9, minute=30, second=0, microsecond=0)
    if ts.time() < time(16, 0):
        return ts.replace(hour=16, minute=0, second=0, microsecond=0)
    return ts.replace(hour=20, minute=0, second=0, microsecond=0)


def regular_open(ts: datetime) -> datetime:
    return ts.replace(hour=9, minute=30, second=0, microsecond=0)


def daypart_bucket(ts: datetime) -> str:
    tod = ts.time()
    if tod < time(8, 0):
        return "premarket_before_8"
    if tod < time(9, 30):
        return "premarket_8_to_9_30"
    if tod < time(11, 0):
        return "regular_before_11"
    if tod < time(14, 0):
        return "regular_11_to_14"
    if tod < time(16, 30):
        return "regular_after_14"
    return "after_hours"


def derive_event_type(title: str | None, tags: Iterable[str] | None) -> str:
    title_text = (title or "").lower()
    tag_text = " ".join(tags or []).lower()
    haystack = f"{title_text} {tag_text}"
    rules = [
        ("earnings_like", ["earnings", "guidance", "quarter", "results"]),
        ("contract_order", ["contract", "order", "award", "purchase", "agreement"]),
        ("financing", ["offering", "financing", "debt", "equity", "placement"]),
        ("product", ["launch", "product", "patent", "approval", "trial"]),
        ("regulatory", ["fda", "sec", "regulatory", "compliance", "permit"]),
        ("management", ["ceo", "cfo", "board", "director", "appoint"]),
    ]
    for label, keywords in rules:
        if any(keyword in haystack for keyword in keywords):
            return label
    return "other"


def ols_regression(
        df: pl.DataFrame,
        numeric_cols: list[str],
        categorical_cols: list[str],
        target_col: str,
        rng_seed: int = 7,
) -> tuple[pl.DataFrame, pl.DataFrame]:
    work = df.select(numeric_cols + categorical_cols + [target_col]).drop_nulls()
    if work.height < 30:
        return pl.DataFrame(), pl.DataFrame()

    design = work.select(numeric_cols + categorical_cols).to_dummies(columns=categorical_cols)
    feature_cols = design.columns
    X = design.to_numpy().astype(float)
    y = work[target_col].to_numpy().astype(float).reshape(-1, 1)

    rng = np.random.default_rng(rng_seed)
    order = rng.permutation(len(X))
    split = max(int(len(X) * 0.7), 20)
    train_idx = order[:split]
    test_idx = order[split:]
    if len(test_idx) < 10:
        return pl.DataFrame(), pl.DataFrame()

    X_train = X[train_idx]
    X_test = X[test_idx]
    y_train = y[train_idx]
    y_test = y[test_idx]

    mean = X_train.mean(axis=0)
    std = X_train.std(axis=0)
    std[std == 0.0] = 1.0

    X_train_z = (X_train - mean) / std
    X_test_z = (X_test - mean) / std

    beta = np.linalg.lstsq(
        np.column_stack([np.ones(len(X_train_z)), X_train_z]),
        y_train,
        rcond=None,
    )[0].reshape(-1)

    def _predict(matrix: np.ndarray) -> np.ndarray:
        return np.column_stack([np.ones(len(matrix)), matrix]) @ beta

    train_pred = _predict(X_train_z)
    test_pred = _predict(X_test_z)

    def _r2(actual: np.ndarray, pred: np.ndarray) -> float:
        actual = actual.reshape(-1)
        pred = pred.reshape(-1)
        denom = np.sum((actual - actual.mean()) ** 2)
        if denom <= 0:
            return float("nan")
        return 1.0 - np.sum((actual - pred) ** 2) / denom

    metrics = pl.DataFrame(
        {
            "target": [target_col],
            "n_obs": [int(len(work))],
            "train_r2": [_r2(y_train, train_pred)],
            "test_r2": [_r2(y_test, test_pred)],
        }
    )
    coefs = pl.DataFrame(
        {
            "feature": ["intercept"] + feature_cols,
            "coef": beta,
            "abs_coef": np.abs(beta),
        }
    ).sort("abs_coef", descending=True)
    return metrics, coefs
Code
# --- Plot helpers ---
from trading_research.pr_graph.intraday_news_execution_viz_helpers import (
    add_frontier_flag,
    ensure_policy_label,
    label_policy_family,
    label_policy_id,
    plot_capacity_overview,
    plot_context_diagnostics,
    plot_correlation_snapshot,
    plot_execution_assumption_comparison,
    plot_family_signal_overview,
    plot_feature_quality_overview,
    plot_feature_run_mix,
    plot_fold_diagnostics,
    plot_histogram,
    plot_market_state_overview,
    plot_monthly_policy_heatmap,
    plot_policy_frontier,
    plot_regime_policy_metric_grid,
    plot_return_share_decomposition,
    plot_sample_overview,
    plot_strategy_metric_grid,
    plot_stress_comparison,
    plot_target_horizon_profile,
    plot_transition_heatmaps_by_family,
    policy_variant_label,
    setup_lets_plot_darcula,
    strategy_display_name,
    summarize_split,
)

# setup_lets_plot_darcula()
Code
from itables import init_notebook_mode
init_notebook_mode(all_interactive=True, connected=True)
Code
# --- Research parameters ---
LOOKBACK_DAYS = 30
MAX_EVENTS = 1_000_000
RNG_SEED = 7

LATENCIES_S = [30, 40, 50, 60, 75, 90]
TRAILING_OFFSETS_BPS = [5, 10, 25, 50]
ASK_CAP_OFFSETS_BPS = [0, 5, 10]
REQUESTED_NOTIONALS = [1_000, 2_500, 5_000, 10_000]
POLICY_NOTIONAL = 5_000
MODEL_NOTIONAL = 2_500
PRIMARY_EXIT = "fill_to_30m_after_open"
PRIMARY_VALUE_COL = "req_fill_to_30m_after_open_bps"
SIGNAL_BUCKETS = 3
FRONTIER_SAMPLE = "clean"
MIN_MODEL_ROWS = 30
MIN_ROWS_PER_SIGNAL_BUCKET = 25
MIN_ROWS_PER_POLICY = 25
LOW_FEATURE_COVERAGE_RATE = 0.05
LOW_FEATURE_COVERED_EVENTS = 10

MODEL_ACTION_COLS = ["strategy", "latency_s", "offset_bps"]
GROUP_COL_EVENT = ["article_id", "ticker"]

ENTRY_STRATEGIES = [
    "trailing_mid",
    "buy_at_ask_cap",
    "vwap",
    "vwap_minus_1std",
]

OVERLAP_WINDOW_S = 30 * 60
FWD_VOLUME_PARTICIPATION = 0.0125

START_DATE = "2024-05-01"
END_DATE = "2025-01-01"
EVENT_TIMEZONE = "America/New_York"
MARKET_DATA_TIMEZONE = "UTC"
MARKET_DATA_MODE = "real"
MARKET_DATA_BACKEND = "clickhouse_flight"
CLICKHOUSE_FLIGHT_URI = "grpc://localhost:9006"
CLICKHOUSE_FLIGHT_USER = "default"
CLICKHOUSE_FLIGHT_PASSWORD = "trade"
POLYGON_TRADES_TABLE = "polygon.trades_raw_v2"
POLYGON_QUOTES_TABLE = "polygon.quotes_raw"
REAL_MARKET_MAX_DATE_EXCL = "2025-11-01"

display.JSON({
    "LOOKBACK_DAYS": LOOKBACK_DAYS,
    "MAX_EVENTS": MAX_EVENTS,
    "LATENCIES_S": LATENCIES_S,
    "TRAILING_OFFSETS_BPS": TRAILING_OFFSETS_BPS,
    "ASK_CAP_OFFSETS_BPS": ASK_CAP_OFFSETS_BPS,
    "REQUESTED_NOTIONALS": REQUESTED_NOTIONALS,
    "POLICY_NOTIONAL": POLICY_NOTIONAL,
    "MODEL_NOTIONAL": MODEL_NOTIONAL,
    "PRIMARY_EXIT": PRIMARY_EXIT,
    "PRIMARY_VALUE_COL": PRIMARY_VALUE_COL,
    "SIGNAL_BUCKETS": SIGNAL_BUCKETS,
    "FRONTIER_SAMPLE": FRONTIER_SAMPLE,
    "MIN_ROWS_PER_SIGNAL_BUCKET": MIN_ROWS_PER_SIGNAL_BUCKET,
    "MIN_ROWS_PER_POLICY": MIN_ROWS_PER_POLICY,
    "MODEL_ACTION_COLS": MODEL_ACTION_COLS,
    "GROUP_COL_EVENT": GROUP_COL_EVENT,
    "EVENT_TIMEZONE": EVENT_TIMEZONE,
    "MARKET_DATA_TIMEZONE": MARKET_DATA_TIMEZONE,
    "MARKET_DATA_MODE": MARKET_DATA_MODE,
    "MARKET_DATA_BACKEND": MARKET_DATA_BACKEND,
    "CLICKHOUSE_FLIGHT_URI": CLICKHOUSE_FLIGHT_URI,
    "CLICKHOUSE_FLIGHT_USER": CLICKHOUSE_FLIGHT_USER,
    "POLYGON_TRADES_TABLE": POLYGON_TRADES_TABLE,
    "POLYGON_QUOTES_TABLE": POLYGON_QUOTES_TABLE,
    "REAL_MARKET_MAX_DATE_EXCL": REAL_MARKET_MAX_DATE_EXCL,
})
<IPython.core.display.JSON object>
Code
CACHE_DIR = Path("data/eda_04")
CACHE_DIR.mkdir(parents=True, exist_ok=True)


def cache_path(name: str) -> Path:
    return CACHE_DIR / f"{name}.parquet"


def load_cached_parquet(name: str) -> pl.DataFrame | None:
    path = cache_path(name)
    if path.exists():
        print(f"Loading cache: {path}")
        return pl.read_parquet(path)
    return None


def write_cached_parquet(df: pl.DataFrame, name: str) -> pl.DataFrame:
    path = cache_path(name)
    df.write_parquet(path, compression="zstd")
    print(f"Saved cache: {path}")
    return df

1) Load article events from ClickHouse

This uses the actual article and extracted ticker tables. The article_id matches the pipeline convention exactly: sha256_hex(f"CH|{url}|{publish_ts}|{download_ts}"). PublishTime and DownloadTime are treated as UTC for event-to-market alignment, then converted into ET for session logic.

Code
effective_end_date = END_DATE
if MARKET_DATA_MODE == "real" and END_DATE > REAL_MARKET_MAX_DATE_EXCL:
    effective_end_date = REAL_MARKET_MAX_DATE_EXCL
    emit_warning(
        f"Real mode clamps article events to dates before {REAL_MARKET_MAX_DATE_EXCL} so event timestamps stay inside the available Polygon market-data window."
    )

events = load_cached_parquet("events")
if events is None:
    events = load_article_events(
        start_date=START_DATE,
        end_date=effective_end_date,
        max_events=MAX_EVENTS,
        lookback_days=LOOKBACK_DAYS,
        overlap_window_s=OVERLAP_WINDOW_S,
    )
    write_cached_parquet(events, "events")

print(events.shape)

events.select(
    "publish_ts",
    "ticker",
    "ArticleTitle",
    "NewsSite",
    "segment",
    "daypart",
    "event_type",
    "event_key",
    "is_overlap_event",
)
Loading cache: data/eda_04/events.parquet
(15316, 27)
Loading ITables v2.7.3 from the internet... (need help?)
Code
plot_sample_overview(events)

Sample composition overview

2) Load real kg.article_security_sentiment_v features

This section loads the real feature table, keeps the latest ingested_ts row per article_id + ticker, and reports feature coverage explicitly. Uncovered events are preserved for execution research and given neutral defaults only for the synthetic market simulation path.

Code
event_panel = load_cached_parquet("event_panel")
feature_duplicates = load_cached_parquet("feature_duplicates")
if event_panel is None:
    sentiment_load = load_article_security_sentiment(
        events,
        include_duplicate_summary=True,
    )
    news_features = sentiment_load.news_features
    feature_duplicates = sentiment_load.feature_duplicates
    event_panel = (
        events.join(news_features, on=["article_id", "ticker"], how="left")
        .with_columns(
            pl.col("sentiment_score").is_not_null().alias("has_news_features")
        )
        .with_columns(
            pl.col("security_id").fill_null(pl.col("ticker")).alias("security_id"),
            pl.col("company_entity_id").fill_null(pl.concat_str([pl.lit("company::"), pl.col("ticker")])).alias(
                "company_entity_id"),
            pl.col("company_name").fill_null(pl.col("ticker")).alias("company_name"),
            pl.col("sentiment_score").fill_null(0.0),
            pl.col("confidence").fill_null(0.5),
            pl.col("relevance").fill_null(0.5),
            pl.col("direction_strength").fill_null(0.25),
            pl.col("novelty").fill_null(0.5),
            pl.col("event_materiality").fill_null(0.35),
            pl.col("stock_prominence").fill_null(0.5),
            pl.col("mention_count").fill_null(0).cast(pl.Int64),
            pl.col("headline_hit").fill_null(0).cast(pl.Int64),
            pl.col("first_offset").fill_null(0).cast(pl.Int64),
            pl.col("snippet_count").fill_null(0).cast(pl.Int64),
            pl.col("role").fill_null("missing"),
            pl.col("evidence_span").fill_null(""),
            pl.col("extractor_model").fill_null("missing"),
            pl.col("run_id").fill_null("missing"),
        )
    )
    write_cached_parquet(event_panel, "event_panel")
    write_cached_parquet(feature_duplicates, "feature_duplicates")
elif feature_duplicates is None:
    feature_duplicates = pl.DataFrame(
        schema={
            "article_id": pl.Utf8,
            "ticker": pl.Utf8,
            "n_rows": pl.Int64,
            "latest_ingested_ts": pl.Datetime("us"),
        }
    )

signal_base_cols = [
    "sentiment_score",
    "confidence",
    "relevance",
    "direction_strength",
    "novelty",
    "event_materiality",
    "stock_prominence",
]
# Placeholder baseline signal features for interaction analysis.
# Thematic features can plug in later without changing the pipeline contract.

selected_feature_runs = (
    event_panel.filter(pl.col("has_news_features"))
    .group_by(["extractor_model", "run_id"])
    .agg(
        pl.len().alias("n_rows"),
        pl.col("ingested_ts").max().alias("latest_ingested_ts"),
    )
    .sort(["n_rows", "latest_ingested_ts"], descending=[True, True])
)

feature_coverage = pl.concat([
    event_panel.group_by("NewsSite").agg(
        pl.len().alias("n_events"),
        pl.col("has_news_features").mean().alias("coverage_rate"),
    ).with_columns(
        pl.lit("NewsSite").alias("dimension"),
        pl.col("NewsSite").cast(pl.Utf8).alias("bucket"),
    ).select("dimension", "bucket", "n_events", "coverage_rate"),
    event_panel.group_by("event_type").agg(
        pl.len().alias("n_events"),
        pl.col("has_news_features").mean().alias("coverage_rate"),
    ).with_columns(
        pl.lit("event_type").alias("dimension"),
        pl.col("event_type").alias("bucket"),
    ).select("dimension", "bucket", "n_events", "coverage_rate"),
    event_panel.group_by("segment").agg(
        pl.len().alias("n_events"),
        pl.col("has_news_features").mean().alias("coverage_rate"),
    ).with_columns(
        pl.lit("segment").alias("dimension"),
        pl.col("segment").alias("bucket"),
    ).select("dimension", "bucket", "n_events", "coverage_rate"),
    event_panel.group_by("is_overlap_event").agg(
        pl.len().alias("n_events"),
        pl.col("has_news_features").mean().alias("coverage_rate"),
    ).with_columns(
        pl.lit("is_overlap_event").alias("dimension"),
        pl.col("is_overlap_event").cast(pl.Utf8).alias("bucket"),
    ).select("dimension", "bucket", "n_events", "coverage_rate"),
], how="vertical")

event_feature_summary = event_panel.select(
    pl.len().alias("n_events"),
    pl.col("has_news_features").sum().alias("n_with_news_features"),
    pl.col("has_news_features").mean().alias("coverage_rate"),
)
event_feature_counts = event_feature_summary.row(0, named=True)
if event_feature_counts["n_with_news_features"] == 0:
    emit_warning(
        "Event/news feature join produced zero covered events. Downstream policy modeling will have no news-feature rows."
    )
elif (
        event_feature_counts["coverage_rate"] < LOW_FEATURE_COVERAGE_RATE
        or event_feature_counts["n_with_news_features"] < LOW_FEATURE_COVERED_EVENTS
):
    emit_warning(
        "Low upstream news-feature coverage: "
        f"{event_feature_counts['n_with_news_features']} of {event_feature_counts['n_events']} events "
        f"({event_feature_counts['coverage_rate']:.1%}) have joined features. Downstream models may be skipped or unstable."
    )

event_panel_preview = event_panel.select(
    "publish_ts",
    "ticker",
    "event_type",
    "has_news_features",
    "sentiment_score",
    "direction_strength",
    "novelty",
    "event_materiality",
    "is_overlap_event",
)
Loading cache: data/eda_04/event_panel.parquet
Loading cache: data/eda_04/feature_duplicates.parquet
Code
show_df(feature_duplicates, n=12)
Loading ITables v2.7.3 from the internet... (need help?)
Code
show_df(event_feature_summary, n=1)
Loading ITables v2.7.3 from the internet... (need help?)
Code
show_df(selected_feature_runs, n=12)
Loading ITables v2.7.3 from the internet... (need help?)
Code
show_df(feature_coverage, n=24)
Loading ITables v2.7.3 from the internet... (need help?)
Code
show_df(event_panel_preview, n=12)
Loading ITables v2.7.3 from the internet... (need help?)
Code
covered_features = event_panel.filter(pl.col("has_news_features"))
plot_feature_quality_overview(
    feature_coverage,
    covered_features,
    total_rows=event_panel.height,
)

Feature coverage overview (78% of event rows covered)

Covered-row feature distributions

3) Build 1-second market state around the real article events

MARKET_DATA_MODE = "real" reads Polygon quotes and trades from ClickHouse over Arrow Flight, while "synthetic" keeps the existing fallback simulation on a full 04:00-20:00 ET ticker-date path so overlapping events still share the same history.

Code
def _rolling_std(values: np.ndarray, window: int) -> np.ndarray:
    values = np.asarray(values, dtype=float)
    n_values = len(values)
    if n_values == 0:
        return np.empty(0, dtype=float)

    c1 = np.cumsum(values, dtype=float)
    c2 = np.cumsum(values * values, dtype=float)
    starts = np.maximum(np.arange(n_values) - window + 1, 0)
    prev = starts - 1
    counts = np.arange(n_values) - starts + 1

    s1 = c1.copy()
    s2 = c2.copy()
    valid_prev = prev >= 0
    s1[valid_prev] -= c1[prev[valid_prev]]
    s2[valid_prev] -= c2[prev[valid_prev]]

    mean = s1 / counts
    var = np.maximum(s2 / counts - mean * mean, 0.0)
    return np.sqrt(var)


def _forward_window_sum(values: np.ndarray, window: int) -> np.ndarray:
    values = np.asarray(values, dtype=float)
    n_values = len(values)
    if n_values == 0:
        return np.empty(0, dtype=float)

    csum = np.empty(n_values + 1, dtype=float)
    csum[0] = 0.0
    csum[1:] = np.cumsum(values, dtype=float)
    left = np.arange(n_values)
    right = np.minimum(n_values, left + window)
    return csum[right] - csum[left]


def summarize_fill_path(
        live_ask: np.ndarray,
        limit_series: np.ndarray,
        live_ask_size_usd: np.ndarray,
        live_quote_update_flag: np.ndarray,
        live_trade_notional_1s: np.ndarray,
        *,
        fwd_volume_participation: float,
) -> dict[str, np.ndarray | float | None]:
    eligible_mask = (
        (~np.isnan(live_ask))
        & (~np.isnan(limit_series))
        & (live_ask <= limit_series)
    )
    if not eligible_mask.any():
        empty = np.empty(0, dtype=float)
        return {
            "cumulative_fillable_usd": empty,
            "cumulative_shares": empty,
            "incremental_fillable_usd": empty,
            "fillable_total_usd": 0.0,
        }

    eligible_idx = np.flatnonzero(eligible_mask)
    quote_episode_start = live_quote_update_flag.copy()
    quote_episode_start[0] = True
    episode_id = np.cumsum(quote_episode_start.astype(np.int64)) - 1
    eligible_episode_id = episode_id[eligible_idx]
    _, first_eligible_idx = np.unique(eligible_episode_id, return_index=True)
    displayed_fill_idx = eligible_idx[first_eligible_idx]

    incremental_fillable_usd = np.zeros(len(live_ask), dtype=float)
    incremental_fillable_usd[displayed_fill_idx] += live_ask_size_usd[displayed_fill_idx]
    incremental_fillable_usd += (
        eligible_mask.astype(float)
        * fwd_volume_participation
        * live_trade_notional_1s
    )

    fillable_mask = incremental_fillable_usd > 0.0
    if not fillable_mask.any():
        empty = np.empty(0, dtype=float)
        return {
            "cumulative_fillable_usd": empty,
            "cumulative_shares": empty,
            "incremental_fillable_usd": empty,
            "fillable_total_usd": 0.0,
        }

    cumulative_fillable_usd = np.cumsum(incremental_fillable_usd)
    cumulative_shares = np.cumsum(
        np.divide(
            incremental_fillable_usd,
            live_ask,
            out=np.zeros(len(live_ask), dtype=float),
            where=fillable_mask,
        )
    )
    return {
        "cumulative_fillable_usd": cumulative_fillable_usd,
        "cumulative_shares": cumulative_shares,
        "incremental_fillable_usd": incremental_fillable_usd,
        "fillable_total_usd": float(cumulative_fillable_usd[-1]),
    }


def summarize_inside25_fill_path(
        live_bid: np.ndarray,
        live_ask: np.ndarray,
        limit_series: np.ndarray,
        live_ask_size_usd: np.ndarray,
        live_quote_update_flag: np.ndarray,
) -> dict[str, np.ndarray | float | None]:
    eligible_mask = (
        (~np.isnan(live_bid))
        & (~np.isnan(live_ask))
        & (~np.isnan(limit_series))
        & (live_bid > 0.0)
        & (live_ask > live_bid)
        & (live_ask <= limit_series)
    )
    if not eligible_mask.any():
        empty = np.empty(0, dtype=float)
        return {
            "cumulative_fillable_usd": empty,
            "cumulative_shares": empty,
            "incremental_fillable_usd": empty,
            "fillable_total_usd": 0.0,
        }

    eligible_idx = np.flatnonzero(eligible_mask)
    quote_episode_start = live_quote_update_flag.copy()
    quote_episode_start[0] = True
    episode_id = np.cumsum(quote_episode_start.astype(np.int64)) - 1
    eligible_episode_id = episode_id[eligible_idx]
    _, first_eligible_idx = np.unique(eligible_episode_id, return_index=True)
    displayed_fill_idx = eligible_idx[first_eligible_idx]

    displayed_fill_px = live_ask - 0.25 * (live_ask - live_bid)
    displayed_fill_shares = np.divide(
        live_ask_size_usd,
        live_ask,
        out=np.zeros(len(live_ask), dtype=float),
        where=(~np.isnan(live_ask)) & (live_ask > 0.0),
    )
    incremental_fillable_usd = np.zeros(len(live_ask), dtype=float)
    incremental_fillable_shares = np.zeros(len(live_ask), dtype=float)
    incremental_fillable_shares[displayed_fill_idx] += displayed_fill_shares[displayed_fill_idx]
    incremental_fillable_usd[displayed_fill_idx] += displayed_fill_shares[displayed_fill_idx] * displayed_fill_px[displayed_fill_idx]

    fillable_mask = incremental_fillable_usd > 0.0
    if not fillable_mask.any():
        empty = np.empty(0, dtype=float)
        return {
            "cumulative_fillable_usd": empty,
            "cumulative_shares": empty,
            "incremental_fillable_usd": empty,
            "fillable_total_usd": 0.0,
        }

    cumulative_fillable_usd = np.cumsum(incremental_fillable_usd)
    cumulative_shares = np.cumsum(incremental_fillable_shares)
    return {
        "cumulative_fillable_usd": cumulative_fillable_usd,
        "cumulative_shares": cumulative_shares,
        "incremental_fillable_usd": incremental_fillable_usd,
        "fillable_total_usd": float(cumulative_fillable_usd[-1]),
    }


def build_next_open_proxy(fill_px: float, eod_px: float, article_id: str) -> float:
    gap_bps = stable_normal(article_id, "next_open_gap", scale=22.0)
    carry_bps = stable_normal(article_id, "next_open_carry", scale=8.0)
    return eod_px * (1.0 + (gap_bps + carry_bps) / 10_000.0)


def policy_family_for_strategy(strategy: str) -> str:
    return label_policy_family(strategy)


def aggressiveness_rank_for_policy(strategy: str, latency_s: int, offset_bps: int) -> float:
    if strategy in {"trailing_mid", "buy_at_ask_cap"}:
        return float(offset_bps * 100 + latency_s)
    return float(latency_s)


def build_mock_event_state_and_outcomes(
        event_df: pl.DataFrame,
        rng_seed: int,
        requested_notionals: list[int],
        latencies_s: list[int],
        trailing_offsets_bps: list[int],
        ask_cap_offsets_bps: list[int],
) -> tuple[pl.DataFrame, pl.DataFrame]:
    event_state_schema = {
        **event_df.schema,
        "mid": pl.Float64,
        "ask": pl.Float64,
        "spread_bps": pl.Float64,
        "quote_age_ms": pl.Int64,
        "ask_size_usd": pl.Float64,
        "rolling_vwap": pl.Float64,
        "rolling_vwap_std": pl.Float64,
        "realized_vol_60s_bps": pl.Float64,
        "future_60s_dollar_volume": pl.Float64,
        "regular_open_mid": pl.Float64,
        "mid_30m_after_open": pl.Float64,
        "regular_close_mid": pl.Float64,
        "eod_mid": pl.Float64,
        "segment_end_ts": pl.Datetime("us"),
        "vwap_dev_bps": pl.Float64,
    }
    outcomes_schema = {
        "article_id": pl.Utf8,
        "event_key": pl.Utf8,
        "publish_ts": pl.Datetime("us"),
        "trade_date": pl.Date,
        "event_date": pl.Date,
        "ticker": pl.Utf8,
        "segment": pl.Utf8,
        "daypart": pl.Utf8,
        "event_type": pl.Utf8,
        "is_overlap_event": pl.Boolean,
        "has_prior_same_day_event": pl.Boolean,
        "prior_event_gap_s": pl.Int64,
        "policy_id": pl.Utf8,
        "policy_family": pl.Utf8,
        "aggression_rank_within_family": pl.Float64,
        "strategy": pl.Utf8,
        "latency_s": pl.Int64,
        "offset_bps": pl.Int64,
        "requested_notional": pl.Int64,
        "fill_ts": pl.Datetime("us"),
        "fill_px": pl.Float64,
        "inside25_fill_ts": pl.Datetime("us"),
        "inside25_fill_px": pl.Float64,
        "anchor_px": pl.Float64,
        "filled_notional": pl.Float64,
        "inside25_filled_notional": pl.Float64,
        "max_fillable_usd": pl.Float64,
        "inside25_max_fillable_usd": pl.Float64,
        "fill_fraction": pl.Float64,
        "inside25_fill_fraction": pl.Float64,
        "filled_flag": pl.Int8,
        "inside25_filled_flag": pl.Int8,
        "markout_1m_px": pl.Float64,
        "inside25_markout_1m_px": pl.Float64,
        "markout_5m_px": pl.Float64,
        "inside25_markout_5m_px": pl.Float64,
        "segment_end_px": pl.Float64,
        "m30_after_open_px": pl.Float64,
        "inside25_m30_after_open_px": pl.Float64,
        "reg_close_px": pl.Float64,
        "inside25_reg_close_px": pl.Float64,
        "eod_px": pl.Float64,
        "next_open_px": pl.Float64,
        "inside25_next_open_px": pl.Float64,
        "spread_bps_at_decision": pl.Float64,
        "quote_age_ms_at_decision": pl.Int64,
        "ask_size_usd_at_decision": pl.Float64,
        "rolling_dollar_volume_60s_at_decision": pl.Float64,
        "vwap_dev_bps_at_decision": pl.Float64,
        "realized_vol_60s_bps_at_decision": pl.Float64,
        "has_news_features": pl.Boolean,
        "sentiment_score": pl.Float64,
        "confidence": pl.Float64,
        "relevance": pl.Float64,
        "direction_strength": pl.Float64,
        "novelty": pl.Float64,
        "event_materiality": pl.Float64,
        "stock_prominence": pl.Float64,
        "mention_count": pl.Int64,
        "headline_hit": pl.Int64,
        "snippet_count": pl.Int64,
    }
    if event_df.height == 0:
        return (
            pl.DataFrame(schema=event_state_schema),
            _finalize_outcomes(
                pl.DataFrame(schema=outcomes_schema),
                primary_exit=PRIMARY_EXIT,
                primary_value_col=PRIMARY_VALUE_COL,
            ),
        )

    rng = np.random.default_rng(rng_seed)
    event_state_frames = []
    outcome_frames = []

    session_offset_seconds = 4 * 3600
    regular_open_idx = int(9.5 * 3600) - session_offset_seconds
    open_plus_30m_idx = regular_open_idx + 30 * 60
    regular_close_idx = int(16 * 3600) - session_offset_seconds
    session_end_idx = int(20 * 3600) - session_offset_seconds
    max_horizon = 60 * 60
    alpha_decay = np.exp(-np.arange(max_horizon) / 600.0)
    volume_decay = np.exp(-np.arange(max_horizon) / 300.0)

    session_start = datetime(2000, 1, 1, 4, 0)
    session_end = datetime(2000, 1, 1, 20, 0)
    n_seconds = int((session_end - session_start).total_seconds()) + 1
    sec_of_day = np.arange(n_seconds)
    seconds_since_midnight = sec_of_day + session_offset_seconds

    regular_mask = (seconds_since_midnight >= 9.5 * 3600) & (seconds_since_midnight < 16 * 3600)
    pre_mask = seconds_since_midnight < 9.5 * 3600
    session_shares_multiplier = np.where(regular_mask, 1.8, np.where(pre_mask, 0.65, 0.85))
    spread_base = np.where(regular_mask, 4.5, np.where(pre_mask, 9.5, 8.0))
    spread_liq = np.where(regular_mask, 10.0, np.where(pre_mask, 20.0, 16.0))
    quote_age_base = np.where(regular_mask, 180.0, np.where(pre_mask, 600.0, 450.0))
    quote_age_liq = np.where(regular_mask, 600.0, np.where(pre_mask, 2200.0, 1600.0))
    ask_size_multiplier = np.where(regular_mask, 1.4, np.where(pre_mask, 0.55, 0.70))
    sigma_base = np.where(regular_mask, 0.00033, np.where(pre_mask, 0.00052, 0.00045))

    for group in event_df.partition_by(["ticker", "trade_date"], as_dict=False):
        group = group.sort("publish_ts")
        ticker = group.item(0, "ticker")
        trade_date = group.item(0, "trade_date")
        start_ts = datetime.combine(trade_date, time(4, 0))

        base_px = 8.0 + 140.0 * stable_u01(ticker, trade_date, "base_px")
        liq_score = 0.2 + 0.8 * stable_u01(ticker, "liq_score")

        sigma = sigma_base * (1.15 - 0.45 * liq_score)
        log_returns = rng.normal(0.0, sigma)

        shares = (40 + 260 * liq_score) * session_shares_multiplier
        shares = shares * (1.0 + rng.lognormal(mean=-0.25, sigma=0.30, size=n_seconds))

        spread_bps = spread_base + spread_liq * (1.0 - liq_score)
        spread_bps = spread_bps * (1.0 + 0.10 * rng.normal(size=n_seconds))
        spread_bps = np.clip(spread_bps, 2.5, 45.0)

        quote_age_ms = quote_age_base + quote_age_liq * (1.0 - liq_score)
        quote_age_ms = np.clip(quote_age_ms * (1.0 + 0.25 * rng.normal(size=n_seconds)), 50, 8_000).astype(int)

        ask_size_usd = np.clip(
            (900 + 6_500 * liq_score) * ask_size_multiplier * (1.0 + 0.35 * rng.normal(size=n_seconds)),
            250,
            35_000,
        )

        volume_boost = np.zeros(n_seconds)
        alpha_drift = np.zeros(n_seconds)
        for publish_ts, sentiment_score, direction_strength, event_materiality in group.select(
                ["publish_ts", "sentiment_score", "direction_strength", "event_materiality"]
        ).iter_rows():
            idx = int((publish_ts - start_ts).total_seconds())
            if idx < 0 or idx >= n_seconds:
                continue

            signed_edge_bps = sentiment_score * (14.0 + 65.0 * direction_strength * event_materiality)
            log_returns[idx] += signed_edge_bps / 10_000.0 * 0.18

            horizon = min(max_horizon, n_seconds - idx - 1)
            if horizon <= 0:
                continue

            alpha_drift[idx + 1: idx + 1 + horizon] += signed_edge_bps / 10_000.0 * 0.0012 * alpha_decay[:horizon]
            volume_boost[idx: idx + horizon] += 1.5 + 5.0 * event_materiality * volume_decay[:horizon]
            log_returns[idx: idx + horizon] += rng.normal(
                0.0,
                sigma[idx: idx + horizon] * (0.5 + 1.5 * event_materiality),
                size=horizon,
            )

        log_price = np.log(base_px) + np.cumsum(log_returns + alpha_drift)
        mid = np.exp(log_price)
        ask = mid * (1.0 + spread_bps / 20_000.0)
        log_mid = np.log(mid)
        trade_px = mid * np.exp(rng.normal(0.0, spread_bps / 25_000.0, size=n_seconds))

        shares = shares * (1.0 + volume_boost)
        dollar_volume_1s = shares * trade_px
        cum_shares = np.cumsum(shares)
        cum_notional = np.cumsum(dollar_volume_1s)
        rolling_vwap = cum_notional / np.maximum(cum_shares, 1.0)
        rolling_vwap_std = _rolling_std(mid, window=300)
        realized_vol_60s_bps = 10_000 * _rolling_std(np.diff(log_mid, prepend=log_mid[0]), window=60)
        future_60s_dollar_volume = _forward_window_sum(dollar_volume_1s, window=60)

        regular_open_mid = float(mid[regular_open_idx])
        mid_30m_after_open = float(mid[open_plus_30m_idx])
        regular_close_mid = float(mid[regular_close_idx])
        eod_mid = float(mid[-1])

        event_state_rows = []
        outcome_rows = []
        for event in group.iter_rows(named=True):
            idx = int((event["publish_ts"] - start_ts).total_seconds())
            if idx < 0 or idx >= n_seconds:
                continue

            if event["segment"] == "premarket":
                search_end_idx = regular_open_idx
            elif event["segment"] == "regular":
                search_end_idx = regular_close_idx
            else:
                search_end_idx = session_end_idx
            segment_end_ts = start_ts + timedelta(seconds=search_end_idx)

            decision_mid = float(mid[idx])
            decision_ask = float(ask[idx])
            decision_spread_bps = float(spread_bps[idx])
            decision_quote_age_ms = int(quote_age_ms[idx])
            decision_ask_size_usd = float(ask_size_usd[idx])
            decision_vwap = float(rolling_vwap[idx])
            decision_vwap_std = float(rolling_vwap_std[idx])
            decision_realized_vol_60s_bps = float(realized_vol_60s_bps[idx])
            decision_future_60s_dollar_volume = float(future_60s_dollar_volume[idx])
            vwap_dev_bps = ((decision_mid / decision_vwap) - 1.0) * 10_000.0

            event_state_rows.append(
                {
                    **event,
                    "mid": decision_mid,
                    "ask": decision_ask,
                    "spread_bps": decision_spread_bps,
                    "quote_age_ms": decision_quote_age_ms,
                    "ask_size_usd": decision_ask_size_usd,
                    "rolling_vwap": decision_vwap,
                    "rolling_vwap_std": decision_vwap_std,
                    "realized_vol_60s_bps": decision_realized_vol_60s_bps,
                    "future_60s_dollar_volume": decision_future_60s_dollar_volume,
                    "regular_open_mid": regular_open_mid,
                    "mid_30m_after_open": mid_30m_after_open,
                    "regular_close_mid": regular_close_mid,
                    "eod_mid": eod_mid,
                    "segment_end_ts": segment_end_ts,
                    "vwap_dev_bps": vwap_dev_bps,
                }
            )

            for latency_s in latencies_s:
                live_start_idx = idx + latency_s
                if event["segment"] == "premarket" and live_start_idx >= regular_open_idx:
                    continue
                if live_start_idx > search_end_idx or live_start_idx >= n_seconds:
                    continue

                live_mid = mid[live_start_idx: search_end_idx + 1]
                live_ask = ask[live_start_idx: search_end_idx + 1]
                live_bid = 2.0 * live_mid - live_ask
                live_ask_size_usd = ask_size_usd[live_start_idx: search_end_idx + 1]
                live_quote_update_flag = np.ones(len(live_ask), dtype=bool)
                live_trade_notional_1s = dollar_volume_1s[live_start_idx: search_end_idx + 1]
                live_vwap = rolling_vwap[live_start_idx: search_end_idx + 1]
                live_vwap_std = rolling_vwap_std[live_start_idx: search_end_idx + 1]

                trailing_anchor = np.maximum.accumulate(live_mid)
                strategy_specs = []
                for offset_bps in trailing_offsets_bps:
                    strategy_specs.append(
                        (
                            "trailing_mid",
                            offset_bps,
                            decision_mid,
                            trailing_anchor * (1.0 - offset_bps / 10_000.0),
                        )
                    )
                for offset_bps in ask_cap_offsets_bps:
                    strategy_specs.append(
                        (
                            "buy_at_ask_cap",
                            offset_bps,
                            decision_ask,
                            np.full_like(live_ask, decision_ask * (1.0 + offset_bps / 10_000.0)),
                        )
                    )
                strategy_specs.append(("vwap", 0, float(live_vwap[0]), live_vwap))
                strategy_specs.append(
                    ("vwap_minus_1std", 0, float(live_vwap[0] - live_vwap_std[0]), live_vwap - live_vwap_std)
                )

                for strategy, offset_bps, anchor_px, limit_series in strategy_specs:
                    fill_summary = summarize_fill_path(
                        live_ask,
                        limit_series,
                        live_ask_size_usd,
                        live_quote_update_flag,
                        live_trade_notional_1s,
                        fwd_volume_participation=FWD_VOLUME_PARTICIPATION,
                    )
                    inside25_fill_summary = summarize_inside25_fill_path(
                        live_bid,
                        live_ask,
                        limit_series,
                        live_ask_size_usd,
                        live_quote_update_flag,
                    )
                    segment_end_px = float(mid[search_end_idx])
                    policy_family = policy_family_for_strategy(strategy)
                    aggression_rank = aggressiveness_rank_for_policy(strategy, latency_s, offset_bps)

                    for requested_notional in requested_notionals:
                        filled_notional = min(requested_notional, float(fill_summary["fillable_total_usd"]))
                        fill_fraction = filled_notional / requested_notional if requested_notional else 0.0
                        filled_flag = int(fill_fraction > 0.0)
                        if filled_flag:
                            cumulative_fillable_usd = fill_summary["cumulative_fillable_usd"]
                            cumulative_shares = fill_summary["cumulative_shares"]
                            fill_idx = int(np.searchsorted(cumulative_fillable_usd, filled_notional, side="left"))
                            fill_abs_idx = live_start_idx + fill_idx
                            fill_ts = start_ts + timedelta(seconds=fill_abs_idx)
                            prior_cum_fillable_usd = float(cumulative_fillable_usd[fill_idx - 1]) if fill_idx > 0 else 0.0
                            prior_cum_shares = float(cumulative_shares[fill_idx - 1]) if fill_idx > 0 else 0.0
                            final_slice_notional = filled_notional - prior_cum_fillable_usd
                            fill_px_at_idx = float(live_ask[fill_idx])
                            filled_shares = prior_cum_shares + final_slice_notional / fill_px_at_idx
                            fill_px = filled_notional / filled_shares if filled_shares > 0.0 else None
                        else:
                            fill_abs_idx = None
                            fill_ts = None
                            fill_px = None

                        inside25_filled_notional = min(requested_notional, float(inside25_fill_summary["fillable_total_usd"]))
                        inside25_fill_fraction = inside25_filled_notional / requested_notional if requested_notional else 0.0
                        inside25_filled_flag = int(inside25_fill_fraction > 0.0)
                        if inside25_filled_flag:
                            inside25_cumulative_fillable_usd = inside25_fill_summary["cumulative_fillable_usd"]
                            inside25_cumulative_shares = inside25_fill_summary["cumulative_shares"]
                            inside25_fill_idx = int(np.searchsorted(inside25_cumulative_fillable_usd, inside25_filled_notional, side="left"))
                            inside25_fill_abs_idx = live_start_idx + inside25_fill_idx
                            inside25_fill_ts = start_ts + timedelta(seconds=inside25_fill_abs_idx)
                            inside25_prior_cum_fillable_usd = float(inside25_cumulative_fillable_usd[inside25_fill_idx - 1]) if inside25_fill_idx > 0 else 0.0
                            inside25_prior_cum_shares = float(inside25_cumulative_shares[inside25_fill_idx - 1]) if inside25_fill_idx > 0 else 0.0
                            inside25_final_slice_notional = inside25_filled_notional - inside25_prior_cum_fillable_usd
                            inside25_fill_px_at_idx = float(live_ask[inside25_fill_idx] - 0.25 * (live_ask[inside25_fill_idx] - live_bid[inside25_fill_idx]))
                            inside25_filled_shares = inside25_prior_cum_shares + inside25_final_slice_notional / inside25_fill_px_at_idx
                            inside25_fill_px = inside25_filled_notional / inside25_filled_shares if inside25_filled_shares > 0.0 else None
                        else:
                            inside25_fill_abs_idx = None
                            inside25_fill_ts = None
                            inside25_fill_px = None

                        max_fillable_usd = float(fill_summary["fillable_total_usd"])
                        inside25_max_fillable_usd = float(inside25_fill_summary["fillable_total_usd"])
                        minute_1_idx = min(fill_abs_idx + 60, n_seconds - 1) if fill_abs_idx is not None else None
                        minute_5_idx = min(fill_abs_idx + 300, n_seconds - 1) if fill_abs_idx is not None else None
                        markout_1m_px = float(mid[minute_1_idx]) if minute_1_idx is not None else None
                        markout_5m_px = float(mid[minute_5_idx]) if minute_5_idx is not None else None
                        inside25_minute_1_idx = min(inside25_fill_abs_idx + 60, n_seconds - 1) if inside25_fill_abs_idx is not None else None
                        inside25_minute_5_idx = min(inside25_fill_abs_idx + 300, n_seconds - 1) if inside25_fill_abs_idx is not None else None
                        inside25_markout_1m_px = float(mid[inside25_minute_1_idx]) if inside25_minute_1_idx is not None else None
                        inside25_markout_5m_px = float(mid[inside25_minute_5_idx]) if inside25_minute_5_idx is not None else None
                        m30_after_open_px = float(mid[
                                                  open_plus_30m_idx]) if fill_abs_idx is not None and fill_abs_idx <= open_plus_30m_idx else None
                        inside25_m30_after_open_px = float(mid[open_plus_30m_idx]) if inside25_fill_abs_idx is not None and inside25_fill_abs_idx <= open_plus_30m_idx else None
                        reg_close_px = float(mid[
                                             regular_close_idx]) if fill_abs_idx is not None and fill_abs_idx <= regular_close_idx else None
                        inside25_reg_close_px = float(mid[regular_close_idx]) if inside25_fill_abs_idx is not None and inside25_fill_abs_idx <= regular_close_idx else None
                        next_open_px = (
                            regular_open_mid
                            if event["segment"] == "premarket"
                            else build_next_open_proxy(fill_px or decision_mid, eod_mid, event["article_id"])
                        )
                        inside25_next_open_px = (
                            regular_open_mid
                            if event["segment"] == "premarket"
                            else build_next_open_proxy(inside25_fill_px or decision_mid, eod_mid, event["article_id"])
                        )
                        policy_id = label_policy_id(strategy, latency_s, offset_bps, requested_notional)
                        outcome_rows.append(
                            {
                                "article_id": event["article_id"],
                                "event_key": event["event_key"],
                                "publish_ts": event["publish_ts"],
                                "trade_date": event["trade_date"],
                                "event_date": event["event_date"],
                                "ticker": event["ticker"],
                                "segment": event["segment"],
                                "daypart": event["daypart"],
                                "event_type": event["event_type"],
                                "is_overlap_event": event["is_overlap_event"],
                                "has_prior_same_day_event": event["has_prior_same_day_event"],
                                "prior_event_gap_s": event["prior_event_gap_s"],
                                "policy_id": policy_id,
                                "policy_family": policy_family,
                                "aggression_rank_within_family": aggression_rank,
                                "strategy": strategy,
                                "latency_s": latency_s,
                                "offset_bps": offset_bps,
                                "requested_notional": requested_notional,
                                "fill_ts": fill_ts,
                                "fill_px": fill_px,
                                "inside25_fill_ts": inside25_fill_ts,
                                "inside25_fill_px": inside25_fill_px,
                                "anchor_px": float(anchor_px),
                                "filled_notional": filled_notional,
                                "inside25_filled_notional": inside25_filled_notional,
                                "max_fillable_usd": max_fillable_usd,
                                "inside25_max_fillable_usd": inside25_max_fillable_usd,
                                "fill_fraction": fill_fraction,
                                "inside25_fill_fraction": inside25_fill_fraction,
                                "filled_flag": filled_flag,
                                "inside25_filled_flag": inside25_filled_flag,
                                "markout_1m_px": markout_1m_px,
                                "inside25_markout_1m_px": inside25_markout_1m_px,
                                "markout_5m_px": markout_5m_px,
                                "inside25_markout_5m_px": inside25_markout_5m_px,
                                "segment_end_px": segment_end_px,
                                "m30_after_open_px": m30_after_open_px,
                                "inside25_m30_after_open_px": inside25_m30_after_open_px,
                                "reg_close_px": reg_close_px,
                                "inside25_reg_close_px": inside25_reg_close_px,
                                "eod_px": eod_mid,
                                "next_open_px": next_open_px,
                                "inside25_next_open_px": inside25_next_open_px,
                                "spread_bps_at_decision": decision_spread_bps,
                                "quote_age_ms_at_decision": decision_quote_age_ms,
                                "ask_size_usd_at_decision": decision_ask_size_usd,
                                "rolling_dollar_volume_60s_at_decision": decision_future_60s_dollar_volume,
                                "vwap_dev_bps_at_decision": vwap_dev_bps,
                                "realized_vol_60s_bps_at_decision": decision_realized_vol_60s_bps,
                                "has_news_features": event["has_news_features"],
                                "sentiment_score": event["sentiment_score"],
                                "confidence": event["confidence"],
                                "relevance": event["relevance"],
                                "direction_strength": event["direction_strength"],
                                "novelty": event["novelty"],
                                "event_materiality": event["event_materiality"],
                                "stock_prominence": event["stock_prominence"],
                                "mention_count": event["mention_count"],
                                "headline_hit": event["headline_hit"],
                                "snippet_count": event["snippet_count"],
                            }
                        )

        if event_state_rows:
            event_state_frames.append(pl.DataFrame(event_state_rows, schema=event_state_schema))
        if outcome_rows:
            outcome_frames.append(pl.DataFrame(outcome_rows, schema=outcomes_schema))

    event_state = (
        pl.concat(event_state_frames, how="vertical_relaxed")
        if event_state_frames
        else pl.DataFrame(schema=event_state_schema)
    )
    outcomes = (
        pl.concat(outcome_frames, how="vertical_relaxed")
        if outcome_frames
        else pl.DataFrame(schema=outcomes_schema)
    )

    if outcomes.height == 0:
        return event_state, _finalize_outcomes(
            outcomes,
            primary_exit=PRIMARY_EXIT,
            primary_value_col=PRIMARY_VALUE_COL,
        )

    return event_state, _finalize_outcomes(
        outcomes,
        primary_exit=PRIMARY_EXIT,
        primary_value_col=PRIMARY_VALUE_COL,
    )

4) Build policy-conditioned event outcomes

Execution is evaluated separately for: - trailing buy limits at x bps below a trailing mid anchor - buy at rolling intraday VWAP - buy at rolling intraday VWAP - 1 std

The latency ladder assumes 30s to 90s total delay from source to machine to execution. Premarket event-latency combinations are dropped once that delayed decision time reaches 09:30 ET.

The key notebook output here is the action-conditioned value table, including requested-dollar normalized targets that keep no-fill rows in the learning problem.

Code
event_state = load_cached_parquet("event_state")
outcomes = load_cached_parquet("outcomes")
if event_state is None or outcomes is None:
    if MARKET_DATA_MODE == "real":
        try:
            event_state, outcomes, real_market_warnings = build_real_event_state_and_outcomes(
                event_panel,
                clickhouse_flight_uri=CLICKHOUSE_FLIGHT_URI,
                clickhouse_flight_user=CLICKHOUSE_FLIGHT_USER,
                clickhouse_flight_password=CLICKHOUSE_FLIGHT_PASSWORD,
                quotes_table=POLYGON_QUOTES_TABLE,
                trades_table=POLYGON_TRADES_TABLE,
                requested_notionals=REQUESTED_NOTIONALS,
                latencies_s=LATENCIES_S,
                trailing_offsets_bps=TRAILING_OFFSETS_BPS,
                ask_cap_offsets_bps=ASK_CAP_OFFSETS_BPS,
                fwd_volume_participation=FWD_VOLUME_PARTICIPATION,
                real_market_max_date_excl=REAL_MARKET_MAX_DATE_EXCL,
                primary_exit=PRIMARY_EXIT,
                primary_value_col=PRIMARY_VALUE_COL,
                label_policy_id=label_policy_id,
                policy_family_for_strategy=policy_family_for_strategy,
                aggressiveness_rank_for_policy=aggressiveness_rank_for_policy,
                build_next_open_proxy=build_next_open_proxy,
            )
        except Exception as exc:
            raise RuntimeError(
                f"Failed to load real Polygon market data from ClickHouse Flight at {CLICKHOUSE_FLIGHT_URI}. "
                "Switch `MARKET_DATA_MODE` to `synthetic` or verify the Arrow Flight endpoint."
            ) from exc
        for warning_message in real_market_warnings:
            emit_warning(warning_message)
    else:
        real_market_warnings = []
        event_state, outcomes = build_mock_event_state_and_outcomes(
            event_panel,
            rng_seed=RNG_SEED,
            requested_notionals=REQUESTED_NOTIONALS,
            latencies_s=LATENCIES_S,
            trailing_offsets_bps=TRAILING_OFFSETS_BPS,
            ask_cap_offsets_bps=ASK_CAP_OFFSETS_BPS,
        )
    write_cached_parquet(event_state, "event_state")
    write_cached_parquet(outcomes, "outcomes")
else:
    real_market_warnings = []

event_state_preview = event_state.select(
    "publish_ts",
    "ticker",
    "segment",
    "daypart",
    "event_type",
    "spread_bps",
    "quote_age_ms",
    "vwap_dev_bps",
    "is_overlap_event",
)
outcomes_preview = outcomes.select(
    "publish_ts",
    "ticker",
    "policy_id",
    "policy_family",
    "requested_notional",
    "fill_fraction",
    "inside25_fill_fraction",
    "req_fill_to_open_bps",
    "req_fill_to_30m_after_open_bps",
    "inside25_req_fill_to_30m_after_open_bps",
    "req_fill_to_reg_close_bps",
    "req_fill_to_eod_bps",
    "primary_exit_bps",
    "primary_exit_pnl",
)
policy_row_counts = outcomes.group_by("policy_id").agg(pl.len().alias("n_rows")).sort("n_rows", descending=True)
requested_notional_counts = outcomes.group_by("requested_notional").agg(pl.len().alias("n_rows")).sort(
    "requested_notional")

print("market data mode:", MARKET_DATA_MODE)
print("event rows:", f"{event_state.height:,}")
print("outcome rows:", f"{outcomes.height:,}")
print("unique policies:", outcomes.get_column("policy_id").n_unique() if outcomes.height else 0)
Code
show_df(event_state_preview, n=12)
Loading ITables v2.7.3 from the internet... (need help?)
Code
show_df(outcomes_preview, n=12)
Loading ITables v2.7.3 from the internet... (need help?)
Code
show_df(policy_row_counts, n=12)
Loading ITables v2.7.3 from the internet... (need help?)
Code
show_df(requested_notional_counts, n=12)
Loading ITables v2.7.3 from the internet... (need help?)
Code
plot_market_state_overview(event_state)

Decision-time market state overview

Code
show_df(
    outcomes.select(
        "policy_id",
        "strategy",
        "latency_s",
        "offset_bps",
        "requested_notional",
        "filled_flag",
        "fill_fraction",
        "adverse_fill_bps",
        "req_fill_to_30m_after_open_bps",
        "req_fill_to_reg_close_bps",
        "fill_to_30m_after_open_bps",
        "fill_to_reg_close_bps",
    ),
    n=12,
)
Loading ITables v2.7.3 from the internet... (need help?)

5) Policy summary tables and action-space definition

POLICY_NOTIONAL remains a descriptive diagnostics slice. MODEL_NOTIONAL freezes the action-space size for frontier analysis and the first joint-model dataset.

Code
policy = outcomes.filter(pl.col("requested_notional") == POLICY_NOTIONAL)
policy_clean = policy.filter(~pl.col("is_overlap_event"))

model_policy = outcomes.filter(pl.col("requested_notional") == MODEL_NOTIONAL)
model_policy_feature = model_policy.filter(pl.col("has_news_features"))
model_policy_clean = model_policy_feature.filter(~pl.col("is_overlap_event"))
selected_policy_sample = "clean" if FRONTIER_SAMPLE == "clean" else "full"
model_policy_selected = model_policy_clean if selected_policy_sample == "clean" else model_policy_feature


def summarize_policy_metrics(df: pl.DataFrame, label: str) -> pl.DataFrame:
    if df.height == 0:
        return pl.DataFrame(
            schema={
                "policy_id": pl.Utf8,
                "policy_family": pl.Utf8,
                "strategy": pl.Utf8,
                "latency_s": pl.Int64,
                "offset_bps": pl.Int64,
                "n_orders": pl.UInt32,
                "fill_rate": pl.Float64,
                "inside25_fill_rate": pl.Float64,
                "avg_fill_fraction": pl.Float64,
                "inside25_avg_fill_fraction": pl.Float64,
                "avg_adverse_fill_bps": pl.Float64,
                "inside25_avg_adverse_fill_bps": pl.Float64,
                "avg_req_fill_to_open_bps": pl.Float64,
                "avg_req_fill_to_30m_after_open_bps": pl.Float64,
                "inside25_avg_req_fill_to_30m_after_open_bps": pl.Float64,
                "avg_req_fill_to_reg_close_bps": pl.Float64,
                "avg_req_markout_1m_bps": pl.Float64,
                "avg_req_markout_5m_bps": pl.Float64,
                "avg_req_fill_to_eod_bps": pl.Float64,
                "delta_inside25_fill_rate": pl.Float64,
                "delta_inside25_avg_adverse_fill_bps": pl.Float64,
                "delta_inside25_avg_req_fill_to_30m_after_open_bps": pl.Float64,
                "p10_req_fill_to_30m_after_open_bps": pl.Float64,
                "avg_fill_to_30m_after_open_bps": pl.Float64,
                "avg_fill_to_reg_close_bps": pl.Float64,
                "sample": pl.Utf8,
            }
        )
    return (
        df.group_by(["policy_id", "policy_family", "strategy", "latency_s", "offset_bps"])
        .agg(
            pl.len().alias("n_orders"),
            pl.col("filled_flag").mean().alias("fill_rate"),
            pl.col("inside25_filled_flag").mean().alias("inside25_fill_rate"),
            pl.col("fill_fraction").mean().alias("avg_fill_fraction"),
            pl.col("inside25_fill_fraction").mean().alias("inside25_avg_fill_fraction"),
            pl.col("adverse_fill_bps").mean().alias("avg_adverse_fill_bps"),
            pl.col("inside25_adverse_fill_bps").mean().alias("inside25_avg_adverse_fill_bps"),
            pl.col("req_fill_to_open_bps").mean().alias("avg_req_fill_to_open_bps"),
            pl.col("req_fill_to_30m_after_open_bps").mean().alias("avg_req_fill_to_30m_after_open_bps"),
            pl.col("inside25_req_fill_to_30m_after_open_bps").mean().alias("inside25_avg_req_fill_to_30m_after_open_bps"),
            pl.col("req_fill_to_reg_close_bps").mean().alias("avg_req_fill_to_reg_close_bps"),
            pl.col("req_markout_1m_bps").mean().alias("avg_req_markout_1m_bps"),
            pl.col("req_markout_5m_bps").mean().alias("avg_req_markout_5m_bps"),
            pl.col("req_fill_to_eod_bps").mean().alias("avg_req_fill_to_eod_bps"),
            pl.col("req_fill_to_30m_after_open_bps").quantile(0.10).alias("p10_req_fill_to_30m_after_open_bps"),
            pl.col("fill_to_30m_after_open_bps").mean().alias("avg_fill_to_30m_after_open_bps"),
            pl.col("fill_to_reg_close_bps").mean().alias("avg_fill_to_reg_close_bps"),
        )
        .with_columns(
            (pl.col("inside25_fill_rate") - pl.col("fill_rate")).alias("delta_inside25_fill_rate"),
            (pl.col("inside25_avg_adverse_fill_bps") - pl.col("avg_adverse_fill_bps")).alias("delta_inside25_avg_adverse_fill_bps"),
            (pl.col("inside25_avg_req_fill_to_30m_after_open_bps") - pl.col("avg_req_fill_to_30m_after_open_bps")).alias("delta_inside25_avg_req_fill_to_30m_after_open_bps"),
        )
        .sort(["strategy", "latency_s", "offset_bps"])
        .with_columns(pl.lit(label).alias("sample"))
    )


policy_universe_summary = pl.DataFrame(
    {
        "sample": ["policy_full", "policy_clean", "model_feature", f"model_{selected_policy_sample}"],
        "requested_notional": [POLICY_NOTIONAL, POLICY_NOTIONAL, MODEL_NOTIONAL, MODEL_NOTIONAL],
        "n_rows": [policy.height, policy_clean.height, model_policy_feature.height, model_policy_selected.height],
        "n_events": [
            policy.get_column("event_key").n_unique() if policy.height else 0,
            policy_clean.get_column("event_key").n_unique() if policy_clean.height else 0,
            model_policy_feature.get_column("event_key").n_unique() if model_policy_feature.height else 0,
            model_policy_selected.get_column("event_key").n_unique() if model_policy_selected.height else 0,
        ],
        "n_policies": [
            policy.get_column("policy_id").n_unique() if policy.height else 0,
            policy_clean.get_column("policy_id").n_unique() if policy_clean.height else 0,
            model_policy_feature.get_column("policy_id").n_unique() if model_policy_feature.height else 0,
            model_policy_selected.get_column("policy_id").n_unique() if model_policy_selected.height else 0,
        ],
    }
)

if model_policy_feature.height == 0:
    emit_warning(
        "The fixed-notional modeling slice has zero rows with joined news features. Frontier and joint-model sections will be empty."
    )
elif model_policy_selected.height == 0:
    emit_warning(
        f"The selected modeling sample '{selected_policy_sample}' is empty after overlap filtering."
    )

policy_summary = pl.concat(
    [
        summarize_policy_metrics(policy, "policy_full"),
        summarize_policy_metrics(policy_clean, "policy_clean"),
        summarize_policy_metrics(model_policy_feature, "model_feature"),
        summarize_policy_metrics(model_policy_selected, f"model_{selected_policy_sample}"),
    ],
    how="vertical_relaxed",
)
model_policy_summary = policy_summary.filter(pl.col("sample") == f"model_{selected_policy_sample}")
Code
show_df(policy_universe_summary, n=10)
Loading ITables v2.7.3 from the internet... (need help?)
Code
show_df(model_policy_summary, n=24)
Loading ITables v2.7.3 from the internet... (need help?)
Code
policy_leaderboard = (
    model_policy_summary
    .filter(pl.col("n_orders") >= MIN_ROWS_PER_POLICY)
    .sort(["avg_req_fill_to_30m_after_open_bps", "fill_rate"], descending=[True, True])
)
show_df(
    policy_leaderboard.select(
        "policy_id",
        "strategy",
        "latency_s",
        "offset_bps",
        "n_orders",
        "fill_rate",
        "inside25_fill_rate",
        "avg_fill_fraction",
        "inside25_avg_fill_fraction",
        "avg_adverse_fill_bps",
        "inside25_avg_adverse_fill_bps",
        "avg_req_fill_to_open_bps",
        "avg_req_fill_to_30m_after_open_bps",
        "inside25_avg_req_fill_to_30m_after_open_bps",
        "delta_inside25_fill_rate",
        "delta_inside25_avg_adverse_fill_bps",
        "delta_inside25_avg_req_fill_to_30m_after_open_bps",
        "avg_req_fill_to_reg_close_bps",
        "avg_req_markout_1m_bps",
        "avg_req_markout_5m_bps",
        "avg_req_fill_to_eod_bps",
    ),
    n=24,
)
Loading ITables v2.7.3 from the internet... (need help?)

6) Diagnostic plots, Pareto frontier, and signal-policy interaction

The diagnostic section keeps the tradeability and fill-quality checks, but shifts the main policy readout toward requested-dollar EV, frontier structure, and signal-conditioned aggressiveness.

Code
plot_strategy_metric_grid(
    policy,
    strategies=ENTRY_STRATEGIES,
    metrics=[
        {
            "col": "filled_flag",
            "title": "Fill rate",
            "cmap": "YlGnBu",
            "fmt": ".0%",
            "percent": True,
        },
        {
            "col": "req_fill_to_30m_after_open_bps",
            "title": "Requested EV to 30m after open",
            "cmap": "RdYlGn",
            "center": 0.0,
            "fmt": ".2f",
        },
        {
            "col": "adverse_fill_bps",
            "title": "Adverse fill",
            "cmap": "RdYlGn_r",
            "fmt": ".2f",
        },
    ],
    title=f"Policy surface overview at ${POLICY_NOTIONAL:,}: fillability, value, and execution cost",
)

Policy surface overview at $5,000: fillability, value, and execution cost

Detailed context splits are kept for appendix diagnostics so the main flow stays focused on the policy surface and frontier.

Code
spread_split = summarize_split(policy, ["strategy", "spread_bucket"])
quote_age_split = summarize_split(policy, ["strategy", "quote_age_bucket"])
dollar_volume_split = summarize_split(policy, ["strategy", "dollar_volume_bucket"])
session_split = summarize_split(policy, ["daypart", "strategy"])
event_type_split = summarize_split(policy, ["event_type", "strategy"])
overlap_split = summarize_split(policy, ["strategy", "is_overlap_event"])
Code
filled_policy = policy.filter(pl.col("filled_flag") == 1)

filled_horizon_summary = (
    filled_policy.group_by(["policy_id", "policy_family", "strategy", "daypart"])
    .agg(
        pl.len().alias("n_fills"),
        pl.col("fill_to_open_bps").mean().alias("avg_fill_to_open_bps"),
        pl.col("markout_1m_bps").mean().alias("avg_markout_1m_bps"),
        pl.col("markout_5m_bps").mean().alias("avg_markout_5m_bps"),
        pl.col("fill_to_30m_after_open_bps").mean().alias("avg_fill_to_30m_after_open_bps"),
        pl.col("fill_to_reg_close_bps").mean().alias("avg_fill_to_reg_close_bps"),
        pl.col("fill_to_eod_bps").mean().alias("avg_fill_to_eod_bps"),
        pl.col("req_fill_to_open_bps").mean().alias("avg_req_fill_to_open_bps"),
        pl.col("req_markout_1m_bps").mean().alias("avg_req_markout_1m_bps"),
        pl.col("req_markout_5m_bps").mean().alias("avg_req_markout_5m_bps"),
        pl.col("req_fill_to_30m_after_open_bps").mean().alias("avg_req_fill_to_30m_after_open_bps"),
        pl.col("req_fill_to_reg_close_bps").mean().alias("avg_req_fill_to_reg_close_bps"),
        pl.col("req_fill_to_eod_bps").mean().alias("avg_req_fill_to_eod_bps"),
    )
    .sort(["daypart", "strategy", "policy_id"])
)

transition_summary = (
    filled_policy.group_by(["policy_id", "policy_family", "strategy", "daypart"])
    .agg(
        pl.len().alias("n_fills"),
        ((pl.col("fill_to_open_bps") > 0) & (pl.col("fill_to_reg_close_bps") < 0)).mean().alias(
            "share_pos_open_neg_close"),
        ((pl.col("markout_5m_bps") > 0) & (pl.col("fill_to_30m_after_open_bps") < 0)).mean().alias(
            "share_pos_5m_neg_30m"),
        ((pl.col("markout_5m_bps") < 0) & (pl.col("fill_to_30m_after_open_bps") > 0)).mean().alias(
            "share_neg_5m_pos_30m"),
        ((pl.col("fill_to_open_bps") < 0) & (pl.col("fill_to_reg_close_bps") > 0)).mean().alias(
            "share_neg_open_pos_close"),
    )
    .sort(["daypart", "strategy", "policy_id"])
)

return_share_summary = (
    filled_policy.group_by(["policy_id", "policy_family", "strategy", "daypart"])
    .agg(
        pl.len().alias("n_fills"),
        pl.col("fill_to_reg_close_pnl").sum().alias("total_fill_to_reg_close_pnl"),
        pl.col("fill_to_open_pnl").sum().alias("pre_open_pnl"),
        (pl.col("fill_to_30m_after_open_pnl") - pl.col("fill_to_open_pnl")).sum().alias("open_to_30m_pnl"),
        (pl.col("fill_to_reg_close_pnl") - pl.col("fill_to_30m_after_open_pnl")).sum().alias("m30_to_close_pnl"),
    )
    .with_columns(
        pl.when(pl.col("total_fill_to_reg_close_pnl").abs() > 1e-9)
        .then(pl.col("pre_open_pnl") / pl.col("total_fill_to_reg_close_pnl"))
        .otherwise(None)
        .alias("pre_open_share"),
        pl.when(pl.col("total_fill_to_reg_close_pnl").abs() > 1e-9)
        .then(pl.col("open_to_30m_pnl") / pl.col("total_fill_to_reg_close_pnl"))
        .otherwise(None)
        .alias("open_to_30m_share"),
        pl.when(pl.col("total_fill_to_reg_close_pnl").abs() > 1e-9)
        .then(pl.col("m30_to_close_pnl") / pl.col("total_fill_to_reg_close_pnl"))
        .otherwise(None)
        .alias("m30_to_close_share"),
    )
    .sort(["daypart", "strategy", "policy_id"])
)

policy_daypart_diagnostics = (
    filled_horizon_summary
    .join(
        transition_summary.select(
            "policy_id",
            "daypart",
            "share_pos_open_neg_close",
            "share_pos_5m_neg_30m",
            "share_neg_5m_pos_30m",
            "share_neg_open_pos_close",
        ),
        on=["policy_id", "daypart"],
        how="left",
    )
    .join(
        return_share_summary.select(
            "policy_id",
            "daypart",
            "pre_open_share",
            "open_to_30m_share",
            "m30_to_close_share",
        ),
        on=["policy_id", "daypart"],
        how="left",
    )
)

show_df(policy_daypart_diagnostics, n=40)
Loading ITables v2.7.3 from the internet... (need help?)
Code
show_df(
    transition_summary.select(
        "policy_id",
        "daypart",
        "n_fills",
        "share_pos_open_neg_close",
        "share_pos_5m_neg_30m",
        "share_neg_5m_pos_30m",
        "share_neg_open_pos_close",
    ),
    n=30,
)
show_df(
    return_share_summary.select(
        "policy_id",
        "daypart",
        "n_fills",
        "total_fill_to_reg_close_pnl",
        "pre_open_share",
        "open_to_30m_share",
        "m30_to_close_share",
    ),
    n=30,
)

plot_target_horizon_profile(filled_policy)

family_return_share = (
    filled_policy.group_by(["policy_family", "daypart"])
    .agg(
        pl.col("fill_to_reg_close_pnl").sum().alias("total_fill_to_reg_close_pnl"),
        pl.col("fill_to_open_pnl").sum().alias("pre_open_pnl"),
        (pl.col("fill_to_30m_after_open_pnl") - pl.col("fill_to_open_pnl")).sum().alias("open_to_30m_pnl"),
        (pl.col("fill_to_reg_close_pnl") - pl.col("fill_to_30m_after_open_pnl")).sum().alias("m30_to_close_pnl"),
    )
    .with_columns(
        pl.when(pl.col("total_fill_to_reg_close_pnl").abs() > 1e-9)
        .then(pl.col("pre_open_pnl") / pl.col("total_fill_to_reg_close_pnl"))
        .otherwise(None)
        .alias("pre_open_share"),
        pl.when(pl.col("total_fill_to_reg_close_pnl").abs() > 1e-9)
        .then(pl.col("open_to_30m_pnl") / pl.col("total_fill_to_reg_close_pnl"))
        .otherwise(None)
        .alias("open_to_30m_share"),
        pl.when(pl.col("total_fill_to_reg_close_pnl").abs() > 1e-9)
        .then(pl.col("m30_to_close_pnl") / pl.col("total_fill_to_reg_close_pnl"))
        .otherwise(None)
        .alias("m30_to_close_share"),
    )
    .with_columns(pl.col("policy_family").map_elements(strategy_display_name, return_dtype=pl.Utf8))
    .sort(["daypart", "policy_family"])
)
if family_return_share.height:
    plot_return_share_decomposition(family_return_share)

print(
    f"Requested-notional sweep moves to the appendix. The joint-model dataset stays frozen at MODEL_NOTIONAL=${MODEL_NOTIONAL:,}."
)

capacity_curves = (
    outcomes.group_by(["strategy", "requested_notional"])
    .agg(
        pl.col("filled_flag").mean().alias("fill_rate"),
        pl.col("fill_fraction").mean().alias("avg_fill_fraction"),
        pl.col("req_fill_to_30m_after_open_bps").mean().alias("avg_req_fill_to_30m_after_open_bps"),
        pl.col("req_fill_to_reg_close_bps").mean().alias("avg_req_fill_to_reg_close_bps"),
        pl.col("fill_to_30m_after_open_bps").mean().alias("avg_fill_to_30m_after_open_bps"),
        pl.col("fill_to_reg_close_bps").mean().alias("avg_fill_to_reg_close_bps"),
        pl.col("fill_to_reg_close_pnl").mean().alias("avg_fill_to_reg_close_pnl"),
    )
    .sort(["strategy", "requested_notional"])
)
show_df(capacity_curves, n=20)

Primary-target selection: requested EV by horizon

Requested-notional sweep moves to the appendix. The joint-model dataset stays frozen at MODEL_NOTIONAL=$2,500.
Loading ITables v2.7.3 from the internet... (need help?)

6.5) Exploratory within-family frontiers

Keep the exploratory frontier view narrow here so the main all-policy frontier remains in the decision package.

Code
frontier_df = (
    model_policy_selected.group_by(["policy_id", "policy_family", "strategy", "latency_s", "offset_bps"])
    .agg(
        pl.len().alias("n_orders"),
        pl.col("filled_flag").mean().alias("fill_rate"),
        pl.col("fill_fraction").mean().alias("avg_fill_fraction"),
        pl.col("adverse_fill_bps").mean().alias("avg_adverse_fill_bps"),
        pl.col("req_fill_to_open_bps").mean().alias("avg_req_fill_to_open_bps"),
        pl.col("req_fill_to_30m_after_open_bps").mean().alias("avg_req_fill_to_30m_after_open_bps"),
        pl.col("req_fill_to_reg_close_bps").mean().alias("avg_req_fill_to_reg_close_bps"),
        pl.col("req_markout_1m_bps").mean().alias("avg_req_markout_1m_bps"),
        pl.col("req_markout_5m_bps").mean().alias("avg_req_markout_5m_bps"),
        pl.col("req_fill_to_eod_bps").mean().alias("avg_req_fill_to_eod_bps"),
        pl.col("req_fill_to_30m_after_open_bps").quantile(0.10).alias("p10_req_fill_to_30m_after_open_bps"),
    )
    .filter(pl.col("n_orders") >= MIN_ROWS_PER_POLICY)
    .sort(["strategy", "latency_s", "offset_bps"])
)
frontier_df = add_frontier_flag(
    frontier_df,
    x_col="fill_rate",
    y_col="avg_req_fill_to_30m_after_open_bps",
)
show_df(frontier_df, n=30)
Loading ITables v2.7.3 from the internet... (need help?)

Exploratory frontier charts stay focused on within-family tradeoffs. The full all-policy frontier is shown later in section 8.4.

Code
for family in ENTRY_STRATEGIES:
    family_frontier = frontier_df.filter(pl.col("policy_family") == family)
    if family_frontier.height == 0:
        continue
    family_frontier = family_frontier.with_columns(
        pl.struct(["strategy", "offset_bps", "latency_s"]).map_elements(
            lambda row: policy_variant_label(row["strategy"], row["offset_bps"], row["latency_s"]),
            return_dtype=pl.Utf8,
        ).alias("policy_label")
    )
    plot_policy_frontier(
        family_frontier,
        x_col="fill_rate",
        y_col="avg_req_fill_to_30m_after_open_bps",
        color_col="avg_adverse_fill_bps",
        label_col="policy_label",
        title=f"Within-family frontier to 30m after open: {strategy_display_name(family)}",
        cmap="viridis_r",
        annotate_all=False,
    )

6.6) Signal-vs-aggressiveness by policy family

Use one compact family-level heatmap plus a best-variant summary for each policy family so the signal interaction answers are visible without a long sequence of per-latency charts.

Code
if model_policy_selected.height == 0:
    model_policy_signal = model_policy_selected
    signal_policy_summary = pl.DataFrame(
        schema={
            "signal_bucket": pl.Utf8,
            "policy_id": pl.Utf8,
            "policy_family": pl.Utf8,
            "strategy": pl.Utf8,
            "latency_s": pl.Int64,
            "offset_bps": pl.Int64,
            "aggression_rank_within_family": pl.Float64,
            "n_orders": pl.UInt32,
            "fill_rate": pl.Float64,
            "avg_fill_fraction": pl.Float64,
            "avg_adverse_fill_bps": pl.Float64,
            "avg_req_fill_to_30m_after_open_bps": pl.Float64,
        }
    )
    emit_warning("Signal-policy interaction is empty because the selected modeling sample has no rows.")
else:
    model_policy_signal = model_policy_selected.with_columns(
        (pl.col("sentiment_score") * pl.col("direction_strength")).alias("signed_direction_strength")
    )

    signal_score_cols = [
        "sentiment_score",
        "signed_direction_strength",
        "confidence",
        "relevance",
        "novelty",
        "event_materiality",
        "stock_prominence",
    ]

    zscore_exprs = []
    for col in signal_score_cols:
        mean_val = model_policy_signal.get_column(col).mean()
        std_val = model_policy_signal.get_column(col).std()
        mean_val = 0.0 if mean_val is None or not np.isfinite(mean_val) else float(mean_val)
        std_val = 1.0 if std_val is None or not np.isfinite(std_val) or float(std_val) == 0.0 else float(std_val)
        zscore_exprs.append(
            ((pl.col(col) - mean_val) / std_val).clip(-3.0, 3.0).alias(f"z_{col}")
        )

    model_policy_signal = model_policy_signal.with_columns(zscore_exprs).with_columns(
        pl.mean_horizontal([pl.col(f"z_{col}") for col in signal_score_cols]).alias("signal_score")
    )
    model_policy_signal = model_policy_signal.with_columns(
        (
            (
                    (pl.col("signal_score").rank(method="ordinal") - 1)
                    * SIGNAL_BUCKETS
                    / pl.len()
            )
            .floor()
            .clip(0, SIGNAL_BUCKETS - 1)
            .cast(pl.Int64)
        ).alias("signal_bucket_idx")
    )
    model_policy_signal = model_policy_signal.with_columns(
        pl.when(pl.col("signal_bucket_idx") == 0)
        .then(pl.lit("low"))
        .when(pl.col("signal_bucket_idx") == 1)
        .then(pl.lit("med"))
        .otherwise(pl.lit("high"))
        .alias("signal_bucket")
    )

    signal_policy_summary = (
        model_policy_signal.group_by(
            [
                "signal_bucket",
                "policy_id",
                "policy_family",
                "strategy",
                "latency_s",
                "offset_bps",
                "aggression_rank_within_family",
            ]
        )
        .agg(
            pl.len().alias("n_orders"),
            pl.col("filled_flag").mean().alias("fill_rate"),
            pl.col("fill_fraction").mean().alias("avg_fill_fraction"),
            pl.col("adverse_fill_bps").mean().alias("avg_adverse_fill_bps"),
            pl.col("req_fill_to_30m_after_open_bps").mean().alias("avg_req_fill_to_30m_after_open_bps"),
        )
        .sort(["signal_bucket", "strategy", "latency_s", "offset_bps"])
    )
Code
signal_bucket_coverage = (
    model_policy_signal.group_by(["signal_bucket", "policy_id"])
    .agg(pl.len().alias("n_orders"))
    .sort(["signal_bucket", "policy_id"])
    if model_policy_signal.height
    else pl.DataFrame(schema={"signal_bucket": pl.Utf8, "policy_id": pl.Utf8, "n_orders": pl.UInt32})
)
thin_signal_cells = (
    signal_bucket_coverage.filter(pl.col("n_orders") < MIN_ROWS_PER_SIGNAL_BUCKET)
    if signal_bucket_coverage.height
    else signal_bucket_coverage
)
if thin_signal_cells.height:
    emit_warning(
        f"{thin_signal_cells.height} signal_bucket x policy cells have fewer than {MIN_ROWS_PER_SIGNAL_BUCKET} rows."
    )
show_df(signal_bucket_coverage, n=30)
Loading ITables v2.7.3 from the internet... (need help?)
Code
signal_policy_plot = signal_policy_summary.filter(pl.col("n_orders") >= MIN_ROWS_PER_SIGNAL_BUCKET)

for family in ENTRY_STRATEGIES:
    plot_family_signal_overview(
        signal_policy_plot.filter(pl.col("policy_family") == family),
        family,
    )

show_df(signal_policy_summary, n=40)

Signal strength vs aggressiveness: Trailing-mid

Signal strength vs aggressiveness: Buy-at-ask cap

Signal strength vs aggressiveness: VWAP

Signal strength vs aggressiveness: VWAP - 1 std

Loading ITables v2.7.3 from the internet... (need help?)

7) Joint-model dataset construction

This notebook now stops at the modeling table. The goal is to freeze the policy/value target definition, action space, and walk-forward folds before any serious model fitting.

Code
joint_model_df = load_cached_parquet("joint_model_df")
if joint_model_df is None:
    if model_policy_signal.height == 0:
        joint_model_df = pl.DataFrame()
        emit_warning("joint_model_df is empty because the selected modeling sample has no rows.")
    else:
        supported_policies = (
            model_policy_signal.group_by(["policy_id", "policy_family", "strategy", "latency_s", "offset_bps"])
            .agg(pl.len().alias("n_rows"))
            .filter(pl.col("n_rows") >= MIN_ROWS_PER_POLICY)
        )
        joint_model_base = (
            model_policy_signal
            .join(supported_policies.select("policy_id"), on="policy_id", how="inner")
            .with_columns(
                pl.col("prior_event_gap_s").fill_null(99_999),
                pl.col("publish_ts").dt.date().alias("publish_date"),
                pl.col("publish_ts").dt.strftime("%Y-%m").alias("publish_month"),
            )
        )

        event_folds = (
            joint_model_base.select("event_key", "publish_ts", "publish_date", "publish_month")
            .unique(subset=["event_key"])
            .sort("publish_ts")
            .with_columns(
                pl.col("publish_month").rank(method="dense").cast(pl.Int64).alias("walk_forward_fold")
            )
            .rename({"publish_month": "train_test_fold"})
        )

        joint_model_df = (
            joint_model_base.join(
                event_folds.select("event_key", "train_test_fold", "walk_forward_fold"),
                on="event_key",
                how="left",
            )
            .select(
                "event_key",
                "article_id",
                "ticker",
                "publish_ts",
                "publish_date",
                "train_test_fold",
                "walk_forward_fold",
                "policy_id",
                "policy_family",
                "aggression_rank_within_family",
                "strategy",
                "latency_s",
                "offset_bps",
                "requested_notional",
                "spread_bps_at_decision",
                "quote_age_ms_at_decision",
                "ask_size_usd_at_decision",
                "rolling_dollar_volume_60s_at_decision",
                "vwap_dev_bps_at_decision",
                "realized_vol_60s_bps_at_decision",
                "prior_event_gap_s",
                "segment",
                "daypart",
                "event_type",
                "is_overlap_event",
                "sentiment_score",
                "confidence",
                "relevance",
                "direction_strength",
                "novelty",
                "event_materiality",
                "stock_prominence",
                "mention_count",
                "headline_hit",
                "snippet_count",
                "signal_score",
                "signal_bucket",
                PRIMARY_VALUE_COL,
                "primary_exit_bps",
                "primary_exit_pnl",
                "filled_flag",
                "fill_fraction",
                "adverse_fill_bps",
                "fill_to_open_bps",
                "fill_to_30m_after_open_bps",
                "fill_to_reg_close_bps",
                "fill_to_eod_bps",
                "markout_1m_bps",
                "markout_5m_bps",
                "req_fill_to_open_bps",
                "req_fill_to_reg_close_bps",
                "req_markout_1m_bps",
                "req_markout_5m_bps",
                "req_fill_to_eod_bps",
            )
            .sort(["publish_ts", "event_key", "policy_id"])
        )
    write_cached_parquet(joint_model_df, "joint_model_df")

joint_model_summary = (
    pl.DataFrame(
        {
            "selected_sample": [selected_policy_sample],
            "model_notional": [MODEL_NOTIONAL],
            "primary_exit": [PRIMARY_EXIT],
            "primary_value_col": [PRIMARY_VALUE_COL],
            "n_rows": [joint_model_df.height],
            "n_events": [joint_model_df.get_column("event_key").n_unique() if joint_model_df.height else 0],
            "n_policies": [joint_model_df.get_column("policy_id").n_unique() if joint_model_df.height else 0],
        }
    )
    if isinstance(joint_model_df, pl.DataFrame)
    else pl.DataFrame()
)
joint_model_preview = (
    joint_model_df.select(
        "event_key",
        "policy_id",
        "signal_bucket",
        PRIMARY_VALUE_COL,
        "filled_flag",
        "fill_fraction",
        "train_test_fold",
        "walk_forward_fold",
    )
    if joint_model_df.height
    else pl.DataFrame()
)
Loading cache: data/eda_04/joint_model_df.parquet
Code
show_df(joint_model_summary, n=1)
Loading ITables v2.7.3 from the internet... (need help?)
Code
show_df(joint_model_preview, n=20)
Loading ITables v2.7.3 from the internet... (need help?)
Code
if joint_model_df.height:
    rows_by_fold = (
        joint_model_df.group_by(["train_test_fold", "walk_forward_fold"])
        .agg(
            pl.len().alias("n_rows"),
            pl.col("event_key").n_unique().alias("n_events"),
        )
        .sort("walk_forward_fold")
    )
    rows_by_policy = (
        joint_model_df.group_by(["policy_id", "policy_family", "strategy", "latency_s", "offset_bps"])
        .agg(pl.len().alias("n_rows"))
        .sort(["strategy", "latency_s", "offset_bps"])
    )
    rows_by_signal_policy = (
        joint_model_df.group_by(["signal_bucket", "policy_id"])
        .agg(pl.len().alias("n_rows"))
        .sort(["signal_bucket", "n_rows"], descending=[False, True])
    )
    avg_target_by_fold = (
        joint_model_df.group_by(["train_test_fold", "walk_forward_fold"])
        .agg(
            pl.col(PRIMARY_VALUE_COL).mean().alias("avg_primary_value"),
            pl.col("filled_flag").mean().alias("fill_rate"),
        )
        .sort("walk_forward_fold")
    )
    coverage_cols = [
        PRIMARY_VALUE_COL,
        "primary_exit_bps",
        "primary_exit_pnl",
        "fill_to_reg_close_bps",
        "fill_to_eod_bps",
        "markout_5m_bps",
    ]
    target_coverage = pl.DataFrame(
        {
            "target": coverage_cols,
            "non_null_rows": [
                joint_model_df.filter(pl.col(col).is_not_null()).height
                for col in coverage_cols
            ],
            "coverage_rate": [
                joint_model_df.filter(pl.col(col).is_not_null()).height / joint_model_df.height
                for col in coverage_cols
            ],
        }
    )

    plot_fold_diagnostics(rows_by_fold, avg_target_by_fold, PRIMARY_VALUE_COL)
    plot_histogram(
        joint_model_df,
        PRIMARY_VALUE_COL,
        hue="policy_family",
        title=f"Distribution of {PRIMARY_VALUE_COL}",
        bins=40,
        stat="density",
    )

    corr_cols = [
        PRIMARY_VALUE_COL,
        "req_fill_to_reg_close_bps",
        "filled_flag",
        "fill_fraction",
        "adverse_fill_bps",
        "fill_to_reg_close_bps",
        "markout_5m_bps",
    ]
    corr_pdf = joint_model_df.select(corr_cols).drop_nulls().to_pandas()
    if not corr_pdf.empty:
        plot_correlation_snapshot(corr_pdf)
else:
    emit_warning("Dataset sanity checks skipped because joint_model_df is empty.")
    rows_by_fold = pl.DataFrame(
        schema={"train_test_fold": pl.Utf8, "walk_forward_fold": pl.Int64, "n_rows": pl.UInt32, "n_events": pl.UInt32}
    )
    rows_by_policy = pl.DataFrame(
        schema={
            "policy_id": pl.Utf8,
            "policy_family": pl.Utf8,
            "strategy": pl.Utf8,
            "latency_s": pl.Int64,
            "offset_bps": pl.Int64,
            "n_rows": pl.UInt32,
        }
    )
    rows_by_signal_policy = pl.DataFrame(schema={"signal_bucket": pl.Utf8, "policy_id": pl.Utf8, "n_rows": pl.UInt32})
    avg_target_by_fold = pl.DataFrame(
        schema={"train_test_fold": pl.Utf8, "walk_forward_fold": pl.Int64, "avg_primary_value": pl.Float64,
                "fill_rate": pl.Float64}
    )
    target_coverage = pl.DataFrame(
        schema={"target": pl.Utf8, "non_null_rows": pl.Int64, "coverage_rate": pl.Float64}
    )

Joint-model fold diagnostics

Code
show_df(rows_by_fold, n=20)
Loading ITables v2.7.3 from the internet... (need help?)
Code
show_df(rows_by_policy, n=40)
Loading ITables v2.7.3 from the internet... (need help?)
Code
show_df(rows_by_signal_policy, n=30)
Loading ITables v2.7.3 from the internet... (need help?)
Code
show_df(avg_target_by_fold, n=20)
Loading ITables v2.7.3 from the internet... (need help?)
Code
show_df(target_coverage, n=10)
Loading ITables v2.7.3 from the internet... (need help?)

8) Execution decision package

This final section turns the diagnostic notebook into a decision-ready package: a per-policy scoreboard, monthly stability checks, regime maps, frontier views, explicit stress tests, and a literal gating decision.

Code
decision_policy_enriched = load_cached_parquet("decision_policy_enriched")
if decision_policy_enriched is None:
    decision_policy_enriched = model_policy_selected.with_columns(
        pl.when(pl.col("filled_flag") == 1)
        .then((pl.col("fill_ts") - pl.col("publish_ts")).dt.total_seconds())
        .otherwise(None)
        .alias("fill_delay_s"),
        pl.when(pl.col("filled_flag") == 1)
        .then((pl.col("fill_ts").dt.time() >= time(9, 20)).cast(pl.Int8))
        .otherwise(None)
        .alias("fill_after_920_flag"),
        pl.when(pl.col("requested_notional") > 0)
        .then(pl.col("filled_notional") / pl.col("requested_notional"))
        .otherwise(0.0)
        .fill_nan(0.0)
        .fill_null(0.0)
        .alias("deployed_notional_share"),
        pl.coalesce([pl.col("fill_px"), pl.col("anchor_px")]).alias("decision_price"),
        pl.when(pl.col("anchor_px").is_not_null() & pl.col("m30_after_open_px").is_not_null())
        .then(((pl.col("m30_after_open_px") / pl.col("anchor_px")) - 1.0) * 10_000.0)
        .otherwise(None)
        .alias("available_alpha_30m_bps"),
        pl.when(pl.col("requested_notional") > 0)
        .then(pl.col("latency_s") * pl.col("rolling_dollar_volume_60s_at_decision") / 60.0 / pl.col("requested_notional"))
        .otherwise(None)
        .alias("turnover_since_release_x"),
    ).with_columns(
        pl.col("decision_price").cut(
            breaks=[5.0, 10.0, 25.0, 50.0, 100.0],
            labels=["<=5", "5-10", "10-25", "25-50", "50-100", ">100"],
        ).alias("price_bucket"),
        pl.when(pl.col("publish_ts").dt.time() <= time(8, 0))
        .then(pl.lit("<=08:00"))
        .when(pl.col("publish_ts").dt.time() <= time(9, 30))
        .then(pl.lit("08:00-09:30"))
        .when(pl.col("publish_ts").dt.time() <= time(11, 0))
        .then(pl.lit("09:30-11:00"))
        .when(pl.col("publish_ts").dt.time() <= time(14, 0))
        .then(pl.lit("11:00-14:00"))
        .when(pl.col("publish_ts").dt.time() <= time(16, 0))
        .then(pl.lit("14:00-16:00"))
        .otherwise(pl.lit(">16:00"))
        .alias("release_time_bucket"),
        pl.col("rolling_dollar_volume_60s_at_decision").cut(
            breaks=[50_000, 100_000, 250_000, 500_000],
            labels=["<=50k", "50k-100k", "100k-250k", "250k-500k", ">500k"],
        ).alias("update_density_bucket"),
        pl.col("turnover_since_release_x").cut(
            breaks=[1.0, 2.0, 5.0, 10.0],
            labels=["<=1x", "1x-2x", "2x-5x", "5x-10x", ">10x"],
        ).alias("turnover_since_release_bucket"),
        pl.when(pl.col("available_alpha_30m_bps").abs() >= 5.0)
        .then((pl.col("req_fill_to_30m_after_open_bps") / pl.col("available_alpha_30m_bps")).clip(-2.0, 2.0))
        .otherwise(None)
        .alias("alpha_capture_ratio_30m"),
        (pl.col("primary_exit_pnl") - pl.col("filled_notional") * 5.0 / 10_000.0).alias("stress_worse_fill_pnl"),
        (
                pl.col("primary_exit_pnl")
                - pl.col("filled_notional") * (3.0 + pl.col("quote_age_ms_at_decision") / 1000.0) / 10_000.0
        ).alias("stress_worse_quote_pnl"),
        pl.when(pl.col("quote_age_ms_at_decision") <= 750)
        .then(pl.col("primary_exit_pnl"))
        .otherwise(0.0)
        .alias("stress_no_stale_quote_pnl"),
    )
    write_cached_parquet(decision_policy_enriched, "decision_policy_enriched")
policy_scoreboard_schema = {
    "policy_id": pl.Utf8,
    "policy_family": pl.Utf8,
    "strategy": pl.Utf8,
    "latency_s": pl.Int64,
    "offset_bps": pl.Int64,
    "trade_count": pl.UInt32,
    "fill_rate": pl.Float64,
    "deployed_notional_pct": pl.Float64,
    "avg_fill_time_s": pl.Float64,
    "pct_fills_after_920": pl.Float64,
    "avg_realized_entry_slippage_bps": pl.Float64,
    "avg_pnl_open": pl.Float64,
    "avg_pnl_30m_after_open": pl.Float64,
    "avg_pnl_reg_close": pl.Float64,
    "avg_pnl_eod": pl.Float64,
    "avg_primary_exit_pnl": pl.Float64,
    "median_pnl": pl.Float64,
    "win_rate_pct": pl.Float64,
    "p10_pnl": pl.Float64,
    "p05_pnl": pl.Float64,
    "worst_trade_pnl": pl.Float64,
    "avg_alpha_capture_ratio": pl.Float64,
    "avg_req_fill_to_30m_after_open_bps": pl.Float64,
    "p10_req_fill_to_30m_after_open_bps": pl.Float64,
    "price_quality_bps": pl.Float64,
    "slightly_worse_fills_pnl": pl.Float64,
    "worse_quote_handling_pnl": pl.Float64,
    "tighter_no_stale_quote_pnl": pl.Float64,
    "later_start_pnl": pl.Float64,
    "stress_test_pnl": pl.Float64,
}

monthly_top_policy_schema = {
    "policy_id": pl.Utf8,
    "policy_label": pl.Utf8,
    "strategy": pl.Utf8,
    "latency_s": pl.Int64,
    "offset_bps": pl.Int64,
    "train_test_fold": pl.Utf8,
    "monthly_expectancy_pnl": pl.Float64,
    "monthly_expectancy_bps": pl.Float64,
    "monthly_fill_rate": pl.Float64,
    "monthly_trade_count": pl.UInt32,
}

regime_policy_schema = {
    "policy_id": pl.Utf8,
    "policy_label": pl.Utf8,
    "bucket": pl.Utf8,
    "trade_count": pl.UInt32,
    "fill_rate": pl.Float64,
    "avg_primary_exit_pnl": pl.Float64,
    "avg_alpha_capture_ratio": pl.Float64,
}

if decision_policy_enriched.height == 0:
    policy_scoreboard = pl.DataFrame(schema=policy_scoreboard_schema)
    monthly_top_policy = pl.DataFrame(schema=monthly_top_policy_schema)
    top_policy_scoreboard = policy_scoreboard
    top_policy_ids = []
    regime_dimensions = [
        ("spread_bucket", "Spread bucket"),
        ("quote_age_bucket", "Quote freshness bucket"),
        ("update_density_bucket", "Update density bucket"),
        ("turnover_since_release_bucket", "Turnover since release bucket"),
        ("price_bucket", "Price bucket"),
        ("release_time_bucket", "Release time bucket"),
    ]
    empty_regime = pl.DataFrame(
        schema={
            "bucket": pl.Utf8,
            "trade_count": pl.UInt32,
            "fill_rate": pl.Float64,
            "avg_primary_exit_pnl": pl.Float64,
            "avg_alpha_capture_ratio": pl.Float64,
        }
    )
    regime_breakdowns = {bucket_col: empty_regime for bucket_col, _ in regime_dimensions}
    regime_policy_breakdowns = {bucket_col: pl.DataFrame(schema=regime_policy_schema) for bucket_col, _ in
                                regime_dimensions}
    regime_focus_lines = []
else:
    policy_group_cols = ["policy_id", "policy_family", "strategy", "latency_s", "offset_bps"]
    policy_scoreboard = (
        decision_policy_enriched.group_by(policy_group_cols)
        .agg(
            pl.len().alias("trade_count"),
            pl.col("filled_flag").mean().alias("fill_rate"),
            (100.0 * pl.col("deployed_notional_share").mean()).alias("deployed_notional_pct"),
            pl.col("fill_delay_s").mean().alias("avg_fill_time_s"),
            (100.0 * pl.col("fill_after_920_flag").cast(pl.Float64).mean()).alias("pct_fills_after_920"),
            pl.col("adverse_fill_bps").mean().alias("avg_realized_entry_slippage_bps"),
            pl.col("fill_to_open_pnl").mean().alias("avg_pnl_open"),
            pl.col("fill_to_30m_after_open_pnl").mean().alias("avg_pnl_30m_after_open"),
            pl.col("fill_to_reg_close_pnl").mean().alias("avg_pnl_reg_close"),
            pl.col("fill_to_eod_pnl").mean().alias("avg_pnl_eod"),
            pl.col("primary_exit_pnl").mean().alias("avg_primary_exit_pnl"),
            pl.col("primary_exit_pnl").median().alias("median_pnl"),
            (100.0 * pl.col("primary_exit_pnl").gt(0.0).cast(pl.Float64).mean()).alias("win_rate_pct"),
            pl.col("primary_exit_pnl").quantile(0.10).alias("p10_pnl"),
            pl.col("primary_exit_pnl").quantile(0.05).alias("p05_pnl"),
            pl.col("primary_exit_pnl").min().alias("worst_trade_pnl"),
            pl.col("alpha_capture_ratio_30m").mean().alias("avg_alpha_capture_ratio"),
            pl.col("req_fill_to_30m_after_open_bps").mean().alias("avg_req_fill_to_30m_after_open_bps"),
            pl.col("req_fill_to_30m_after_open_bps").quantile(0.10).alias("p10_req_fill_to_30m_after_open_bps"),
        )
    )

    stress_policy_summary = (
        decision_policy_enriched.group_by(policy_group_cols)
        .agg(
            pl.col("stress_worse_fill_pnl").mean().alias("slightly_worse_fills_pnl"),
            pl.col("stress_worse_quote_pnl").mean().alias("worse_quote_handling_pnl"),
            pl.col("stress_no_stale_quote_pnl").mean().alias("tighter_no_stale_quote_pnl"),
        )
    )

    later_start_lookup = policy_scoreboard.select(
        "strategy",
        "offset_bps",
        pl.col("latency_s").alias("later_latency_s"),
        pl.col("avg_primary_exit_pnl").alias("later_start_pnl"),
    )

    stress_policy_summary = (
        stress_policy_summary.join(
            policy_scoreboard.select(policy_group_cols + ["avg_primary_exit_pnl"]),
            on=policy_group_cols,
            how="left",
        )
        .with_columns(
            pl.when(pl.col("latency_s") == LATENCIES_S[0])
            .then(pl.lit(LATENCIES_S[min(1, len(LATENCIES_S) - 1)]))
            .when(pl.col("latency_s") == LATENCIES_S[1])
            .then(pl.lit(LATENCIES_S[min(2, len(LATENCIES_S) - 1)]))
            .when(pl.col("latency_s") == LATENCIES_S[2])
            .then(pl.lit(LATENCIES_S[min(3, len(LATENCIES_S) - 1)]))
            .otherwise(pl.lit(LATENCIES_S[-1]))
            .alias("later_latency_s")
        )
        .join(
            later_start_lookup,
            on=["strategy", "offset_bps", "later_latency_s"],
            how="left",
        )
        .with_columns(
            pl.coalesce([pl.col("later_start_pnl"), pl.col("avg_primary_exit_pnl")]).alias("later_start_pnl"),
            pl.min_horizontal(
                "slightly_worse_fills_pnl",
                "worse_quote_handling_pnl",
                "tighter_no_stale_quote_pnl",
                "later_start_pnl",
            ).alias("stress_test_pnl"),
        )
        .select(policy_group_cols + [
            "slightly_worse_fills_pnl",
            "worse_quote_handling_pnl",
            "tighter_no_stale_quote_pnl",
            "later_start_pnl",
            "stress_test_pnl",
        ])
    )

    policy_scoreboard = (
        policy_scoreboard.join(stress_policy_summary, on=policy_group_cols, how="left")
        .with_columns((-pl.col("avg_realized_entry_slippage_bps")).alias("price_quality_bps"))
        .sort(["stress_test_pnl", "avg_primary_exit_pnl", "fill_rate"], descending=[True, True, True])
    )

    top_policy_count = min(5, policy_scoreboard.height)
    top_policy_scoreboard = policy_scoreboard.head(top_policy_count)
    top_policy_ids = top_policy_scoreboard.get_column("policy_id").to_list()
    top_policy_meta = ensure_policy_label(top_policy_scoreboard).select(
        "policy_id",
        "policy_label",
        "strategy",
        "latency_s",
        "offset_bps",
    ) if top_policy_scoreboard.height else pl.DataFrame(schema={
        "policy_id": pl.Utf8,
        "policy_label": pl.Utf8,
        "strategy": pl.Utf8,
        "latency_s": pl.Int64,
        "offset_bps": pl.Int64,
    })

    if joint_model_df.height and top_policy_ids:
        monthly_top_policy = (
            joint_model_df.filter(pl.col("policy_id").is_in(top_policy_ids))
            .group_by(["policy_id", "train_test_fold"])
            .agg(
                pl.col("primary_exit_pnl").mean().alias("monthly_expectancy_pnl"),
                pl.col(PRIMARY_VALUE_COL).mean().alias("monthly_expectancy_bps"),
                pl.col("filled_flag").mean().alias("monthly_fill_rate"),
                pl.len().alias("monthly_trade_count"),
            )
            .join(top_policy_meta, on="policy_id", how="left")
            .sort(["policy_label", "train_test_fold"])
            .select(
                "policy_id",
                "policy_label",
                "strategy",
                "latency_s",
                "offset_bps",
                "train_test_fold",
                "monthly_expectancy_pnl",
                "monthly_expectancy_bps",
                "monthly_fill_rate",
                "monthly_trade_count",
            )
        )
    else:
        monthly_top_policy = pl.DataFrame(schema=monthly_top_policy_schema)

    regime_dimensions = [
        ("spread_bucket", "Spread bucket"),
        ("quote_age_bucket", "Quote freshness bucket"),
        ("update_density_bucket", "Update density bucket"),
        ("turnover_since_release_bucket", "Turnover since release bucket"),
        ("price_bucket", "Price bucket"),
        ("release_time_bucket", "Release time bucket"),
    ]
    decision_top_policy = decision_policy_enriched.filter(pl.col("policy_id").is_in(top_policy_ids))
    regime_breakdowns = {}
    regime_policy_breakdowns = {}
    regime_focus_lines = []
    for bucket_col, bucket_label in regime_dimensions:
        if decision_top_policy.height == 0:
            bucket_df = pl.DataFrame(
                schema={
                    "bucket": pl.Utf8,
                    "trade_count": pl.UInt32,
                    "fill_rate": pl.Float64,
                    "avg_primary_exit_pnl": pl.Float64,
                    "avg_alpha_capture_ratio": pl.Float64,
                }
            )
            bucket_policy_df = pl.DataFrame(schema=regime_policy_schema)
        else:
            bucket_df = (
                decision_top_policy.group_by(bucket_col)
                .agg(
                    pl.len().alias("trade_count"),
                    pl.col("filled_flag").mean().alias("fill_rate"),
                    pl.col("primary_exit_pnl").mean().alias("avg_primary_exit_pnl"),
                    pl.col("alpha_capture_ratio_30m").mean().alias("avg_alpha_capture_ratio"),
                )
                .rename({bucket_col: "bucket"})
                .sort(["avg_primary_exit_pnl", "fill_rate"], descending=[True, True])
            )
            bucket_policy_df = (
                decision_top_policy.group_by(["policy_id", bucket_col])
                .agg(
                    pl.len().alias("trade_count"),
                    pl.col("filled_flag").mean().alias("fill_rate"),
                    pl.col("primary_exit_pnl").mean().alias("avg_primary_exit_pnl"),
                    pl.col("alpha_capture_ratio_30m").mean().alias("avg_alpha_capture_ratio"),
                )
                .rename({bucket_col: "bucket"})
                .join(top_policy_meta.select("policy_id", "policy_label"), on="policy_id", how="left")
                .sort(["bucket", "policy_label"])
                .select(
                    "policy_id",
                    "policy_label",
                    "bucket",
                    "trade_count",
                    "fill_rate",
                    "avg_primary_exit_pnl",
                    "avg_alpha_capture_ratio",
                )
            )
        regime_breakdowns[bucket_col] = bucket_df
        regime_policy_breakdowns[bucket_col] = bucket_policy_df
        if bucket_df.height:
            focus = bucket_df.row(0, named=True)
            if focus["avg_primary_exit_pnl"] is not None:
                regime_focus_lines.append(
                    f"{bucket_label}: {focus['bucket']} (avg pnl={focus['avg_primary_exit_pnl']:.2f}, fill={focus['fill_rate']:.2%})"
                )

write_cached_parquet(policy_scoreboard, "policy_scoreboard")
Loading cache: data/eda_04/decision_policy_enriched.parquet
Saved cache: data/eda_04/policy_scoreboard.parquet
Loading ITables v2.7.3 from the internet... (need help?)

8.1) Core scoreboard

Code
display.display(display.Markdown(
    f"""Policy scoreboard at MODEL_NOTIONAL=${MODEL_NOTIONAL:,} on the {selected_policy_sample} modeling sample."""
))

Policy scoreboard at MODEL_NOTIONAL=$2,500 on the clean modeling sample.

Code
show_df(
    policy_scoreboard.select(
        "policy_id",
        "strategy",
        "latency_s",
        "offset_bps",
        "trade_count",
        "fill_rate",
        "deployed_notional_pct",
        "avg_fill_time_s",
        "pct_fills_after_920",
        "avg_realized_entry_slippage_bps",
        "avg_req_fill_to_30m_after_open_bps",
        "p10_req_fill_to_30m_after_open_bps",
        "avg_alpha_capture_ratio",
        "median_pnl",
        "win_rate_pct",
        "p10_pnl",
        "p05_pnl",
        "worst_trade_pnl",
        "price_quality_bps",
        "stress_test_pnl",
    ),
    n=40,
)
Loading ITables v2.7.3 from the internet... (need help?)

8.2) Month-by-month breakdown

Monthly stability heatmaps for the current top stress-adjusted policies, plus the underlying table.

Code
plot_monthly_policy_heatmap(monthly_top_policy, top_policy_scoreboard)
show_df(monthly_top_policy, n=60)

Monthly stability of top policies

Loading ITables v2.7.3 from the internet... (need help?)

8.3) Regime breakdown

Per-policy heatmaps first, then the combined regime tables used for the final gating readout.

Code
plot_regime_policy_metric_grid(
    regime_policy_breakdowns,
    regime_dimensions,
    value_col="avg_primary_exit_pnl",
    title="Regime map: average primary-exit PnL by top policy",
    cmap="RdYlGn",
    fmt=".2f",
    center=0.0,
)
plot_regime_policy_metric_grid(
    regime_policy_breakdowns,
    regime_dimensions,
    value_col="fill_rate",
    title="Regime map: fill rate by top policy",
    cmap="YlGnBu",
    fmt=".0%",
    vmin=0.0,
    vmax=1.0,
)
for bucket_col, bucket_label in regime_dimensions:
    display.display(display.Markdown(f"#### {bucket_label}"))
    display.display(regime_breakdowns[bucket_col].head(12))

Regime map: average primary-exit PnL by top policy

Regime map: fill rate by top policy

Spread bucket

Loading ITables v2.7.3 from the internet... (need help?)

Quote freshness bucket

Loading ITables v2.7.3 from the internet... (need help?)

Update density bucket

Loading ITables v2.7.3 from the internet... (need help?)

Turnover since release bucket

Loading ITables v2.7.3 from the internet... (need help?)

Price bucket

Loading ITables v2.7.3 from the internet... (need help?)

Release time bucket

Loading ITables v2.7.3 from the internet... (need help?)

8.4) Frontier charts

Primary-target, downside-aware, and stress frontiers for the full policy set, followed by alpha-capture as a supporting view.

Code
if policy_scoreboard.height:
    frontier_scoreboard = ensure_policy_label(policy_scoreboard)
    frontier_label_col = "policy_label" if "policy_label" in frontier_scoreboard.columns else "policy_id"

    primary_frontier = add_frontier_flag(
        frontier_scoreboard,
        x_col="fill_rate",
        y_col="avg_req_fill_to_30m_after_open_bps",
    )
    plot_policy_frontier(
        primary_frontier,
        x_col="fill_rate",
        y_col="avg_req_fill_to_30m_after_open_bps",
        color_col="stress_test_pnl",
        label_col=frontier_label_col,
        title="Frontier: fill rate vs requested EV to 30m after open",
        cmap="RdYlGn",
        cbar_label="Stress test pnl",
    )

    downside_frontier = add_frontier_flag(
        frontier_scoreboard,
        x_col="fill_rate",
        y_col="p10_req_fill_to_30m_after_open_bps",
    )
    plot_policy_frontier(
        downside_frontier,
        x_col="fill_rate",
        y_col="p10_req_fill_to_30m_after_open_bps",
        color_col="avg_req_fill_to_30m_after_open_bps",
        label_col=frontier_label_col,
        title="Downside frontier: fill rate vs 10th pct requested EV",
        cmap="RdYlGn",
        cbar_label="Average requested EV",
    )

    stress_frontier = add_frontier_flag(
        frontier_scoreboard,
        x_col="fill_rate",
        y_col="stress_test_pnl",
    )
    plot_policy_frontier(
        stress_frontier,
        x_col="fill_rate",
        y_col="stress_test_pnl",
        color_col="avg_req_fill_to_30m_after_open_bps",
        label_col=frontier_label_col,
        title="Stress frontier: fill rate vs stress-test pnl",
        cmap="RdYlGn",
        cbar_label="Average requested EV",
    )

    alpha_frontier = add_frontier_flag(
        frontier_scoreboard,
        x_col="fill_rate",
        y_col="avg_alpha_capture_ratio",
    )
    plot_policy_frontier(
        alpha_frontier,
        x_col="fill_rate",
        y_col="avg_alpha_capture_ratio",
        color_col="price_quality_bps",
        label_col=frontier_label_col,
        title="Support view: fill rate vs alpha-capture ratio",
        cmap="viridis_r",
        cbar_label="Price quality (bps)",
    )

8.5) Stress table

Top policies under slightly worse fills, later start, worse quote handling, and tighter no-stale-quote rules.”

Code
plot_stress_comparison(top_policy_scoreboard)
show_df(
    top_policy_scoreboard.select(
        "policy_id",
        "strategy",
        "latency_s",
        "offset_bps",
        "avg_primary_exit_pnl",
        "slightly_worse_fills_pnl",
        "later_start_pnl",
        "worse_quote_handling_pnl",
        "tighter_no_stale_quote_pnl",
        "stress_test_pnl",
    ),
    n=10,
)

Stress comparison for top policies

Loading ITables v2.7.3 from the internet... (need help?)

8.6) Final gating decision

The notebook ends with a literal gate so the research can be advanced, narrowed to regimes, held back, or stopped.

Code
if policy_scoreboard.height == 0:
    final_gating_decision = "Do not advance yet"
    final_gating_reason = "The selected modeling sample has no policy rows after the current filters."
    final_gating_regimes = []
else:
    best_policy = top_policy_scoreboard.row(0, named=True)
    best_monthly = monthly_top_policy.filter(
        pl.col("policy_id") == best_policy["policy_id"]) if monthly_top_policy.height else pl.DataFrame()
    positive_month_share = (
        best_monthly.select((pl.col("monthly_expectancy_pnl") > 0.0).cast(pl.Float64).mean()).item()
        if best_monthly.height
        else 0.0
    )
    robust_positive = (best_policy["stress_test_pnl"] or 0.0) > 0.0
    baseline_positive = (best_policy["avg_primary_exit_pnl"] or 0.0) > 0.0
    fill_ok = (best_policy["fill_rate"] or 0.0) >= 0.85
    monthly_ok = positive_month_share >= 0.50

    if robust_positive and fill_ok and monthly_ok:
        final_gating_decision = "Proceed"
    elif baseline_positive and ((best_policy["stress_test_pnl"] or 0.0) > -5.0 or monthly_ok):
        final_gating_decision = "Proceed, but only in these regimes"
    elif baseline_positive:
        final_gating_decision = "Do not advance yet"
    else:
        final_gating_decision = "Abandon this direction"

    final_gating_reason = (
        f"Best policy {best_policy['policy_id']} has avg primary-exit pnl={best_policy['avg_primary_exit_pnl']:.2f}, "
        f"stress floor={best_policy['stress_test_pnl']:.2f}, fill rate={best_policy['fill_rate']:.2%}, "
        f"and positive-month share={positive_month_share:.2%}."
    )
    final_gating_regimes = regime_focus_lines[:4]

summary_lines = [
    "## Final gating decision",
    "",
    final_gating_decision,
    "",
    final_gating_reason,
]
if final_gating_regimes:
    summary_lines.extend(["", "Focus regimes:"] + [f"- {line}" for line in final_gating_regimes])
display.display(display.Markdown("\n".join(summary_lines)))

Final gating decision

Abandon this direction

Best policy vwap_minus_1std|lat=90|off=0|n=2500 has avg primary-exit pnl=-21.99, stress floor=-109.76, fill rate=51.47%, and positive-month share=0.00%.

Focus regimes: - Spread bucket: 5-10 (avg pnl=-6.73, fill=71.06%) - Quote freshness bucket: 1500-3000 (avg pnl=-14.33, fill=67.47%) - Update density bucket: <=50k (avg pnl=-18.13, fill=48.13%) - Turnover since release bucket: <=1x (avg pnl=-12.54, fill=40.94%)

Appendix - diagnostic detail

These charts keep secondary diagnostics available without breaking the main narrative flow, including execution-assumption comparisons, the requested-notional capacity sweep, and setup diagnostics.

Appendix: capacity and setup diagnostics

Code
execution_assumption_summary = pl.concat([
    model_policy_selected.group_by("policy_family").agg(
        pl.col("filled_flag").mean().alias("fill_rate"),
        pl.col("adverse_fill_bps").mean().alias("avg_adverse_fill_bps"),
        pl.col("req_fill_to_30m_after_open_bps").mean().alias("avg_req_fill_to_30m_after_open_bps"),
    ).with_columns(pl.lit("Conservative ask").alias("assumption")),
    model_policy_selected.group_by("policy_family").agg(
        pl.col("inside25_filled_flag").mean().alias("fill_rate"),
        pl.col("inside25_adverse_fill_bps").mean().alias("avg_adverse_fill_bps"),
        pl.col("inside25_req_fill_to_30m_after_open_bps").mean().alias("avg_req_fill_to_30m_after_open_bps"),
    ).with_columns(pl.lit("Ask - 25% spread").alias("assumption")),
], how="vertical_relaxed").with_columns(
    pl.col("policy_family").map_elements(strategy_display_name, return_dtype=pl.Utf8).alias("policy_family_label")
).sort(["policy_family_label", "assumption"])
display.display(display.Markdown("### Appendix: execution-assumption comparison"))
plot_execution_assumption_comparison(execution_assumption_summary)
show_df(execution_assumption_summary, n=12)
plot_capacity_overview(capacity_curves)
show_df(capacity_curves, n=20)
plot_feature_run_mix(selected_feature_runs)
plot_histogram(
    event_state,
    "vwap_dev_bps",
    hue="daypart",
    title="Appendix: VWAP deviation at decision by premarket daypart",
    bins=30,
    stat="density",
)
plot_context_diagnostics(policy)

Appendix: execution-assumption comparison

Appendix: execution-assumption comparison by policy family

Capacity overview

Appendix: requested EV by context bucket

Code
plot_transition_heatmaps_by_family(transition_summary)

Appendix: transition rates by policy family