Code
%load_ext autoreload
%autoreload 2April 6, 2026
May 15, 2026
This notebook evaluates policy-conditioned execution outcomes after second-stamped news and prepares the supervised table used in the next joint-modeling stage.
The notebook’s main outputs are:
This notebook is the last pre-modeling notebook. It evaluates policy-conditioned outcomes. It constructs the supervised learning table for later joint modeling. It does not attempt production-grade model fitting.
Assumptions in this prototype:
news.articles joined to news.articles_extracted_tickers_backfill_stageReal inputs in this notebook:
kg.article_security_sentiment_v features using the pipeline article_idToggleable for methodology development:
Core questions in this notebook:
import hashlib
from pathlib import Path
from datetime import date, datetime, time, timedelta
from typing import Iterable
from trading_research.pr_graph.intraday_news_execution_market_data import (
_finalize_outcomes,
build_real_event_state_and_outcomes,
normalize_loaded_article_events,
)
from trading_research.pr_graph.intraday_news_research_io import (
load_article_events,
load_article_security_sentiment,
)
ARTICLES_TABLE = os.getenv("NEWS_ARTICLES_TABLE", "news.articles")
ARTICLE_TICKERS_TABLE = os.getenv(
"NEWS_ARTICLE_TICKERS_TABLE",
"news.articles_extracted_tickers_backfill_stage",
)
NEWS_FEATURES_TABLE = os.getenv(
"ARTICLE_SECURITY_SENTIMENT_TABLE",
"kg.article_security_sentiment_v",
)
def ch_query_rows(sql: str, *, external_tables=None) -> pl.DataFrame:
return query_and_build_pl_df(
sql.strip().rstrip(";"),
external_tables=external_tables,
)
def show_df(df: pl.DataFrame, n: int = 10) -> pl.DataFrame:
# print(df.schema)
# print("rows:", df.height)
# return df.head(n)
return df
def emit_warning(message: str) -> None:
print(f"WARNING: {message}")
def stable_u01(*parts: object) -> float:
key = "||".join("" if part is None else str(part) for part in parts)
digest = hashlib.md5(key.encode("utf-8")).hexdigest()
return int(digest[:12], 16) / float(16 ** 12)
def stable_normal(*parts: object, scale: float = 1.0) -> float:
u1 = max(stable_u01(*parts, "u1"), 1e-9)
u2 = stable_u01(*parts, "u2")
return scale * math.sqrt(-2.0 * math.log(u1)) * math.cos(2.0 * math.pi * u2)
def sha256_hex(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def make_article_id(url: str, publish_ts: datetime, download_ts: datetime) -> str:
return sha256_hex(f"CH|{url}|{publish_ts}|{download_ts}")
def segment_label(ts: datetime) -> str:
tod = ts.time()
if tod < time(9, 30):
return "premarket"
if tod < time(16, 0):
return "regular"
return "after_hours"
def segment_end(ts: datetime) -> datetime:
if ts.time() < time(9, 30):
return ts.replace(hour=9, minute=30, second=0, microsecond=0)
if ts.time() < time(16, 0):
return ts.replace(hour=16, minute=0, second=0, microsecond=0)
return ts.replace(hour=20, minute=0, second=0, microsecond=0)
def regular_open(ts: datetime) -> datetime:
return ts.replace(hour=9, minute=30, second=0, microsecond=0)
def daypart_bucket(ts: datetime) -> str:
tod = ts.time()
if tod < time(8, 0):
return "premarket_before_8"
if tod < time(9, 30):
return "premarket_8_to_9_30"
if tod < time(11, 0):
return "regular_before_11"
if tod < time(14, 0):
return "regular_11_to_14"
if tod < time(16, 30):
return "regular_after_14"
return "after_hours"
def derive_event_type(title: str | None, tags: Iterable[str] | None) -> str:
title_text = (title or "").lower()
tag_text = " ".join(tags or []).lower()
haystack = f"{title_text} {tag_text}"
rules = [
("earnings_like", ["earnings", "guidance", "quarter", "results"]),
("contract_order", ["contract", "order", "award", "purchase", "agreement"]),
("financing", ["offering", "financing", "debt", "equity", "placement"]),
("product", ["launch", "product", "patent", "approval", "trial"]),
("regulatory", ["fda", "sec", "regulatory", "compliance", "permit"]),
("management", ["ceo", "cfo", "board", "director", "appoint"]),
]
for label, keywords in rules:
if any(keyword in haystack for keyword in keywords):
return label
return "other"
def ols_regression(
df: pl.DataFrame,
numeric_cols: list[str],
categorical_cols: list[str],
target_col: str,
rng_seed: int = 7,
) -> tuple[pl.DataFrame, pl.DataFrame]:
work = df.select(numeric_cols + categorical_cols + [target_col]).drop_nulls()
if work.height < 30:
return pl.DataFrame(), pl.DataFrame()
design = work.select(numeric_cols + categorical_cols).to_dummies(columns=categorical_cols)
feature_cols = design.columns
X = design.to_numpy().astype(float)
y = work[target_col].to_numpy().astype(float).reshape(-1, 1)
rng = np.random.default_rng(rng_seed)
order = rng.permutation(len(X))
split = max(int(len(X) * 0.7), 20)
train_idx = order[:split]
test_idx = order[split:]
if len(test_idx) < 10:
return pl.DataFrame(), pl.DataFrame()
X_train = X[train_idx]
X_test = X[test_idx]
y_train = y[train_idx]
y_test = y[test_idx]
mean = X_train.mean(axis=0)
std = X_train.std(axis=0)
std[std == 0.0] = 1.0
X_train_z = (X_train - mean) / std
X_test_z = (X_test - mean) / std
beta = np.linalg.lstsq(
np.column_stack([np.ones(len(X_train_z)), X_train_z]),
y_train,
rcond=None,
)[0].reshape(-1)
def _predict(matrix: np.ndarray) -> np.ndarray:
return np.column_stack([np.ones(len(matrix)), matrix]) @ beta
train_pred = _predict(X_train_z)
test_pred = _predict(X_test_z)
def _r2(actual: np.ndarray, pred: np.ndarray) -> float:
actual = actual.reshape(-1)
pred = pred.reshape(-1)
denom = np.sum((actual - actual.mean()) ** 2)
if denom <= 0:
return float("nan")
return 1.0 - np.sum((actual - pred) ** 2) / denom
metrics = pl.DataFrame(
{
"target": [target_col],
"n_obs": [int(len(work))],
"train_r2": [_r2(y_train, train_pred)],
"test_r2": [_r2(y_test, test_pred)],
}
)
coefs = pl.DataFrame(
{
"feature": ["intercept"] + feature_cols,
"coef": beta,
"abs_coef": np.abs(beta),
}
).sort("abs_coef", descending=True)
return metrics, coefs# --- Plot helpers ---
from trading_research.pr_graph.intraday_news_execution_viz_helpers import (
add_frontier_flag,
ensure_policy_label,
label_policy_family,
label_policy_id,
plot_capacity_overview,
plot_context_diagnostics,
plot_correlation_snapshot,
plot_execution_assumption_comparison,
plot_family_signal_overview,
plot_feature_quality_overview,
plot_feature_run_mix,
plot_fold_diagnostics,
plot_histogram,
plot_market_state_overview,
plot_monthly_policy_heatmap,
plot_policy_frontier,
plot_regime_policy_metric_grid,
plot_return_share_decomposition,
plot_sample_overview,
plot_strategy_metric_grid,
plot_stress_comparison,
plot_target_horizon_profile,
plot_transition_heatmaps_by_family,
policy_variant_label,
setup_lets_plot_darcula,
strategy_display_name,
summarize_split,
)
# setup_lets_plot_darcula()# --- Research parameters ---
LOOKBACK_DAYS = 30
MAX_EVENTS = 1_000_000
RNG_SEED = 7
LATENCIES_S = [30, 40, 50, 60, 75, 90]
TRAILING_OFFSETS_BPS = [5, 10, 25, 50]
ASK_CAP_OFFSETS_BPS = [0, 5, 10]
REQUESTED_NOTIONALS = [1_000, 2_500, 5_000, 10_000]
POLICY_NOTIONAL = 5_000
MODEL_NOTIONAL = 2_500
PRIMARY_EXIT = "fill_to_30m_after_open"
PRIMARY_VALUE_COL = "req_fill_to_30m_after_open_bps"
SIGNAL_BUCKETS = 3
FRONTIER_SAMPLE = "clean"
MIN_MODEL_ROWS = 30
MIN_ROWS_PER_SIGNAL_BUCKET = 25
MIN_ROWS_PER_POLICY = 25
LOW_FEATURE_COVERAGE_RATE = 0.05
LOW_FEATURE_COVERED_EVENTS = 10
MODEL_ACTION_COLS = ["strategy", "latency_s", "offset_bps"]
GROUP_COL_EVENT = ["article_id", "ticker"]
ENTRY_STRATEGIES = [
"trailing_mid",
"buy_at_ask_cap",
"vwap",
"vwap_minus_1std",
]
OVERLAP_WINDOW_S = 30 * 60
FWD_VOLUME_PARTICIPATION = 0.0125
START_DATE = "2024-05-01"
END_DATE = "2025-01-01"
EVENT_TIMEZONE = "America/New_York"
MARKET_DATA_TIMEZONE = "UTC"
MARKET_DATA_MODE = "real"
MARKET_DATA_BACKEND = "clickhouse_flight"
CLICKHOUSE_FLIGHT_URI = "grpc://localhost:9006"
CLICKHOUSE_FLIGHT_USER = "default"
CLICKHOUSE_FLIGHT_PASSWORD = "trade"
POLYGON_TRADES_TABLE = "polygon.trades_raw_v2"
POLYGON_QUOTES_TABLE = "polygon.quotes_raw"
REAL_MARKET_MAX_DATE_EXCL = "2025-11-01"
display.JSON({
"LOOKBACK_DAYS": LOOKBACK_DAYS,
"MAX_EVENTS": MAX_EVENTS,
"LATENCIES_S": LATENCIES_S,
"TRAILING_OFFSETS_BPS": TRAILING_OFFSETS_BPS,
"ASK_CAP_OFFSETS_BPS": ASK_CAP_OFFSETS_BPS,
"REQUESTED_NOTIONALS": REQUESTED_NOTIONALS,
"POLICY_NOTIONAL": POLICY_NOTIONAL,
"MODEL_NOTIONAL": MODEL_NOTIONAL,
"PRIMARY_EXIT": PRIMARY_EXIT,
"PRIMARY_VALUE_COL": PRIMARY_VALUE_COL,
"SIGNAL_BUCKETS": SIGNAL_BUCKETS,
"FRONTIER_SAMPLE": FRONTIER_SAMPLE,
"MIN_ROWS_PER_SIGNAL_BUCKET": MIN_ROWS_PER_SIGNAL_BUCKET,
"MIN_ROWS_PER_POLICY": MIN_ROWS_PER_POLICY,
"MODEL_ACTION_COLS": MODEL_ACTION_COLS,
"GROUP_COL_EVENT": GROUP_COL_EVENT,
"EVENT_TIMEZONE": EVENT_TIMEZONE,
"MARKET_DATA_TIMEZONE": MARKET_DATA_TIMEZONE,
"MARKET_DATA_MODE": MARKET_DATA_MODE,
"MARKET_DATA_BACKEND": MARKET_DATA_BACKEND,
"CLICKHOUSE_FLIGHT_URI": CLICKHOUSE_FLIGHT_URI,
"CLICKHOUSE_FLIGHT_USER": CLICKHOUSE_FLIGHT_USER,
"POLYGON_TRADES_TABLE": POLYGON_TRADES_TABLE,
"POLYGON_QUOTES_TABLE": POLYGON_QUOTES_TABLE,
"REAL_MARKET_MAX_DATE_EXCL": REAL_MARKET_MAX_DATE_EXCL,
})<IPython.core.display.JSON object>
CACHE_DIR = Path("data/eda_04")
CACHE_DIR.mkdir(parents=True, exist_ok=True)
def cache_path(name: str) -> Path:
return CACHE_DIR / f"{name}.parquet"
def load_cached_parquet(name: str) -> pl.DataFrame | None:
path = cache_path(name)
if path.exists():
print(f"Loading cache: {path}")
return pl.read_parquet(path)
return None
def write_cached_parquet(df: pl.DataFrame, name: str) -> pl.DataFrame:
path = cache_path(name)
df.write_parquet(path, compression="zstd")
print(f"Saved cache: {path}")
return dfThis uses the actual article and extracted ticker tables. The article_id matches the pipeline convention exactly: sha256_hex(f"CH|{url}|{publish_ts}|{download_ts}"). PublishTime and DownloadTime are treated as UTC for event-to-market alignment, then converted into ET for session logic.
effective_end_date = END_DATE
if MARKET_DATA_MODE == "real" and END_DATE > REAL_MARKET_MAX_DATE_EXCL:
effective_end_date = REAL_MARKET_MAX_DATE_EXCL
emit_warning(
f"Real mode clamps article events to dates before {REAL_MARKET_MAX_DATE_EXCL} so event timestamps stay inside the available Polygon market-data window."
)
events = load_cached_parquet("events")
if events is None:
events = load_article_events(
start_date=START_DATE,
end_date=effective_end_date,
max_events=MAX_EVENTS,
lookback_days=LOOKBACK_DAYS,
overlap_window_s=OVERLAP_WINDOW_S,
)
write_cached_parquet(events, "events")
print(events.shape)
events.select(
"publish_ts",
"ticker",
"ArticleTitle",
"NewsSite",
"segment",
"daypart",
"event_type",
"event_key",
"is_overlap_event",
)Loading cache: data/eda_04/events.parquet
(15316, 27)
| Loading ITables v2.7.3 from the internet... (need help?) |
kg.article_security_sentiment_v featuresThis section loads the real feature table, keeps the latest ingested_ts row per article_id + ticker, and reports feature coverage explicitly. Uncovered events are preserved for execution research and given neutral defaults only for the synthetic market simulation path.
event_panel = load_cached_parquet("event_panel")
feature_duplicates = load_cached_parquet("feature_duplicates")
if event_panel is None:
sentiment_load = load_article_security_sentiment(
events,
include_duplicate_summary=True,
)
news_features = sentiment_load.news_features
feature_duplicates = sentiment_load.feature_duplicates
event_panel = (
events.join(news_features, on=["article_id", "ticker"], how="left")
.with_columns(
pl.col("sentiment_score").is_not_null().alias("has_news_features")
)
.with_columns(
pl.col("security_id").fill_null(pl.col("ticker")).alias("security_id"),
pl.col("company_entity_id").fill_null(pl.concat_str([pl.lit("company::"), pl.col("ticker")])).alias(
"company_entity_id"),
pl.col("company_name").fill_null(pl.col("ticker")).alias("company_name"),
pl.col("sentiment_score").fill_null(0.0),
pl.col("confidence").fill_null(0.5),
pl.col("relevance").fill_null(0.5),
pl.col("direction_strength").fill_null(0.25),
pl.col("novelty").fill_null(0.5),
pl.col("event_materiality").fill_null(0.35),
pl.col("stock_prominence").fill_null(0.5),
pl.col("mention_count").fill_null(0).cast(pl.Int64),
pl.col("headline_hit").fill_null(0).cast(pl.Int64),
pl.col("first_offset").fill_null(0).cast(pl.Int64),
pl.col("snippet_count").fill_null(0).cast(pl.Int64),
pl.col("role").fill_null("missing"),
pl.col("evidence_span").fill_null(""),
pl.col("extractor_model").fill_null("missing"),
pl.col("run_id").fill_null("missing"),
)
)
write_cached_parquet(event_panel, "event_panel")
write_cached_parquet(feature_duplicates, "feature_duplicates")
elif feature_duplicates is None:
feature_duplicates = pl.DataFrame(
schema={
"article_id": pl.Utf8,
"ticker": pl.Utf8,
"n_rows": pl.Int64,
"latest_ingested_ts": pl.Datetime("us"),
}
)
signal_base_cols = [
"sentiment_score",
"confidence",
"relevance",
"direction_strength",
"novelty",
"event_materiality",
"stock_prominence",
]
# Placeholder baseline signal features for interaction analysis.
# Thematic features can plug in later without changing the pipeline contract.
selected_feature_runs = (
event_panel.filter(pl.col("has_news_features"))
.group_by(["extractor_model", "run_id"])
.agg(
pl.len().alias("n_rows"),
pl.col("ingested_ts").max().alias("latest_ingested_ts"),
)
.sort(["n_rows", "latest_ingested_ts"], descending=[True, True])
)
feature_coverage = pl.concat([
event_panel.group_by("NewsSite").agg(
pl.len().alias("n_events"),
pl.col("has_news_features").mean().alias("coverage_rate"),
).with_columns(
pl.lit("NewsSite").alias("dimension"),
pl.col("NewsSite").cast(pl.Utf8).alias("bucket"),
).select("dimension", "bucket", "n_events", "coverage_rate"),
event_panel.group_by("event_type").agg(
pl.len().alias("n_events"),
pl.col("has_news_features").mean().alias("coverage_rate"),
).with_columns(
pl.lit("event_type").alias("dimension"),
pl.col("event_type").alias("bucket"),
).select("dimension", "bucket", "n_events", "coverage_rate"),
event_panel.group_by("segment").agg(
pl.len().alias("n_events"),
pl.col("has_news_features").mean().alias("coverage_rate"),
).with_columns(
pl.lit("segment").alias("dimension"),
pl.col("segment").alias("bucket"),
).select("dimension", "bucket", "n_events", "coverage_rate"),
event_panel.group_by("is_overlap_event").agg(
pl.len().alias("n_events"),
pl.col("has_news_features").mean().alias("coverage_rate"),
).with_columns(
pl.lit("is_overlap_event").alias("dimension"),
pl.col("is_overlap_event").cast(pl.Utf8).alias("bucket"),
).select("dimension", "bucket", "n_events", "coverage_rate"),
], how="vertical")
event_feature_summary = event_panel.select(
pl.len().alias("n_events"),
pl.col("has_news_features").sum().alias("n_with_news_features"),
pl.col("has_news_features").mean().alias("coverage_rate"),
)
event_feature_counts = event_feature_summary.row(0, named=True)
if event_feature_counts["n_with_news_features"] == 0:
emit_warning(
"Event/news feature join produced zero covered events. Downstream policy modeling will have no news-feature rows."
)
elif (
event_feature_counts["coverage_rate"] < LOW_FEATURE_COVERAGE_RATE
or event_feature_counts["n_with_news_features"] < LOW_FEATURE_COVERED_EVENTS
):
emit_warning(
"Low upstream news-feature coverage: "
f"{event_feature_counts['n_with_news_features']} of {event_feature_counts['n_events']} events "
f"({event_feature_counts['coverage_rate']:.1%}) have joined features. Downstream models may be skipped or unstable."
)
event_panel_preview = event_panel.select(
"publish_ts",
"ticker",
"event_type",
"has_news_features",
"sentiment_score",
"direction_strength",
"novelty",
"event_materiality",
"is_overlap_event",
)Loading cache: data/eda_04/event_panel.parquet
Loading cache: data/eda_04/feature_duplicates.parquet
| Loading ITables v2.7.3 from the internet... (need help?) |
| Loading ITables v2.7.3 from the internet... (need help?) |
| Loading ITables v2.7.3 from the internet... (need help?) |
| Loading ITables v2.7.3 from the internet... (need help?) |
| Loading ITables v2.7.3 from the internet... (need help?) |
MARKET_DATA_MODE = "real" reads Polygon quotes and trades from ClickHouse over Arrow Flight, while "synthetic" keeps the existing fallback simulation on a full 04:00-20:00 ET ticker-date path so overlapping events still share the same history.
def _rolling_std(values: np.ndarray, window: int) -> np.ndarray:
values = np.asarray(values, dtype=float)
n_values = len(values)
if n_values == 0:
return np.empty(0, dtype=float)
c1 = np.cumsum(values, dtype=float)
c2 = np.cumsum(values * values, dtype=float)
starts = np.maximum(np.arange(n_values) - window + 1, 0)
prev = starts - 1
counts = np.arange(n_values) - starts + 1
s1 = c1.copy()
s2 = c2.copy()
valid_prev = prev >= 0
s1[valid_prev] -= c1[prev[valid_prev]]
s2[valid_prev] -= c2[prev[valid_prev]]
mean = s1 / counts
var = np.maximum(s2 / counts - mean * mean, 0.0)
return np.sqrt(var)
def _forward_window_sum(values: np.ndarray, window: int) -> np.ndarray:
values = np.asarray(values, dtype=float)
n_values = len(values)
if n_values == 0:
return np.empty(0, dtype=float)
csum = np.empty(n_values + 1, dtype=float)
csum[0] = 0.0
csum[1:] = np.cumsum(values, dtype=float)
left = np.arange(n_values)
right = np.minimum(n_values, left + window)
return csum[right] - csum[left]
def summarize_fill_path(
live_ask: np.ndarray,
limit_series: np.ndarray,
live_ask_size_usd: np.ndarray,
live_quote_update_flag: np.ndarray,
live_trade_notional_1s: np.ndarray,
*,
fwd_volume_participation: float,
) -> dict[str, np.ndarray | float | None]:
eligible_mask = (
(~np.isnan(live_ask))
& (~np.isnan(limit_series))
& (live_ask <= limit_series)
)
if not eligible_mask.any():
empty = np.empty(0, dtype=float)
return {
"cumulative_fillable_usd": empty,
"cumulative_shares": empty,
"incremental_fillable_usd": empty,
"fillable_total_usd": 0.0,
}
eligible_idx = np.flatnonzero(eligible_mask)
quote_episode_start = live_quote_update_flag.copy()
quote_episode_start[0] = True
episode_id = np.cumsum(quote_episode_start.astype(np.int64)) - 1
eligible_episode_id = episode_id[eligible_idx]
_, first_eligible_idx = np.unique(eligible_episode_id, return_index=True)
displayed_fill_idx = eligible_idx[first_eligible_idx]
incremental_fillable_usd = np.zeros(len(live_ask), dtype=float)
incremental_fillable_usd[displayed_fill_idx] += live_ask_size_usd[displayed_fill_idx]
incremental_fillable_usd += (
eligible_mask.astype(float)
* fwd_volume_participation
* live_trade_notional_1s
)
fillable_mask = incremental_fillable_usd > 0.0
if not fillable_mask.any():
empty = np.empty(0, dtype=float)
return {
"cumulative_fillable_usd": empty,
"cumulative_shares": empty,
"incremental_fillable_usd": empty,
"fillable_total_usd": 0.0,
}
cumulative_fillable_usd = np.cumsum(incremental_fillable_usd)
cumulative_shares = np.cumsum(
np.divide(
incremental_fillable_usd,
live_ask,
out=np.zeros(len(live_ask), dtype=float),
where=fillable_mask,
)
)
return {
"cumulative_fillable_usd": cumulative_fillable_usd,
"cumulative_shares": cumulative_shares,
"incremental_fillable_usd": incremental_fillable_usd,
"fillable_total_usd": float(cumulative_fillable_usd[-1]),
}
def summarize_inside25_fill_path(
live_bid: np.ndarray,
live_ask: np.ndarray,
limit_series: np.ndarray,
live_ask_size_usd: np.ndarray,
live_quote_update_flag: np.ndarray,
) -> dict[str, np.ndarray | float | None]:
eligible_mask = (
(~np.isnan(live_bid))
& (~np.isnan(live_ask))
& (~np.isnan(limit_series))
& (live_bid > 0.0)
& (live_ask > live_bid)
& (live_ask <= limit_series)
)
if not eligible_mask.any():
empty = np.empty(0, dtype=float)
return {
"cumulative_fillable_usd": empty,
"cumulative_shares": empty,
"incremental_fillable_usd": empty,
"fillable_total_usd": 0.0,
}
eligible_idx = np.flatnonzero(eligible_mask)
quote_episode_start = live_quote_update_flag.copy()
quote_episode_start[0] = True
episode_id = np.cumsum(quote_episode_start.astype(np.int64)) - 1
eligible_episode_id = episode_id[eligible_idx]
_, first_eligible_idx = np.unique(eligible_episode_id, return_index=True)
displayed_fill_idx = eligible_idx[first_eligible_idx]
displayed_fill_px = live_ask - 0.25 * (live_ask - live_bid)
displayed_fill_shares = np.divide(
live_ask_size_usd,
live_ask,
out=np.zeros(len(live_ask), dtype=float),
where=(~np.isnan(live_ask)) & (live_ask > 0.0),
)
incremental_fillable_usd = np.zeros(len(live_ask), dtype=float)
incremental_fillable_shares = np.zeros(len(live_ask), dtype=float)
incremental_fillable_shares[displayed_fill_idx] += displayed_fill_shares[displayed_fill_idx]
incremental_fillable_usd[displayed_fill_idx] += displayed_fill_shares[displayed_fill_idx] * displayed_fill_px[displayed_fill_idx]
fillable_mask = incremental_fillable_usd > 0.0
if not fillable_mask.any():
empty = np.empty(0, dtype=float)
return {
"cumulative_fillable_usd": empty,
"cumulative_shares": empty,
"incremental_fillable_usd": empty,
"fillable_total_usd": 0.0,
}
cumulative_fillable_usd = np.cumsum(incremental_fillable_usd)
cumulative_shares = np.cumsum(incremental_fillable_shares)
return {
"cumulative_fillable_usd": cumulative_fillable_usd,
"cumulative_shares": cumulative_shares,
"incremental_fillable_usd": incremental_fillable_usd,
"fillable_total_usd": float(cumulative_fillable_usd[-1]),
}
def build_next_open_proxy(fill_px: float, eod_px: float, article_id: str) -> float:
gap_bps = stable_normal(article_id, "next_open_gap", scale=22.0)
carry_bps = stable_normal(article_id, "next_open_carry", scale=8.0)
return eod_px * (1.0 + (gap_bps + carry_bps) / 10_000.0)
def policy_family_for_strategy(strategy: str) -> str:
return label_policy_family(strategy)
def aggressiveness_rank_for_policy(strategy: str, latency_s: int, offset_bps: int) -> float:
if strategy in {"trailing_mid", "buy_at_ask_cap"}:
return float(offset_bps * 100 + latency_s)
return float(latency_s)
def build_mock_event_state_and_outcomes(
event_df: pl.DataFrame,
rng_seed: int,
requested_notionals: list[int],
latencies_s: list[int],
trailing_offsets_bps: list[int],
ask_cap_offsets_bps: list[int],
) -> tuple[pl.DataFrame, pl.DataFrame]:
event_state_schema = {
**event_df.schema,
"mid": pl.Float64,
"ask": pl.Float64,
"spread_bps": pl.Float64,
"quote_age_ms": pl.Int64,
"ask_size_usd": pl.Float64,
"rolling_vwap": pl.Float64,
"rolling_vwap_std": pl.Float64,
"realized_vol_60s_bps": pl.Float64,
"future_60s_dollar_volume": pl.Float64,
"regular_open_mid": pl.Float64,
"mid_30m_after_open": pl.Float64,
"regular_close_mid": pl.Float64,
"eod_mid": pl.Float64,
"segment_end_ts": pl.Datetime("us"),
"vwap_dev_bps": pl.Float64,
}
outcomes_schema = {
"article_id": pl.Utf8,
"event_key": pl.Utf8,
"publish_ts": pl.Datetime("us"),
"trade_date": pl.Date,
"event_date": pl.Date,
"ticker": pl.Utf8,
"segment": pl.Utf8,
"daypart": pl.Utf8,
"event_type": pl.Utf8,
"is_overlap_event": pl.Boolean,
"has_prior_same_day_event": pl.Boolean,
"prior_event_gap_s": pl.Int64,
"policy_id": pl.Utf8,
"policy_family": pl.Utf8,
"aggression_rank_within_family": pl.Float64,
"strategy": pl.Utf8,
"latency_s": pl.Int64,
"offset_bps": pl.Int64,
"requested_notional": pl.Int64,
"fill_ts": pl.Datetime("us"),
"fill_px": pl.Float64,
"inside25_fill_ts": pl.Datetime("us"),
"inside25_fill_px": pl.Float64,
"anchor_px": pl.Float64,
"filled_notional": pl.Float64,
"inside25_filled_notional": pl.Float64,
"max_fillable_usd": pl.Float64,
"inside25_max_fillable_usd": pl.Float64,
"fill_fraction": pl.Float64,
"inside25_fill_fraction": pl.Float64,
"filled_flag": pl.Int8,
"inside25_filled_flag": pl.Int8,
"markout_1m_px": pl.Float64,
"inside25_markout_1m_px": pl.Float64,
"markout_5m_px": pl.Float64,
"inside25_markout_5m_px": pl.Float64,
"segment_end_px": pl.Float64,
"m30_after_open_px": pl.Float64,
"inside25_m30_after_open_px": pl.Float64,
"reg_close_px": pl.Float64,
"inside25_reg_close_px": pl.Float64,
"eod_px": pl.Float64,
"next_open_px": pl.Float64,
"inside25_next_open_px": pl.Float64,
"spread_bps_at_decision": pl.Float64,
"quote_age_ms_at_decision": pl.Int64,
"ask_size_usd_at_decision": pl.Float64,
"rolling_dollar_volume_60s_at_decision": pl.Float64,
"vwap_dev_bps_at_decision": pl.Float64,
"realized_vol_60s_bps_at_decision": pl.Float64,
"has_news_features": pl.Boolean,
"sentiment_score": pl.Float64,
"confidence": pl.Float64,
"relevance": pl.Float64,
"direction_strength": pl.Float64,
"novelty": pl.Float64,
"event_materiality": pl.Float64,
"stock_prominence": pl.Float64,
"mention_count": pl.Int64,
"headline_hit": pl.Int64,
"snippet_count": pl.Int64,
}
if event_df.height == 0:
return (
pl.DataFrame(schema=event_state_schema),
_finalize_outcomes(
pl.DataFrame(schema=outcomes_schema),
primary_exit=PRIMARY_EXIT,
primary_value_col=PRIMARY_VALUE_COL,
),
)
rng = np.random.default_rng(rng_seed)
event_state_frames = []
outcome_frames = []
session_offset_seconds = 4 * 3600
regular_open_idx = int(9.5 * 3600) - session_offset_seconds
open_plus_30m_idx = regular_open_idx + 30 * 60
regular_close_idx = int(16 * 3600) - session_offset_seconds
session_end_idx = int(20 * 3600) - session_offset_seconds
max_horizon = 60 * 60
alpha_decay = np.exp(-np.arange(max_horizon) / 600.0)
volume_decay = np.exp(-np.arange(max_horizon) / 300.0)
session_start = datetime(2000, 1, 1, 4, 0)
session_end = datetime(2000, 1, 1, 20, 0)
n_seconds = int((session_end - session_start).total_seconds()) + 1
sec_of_day = np.arange(n_seconds)
seconds_since_midnight = sec_of_day + session_offset_seconds
regular_mask = (seconds_since_midnight >= 9.5 * 3600) & (seconds_since_midnight < 16 * 3600)
pre_mask = seconds_since_midnight < 9.5 * 3600
session_shares_multiplier = np.where(regular_mask, 1.8, np.where(pre_mask, 0.65, 0.85))
spread_base = np.where(regular_mask, 4.5, np.where(pre_mask, 9.5, 8.0))
spread_liq = np.where(regular_mask, 10.0, np.where(pre_mask, 20.0, 16.0))
quote_age_base = np.where(regular_mask, 180.0, np.where(pre_mask, 600.0, 450.0))
quote_age_liq = np.where(regular_mask, 600.0, np.where(pre_mask, 2200.0, 1600.0))
ask_size_multiplier = np.where(regular_mask, 1.4, np.where(pre_mask, 0.55, 0.70))
sigma_base = np.where(regular_mask, 0.00033, np.where(pre_mask, 0.00052, 0.00045))
for group in event_df.partition_by(["ticker", "trade_date"], as_dict=False):
group = group.sort("publish_ts")
ticker = group.item(0, "ticker")
trade_date = group.item(0, "trade_date")
start_ts = datetime.combine(trade_date, time(4, 0))
base_px = 8.0 + 140.0 * stable_u01(ticker, trade_date, "base_px")
liq_score = 0.2 + 0.8 * stable_u01(ticker, "liq_score")
sigma = sigma_base * (1.15 - 0.45 * liq_score)
log_returns = rng.normal(0.0, sigma)
shares = (40 + 260 * liq_score) * session_shares_multiplier
shares = shares * (1.0 + rng.lognormal(mean=-0.25, sigma=0.30, size=n_seconds))
spread_bps = spread_base + spread_liq * (1.0 - liq_score)
spread_bps = spread_bps * (1.0 + 0.10 * rng.normal(size=n_seconds))
spread_bps = np.clip(spread_bps, 2.5, 45.0)
quote_age_ms = quote_age_base + quote_age_liq * (1.0 - liq_score)
quote_age_ms = np.clip(quote_age_ms * (1.0 + 0.25 * rng.normal(size=n_seconds)), 50, 8_000).astype(int)
ask_size_usd = np.clip(
(900 + 6_500 * liq_score) * ask_size_multiplier * (1.0 + 0.35 * rng.normal(size=n_seconds)),
250,
35_000,
)
volume_boost = np.zeros(n_seconds)
alpha_drift = np.zeros(n_seconds)
for publish_ts, sentiment_score, direction_strength, event_materiality in group.select(
["publish_ts", "sentiment_score", "direction_strength", "event_materiality"]
).iter_rows():
idx = int((publish_ts - start_ts).total_seconds())
if idx < 0 or idx >= n_seconds:
continue
signed_edge_bps = sentiment_score * (14.0 + 65.0 * direction_strength * event_materiality)
log_returns[idx] += signed_edge_bps / 10_000.0 * 0.18
horizon = min(max_horizon, n_seconds - idx - 1)
if horizon <= 0:
continue
alpha_drift[idx + 1: idx + 1 + horizon] += signed_edge_bps / 10_000.0 * 0.0012 * alpha_decay[:horizon]
volume_boost[idx: idx + horizon] += 1.5 + 5.0 * event_materiality * volume_decay[:horizon]
log_returns[idx: idx + horizon] += rng.normal(
0.0,
sigma[idx: idx + horizon] * (0.5 + 1.5 * event_materiality),
size=horizon,
)
log_price = np.log(base_px) + np.cumsum(log_returns + alpha_drift)
mid = np.exp(log_price)
ask = mid * (1.0 + spread_bps / 20_000.0)
log_mid = np.log(mid)
trade_px = mid * np.exp(rng.normal(0.0, spread_bps / 25_000.0, size=n_seconds))
shares = shares * (1.0 + volume_boost)
dollar_volume_1s = shares * trade_px
cum_shares = np.cumsum(shares)
cum_notional = np.cumsum(dollar_volume_1s)
rolling_vwap = cum_notional / np.maximum(cum_shares, 1.0)
rolling_vwap_std = _rolling_std(mid, window=300)
realized_vol_60s_bps = 10_000 * _rolling_std(np.diff(log_mid, prepend=log_mid[0]), window=60)
future_60s_dollar_volume = _forward_window_sum(dollar_volume_1s, window=60)
regular_open_mid = float(mid[regular_open_idx])
mid_30m_after_open = float(mid[open_plus_30m_idx])
regular_close_mid = float(mid[regular_close_idx])
eod_mid = float(mid[-1])
event_state_rows = []
outcome_rows = []
for event in group.iter_rows(named=True):
idx = int((event["publish_ts"] - start_ts).total_seconds())
if idx < 0 or idx >= n_seconds:
continue
if event["segment"] == "premarket":
search_end_idx = regular_open_idx
elif event["segment"] == "regular":
search_end_idx = regular_close_idx
else:
search_end_idx = session_end_idx
segment_end_ts = start_ts + timedelta(seconds=search_end_idx)
decision_mid = float(mid[idx])
decision_ask = float(ask[idx])
decision_spread_bps = float(spread_bps[idx])
decision_quote_age_ms = int(quote_age_ms[idx])
decision_ask_size_usd = float(ask_size_usd[idx])
decision_vwap = float(rolling_vwap[idx])
decision_vwap_std = float(rolling_vwap_std[idx])
decision_realized_vol_60s_bps = float(realized_vol_60s_bps[idx])
decision_future_60s_dollar_volume = float(future_60s_dollar_volume[idx])
vwap_dev_bps = ((decision_mid / decision_vwap) - 1.0) * 10_000.0
event_state_rows.append(
{
**event,
"mid": decision_mid,
"ask": decision_ask,
"spread_bps": decision_spread_bps,
"quote_age_ms": decision_quote_age_ms,
"ask_size_usd": decision_ask_size_usd,
"rolling_vwap": decision_vwap,
"rolling_vwap_std": decision_vwap_std,
"realized_vol_60s_bps": decision_realized_vol_60s_bps,
"future_60s_dollar_volume": decision_future_60s_dollar_volume,
"regular_open_mid": regular_open_mid,
"mid_30m_after_open": mid_30m_after_open,
"regular_close_mid": regular_close_mid,
"eod_mid": eod_mid,
"segment_end_ts": segment_end_ts,
"vwap_dev_bps": vwap_dev_bps,
}
)
for latency_s in latencies_s:
live_start_idx = idx + latency_s
if event["segment"] == "premarket" and live_start_idx >= regular_open_idx:
continue
if live_start_idx > search_end_idx or live_start_idx >= n_seconds:
continue
live_mid = mid[live_start_idx: search_end_idx + 1]
live_ask = ask[live_start_idx: search_end_idx + 1]
live_bid = 2.0 * live_mid - live_ask
live_ask_size_usd = ask_size_usd[live_start_idx: search_end_idx + 1]
live_quote_update_flag = np.ones(len(live_ask), dtype=bool)
live_trade_notional_1s = dollar_volume_1s[live_start_idx: search_end_idx + 1]
live_vwap = rolling_vwap[live_start_idx: search_end_idx + 1]
live_vwap_std = rolling_vwap_std[live_start_idx: search_end_idx + 1]
trailing_anchor = np.maximum.accumulate(live_mid)
strategy_specs = []
for offset_bps in trailing_offsets_bps:
strategy_specs.append(
(
"trailing_mid",
offset_bps,
decision_mid,
trailing_anchor * (1.0 - offset_bps / 10_000.0),
)
)
for offset_bps in ask_cap_offsets_bps:
strategy_specs.append(
(
"buy_at_ask_cap",
offset_bps,
decision_ask,
np.full_like(live_ask, decision_ask * (1.0 + offset_bps / 10_000.0)),
)
)
strategy_specs.append(("vwap", 0, float(live_vwap[0]), live_vwap))
strategy_specs.append(
("vwap_minus_1std", 0, float(live_vwap[0] - live_vwap_std[0]), live_vwap - live_vwap_std)
)
for strategy, offset_bps, anchor_px, limit_series in strategy_specs:
fill_summary = summarize_fill_path(
live_ask,
limit_series,
live_ask_size_usd,
live_quote_update_flag,
live_trade_notional_1s,
fwd_volume_participation=FWD_VOLUME_PARTICIPATION,
)
inside25_fill_summary = summarize_inside25_fill_path(
live_bid,
live_ask,
limit_series,
live_ask_size_usd,
live_quote_update_flag,
)
segment_end_px = float(mid[search_end_idx])
policy_family = policy_family_for_strategy(strategy)
aggression_rank = aggressiveness_rank_for_policy(strategy, latency_s, offset_bps)
for requested_notional in requested_notionals:
filled_notional = min(requested_notional, float(fill_summary["fillable_total_usd"]))
fill_fraction = filled_notional / requested_notional if requested_notional else 0.0
filled_flag = int(fill_fraction > 0.0)
if filled_flag:
cumulative_fillable_usd = fill_summary["cumulative_fillable_usd"]
cumulative_shares = fill_summary["cumulative_shares"]
fill_idx = int(np.searchsorted(cumulative_fillable_usd, filled_notional, side="left"))
fill_abs_idx = live_start_idx + fill_idx
fill_ts = start_ts + timedelta(seconds=fill_abs_idx)
prior_cum_fillable_usd = float(cumulative_fillable_usd[fill_idx - 1]) if fill_idx > 0 else 0.0
prior_cum_shares = float(cumulative_shares[fill_idx - 1]) if fill_idx > 0 else 0.0
final_slice_notional = filled_notional - prior_cum_fillable_usd
fill_px_at_idx = float(live_ask[fill_idx])
filled_shares = prior_cum_shares + final_slice_notional / fill_px_at_idx
fill_px = filled_notional / filled_shares if filled_shares > 0.0 else None
else:
fill_abs_idx = None
fill_ts = None
fill_px = None
inside25_filled_notional = min(requested_notional, float(inside25_fill_summary["fillable_total_usd"]))
inside25_fill_fraction = inside25_filled_notional / requested_notional if requested_notional else 0.0
inside25_filled_flag = int(inside25_fill_fraction > 0.0)
if inside25_filled_flag:
inside25_cumulative_fillable_usd = inside25_fill_summary["cumulative_fillable_usd"]
inside25_cumulative_shares = inside25_fill_summary["cumulative_shares"]
inside25_fill_idx = int(np.searchsorted(inside25_cumulative_fillable_usd, inside25_filled_notional, side="left"))
inside25_fill_abs_idx = live_start_idx + inside25_fill_idx
inside25_fill_ts = start_ts + timedelta(seconds=inside25_fill_abs_idx)
inside25_prior_cum_fillable_usd = float(inside25_cumulative_fillable_usd[inside25_fill_idx - 1]) if inside25_fill_idx > 0 else 0.0
inside25_prior_cum_shares = float(inside25_cumulative_shares[inside25_fill_idx - 1]) if inside25_fill_idx > 0 else 0.0
inside25_final_slice_notional = inside25_filled_notional - inside25_prior_cum_fillable_usd
inside25_fill_px_at_idx = float(live_ask[inside25_fill_idx] - 0.25 * (live_ask[inside25_fill_idx] - live_bid[inside25_fill_idx]))
inside25_filled_shares = inside25_prior_cum_shares + inside25_final_slice_notional / inside25_fill_px_at_idx
inside25_fill_px = inside25_filled_notional / inside25_filled_shares if inside25_filled_shares > 0.0 else None
else:
inside25_fill_abs_idx = None
inside25_fill_ts = None
inside25_fill_px = None
max_fillable_usd = float(fill_summary["fillable_total_usd"])
inside25_max_fillable_usd = float(inside25_fill_summary["fillable_total_usd"])
minute_1_idx = min(fill_abs_idx + 60, n_seconds - 1) if fill_abs_idx is not None else None
minute_5_idx = min(fill_abs_idx + 300, n_seconds - 1) if fill_abs_idx is not None else None
markout_1m_px = float(mid[minute_1_idx]) if minute_1_idx is not None else None
markout_5m_px = float(mid[minute_5_idx]) if minute_5_idx is not None else None
inside25_minute_1_idx = min(inside25_fill_abs_idx + 60, n_seconds - 1) if inside25_fill_abs_idx is not None else None
inside25_minute_5_idx = min(inside25_fill_abs_idx + 300, n_seconds - 1) if inside25_fill_abs_idx is not None else None
inside25_markout_1m_px = float(mid[inside25_minute_1_idx]) if inside25_minute_1_idx is not None else None
inside25_markout_5m_px = float(mid[inside25_minute_5_idx]) if inside25_minute_5_idx is not None else None
m30_after_open_px = float(mid[
open_plus_30m_idx]) if fill_abs_idx is not None and fill_abs_idx <= open_plus_30m_idx else None
inside25_m30_after_open_px = float(mid[open_plus_30m_idx]) if inside25_fill_abs_idx is not None and inside25_fill_abs_idx <= open_plus_30m_idx else None
reg_close_px = float(mid[
regular_close_idx]) if fill_abs_idx is not None and fill_abs_idx <= regular_close_idx else None
inside25_reg_close_px = float(mid[regular_close_idx]) if inside25_fill_abs_idx is not None and inside25_fill_abs_idx <= regular_close_idx else None
next_open_px = (
regular_open_mid
if event["segment"] == "premarket"
else build_next_open_proxy(fill_px or decision_mid, eod_mid, event["article_id"])
)
inside25_next_open_px = (
regular_open_mid
if event["segment"] == "premarket"
else build_next_open_proxy(inside25_fill_px or decision_mid, eod_mid, event["article_id"])
)
policy_id = label_policy_id(strategy, latency_s, offset_bps, requested_notional)
outcome_rows.append(
{
"article_id": event["article_id"],
"event_key": event["event_key"],
"publish_ts": event["publish_ts"],
"trade_date": event["trade_date"],
"event_date": event["event_date"],
"ticker": event["ticker"],
"segment": event["segment"],
"daypart": event["daypart"],
"event_type": event["event_type"],
"is_overlap_event": event["is_overlap_event"],
"has_prior_same_day_event": event["has_prior_same_day_event"],
"prior_event_gap_s": event["prior_event_gap_s"],
"policy_id": policy_id,
"policy_family": policy_family,
"aggression_rank_within_family": aggression_rank,
"strategy": strategy,
"latency_s": latency_s,
"offset_bps": offset_bps,
"requested_notional": requested_notional,
"fill_ts": fill_ts,
"fill_px": fill_px,
"inside25_fill_ts": inside25_fill_ts,
"inside25_fill_px": inside25_fill_px,
"anchor_px": float(anchor_px),
"filled_notional": filled_notional,
"inside25_filled_notional": inside25_filled_notional,
"max_fillable_usd": max_fillable_usd,
"inside25_max_fillable_usd": inside25_max_fillable_usd,
"fill_fraction": fill_fraction,
"inside25_fill_fraction": inside25_fill_fraction,
"filled_flag": filled_flag,
"inside25_filled_flag": inside25_filled_flag,
"markout_1m_px": markout_1m_px,
"inside25_markout_1m_px": inside25_markout_1m_px,
"markout_5m_px": markout_5m_px,
"inside25_markout_5m_px": inside25_markout_5m_px,
"segment_end_px": segment_end_px,
"m30_after_open_px": m30_after_open_px,
"inside25_m30_after_open_px": inside25_m30_after_open_px,
"reg_close_px": reg_close_px,
"inside25_reg_close_px": inside25_reg_close_px,
"eod_px": eod_mid,
"next_open_px": next_open_px,
"inside25_next_open_px": inside25_next_open_px,
"spread_bps_at_decision": decision_spread_bps,
"quote_age_ms_at_decision": decision_quote_age_ms,
"ask_size_usd_at_decision": decision_ask_size_usd,
"rolling_dollar_volume_60s_at_decision": decision_future_60s_dollar_volume,
"vwap_dev_bps_at_decision": vwap_dev_bps,
"realized_vol_60s_bps_at_decision": decision_realized_vol_60s_bps,
"has_news_features": event["has_news_features"],
"sentiment_score": event["sentiment_score"],
"confidence": event["confidence"],
"relevance": event["relevance"],
"direction_strength": event["direction_strength"],
"novelty": event["novelty"],
"event_materiality": event["event_materiality"],
"stock_prominence": event["stock_prominence"],
"mention_count": event["mention_count"],
"headline_hit": event["headline_hit"],
"snippet_count": event["snippet_count"],
}
)
if event_state_rows:
event_state_frames.append(pl.DataFrame(event_state_rows, schema=event_state_schema))
if outcome_rows:
outcome_frames.append(pl.DataFrame(outcome_rows, schema=outcomes_schema))
event_state = (
pl.concat(event_state_frames, how="vertical_relaxed")
if event_state_frames
else pl.DataFrame(schema=event_state_schema)
)
outcomes = (
pl.concat(outcome_frames, how="vertical_relaxed")
if outcome_frames
else pl.DataFrame(schema=outcomes_schema)
)
if outcomes.height == 0:
return event_state, _finalize_outcomes(
outcomes,
primary_exit=PRIMARY_EXIT,
primary_value_col=PRIMARY_VALUE_COL,
)
return event_state, _finalize_outcomes(
outcomes,
primary_exit=PRIMARY_EXIT,
primary_value_col=PRIMARY_VALUE_COL,
)Execution is evaluated separately for: - trailing buy limits at x bps below a trailing mid anchor - buy at rolling intraday VWAP - buy at rolling intraday VWAP - 1 std
The latency ladder assumes 30s to 90s total delay from source to machine to execution. Premarket event-latency combinations are dropped once that delayed decision time reaches 09:30 ET.
The key notebook output here is the action-conditioned value table, including requested-dollar normalized targets that keep no-fill rows in the learning problem.
event_state = load_cached_parquet("event_state")
outcomes = load_cached_parquet("outcomes")
if event_state is None or outcomes is None:
if MARKET_DATA_MODE == "real":
try:
event_state, outcomes, real_market_warnings = build_real_event_state_and_outcomes(
event_panel,
clickhouse_flight_uri=CLICKHOUSE_FLIGHT_URI,
clickhouse_flight_user=CLICKHOUSE_FLIGHT_USER,
clickhouse_flight_password=CLICKHOUSE_FLIGHT_PASSWORD,
quotes_table=POLYGON_QUOTES_TABLE,
trades_table=POLYGON_TRADES_TABLE,
requested_notionals=REQUESTED_NOTIONALS,
latencies_s=LATENCIES_S,
trailing_offsets_bps=TRAILING_OFFSETS_BPS,
ask_cap_offsets_bps=ASK_CAP_OFFSETS_BPS,
fwd_volume_participation=FWD_VOLUME_PARTICIPATION,
real_market_max_date_excl=REAL_MARKET_MAX_DATE_EXCL,
primary_exit=PRIMARY_EXIT,
primary_value_col=PRIMARY_VALUE_COL,
label_policy_id=label_policy_id,
policy_family_for_strategy=policy_family_for_strategy,
aggressiveness_rank_for_policy=aggressiveness_rank_for_policy,
build_next_open_proxy=build_next_open_proxy,
)
except Exception as exc:
raise RuntimeError(
f"Failed to load real Polygon market data from ClickHouse Flight at {CLICKHOUSE_FLIGHT_URI}. "
"Switch `MARKET_DATA_MODE` to `synthetic` or verify the Arrow Flight endpoint."
) from exc
for warning_message in real_market_warnings:
emit_warning(warning_message)
else:
real_market_warnings = []
event_state, outcomes = build_mock_event_state_and_outcomes(
event_panel,
rng_seed=RNG_SEED,
requested_notionals=REQUESTED_NOTIONALS,
latencies_s=LATENCIES_S,
trailing_offsets_bps=TRAILING_OFFSETS_BPS,
ask_cap_offsets_bps=ASK_CAP_OFFSETS_BPS,
)
write_cached_parquet(event_state, "event_state")
write_cached_parquet(outcomes, "outcomes")
else:
real_market_warnings = []
event_state_preview = event_state.select(
"publish_ts",
"ticker",
"segment",
"daypart",
"event_type",
"spread_bps",
"quote_age_ms",
"vwap_dev_bps",
"is_overlap_event",
)
outcomes_preview = outcomes.select(
"publish_ts",
"ticker",
"policy_id",
"policy_family",
"requested_notional",
"fill_fraction",
"inside25_fill_fraction",
"req_fill_to_open_bps",
"req_fill_to_30m_after_open_bps",
"inside25_req_fill_to_30m_after_open_bps",
"req_fill_to_reg_close_bps",
"req_fill_to_eod_bps",
"primary_exit_bps",
"primary_exit_pnl",
)
policy_row_counts = outcomes.group_by("policy_id").agg(pl.len().alias("n_rows")).sort("n_rows", descending=True)
requested_notional_counts = outcomes.group_by("requested_notional").agg(pl.len().alias("n_rows")).sort(
"requested_notional")
print("market data mode:", MARKET_DATA_MODE)
print("event rows:", f"{event_state.height:,}")
print("outcome rows:", f"{outcomes.height:,}")
print("unique policies:", outcomes.get_column("policy_id").n_unique() if outcomes.height else 0)| Loading ITables v2.7.3 from the internet... (need help?) |
| Loading ITables v2.7.3 from the internet... (need help?) |
| Loading ITables v2.7.3 from the internet... (need help?) |
| Loading ITables v2.7.3 from the internet... (need help?) |
| Loading ITables v2.7.3 from the internet... (need help?) |
POLICY_NOTIONAL remains a descriptive diagnostics slice. MODEL_NOTIONAL freezes the action-space size for frontier analysis and the first joint-model dataset.
policy = outcomes.filter(pl.col("requested_notional") == POLICY_NOTIONAL)
policy_clean = policy.filter(~pl.col("is_overlap_event"))
model_policy = outcomes.filter(pl.col("requested_notional") == MODEL_NOTIONAL)
model_policy_feature = model_policy.filter(pl.col("has_news_features"))
model_policy_clean = model_policy_feature.filter(~pl.col("is_overlap_event"))
selected_policy_sample = "clean" if FRONTIER_SAMPLE == "clean" else "full"
model_policy_selected = model_policy_clean if selected_policy_sample == "clean" else model_policy_feature
def summarize_policy_metrics(df: pl.DataFrame, label: str) -> pl.DataFrame:
if df.height == 0:
return pl.DataFrame(
schema={
"policy_id": pl.Utf8,
"policy_family": pl.Utf8,
"strategy": pl.Utf8,
"latency_s": pl.Int64,
"offset_bps": pl.Int64,
"n_orders": pl.UInt32,
"fill_rate": pl.Float64,
"inside25_fill_rate": pl.Float64,
"avg_fill_fraction": pl.Float64,
"inside25_avg_fill_fraction": pl.Float64,
"avg_adverse_fill_bps": pl.Float64,
"inside25_avg_adverse_fill_bps": pl.Float64,
"avg_req_fill_to_open_bps": pl.Float64,
"avg_req_fill_to_30m_after_open_bps": pl.Float64,
"inside25_avg_req_fill_to_30m_after_open_bps": pl.Float64,
"avg_req_fill_to_reg_close_bps": pl.Float64,
"avg_req_markout_1m_bps": pl.Float64,
"avg_req_markout_5m_bps": pl.Float64,
"avg_req_fill_to_eod_bps": pl.Float64,
"delta_inside25_fill_rate": pl.Float64,
"delta_inside25_avg_adverse_fill_bps": pl.Float64,
"delta_inside25_avg_req_fill_to_30m_after_open_bps": pl.Float64,
"p10_req_fill_to_30m_after_open_bps": pl.Float64,
"avg_fill_to_30m_after_open_bps": pl.Float64,
"avg_fill_to_reg_close_bps": pl.Float64,
"sample": pl.Utf8,
}
)
return (
df.group_by(["policy_id", "policy_family", "strategy", "latency_s", "offset_bps"])
.agg(
pl.len().alias("n_orders"),
pl.col("filled_flag").mean().alias("fill_rate"),
pl.col("inside25_filled_flag").mean().alias("inside25_fill_rate"),
pl.col("fill_fraction").mean().alias("avg_fill_fraction"),
pl.col("inside25_fill_fraction").mean().alias("inside25_avg_fill_fraction"),
pl.col("adverse_fill_bps").mean().alias("avg_adverse_fill_bps"),
pl.col("inside25_adverse_fill_bps").mean().alias("inside25_avg_adverse_fill_bps"),
pl.col("req_fill_to_open_bps").mean().alias("avg_req_fill_to_open_bps"),
pl.col("req_fill_to_30m_after_open_bps").mean().alias("avg_req_fill_to_30m_after_open_bps"),
pl.col("inside25_req_fill_to_30m_after_open_bps").mean().alias("inside25_avg_req_fill_to_30m_after_open_bps"),
pl.col("req_fill_to_reg_close_bps").mean().alias("avg_req_fill_to_reg_close_bps"),
pl.col("req_markout_1m_bps").mean().alias("avg_req_markout_1m_bps"),
pl.col("req_markout_5m_bps").mean().alias("avg_req_markout_5m_bps"),
pl.col("req_fill_to_eod_bps").mean().alias("avg_req_fill_to_eod_bps"),
pl.col("req_fill_to_30m_after_open_bps").quantile(0.10).alias("p10_req_fill_to_30m_after_open_bps"),
pl.col("fill_to_30m_after_open_bps").mean().alias("avg_fill_to_30m_after_open_bps"),
pl.col("fill_to_reg_close_bps").mean().alias("avg_fill_to_reg_close_bps"),
)
.with_columns(
(pl.col("inside25_fill_rate") - pl.col("fill_rate")).alias("delta_inside25_fill_rate"),
(pl.col("inside25_avg_adverse_fill_bps") - pl.col("avg_adverse_fill_bps")).alias("delta_inside25_avg_adverse_fill_bps"),
(pl.col("inside25_avg_req_fill_to_30m_after_open_bps") - pl.col("avg_req_fill_to_30m_after_open_bps")).alias("delta_inside25_avg_req_fill_to_30m_after_open_bps"),
)
.sort(["strategy", "latency_s", "offset_bps"])
.with_columns(pl.lit(label).alias("sample"))
)
policy_universe_summary = pl.DataFrame(
{
"sample": ["policy_full", "policy_clean", "model_feature", f"model_{selected_policy_sample}"],
"requested_notional": [POLICY_NOTIONAL, POLICY_NOTIONAL, MODEL_NOTIONAL, MODEL_NOTIONAL],
"n_rows": [policy.height, policy_clean.height, model_policy_feature.height, model_policy_selected.height],
"n_events": [
policy.get_column("event_key").n_unique() if policy.height else 0,
policy_clean.get_column("event_key").n_unique() if policy_clean.height else 0,
model_policy_feature.get_column("event_key").n_unique() if model_policy_feature.height else 0,
model_policy_selected.get_column("event_key").n_unique() if model_policy_selected.height else 0,
],
"n_policies": [
policy.get_column("policy_id").n_unique() if policy.height else 0,
policy_clean.get_column("policy_id").n_unique() if policy_clean.height else 0,
model_policy_feature.get_column("policy_id").n_unique() if model_policy_feature.height else 0,
model_policy_selected.get_column("policy_id").n_unique() if model_policy_selected.height else 0,
],
}
)
if model_policy_feature.height == 0:
emit_warning(
"The fixed-notional modeling slice has zero rows with joined news features. Frontier and joint-model sections will be empty."
)
elif model_policy_selected.height == 0:
emit_warning(
f"The selected modeling sample '{selected_policy_sample}' is empty after overlap filtering."
)
policy_summary = pl.concat(
[
summarize_policy_metrics(policy, "policy_full"),
summarize_policy_metrics(policy_clean, "policy_clean"),
summarize_policy_metrics(model_policy_feature, "model_feature"),
summarize_policy_metrics(model_policy_selected, f"model_{selected_policy_sample}"),
],
how="vertical_relaxed",
)
model_policy_summary = policy_summary.filter(pl.col("sample") == f"model_{selected_policy_sample}")| Loading ITables v2.7.3 from the internet... (need help?) |
| Loading ITables v2.7.3 from the internet... (need help?) |
policy_leaderboard = (
model_policy_summary
.filter(pl.col("n_orders") >= MIN_ROWS_PER_POLICY)
.sort(["avg_req_fill_to_30m_after_open_bps", "fill_rate"], descending=[True, True])
)
show_df(
policy_leaderboard.select(
"policy_id",
"strategy",
"latency_s",
"offset_bps",
"n_orders",
"fill_rate",
"inside25_fill_rate",
"avg_fill_fraction",
"inside25_avg_fill_fraction",
"avg_adverse_fill_bps",
"inside25_avg_adverse_fill_bps",
"avg_req_fill_to_open_bps",
"avg_req_fill_to_30m_after_open_bps",
"inside25_avg_req_fill_to_30m_after_open_bps",
"delta_inside25_fill_rate",
"delta_inside25_avg_adverse_fill_bps",
"delta_inside25_avg_req_fill_to_30m_after_open_bps",
"avg_req_fill_to_reg_close_bps",
"avg_req_markout_1m_bps",
"avg_req_markout_5m_bps",
"avg_req_fill_to_eod_bps",
),
n=24,
)| Loading ITables v2.7.3 from the internet... (need help?) |
The diagnostic section keeps the tradeability and fill-quality checks, but shifts the main policy readout toward requested-dollar EV, frontier structure, and signal-conditioned aggressiveness.
plot_strategy_metric_grid(
policy,
strategies=ENTRY_STRATEGIES,
metrics=[
{
"col": "filled_flag",
"title": "Fill rate",
"cmap": "YlGnBu",
"fmt": ".0%",
"percent": True,
},
{
"col": "req_fill_to_30m_after_open_bps",
"title": "Requested EV to 30m after open",
"cmap": "RdYlGn",
"center": 0.0,
"fmt": ".2f",
},
{
"col": "adverse_fill_bps",
"title": "Adverse fill",
"cmap": "RdYlGn_r",
"fmt": ".2f",
},
],
title=f"Policy surface overview at ${POLICY_NOTIONAL:,}: fillability, value, and execution cost",
)Detailed context splits are kept for appendix diagnostics so the main flow stays focused on the policy surface and frontier.
spread_split = summarize_split(policy, ["strategy", "spread_bucket"])
quote_age_split = summarize_split(policy, ["strategy", "quote_age_bucket"])
dollar_volume_split = summarize_split(policy, ["strategy", "dollar_volume_bucket"])
session_split = summarize_split(policy, ["daypart", "strategy"])
event_type_split = summarize_split(policy, ["event_type", "strategy"])
overlap_split = summarize_split(policy, ["strategy", "is_overlap_event"])filled_policy = policy.filter(pl.col("filled_flag") == 1)
filled_horizon_summary = (
filled_policy.group_by(["policy_id", "policy_family", "strategy", "daypart"])
.agg(
pl.len().alias("n_fills"),
pl.col("fill_to_open_bps").mean().alias("avg_fill_to_open_bps"),
pl.col("markout_1m_bps").mean().alias("avg_markout_1m_bps"),
pl.col("markout_5m_bps").mean().alias("avg_markout_5m_bps"),
pl.col("fill_to_30m_after_open_bps").mean().alias("avg_fill_to_30m_after_open_bps"),
pl.col("fill_to_reg_close_bps").mean().alias("avg_fill_to_reg_close_bps"),
pl.col("fill_to_eod_bps").mean().alias("avg_fill_to_eod_bps"),
pl.col("req_fill_to_open_bps").mean().alias("avg_req_fill_to_open_bps"),
pl.col("req_markout_1m_bps").mean().alias("avg_req_markout_1m_bps"),
pl.col("req_markout_5m_bps").mean().alias("avg_req_markout_5m_bps"),
pl.col("req_fill_to_30m_after_open_bps").mean().alias("avg_req_fill_to_30m_after_open_bps"),
pl.col("req_fill_to_reg_close_bps").mean().alias("avg_req_fill_to_reg_close_bps"),
pl.col("req_fill_to_eod_bps").mean().alias("avg_req_fill_to_eod_bps"),
)
.sort(["daypart", "strategy", "policy_id"])
)
transition_summary = (
filled_policy.group_by(["policy_id", "policy_family", "strategy", "daypart"])
.agg(
pl.len().alias("n_fills"),
((pl.col("fill_to_open_bps") > 0) & (pl.col("fill_to_reg_close_bps") < 0)).mean().alias(
"share_pos_open_neg_close"),
((pl.col("markout_5m_bps") > 0) & (pl.col("fill_to_30m_after_open_bps") < 0)).mean().alias(
"share_pos_5m_neg_30m"),
((pl.col("markout_5m_bps") < 0) & (pl.col("fill_to_30m_after_open_bps") > 0)).mean().alias(
"share_neg_5m_pos_30m"),
((pl.col("fill_to_open_bps") < 0) & (pl.col("fill_to_reg_close_bps") > 0)).mean().alias(
"share_neg_open_pos_close"),
)
.sort(["daypart", "strategy", "policy_id"])
)
return_share_summary = (
filled_policy.group_by(["policy_id", "policy_family", "strategy", "daypart"])
.agg(
pl.len().alias("n_fills"),
pl.col("fill_to_reg_close_pnl").sum().alias("total_fill_to_reg_close_pnl"),
pl.col("fill_to_open_pnl").sum().alias("pre_open_pnl"),
(pl.col("fill_to_30m_after_open_pnl") - pl.col("fill_to_open_pnl")).sum().alias("open_to_30m_pnl"),
(pl.col("fill_to_reg_close_pnl") - pl.col("fill_to_30m_after_open_pnl")).sum().alias("m30_to_close_pnl"),
)
.with_columns(
pl.when(pl.col("total_fill_to_reg_close_pnl").abs() > 1e-9)
.then(pl.col("pre_open_pnl") / pl.col("total_fill_to_reg_close_pnl"))
.otherwise(None)
.alias("pre_open_share"),
pl.when(pl.col("total_fill_to_reg_close_pnl").abs() > 1e-9)
.then(pl.col("open_to_30m_pnl") / pl.col("total_fill_to_reg_close_pnl"))
.otherwise(None)
.alias("open_to_30m_share"),
pl.when(pl.col("total_fill_to_reg_close_pnl").abs() > 1e-9)
.then(pl.col("m30_to_close_pnl") / pl.col("total_fill_to_reg_close_pnl"))
.otherwise(None)
.alias("m30_to_close_share"),
)
.sort(["daypart", "strategy", "policy_id"])
)
policy_daypart_diagnostics = (
filled_horizon_summary
.join(
transition_summary.select(
"policy_id",
"daypart",
"share_pos_open_neg_close",
"share_pos_5m_neg_30m",
"share_neg_5m_pos_30m",
"share_neg_open_pos_close",
),
on=["policy_id", "daypart"],
how="left",
)
.join(
return_share_summary.select(
"policy_id",
"daypart",
"pre_open_share",
"open_to_30m_share",
"m30_to_close_share",
),
on=["policy_id", "daypart"],
how="left",
)
)
show_df(policy_daypart_diagnostics, n=40)| Loading ITables v2.7.3 from the internet... (need help?) |
show_df(
transition_summary.select(
"policy_id",
"daypart",
"n_fills",
"share_pos_open_neg_close",
"share_pos_5m_neg_30m",
"share_neg_5m_pos_30m",
"share_neg_open_pos_close",
),
n=30,
)
show_df(
return_share_summary.select(
"policy_id",
"daypart",
"n_fills",
"total_fill_to_reg_close_pnl",
"pre_open_share",
"open_to_30m_share",
"m30_to_close_share",
),
n=30,
)
plot_target_horizon_profile(filled_policy)
family_return_share = (
filled_policy.group_by(["policy_family", "daypart"])
.agg(
pl.col("fill_to_reg_close_pnl").sum().alias("total_fill_to_reg_close_pnl"),
pl.col("fill_to_open_pnl").sum().alias("pre_open_pnl"),
(pl.col("fill_to_30m_after_open_pnl") - pl.col("fill_to_open_pnl")).sum().alias("open_to_30m_pnl"),
(pl.col("fill_to_reg_close_pnl") - pl.col("fill_to_30m_after_open_pnl")).sum().alias("m30_to_close_pnl"),
)
.with_columns(
pl.when(pl.col("total_fill_to_reg_close_pnl").abs() > 1e-9)
.then(pl.col("pre_open_pnl") / pl.col("total_fill_to_reg_close_pnl"))
.otherwise(None)
.alias("pre_open_share"),
pl.when(pl.col("total_fill_to_reg_close_pnl").abs() > 1e-9)
.then(pl.col("open_to_30m_pnl") / pl.col("total_fill_to_reg_close_pnl"))
.otherwise(None)
.alias("open_to_30m_share"),
pl.when(pl.col("total_fill_to_reg_close_pnl").abs() > 1e-9)
.then(pl.col("m30_to_close_pnl") / pl.col("total_fill_to_reg_close_pnl"))
.otherwise(None)
.alias("m30_to_close_share"),
)
.with_columns(pl.col("policy_family").map_elements(strategy_display_name, return_dtype=pl.Utf8))
.sort(["daypart", "policy_family"])
)
if family_return_share.height:
plot_return_share_decomposition(family_return_share)
print(
f"Requested-notional sweep moves to the appendix. The joint-model dataset stays frozen at MODEL_NOTIONAL=${MODEL_NOTIONAL:,}."
)
capacity_curves = (
outcomes.group_by(["strategy", "requested_notional"])
.agg(
pl.col("filled_flag").mean().alias("fill_rate"),
pl.col("fill_fraction").mean().alias("avg_fill_fraction"),
pl.col("req_fill_to_30m_after_open_bps").mean().alias("avg_req_fill_to_30m_after_open_bps"),
pl.col("req_fill_to_reg_close_bps").mean().alias("avg_req_fill_to_reg_close_bps"),
pl.col("fill_to_30m_after_open_bps").mean().alias("avg_fill_to_30m_after_open_bps"),
pl.col("fill_to_reg_close_bps").mean().alias("avg_fill_to_reg_close_bps"),
pl.col("fill_to_reg_close_pnl").mean().alias("avg_fill_to_reg_close_pnl"),
)
.sort(["strategy", "requested_notional"])
)
show_df(capacity_curves, n=20)Requested-notional sweep moves to the appendix. The joint-model dataset stays frozen at MODEL_NOTIONAL=$2,500.
| Loading ITables v2.7.3 from the internet... (need help?) |
Keep the exploratory frontier view narrow here so the main all-policy frontier remains in the decision package.
frontier_df = (
model_policy_selected.group_by(["policy_id", "policy_family", "strategy", "latency_s", "offset_bps"])
.agg(
pl.len().alias("n_orders"),
pl.col("filled_flag").mean().alias("fill_rate"),
pl.col("fill_fraction").mean().alias("avg_fill_fraction"),
pl.col("adverse_fill_bps").mean().alias("avg_adverse_fill_bps"),
pl.col("req_fill_to_open_bps").mean().alias("avg_req_fill_to_open_bps"),
pl.col("req_fill_to_30m_after_open_bps").mean().alias("avg_req_fill_to_30m_after_open_bps"),
pl.col("req_fill_to_reg_close_bps").mean().alias("avg_req_fill_to_reg_close_bps"),
pl.col("req_markout_1m_bps").mean().alias("avg_req_markout_1m_bps"),
pl.col("req_markout_5m_bps").mean().alias("avg_req_markout_5m_bps"),
pl.col("req_fill_to_eod_bps").mean().alias("avg_req_fill_to_eod_bps"),
pl.col("req_fill_to_30m_after_open_bps").quantile(0.10).alias("p10_req_fill_to_30m_after_open_bps"),
)
.filter(pl.col("n_orders") >= MIN_ROWS_PER_POLICY)
.sort(["strategy", "latency_s", "offset_bps"])
)
frontier_df = add_frontier_flag(
frontier_df,
x_col="fill_rate",
y_col="avg_req_fill_to_30m_after_open_bps",
)
show_df(frontier_df, n=30)| Loading ITables v2.7.3 from the internet... (need help?) |
Exploratory frontier charts stay focused on within-family tradeoffs. The full all-policy frontier is shown later in section 8.4.
for family in ENTRY_STRATEGIES:
family_frontier = frontier_df.filter(pl.col("policy_family") == family)
if family_frontier.height == 0:
continue
family_frontier = family_frontier.with_columns(
pl.struct(["strategy", "offset_bps", "latency_s"]).map_elements(
lambda row: policy_variant_label(row["strategy"], row["offset_bps"], row["latency_s"]),
return_dtype=pl.Utf8,
).alias("policy_label")
)
plot_policy_frontier(
family_frontier,
x_col="fill_rate",
y_col="avg_req_fill_to_30m_after_open_bps",
color_col="avg_adverse_fill_bps",
label_col="policy_label",
title=f"Within-family frontier to 30m after open: {strategy_display_name(family)}",
cmap="viridis_r",
annotate_all=False,
)Use one compact family-level heatmap plus a best-variant summary for each policy family so the signal interaction answers are visible without a long sequence of per-latency charts.
if model_policy_selected.height == 0:
model_policy_signal = model_policy_selected
signal_policy_summary = pl.DataFrame(
schema={
"signal_bucket": pl.Utf8,
"policy_id": pl.Utf8,
"policy_family": pl.Utf8,
"strategy": pl.Utf8,
"latency_s": pl.Int64,
"offset_bps": pl.Int64,
"aggression_rank_within_family": pl.Float64,
"n_orders": pl.UInt32,
"fill_rate": pl.Float64,
"avg_fill_fraction": pl.Float64,
"avg_adverse_fill_bps": pl.Float64,
"avg_req_fill_to_30m_after_open_bps": pl.Float64,
}
)
emit_warning("Signal-policy interaction is empty because the selected modeling sample has no rows.")
else:
model_policy_signal = model_policy_selected.with_columns(
(pl.col("sentiment_score") * pl.col("direction_strength")).alias("signed_direction_strength")
)
signal_score_cols = [
"sentiment_score",
"signed_direction_strength",
"confidence",
"relevance",
"novelty",
"event_materiality",
"stock_prominence",
]
zscore_exprs = []
for col in signal_score_cols:
mean_val = model_policy_signal.get_column(col).mean()
std_val = model_policy_signal.get_column(col).std()
mean_val = 0.0 if mean_val is None or not np.isfinite(mean_val) else float(mean_val)
std_val = 1.0 if std_val is None or not np.isfinite(std_val) or float(std_val) == 0.0 else float(std_val)
zscore_exprs.append(
((pl.col(col) - mean_val) / std_val).clip(-3.0, 3.0).alias(f"z_{col}")
)
model_policy_signal = model_policy_signal.with_columns(zscore_exprs).with_columns(
pl.mean_horizontal([pl.col(f"z_{col}") for col in signal_score_cols]).alias("signal_score")
)
model_policy_signal = model_policy_signal.with_columns(
(
(
(pl.col("signal_score").rank(method="ordinal") - 1)
* SIGNAL_BUCKETS
/ pl.len()
)
.floor()
.clip(0, SIGNAL_BUCKETS - 1)
.cast(pl.Int64)
).alias("signal_bucket_idx")
)
model_policy_signal = model_policy_signal.with_columns(
pl.when(pl.col("signal_bucket_idx") == 0)
.then(pl.lit("low"))
.when(pl.col("signal_bucket_idx") == 1)
.then(pl.lit("med"))
.otherwise(pl.lit("high"))
.alias("signal_bucket")
)
signal_policy_summary = (
model_policy_signal.group_by(
[
"signal_bucket",
"policy_id",
"policy_family",
"strategy",
"latency_s",
"offset_bps",
"aggression_rank_within_family",
]
)
.agg(
pl.len().alias("n_orders"),
pl.col("filled_flag").mean().alias("fill_rate"),
pl.col("fill_fraction").mean().alias("avg_fill_fraction"),
pl.col("adverse_fill_bps").mean().alias("avg_adverse_fill_bps"),
pl.col("req_fill_to_30m_after_open_bps").mean().alias("avg_req_fill_to_30m_after_open_bps"),
)
.sort(["signal_bucket", "strategy", "latency_s", "offset_bps"])
)signal_bucket_coverage = (
model_policy_signal.group_by(["signal_bucket", "policy_id"])
.agg(pl.len().alias("n_orders"))
.sort(["signal_bucket", "policy_id"])
if model_policy_signal.height
else pl.DataFrame(schema={"signal_bucket": pl.Utf8, "policy_id": pl.Utf8, "n_orders": pl.UInt32})
)
thin_signal_cells = (
signal_bucket_coverage.filter(pl.col("n_orders") < MIN_ROWS_PER_SIGNAL_BUCKET)
if signal_bucket_coverage.height
else signal_bucket_coverage
)
if thin_signal_cells.height:
emit_warning(
f"{thin_signal_cells.height} signal_bucket x policy cells have fewer than {MIN_ROWS_PER_SIGNAL_BUCKET} rows."
)
show_df(signal_bucket_coverage, n=30)| Loading ITables v2.7.3 from the internet... (need help?) |
| Loading ITables v2.7.3 from the internet... (need help?) |
This notebook now stops at the modeling table. The goal is to freeze the policy/value target definition, action space, and walk-forward folds before any serious model fitting.
joint_model_df = load_cached_parquet("joint_model_df")
if joint_model_df is None:
if model_policy_signal.height == 0:
joint_model_df = pl.DataFrame()
emit_warning("joint_model_df is empty because the selected modeling sample has no rows.")
else:
supported_policies = (
model_policy_signal.group_by(["policy_id", "policy_family", "strategy", "latency_s", "offset_bps"])
.agg(pl.len().alias("n_rows"))
.filter(pl.col("n_rows") >= MIN_ROWS_PER_POLICY)
)
joint_model_base = (
model_policy_signal
.join(supported_policies.select("policy_id"), on="policy_id", how="inner")
.with_columns(
pl.col("prior_event_gap_s").fill_null(99_999),
pl.col("publish_ts").dt.date().alias("publish_date"),
pl.col("publish_ts").dt.strftime("%Y-%m").alias("publish_month"),
)
)
event_folds = (
joint_model_base.select("event_key", "publish_ts", "publish_date", "publish_month")
.unique(subset=["event_key"])
.sort("publish_ts")
.with_columns(
pl.col("publish_month").rank(method="dense").cast(pl.Int64).alias("walk_forward_fold")
)
.rename({"publish_month": "train_test_fold"})
)
joint_model_df = (
joint_model_base.join(
event_folds.select("event_key", "train_test_fold", "walk_forward_fold"),
on="event_key",
how="left",
)
.select(
"event_key",
"article_id",
"ticker",
"publish_ts",
"publish_date",
"train_test_fold",
"walk_forward_fold",
"policy_id",
"policy_family",
"aggression_rank_within_family",
"strategy",
"latency_s",
"offset_bps",
"requested_notional",
"spread_bps_at_decision",
"quote_age_ms_at_decision",
"ask_size_usd_at_decision",
"rolling_dollar_volume_60s_at_decision",
"vwap_dev_bps_at_decision",
"realized_vol_60s_bps_at_decision",
"prior_event_gap_s",
"segment",
"daypart",
"event_type",
"is_overlap_event",
"sentiment_score",
"confidence",
"relevance",
"direction_strength",
"novelty",
"event_materiality",
"stock_prominence",
"mention_count",
"headline_hit",
"snippet_count",
"signal_score",
"signal_bucket",
PRIMARY_VALUE_COL,
"primary_exit_bps",
"primary_exit_pnl",
"filled_flag",
"fill_fraction",
"adverse_fill_bps",
"fill_to_open_bps",
"fill_to_30m_after_open_bps",
"fill_to_reg_close_bps",
"fill_to_eod_bps",
"markout_1m_bps",
"markout_5m_bps",
"req_fill_to_open_bps",
"req_fill_to_reg_close_bps",
"req_markout_1m_bps",
"req_markout_5m_bps",
"req_fill_to_eod_bps",
)
.sort(["publish_ts", "event_key", "policy_id"])
)
write_cached_parquet(joint_model_df, "joint_model_df")
joint_model_summary = (
pl.DataFrame(
{
"selected_sample": [selected_policy_sample],
"model_notional": [MODEL_NOTIONAL],
"primary_exit": [PRIMARY_EXIT],
"primary_value_col": [PRIMARY_VALUE_COL],
"n_rows": [joint_model_df.height],
"n_events": [joint_model_df.get_column("event_key").n_unique() if joint_model_df.height else 0],
"n_policies": [joint_model_df.get_column("policy_id").n_unique() if joint_model_df.height else 0],
}
)
if isinstance(joint_model_df, pl.DataFrame)
else pl.DataFrame()
)
joint_model_preview = (
joint_model_df.select(
"event_key",
"policy_id",
"signal_bucket",
PRIMARY_VALUE_COL,
"filled_flag",
"fill_fraction",
"train_test_fold",
"walk_forward_fold",
)
if joint_model_df.height
else pl.DataFrame()
)Loading cache: data/eda_04/joint_model_df.parquet
| Loading ITables v2.7.3 from the internet... (need help?) |
| Loading ITables v2.7.3 from the internet... (need help?) |
if joint_model_df.height:
rows_by_fold = (
joint_model_df.group_by(["train_test_fold", "walk_forward_fold"])
.agg(
pl.len().alias("n_rows"),
pl.col("event_key").n_unique().alias("n_events"),
)
.sort("walk_forward_fold")
)
rows_by_policy = (
joint_model_df.group_by(["policy_id", "policy_family", "strategy", "latency_s", "offset_bps"])
.agg(pl.len().alias("n_rows"))
.sort(["strategy", "latency_s", "offset_bps"])
)
rows_by_signal_policy = (
joint_model_df.group_by(["signal_bucket", "policy_id"])
.agg(pl.len().alias("n_rows"))
.sort(["signal_bucket", "n_rows"], descending=[False, True])
)
avg_target_by_fold = (
joint_model_df.group_by(["train_test_fold", "walk_forward_fold"])
.agg(
pl.col(PRIMARY_VALUE_COL).mean().alias("avg_primary_value"),
pl.col("filled_flag").mean().alias("fill_rate"),
)
.sort("walk_forward_fold")
)
coverage_cols = [
PRIMARY_VALUE_COL,
"primary_exit_bps",
"primary_exit_pnl",
"fill_to_reg_close_bps",
"fill_to_eod_bps",
"markout_5m_bps",
]
target_coverage = pl.DataFrame(
{
"target": coverage_cols,
"non_null_rows": [
joint_model_df.filter(pl.col(col).is_not_null()).height
for col in coverage_cols
],
"coverage_rate": [
joint_model_df.filter(pl.col(col).is_not_null()).height / joint_model_df.height
for col in coverage_cols
],
}
)
plot_fold_diagnostics(rows_by_fold, avg_target_by_fold, PRIMARY_VALUE_COL)
plot_histogram(
joint_model_df,
PRIMARY_VALUE_COL,
hue="policy_family",
title=f"Distribution of {PRIMARY_VALUE_COL}",
bins=40,
stat="density",
)
corr_cols = [
PRIMARY_VALUE_COL,
"req_fill_to_reg_close_bps",
"filled_flag",
"fill_fraction",
"adverse_fill_bps",
"fill_to_reg_close_bps",
"markout_5m_bps",
]
corr_pdf = joint_model_df.select(corr_cols).drop_nulls().to_pandas()
if not corr_pdf.empty:
plot_correlation_snapshot(corr_pdf)
else:
emit_warning("Dataset sanity checks skipped because joint_model_df is empty.")
rows_by_fold = pl.DataFrame(
schema={"train_test_fold": pl.Utf8, "walk_forward_fold": pl.Int64, "n_rows": pl.UInt32, "n_events": pl.UInt32}
)
rows_by_policy = pl.DataFrame(
schema={
"policy_id": pl.Utf8,
"policy_family": pl.Utf8,
"strategy": pl.Utf8,
"latency_s": pl.Int64,
"offset_bps": pl.Int64,
"n_rows": pl.UInt32,
}
)
rows_by_signal_policy = pl.DataFrame(schema={"signal_bucket": pl.Utf8, "policy_id": pl.Utf8, "n_rows": pl.UInt32})
avg_target_by_fold = pl.DataFrame(
schema={"train_test_fold": pl.Utf8, "walk_forward_fold": pl.Int64, "avg_primary_value": pl.Float64,
"fill_rate": pl.Float64}
)
target_coverage = pl.DataFrame(
schema={"target": pl.Utf8, "non_null_rows": pl.Int64, "coverage_rate": pl.Float64}
)| Loading ITables v2.7.3 from the internet... (need help?) |
| Loading ITables v2.7.3 from the internet... (need help?) |
| Loading ITables v2.7.3 from the internet... (need help?) |
| Loading ITables v2.7.3 from the internet... (need help?) |
| Loading ITables v2.7.3 from the internet... (need help?) |
This final section turns the diagnostic notebook into a decision-ready package: a per-policy scoreboard, monthly stability checks, regime maps, frontier views, explicit stress tests, and a literal gating decision.
decision_policy_enriched = load_cached_parquet("decision_policy_enriched")
if decision_policy_enriched is None:
decision_policy_enriched = model_policy_selected.with_columns(
pl.when(pl.col("filled_flag") == 1)
.then((pl.col("fill_ts") - pl.col("publish_ts")).dt.total_seconds())
.otherwise(None)
.alias("fill_delay_s"),
pl.when(pl.col("filled_flag") == 1)
.then((pl.col("fill_ts").dt.time() >= time(9, 20)).cast(pl.Int8))
.otherwise(None)
.alias("fill_after_920_flag"),
pl.when(pl.col("requested_notional") > 0)
.then(pl.col("filled_notional") / pl.col("requested_notional"))
.otherwise(0.0)
.fill_nan(0.0)
.fill_null(0.0)
.alias("deployed_notional_share"),
pl.coalesce([pl.col("fill_px"), pl.col("anchor_px")]).alias("decision_price"),
pl.when(pl.col("anchor_px").is_not_null() & pl.col("m30_after_open_px").is_not_null())
.then(((pl.col("m30_after_open_px") / pl.col("anchor_px")) - 1.0) * 10_000.0)
.otherwise(None)
.alias("available_alpha_30m_bps"),
pl.when(pl.col("requested_notional") > 0)
.then(pl.col("latency_s") * pl.col("rolling_dollar_volume_60s_at_decision") / 60.0 / pl.col("requested_notional"))
.otherwise(None)
.alias("turnover_since_release_x"),
).with_columns(
pl.col("decision_price").cut(
breaks=[5.0, 10.0, 25.0, 50.0, 100.0],
labels=["<=5", "5-10", "10-25", "25-50", "50-100", ">100"],
).alias("price_bucket"),
pl.when(pl.col("publish_ts").dt.time() <= time(8, 0))
.then(pl.lit("<=08:00"))
.when(pl.col("publish_ts").dt.time() <= time(9, 30))
.then(pl.lit("08:00-09:30"))
.when(pl.col("publish_ts").dt.time() <= time(11, 0))
.then(pl.lit("09:30-11:00"))
.when(pl.col("publish_ts").dt.time() <= time(14, 0))
.then(pl.lit("11:00-14:00"))
.when(pl.col("publish_ts").dt.time() <= time(16, 0))
.then(pl.lit("14:00-16:00"))
.otherwise(pl.lit(">16:00"))
.alias("release_time_bucket"),
pl.col("rolling_dollar_volume_60s_at_decision").cut(
breaks=[50_000, 100_000, 250_000, 500_000],
labels=["<=50k", "50k-100k", "100k-250k", "250k-500k", ">500k"],
).alias("update_density_bucket"),
pl.col("turnover_since_release_x").cut(
breaks=[1.0, 2.0, 5.0, 10.0],
labels=["<=1x", "1x-2x", "2x-5x", "5x-10x", ">10x"],
).alias("turnover_since_release_bucket"),
pl.when(pl.col("available_alpha_30m_bps").abs() >= 5.0)
.then((pl.col("req_fill_to_30m_after_open_bps") / pl.col("available_alpha_30m_bps")).clip(-2.0, 2.0))
.otherwise(None)
.alias("alpha_capture_ratio_30m"),
(pl.col("primary_exit_pnl") - pl.col("filled_notional") * 5.0 / 10_000.0).alias("stress_worse_fill_pnl"),
(
pl.col("primary_exit_pnl")
- pl.col("filled_notional") * (3.0 + pl.col("quote_age_ms_at_decision") / 1000.0) / 10_000.0
).alias("stress_worse_quote_pnl"),
pl.when(pl.col("quote_age_ms_at_decision") <= 750)
.then(pl.col("primary_exit_pnl"))
.otherwise(0.0)
.alias("stress_no_stale_quote_pnl"),
)
write_cached_parquet(decision_policy_enriched, "decision_policy_enriched")
policy_scoreboard_schema = {
"policy_id": pl.Utf8,
"policy_family": pl.Utf8,
"strategy": pl.Utf8,
"latency_s": pl.Int64,
"offset_bps": pl.Int64,
"trade_count": pl.UInt32,
"fill_rate": pl.Float64,
"deployed_notional_pct": pl.Float64,
"avg_fill_time_s": pl.Float64,
"pct_fills_after_920": pl.Float64,
"avg_realized_entry_slippage_bps": pl.Float64,
"avg_pnl_open": pl.Float64,
"avg_pnl_30m_after_open": pl.Float64,
"avg_pnl_reg_close": pl.Float64,
"avg_pnl_eod": pl.Float64,
"avg_primary_exit_pnl": pl.Float64,
"median_pnl": pl.Float64,
"win_rate_pct": pl.Float64,
"p10_pnl": pl.Float64,
"p05_pnl": pl.Float64,
"worst_trade_pnl": pl.Float64,
"avg_alpha_capture_ratio": pl.Float64,
"avg_req_fill_to_30m_after_open_bps": pl.Float64,
"p10_req_fill_to_30m_after_open_bps": pl.Float64,
"price_quality_bps": pl.Float64,
"slightly_worse_fills_pnl": pl.Float64,
"worse_quote_handling_pnl": pl.Float64,
"tighter_no_stale_quote_pnl": pl.Float64,
"later_start_pnl": pl.Float64,
"stress_test_pnl": pl.Float64,
}
monthly_top_policy_schema = {
"policy_id": pl.Utf8,
"policy_label": pl.Utf8,
"strategy": pl.Utf8,
"latency_s": pl.Int64,
"offset_bps": pl.Int64,
"train_test_fold": pl.Utf8,
"monthly_expectancy_pnl": pl.Float64,
"monthly_expectancy_bps": pl.Float64,
"monthly_fill_rate": pl.Float64,
"monthly_trade_count": pl.UInt32,
}
regime_policy_schema = {
"policy_id": pl.Utf8,
"policy_label": pl.Utf8,
"bucket": pl.Utf8,
"trade_count": pl.UInt32,
"fill_rate": pl.Float64,
"avg_primary_exit_pnl": pl.Float64,
"avg_alpha_capture_ratio": pl.Float64,
}
if decision_policy_enriched.height == 0:
policy_scoreboard = pl.DataFrame(schema=policy_scoreboard_schema)
monthly_top_policy = pl.DataFrame(schema=monthly_top_policy_schema)
top_policy_scoreboard = policy_scoreboard
top_policy_ids = []
regime_dimensions = [
("spread_bucket", "Spread bucket"),
("quote_age_bucket", "Quote freshness bucket"),
("update_density_bucket", "Update density bucket"),
("turnover_since_release_bucket", "Turnover since release bucket"),
("price_bucket", "Price bucket"),
("release_time_bucket", "Release time bucket"),
]
empty_regime = pl.DataFrame(
schema={
"bucket": pl.Utf8,
"trade_count": pl.UInt32,
"fill_rate": pl.Float64,
"avg_primary_exit_pnl": pl.Float64,
"avg_alpha_capture_ratio": pl.Float64,
}
)
regime_breakdowns = {bucket_col: empty_regime for bucket_col, _ in regime_dimensions}
regime_policy_breakdowns = {bucket_col: pl.DataFrame(schema=regime_policy_schema) for bucket_col, _ in
regime_dimensions}
regime_focus_lines = []
else:
policy_group_cols = ["policy_id", "policy_family", "strategy", "latency_s", "offset_bps"]
policy_scoreboard = (
decision_policy_enriched.group_by(policy_group_cols)
.agg(
pl.len().alias("trade_count"),
pl.col("filled_flag").mean().alias("fill_rate"),
(100.0 * pl.col("deployed_notional_share").mean()).alias("deployed_notional_pct"),
pl.col("fill_delay_s").mean().alias("avg_fill_time_s"),
(100.0 * pl.col("fill_after_920_flag").cast(pl.Float64).mean()).alias("pct_fills_after_920"),
pl.col("adverse_fill_bps").mean().alias("avg_realized_entry_slippage_bps"),
pl.col("fill_to_open_pnl").mean().alias("avg_pnl_open"),
pl.col("fill_to_30m_after_open_pnl").mean().alias("avg_pnl_30m_after_open"),
pl.col("fill_to_reg_close_pnl").mean().alias("avg_pnl_reg_close"),
pl.col("fill_to_eod_pnl").mean().alias("avg_pnl_eod"),
pl.col("primary_exit_pnl").mean().alias("avg_primary_exit_pnl"),
pl.col("primary_exit_pnl").median().alias("median_pnl"),
(100.0 * pl.col("primary_exit_pnl").gt(0.0).cast(pl.Float64).mean()).alias("win_rate_pct"),
pl.col("primary_exit_pnl").quantile(0.10).alias("p10_pnl"),
pl.col("primary_exit_pnl").quantile(0.05).alias("p05_pnl"),
pl.col("primary_exit_pnl").min().alias("worst_trade_pnl"),
pl.col("alpha_capture_ratio_30m").mean().alias("avg_alpha_capture_ratio"),
pl.col("req_fill_to_30m_after_open_bps").mean().alias("avg_req_fill_to_30m_after_open_bps"),
pl.col("req_fill_to_30m_after_open_bps").quantile(0.10).alias("p10_req_fill_to_30m_after_open_bps"),
)
)
stress_policy_summary = (
decision_policy_enriched.group_by(policy_group_cols)
.agg(
pl.col("stress_worse_fill_pnl").mean().alias("slightly_worse_fills_pnl"),
pl.col("stress_worse_quote_pnl").mean().alias("worse_quote_handling_pnl"),
pl.col("stress_no_stale_quote_pnl").mean().alias("tighter_no_stale_quote_pnl"),
)
)
later_start_lookup = policy_scoreboard.select(
"strategy",
"offset_bps",
pl.col("latency_s").alias("later_latency_s"),
pl.col("avg_primary_exit_pnl").alias("later_start_pnl"),
)
stress_policy_summary = (
stress_policy_summary.join(
policy_scoreboard.select(policy_group_cols + ["avg_primary_exit_pnl"]),
on=policy_group_cols,
how="left",
)
.with_columns(
pl.when(pl.col("latency_s") == LATENCIES_S[0])
.then(pl.lit(LATENCIES_S[min(1, len(LATENCIES_S) - 1)]))
.when(pl.col("latency_s") == LATENCIES_S[1])
.then(pl.lit(LATENCIES_S[min(2, len(LATENCIES_S) - 1)]))
.when(pl.col("latency_s") == LATENCIES_S[2])
.then(pl.lit(LATENCIES_S[min(3, len(LATENCIES_S) - 1)]))
.otherwise(pl.lit(LATENCIES_S[-1]))
.alias("later_latency_s")
)
.join(
later_start_lookup,
on=["strategy", "offset_bps", "later_latency_s"],
how="left",
)
.with_columns(
pl.coalesce([pl.col("later_start_pnl"), pl.col("avg_primary_exit_pnl")]).alias("later_start_pnl"),
pl.min_horizontal(
"slightly_worse_fills_pnl",
"worse_quote_handling_pnl",
"tighter_no_stale_quote_pnl",
"later_start_pnl",
).alias("stress_test_pnl"),
)
.select(policy_group_cols + [
"slightly_worse_fills_pnl",
"worse_quote_handling_pnl",
"tighter_no_stale_quote_pnl",
"later_start_pnl",
"stress_test_pnl",
])
)
policy_scoreboard = (
policy_scoreboard.join(stress_policy_summary, on=policy_group_cols, how="left")
.with_columns((-pl.col("avg_realized_entry_slippage_bps")).alias("price_quality_bps"))
.sort(["stress_test_pnl", "avg_primary_exit_pnl", "fill_rate"], descending=[True, True, True])
)
top_policy_count = min(5, policy_scoreboard.height)
top_policy_scoreboard = policy_scoreboard.head(top_policy_count)
top_policy_ids = top_policy_scoreboard.get_column("policy_id").to_list()
top_policy_meta = ensure_policy_label(top_policy_scoreboard).select(
"policy_id",
"policy_label",
"strategy",
"latency_s",
"offset_bps",
) if top_policy_scoreboard.height else pl.DataFrame(schema={
"policy_id": pl.Utf8,
"policy_label": pl.Utf8,
"strategy": pl.Utf8,
"latency_s": pl.Int64,
"offset_bps": pl.Int64,
})
if joint_model_df.height and top_policy_ids:
monthly_top_policy = (
joint_model_df.filter(pl.col("policy_id").is_in(top_policy_ids))
.group_by(["policy_id", "train_test_fold"])
.agg(
pl.col("primary_exit_pnl").mean().alias("monthly_expectancy_pnl"),
pl.col(PRIMARY_VALUE_COL).mean().alias("monthly_expectancy_bps"),
pl.col("filled_flag").mean().alias("monthly_fill_rate"),
pl.len().alias("monthly_trade_count"),
)
.join(top_policy_meta, on="policy_id", how="left")
.sort(["policy_label", "train_test_fold"])
.select(
"policy_id",
"policy_label",
"strategy",
"latency_s",
"offset_bps",
"train_test_fold",
"monthly_expectancy_pnl",
"monthly_expectancy_bps",
"monthly_fill_rate",
"monthly_trade_count",
)
)
else:
monthly_top_policy = pl.DataFrame(schema=monthly_top_policy_schema)
regime_dimensions = [
("spread_bucket", "Spread bucket"),
("quote_age_bucket", "Quote freshness bucket"),
("update_density_bucket", "Update density bucket"),
("turnover_since_release_bucket", "Turnover since release bucket"),
("price_bucket", "Price bucket"),
("release_time_bucket", "Release time bucket"),
]
decision_top_policy = decision_policy_enriched.filter(pl.col("policy_id").is_in(top_policy_ids))
regime_breakdowns = {}
regime_policy_breakdowns = {}
regime_focus_lines = []
for bucket_col, bucket_label in regime_dimensions:
if decision_top_policy.height == 0:
bucket_df = pl.DataFrame(
schema={
"bucket": pl.Utf8,
"trade_count": pl.UInt32,
"fill_rate": pl.Float64,
"avg_primary_exit_pnl": pl.Float64,
"avg_alpha_capture_ratio": pl.Float64,
}
)
bucket_policy_df = pl.DataFrame(schema=regime_policy_schema)
else:
bucket_df = (
decision_top_policy.group_by(bucket_col)
.agg(
pl.len().alias("trade_count"),
pl.col("filled_flag").mean().alias("fill_rate"),
pl.col("primary_exit_pnl").mean().alias("avg_primary_exit_pnl"),
pl.col("alpha_capture_ratio_30m").mean().alias("avg_alpha_capture_ratio"),
)
.rename({bucket_col: "bucket"})
.sort(["avg_primary_exit_pnl", "fill_rate"], descending=[True, True])
)
bucket_policy_df = (
decision_top_policy.group_by(["policy_id", bucket_col])
.agg(
pl.len().alias("trade_count"),
pl.col("filled_flag").mean().alias("fill_rate"),
pl.col("primary_exit_pnl").mean().alias("avg_primary_exit_pnl"),
pl.col("alpha_capture_ratio_30m").mean().alias("avg_alpha_capture_ratio"),
)
.rename({bucket_col: "bucket"})
.join(top_policy_meta.select("policy_id", "policy_label"), on="policy_id", how="left")
.sort(["bucket", "policy_label"])
.select(
"policy_id",
"policy_label",
"bucket",
"trade_count",
"fill_rate",
"avg_primary_exit_pnl",
"avg_alpha_capture_ratio",
)
)
regime_breakdowns[bucket_col] = bucket_df
regime_policy_breakdowns[bucket_col] = bucket_policy_df
if bucket_df.height:
focus = bucket_df.row(0, named=True)
if focus["avg_primary_exit_pnl"] is not None:
regime_focus_lines.append(
f"{bucket_label}: {focus['bucket']} (avg pnl={focus['avg_primary_exit_pnl']:.2f}, fill={focus['fill_rate']:.2%})"
)
write_cached_parquet(policy_scoreboard, "policy_scoreboard")Loading cache: data/eda_04/decision_policy_enriched.parquet
Saved cache: data/eda_04/policy_scoreboard.parquet
| Loading ITables v2.7.3 from the internet... (need help?) |
Policy scoreboard at MODEL_NOTIONAL=$2,500 on the clean modeling sample.
show_df(
policy_scoreboard.select(
"policy_id",
"strategy",
"latency_s",
"offset_bps",
"trade_count",
"fill_rate",
"deployed_notional_pct",
"avg_fill_time_s",
"pct_fills_after_920",
"avg_realized_entry_slippage_bps",
"avg_req_fill_to_30m_after_open_bps",
"p10_req_fill_to_30m_after_open_bps",
"avg_alpha_capture_ratio",
"median_pnl",
"win_rate_pct",
"p10_pnl",
"p05_pnl",
"worst_trade_pnl",
"price_quality_bps",
"stress_test_pnl",
),
n=40,
)| Loading ITables v2.7.3 from the internet... (need help?) |
Monthly stability heatmaps for the current top stress-adjusted policies, plus the underlying table.
Per-policy heatmaps first, then the combined regime tables used for the final gating readout.
plot_regime_policy_metric_grid(
regime_policy_breakdowns,
regime_dimensions,
value_col="avg_primary_exit_pnl",
title="Regime map: average primary-exit PnL by top policy",
cmap="RdYlGn",
fmt=".2f",
center=0.0,
)
plot_regime_policy_metric_grid(
regime_policy_breakdowns,
regime_dimensions,
value_col="fill_rate",
title="Regime map: fill rate by top policy",
cmap="YlGnBu",
fmt=".0%",
vmin=0.0,
vmax=1.0,
)
for bucket_col, bucket_label in regime_dimensions:
display.display(display.Markdown(f"#### {bucket_label}"))
display.display(regime_breakdowns[bucket_col].head(12))| Loading ITables v2.7.3 from the internet... (need help?) |
| Loading ITables v2.7.3 from the internet... (need help?) |
| Loading ITables v2.7.3 from the internet... (need help?) |
| Loading ITables v2.7.3 from the internet... (need help?) |
| Loading ITables v2.7.3 from the internet... (need help?) |
| Loading ITables v2.7.3 from the internet... (need help?) |
Primary-target, downside-aware, and stress frontiers for the full policy set, followed by alpha-capture as a supporting view.
if policy_scoreboard.height:
frontier_scoreboard = ensure_policy_label(policy_scoreboard)
frontier_label_col = "policy_label" if "policy_label" in frontier_scoreboard.columns else "policy_id"
primary_frontier = add_frontier_flag(
frontier_scoreboard,
x_col="fill_rate",
y_col="avg_req_fill_to_30m_after_open_bps",
)
plot_policy_frontier(
primary_frontier,
x_col="fill_rate",
y_col="avg_req_fill_to_30m_after_open_bps",
color_col="stress_test_pnl",
label_col=frontier_label_col,
title="Frontier: fill rate vs requested EV to 30m after open",
cmap="RdYlGn",
cbar_label="Stress test pnl",
)
downside_frontier = add_frontier_flag(
frontier_scoreboard,
x_col="fill_rate",
y_col="p10_req_fill_to_30m_after_open_bps",
)
plot_policy_frontier(
downside_frontier,
x_col="fill_rate",
y_col="p10_req_fill_to_30m_after_open_bps",
color_col="avg_req_fill_to_30m_after_open_bps",
label_col=frontier_label_col,
title="Downside frontier: fill rate vs 10th pct requested EV",
cmap="RdYlGn",
cbar_label="Average requested EV",
)
stress_frontier = add_frontier_flag(
frontier_scoreboard,
x_col="fill_rate",
y_col="stress_test_pnl",
)
plot_policy_frontier(
stress_frontier,
x_col="fill_rate",
y_col="stress_test_pnl",
color_col="avg_req_fill_to_30m_after_open_bps",
label_col=frontier_label_col,
title="Stress frontier: fill rate vs stress-test pnl",
cmap="RdYlGn",
cbar_label="Average requested EV",
)
alpha_frontier = add_frontier_flag(
frontier_scoreboard,
x_col="fill_rate",
y_col="avg_alpha_capture_ratio",
)
plot_policy_frontier(
alpha_frontier,
x_col="fill_rate",
y_col="avg_alpha_capture_ratio",
color_col="price_quality_bps",
label_col=frontier_label_col,
title="Support view: fill rate vs alpha-capture ratio",
cmap="viridis_r",
cbar_label="Price quality (bps)",
)Top policies under slightly worse fills, later start, worse quote handling, and tighter no-stale-quote rules.”
The notebook ends with a literal gate so the research can be advanced, narrowed to regimes, held back, or stopped.
if policy_scoreboard.height == 0:
final_gating_decision = "Do not advance yet"
final_gating_reason = "The selected modeling sample has no policy rows after the current filters."
final_gating_regimes = []
else:
best_policy = top_policy_scoreboard.row(0, named=True)
best_monthly = monthly_top_policy.filter(
pl.col("policy_id") == best_policy["policy_id"]) if monthly_top_policy.height else pl.DataFrame()
positive_month_share = (
best_monthly.select((pl.col("monthly_expectancy_pnl") > 0.0).cast(pl.Float64).mean()).item()
if best_monthly.height
else 0.0
)
robust_positive = (best_policy["stress_test_pnl"] or 0.0) > 0.0
baseline_positive = (best_policy["avg_primary_exit_pnl"] or 0.0) > 0.0
fill_ok = (best_policy["fill_rate"] or 0.0) >= 0.85
monthly_ok = positive_month_share >= 0.50
if robust_positive and fill_ok and monthly_ok:
final_gating_decision = "Proceed"
elif baseline_positive and ((best_policy["stress_test_pnl"] or 0.0) > -5.0 or monthly_ok):
final_gating_decision = "Proceed, but only in these regimes"
elif baseline_positive:
final_gating_decision = "Do not advance yet"
else:
final_gating_decision = "Abandon this direction"
final_gating_reason = (
f"Best policy {best_policy['policy_id']} has avg primary-exit pnl={best_policy['avg_primary_exit_pnl']:.2f}, "
f"stress floor={best_policy['stress_test_pnl']:.2f}, fill rate={best_policy['fill_rate']:.2%}, "
f"and positive-month share={positive_month_share:.2%}."
)
final_gating_regimes = regime_focus_lines[:4]
summary_lines = [
"## Final gating decision",
"",
final_gating_decision,
"",
final_gating_reason,
]
if final_gating_regimes:
summary_lines.extend(["", "Focus regimes:"] + [f"- {line}" for line in final_gating_regimes])
display.display(display.Markdown("\n".join(summary_lines)))Abandon this direction
Best policy vwap_minus_1std|lat=90|off=0|n=2500 has avg primary-exit pnl=-21.99, stress floor=-109.76, fill rate=51.47%, and positive-month share=0.00%.
Focus regimes: - Spread bucket: 5-10 (avg pnl=-6.73, fill=71.06%) - Quote freshness bucket: 1500-3000 (avg pnl=-14.33, fill=67.47%) - Update density bucket: <=50k (avg pnl=-18.13, fill=48.13%) - Turnover since release bucket: <=1x (avg pnl=-12.54, fill=40.94%)
These charts keep secondary diagnostics available without breaking the main narrative flow, including execution-assumption comparisons, the requested-notional capacity sweep, and setup diagnostics.
execution_assumption_summary = pl.concat([
model_policy_selected.group_by("policy_family").agg(
pl.col("filled_flag").mean().alias("fill_rate"),
pl.col("adverse_fill_bps").mean().alias("avg_adverse_fill_bps"),
pl.col("req_fill_to_30m_after_open_bps").mean().alias("avg_req_fill_to_30m_after_open_bps"),
).with_columns(pl.lit("Conservative ask").alias("assumption")),
model_policy_selected.group_by("policy_family").agg(
pl.col("inside25_filled_flag").mean().alias("fill_rate"),
pl.col("inside25_adverse_fill_bps").mean().alias("avg_adverse_fill_bps"),
pl.col("inside25_req_fill_to_30m_after_open_bps").mean().alias("avg_req_fill_to_30m_after_open_bps"),
).with_columns(pl.lit("Ask - 25% spread").alias("assumption")),
], how="vertical_relaxed").with_columns(
pl.col("policy_family").map_elements(strategy_display_name, return_dtype=pl.Utf8).alias("policy_family_label")
).sort(["policy_family_label", "assumption"])
display.display(display.Markdown("### Appendix: execution-assumption comparison"))
plot_execution_assumption_comparison(execution_assumption_summary)
show_df(execution_assumption_summary, n=12)
plot_capacity_overview(capacity_curves)
show_df(capacity_curves, n=20)
plot_feature_run_mix(selected_feature_runs)
plot_histogram(
event_state,
"vwap_dev_bps",
hue="daypart",
title="Appendix: VWAP deviation at decision by premarket daypart",
bins=30,
stat="density",
)
plot_context_diagnostics(policy)