15  The Data Science Pipeline for Optimization

NoteLearning Objectives
  • Design a reproducible data pipeline from raw data to OR solver input
  • Apply data validation and schema enforcement with Pandera
  • Engineer features suited to OR parameter estimation
  • Serialize fitted models and cache expensive computations with joblib
  • Structure a project so the ML layer and the OR layer are cleanly separated
  • Diagnose and repair common failure modes at the ML → OR handoff

15.1 The Pipeline as Infrastructure

There is a temptation, especially in early-stage OR projects, to treat the data pipeline as a formality — a few lines of pandas before the interesting work begins. This is a mistake with consequences. A model that produces the wrong demand forecast propagates the error into the inventory decision, the schedule, the capital plan. Garbage in, garbage out is not a cliché; it is an engineering failure mode.

The pipeline is not plumbing. It is load-bearing structure.

J.E. Gordon, writing about the mechanics of materials, observed that failures almost never originate in the main structural member — they originate at the joint, the interface, the transition between one material and another. The same pattern repeats in computational pipelines. The LP solver is rarely the source of a bad decision. The error lives upstream, at the seam between the data and the model, or between the model and the solver. A well-engineered pipeline makes these seams explicit, validated, and testable.

This chapter builds that pipeline from scratch: raw tabular data to a validated, serialized ML model whose outputs feed cleanly into an OR formulation. The techniques are domain-agnostic — they apply equally to supply chain demand forecasting, scheduling duration prediction, and energy dispatch.


15.2 Anatomy of an OR Pipeline

An end-to-end OR pipeline has five distinct stages, each with its own failure modes:

Code
import plotly.graph_objects as go

stages = [
    "1. Ingest &\nValidate",
    "2. Feature\nEngineering",
    "3. Train &\nEvaluate",
    "4. ML → OR\nHandoff",
    "5. Solve &\nAnalyse",
]
x_pos = [0, 1, 2, 3, 4]
colors = ["#4e79a7", "#f28e2b", "#59a14f", "#e15759", "#76b7b2"]

fig = go.Figure()

for i, (stage, x, col) in enumerate(zip(stages, x_pos, colors)):
    fig.add_shape(type="rect",
        x0=x - 0.38, x1=x + 0.38, y0=0.2, y1=0.8,
        fillcolor=col, opacity=0.85, line_color="white", line_width=2)
    fig.add_annotation(x=x, y=0.5, text=stage,
        showarrow=False, font=dict(color="white", size=12),
        align="center")
    if i < len(stages) - 1:
        fig.add_annotation(
            x=x + 0.42, y=0.5, ax=x + 0.58, ay=0.5,
            xref="x", yref="y", axref="x", ayref="y",
            showarrow=True, arrowhead=2, arrowsize=1.4,
            arrowcolor="#555", arrowwidth=2)

artifacts = [
    "DataFrame +\nschema",
    "Feature\nmatrix X",
    "Fitted\nmodel",
    "OR\nparameters",
    "Decision +\nsensitivity",
]
for x, art in zip(x_pos, artifacts):
    fig.add_annotation(x=x, y=0.05, text=f"<i>{art}</i>",
        showarrow=False, font=dict(color="#444", size=10), align="center")

fig.update_layout(
    xaxis=dict(visible=False, range=[-0.6, 4.6]),
    yaxis=dict(visible=False, range=[-0.1, 1.0]),
    height=220, margin=dict(l=10, r=10, t=10, b=10),
    plot_bgcolor="white", paper_bgcolor="white")
fig.show()
Figure 15.1: Five-stage OR pipeline. Each stage produces a validated artifact consumed by the next. Failures at the ML→OR interface (Stage 4) are the most common and hardest to diagnose.

Stage 4 — the ML → OR handoff — deserves special attention. It is where a regression output (a float) must be reconciled with an OR parameter (which may have integrality constraints, non-negativity requirements, or coupling with other parameters). An unchecked handoff produces infeasible models and silent errors.


15.3 Stage 1: Data Ingestion and Validation

15.3.1 Why Validate Before Modelling?

A clean training set is not a guarantee that production data will arrive clean. Schema drift — columns renamed, units changed, a categorical level added — breaks pipelines silently. Pandera enforces a schema at ingestion time, turning silent errors into loud ones.

Code
import numpy as np
import pandas as pd
import pandera as pa
from pandera import Column, DataFrameSchema, Check
import warnings
warnings.filterwarnings("ignore")

rng = np.random.default_rng(42)
n = 1_500

# --- synthetic manufacturing scheduling dataset ---
job_ids    = np.arange(1, n + 1)
machines   = rng.choice(["M1", "M2", "M3"], size=n)
operators  = rng.choice(["junior", "senior", "lead"], size=n, p=[0.4, 0.4, 0.2])
complexity = rng.integers(1, 6, size=n)
setup_min  = rng.uniform(5, 30, size=n)
true_duration = (
    8 * complexity
    + np.where(operators == "junior", 12, np.where(operators == "senior", 6, 2))
    + setup_min * 0.3
    + rng.normal(0, 4, size=n)
).clip(5, 120)

raw_df = pd.DataFrame({
    "job_id":       job_ids,
    "machine":      machines,
    "operator":     operators,
    "complexity":   complexity,
    "setup_min":    setup_min.round(1),
    "duration_min": true_duration.round(1),
})

# --- schema definition ---
job_schema = DataFrameSchema({
    "job_id":       Column(int,   Check.greater_than(0), nullable=False),
    "machine":      Column(str,   Check.isin(["M1", "M2", "M3"])),
    "operator":     Column(str,   Check.isin(["junior", "senior", "lead"])),
    "complexity":   Column(int,   Check(lambda s: s.between(1, 5).all())),
    "setup_min":    Column(float, Check.greater_than_or_equal_to(0)),
    "duration_min": Column(float, Check.greater_than(0)),
})

validated_df = job_schema.validate(raw_df)
print(f"Schema validated: {len(validated_df):,} rows, {validated_df.shape[1]} columns")
print(validated_df.dtypes.to_string())
Schema validated: 1,500 rows, 6 columns
job_id            int64
machine          object
operator         object
complexity        int64
setup_min       float64
duration_min    float64

15.3.2 Handling Schema Failures

When validation fails, the error is specific: which column, which rows, which check. This is far superior to discovering a downstream infeasibility in the solver and tracing it back to a negative processing time.

Code
bad_row = pd.DataFrame([{
    "job_id": -1, "machine": "M4", "operator": "intern",
    "complexity": 7, "setup_min": -5.0, "duration_min": 0.0
}])

try:
    job_schema.validate(bad_row, lazy=True)
except pa.errors.SchemaErrors as exc:
    print("Schema violations detected:")
    print(exc.failure_cases[["schema_context", "column", "check", "failure_case"]].to_string(index=False))
Schema violations detected:
schema_context       column                              check failure_case
        Column       job_id                    greater_than(0)           -1
        Column      machine           isin(['M1', 'M2', 'M3'])           M4
        Column     operator isin(['junior', 'senior', 'lead'])       intern
        Column    setup_min        greater_than_or_equal_to(0)         -5.0
        Column duration_min                    greater_than(0)          0.0
        Column   complexity                           <lambda>        False

15.4 Stage 2: Feature Engineering for OR

Feature engineering for OR differs subtly from feature engineering for pure prediction. The ML model’s output is not an end in itself — it is a parameter in an optimisation model. This shapes which features matter.

Processing time estimation (scheduling): features that capture operator skill, job complexity, machine condition, setup requirements. The goal is not just accuracy but calibration — the predicted distribution should match the true distribution, because stochastic programs consume quantiles, not point estimates.

Demand forecasting (inventory): temporal features (day of week, seasonality), external signals (promotions, weather), lag features. The critical ratio determines which quantile is needed, not the mean.

Code
from sklearn.preprocessing import OrdinalEncoder

df = validated_df.copy()

enc_machine  = OrdinalEncoder(categories=[["M1", "M2", "M3"]])
enc_operator = OrdinalEncoder(categories=[["junior", "senior", "lead"]])

df["machine_enc"]  = enc_machine.fit_transform(df[["machine"]]).astype(int)
df["operator_enc"] = enc_operator.fit_transform(df[["operator"]]).astype(int)

# Interaction: complexity × operator skill level
df["complexity_x_skill"] = df["complexity"] * df["operator_enc"]

# Setup time quantile bin (useful for stochastic scheduling)
df["setup_quantile"] = pd.qcut(df["setup_min"], q=4, labels=[0, 1, 2, 3]).astype(int)

feature_cols = ["machine_enc", "operator_enc", "complexity",
                "setup_min", "complexity_x_skill", "setup_quantile"]
target_col   = "duration_min"

X = df[feature_cols].values
y = df[target_col].values

print("Feature matrix shape:", X.shape)
print(f"Target — mean: {y.mean():.1f} min, std: {y.std():.1f} min, "
      f"range: [{y.min():.1f}, {y.max():.1f}]")
Feature matrix shape: (1500, 6)
Target — mean: 36.8 min, std: 13.2 min, range: [5.0, 70.1]

15.5 Stage 3: Training and Evaluation

15.5.1 Train / Validation / Test Split

For OR pipelines, the test set should mirror production conditions. Temporal data (demand, job arrivals) requires time-ordered splits — random shuffling leaks future information into training. Even for non-temporal data, hold out a test set that is never touched until final evaluation.

Code
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.linear_model import QuantileRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler

X_tv, X_test, y_tv, y_test = train_test_split(X, y, test_size=0.15, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_tv, y_tv, test_size=0.15, random_state=42)

print(f"Train: {len(X_train):,}  Val: {len(X_val):,}  Test: {len(X_test):,}")

gbm = GradientBoostingRegressor(n_estimators=200, max_depth=4,
                                 learning_rate=0.05, random_state=42)
gbm.fit(X_train, y_train)

y_val_pred = gbm.predict(X_val)
mae  = mean_absolute_error(y_val, y_val_pred)
rmse = mean_squared_error(y_val, y_val_pred) ** 0.5
print(f"\nGBM validation  MAE: {mae:.2f} min   RMSE: {rmse:.2f} min")
Train: 1,083  Val: 192  Test: 225

GBM validation  MAE: 3.31 min   RMSE: 4.17 min

15.5.2 Quantile Regression for Stochastic OR

When the downstream OR model is stochastic — a stochastic LP, a chance-constrained program, a newsvendor — the solver needs a quantile of the processing time distribution, not the mean. Train a quantile regressor directly.

Code
tau_values = [0.25, 0.50, 0.75]
q_models = {}

for tau in tau_values:
    pipe = Pipeline([
        ("scaler", StandardScaler()),
        ("qr", QuantileRegressor(quantile=tau, alpha=0.01, solver="highs")),
    ])
    pipe.fit(X_train, y_train)
    q_models[tau] = pipe

print("Quantile coverage on validation set:")
print(f"{'Quantile':>10}  {'Target':>8}  {'Actual':>8}  {'|Error|':>8}")
for tau, pipe in q_models.items():
    q_pred   = pipe.predict(X_val)
    coverage = (y_val <= q_pred).mean()
    print(f"{tau:>10.2f}  {tau:>8.2f}  {coverage:>8.3f}  {abs(coverage - tau):>8.3f}")
Quantile coverage on validation set:
  Quantile    Target    Actual   |Error|
      0.25      0.25     0.250     0.000
      0.50      0.50     0.495     0.005
      0.75      0.75     0.734     0.016

15.5.3 Visualising Prediction Intervals

Code
idx      = np.argsort(y_val)[:200]
y_sorted = y_val[idx]
y_pt     = y_val_pred[idx]
q25      = q_models[0.25].predict(X_val)[idx]
q75      = q_models[0.75].predict(X_val)[idx]
x_plot   = np.arange(len(idx))

fig = go.Figure()
fig.add_trace(go.Scatter(
    x=np.concatenate([x_plot, x_plot[::-1]]),
    y=np.concatenate([q75, q25[::-1]]),
    fill="toself", fillcolor="rgba(78,121,167,0.15)",
    line=dict(color="rgba(0,0,0,0)"), name="25–75th pct band"))
fig.add_trace(go.Scatter(x=x_plot, y=y_sorted, mode="markers",
    marker=dict(size=4, color="gray", opacity=0.5), name="Actual"))
fig.add_trace(go.Scatter(x=x_plot, y=y_pt, mode="lines",
    line=dict(color="#4e79a7", width=1.5), name="GBM point"))
fig.add_trace(go.Scatter(x=x_plot, y=q75, mode="lines",
    line=dict(color="#e15759", width=1, dash="dash"), name="75th pct"))
fig.add_trace(go.Scatter(x=x_plot, y=q25, mode="lines",
    line=dict(color="#59a14f", width=1, dash="dash"), name="25th pct"))

fig.update_layout(
    xaxis_title="Job (sorted by true duration)",
    yaxis_title="Processing time (min)",
    template="plotly_white", height=380,
    legend=dict(x=0.01, y=0.99))
fig.show()
Figure 15.2: Processing time predictions on the validation set. The GBM point estimate (blue) tracks the true duration; quantile bands at 25th and 75th percentiles capture ~50% of observations as expected. Scheduling under uncertainty uses the 75th-percentile estimate to build in buffer against tardiness.

15.6 Stage 4: The ML → OR Handoff

This is where most pipelines quietly break.

The handoff produces a dictionary of OR parameters. Keeping it as a dedicated function — rather than inline assignment — makes it testable, loggable, and replaceable.

Code
from dataclasses import dataclass
from typing import Dict

@dataclass
class SchedulingParams:
    """OR-ready parameters for a single-machine scheduling problem."""
    job_ids:          list
    processing_times: Dict[int, float]
    processing_p75:   Dict[int, float]
    processing_p25:   Dict[int, float]
    due_dates:        Dict[int, float]
    weights:          Dict[int, float]

def build_or_params(
    job_df: pd.DataFrame,
    gbm_model,
    q_models: dict,
    feature_cols: list,
) -> SchedulingParams:
    """Convert a validated job DataFrame into OR-ready scheduling parameters."""
    X_new = job_df[feature_cols].values

    # Enforce non-negativity — OR models cannot have negative processing times
    p_point = np.maximum(gbm_model.predict(X_new), 1.0)
    p_p75   = np.maximum(q_models[0.75].predict(X_new), 1.0)
    p_p25   = np.maximum(q_models[0.25].predict(X_new), 1.0)

    ids = job_df["job_id"].tolist()

    rng2      = np.random.default_rng(0)
    due_dates = {jid: pt * rng2.uniform(1.5, 3.0)
                 for jid, pt in zip(ids, p_point)}
    weights   = {jid: rng2.choice([1, 2, 3], p=[0.5, 0.35, 0.15])
                 for jid in ids}

    return SchedulingParams(
        job_ids=ids,
        processing_times=dict(zip(ids, p_point.round(2))),
        processing_p75=dict(zip(ids, p_p75.round(2))),
        processing_p25=dict(zip(ids, p_p25.round(2))),
        due_dates={k: round(v, 2) for k, v in due_dates.items()},
        weights=weights,
    )

test_df = df.iloc[-len(X_test):].reset_index(drop=True)
params  = build_or_params(test_df, gbm, q_models, feature_cols)

sample_ids = params.job_ids[:5]
print("Sample OR parameters (first 5 jobs):")
print(f"{'Job':>6}  {'p_point':>8}  {'p_p25':>7}  {'p_p75':>7}  {'due':>8}  {'w':>3}")
for jid in sample_ids:
    print(f"{jid:>6}  {params.processing_times[jid]:>8.1f}  "
          f"{params.processing_p25[jid]:>7.1f}  {params.processing_p75[jid]:>7.1f}  "
          f"{params.due_dates[jid]:>8.1f}  {params.weights[jid]:>3}")
Sample OR parameters (first 5 jobs):
   Job   p_point    p_p25    p_p75       due    w
  1276      53.7     51.6     56.3     131.8    2
  1277      21.8     19.1     24.8      41.5    1
  1278      28.4     25.2     29.8      44.4    1
  1279      41.2     39.5     45.0      62.8    2
  1280      21.8     18.6     23.8      59.3    1

15.6.1 Common Failure Modes at the Handoff

Warning

The five silent killers at the ML → OR interface

  1. Negative predictions — regression models can predict below zero; LP solvers accept the input and return nonsensical solutions. Always clip to valid domain.

  2. Unit mismatch — model trained on minutes, OR model expects hours. The solver will find a solution; it will be wrong by a factor of 60.

  3. Dimension mismatch — feature matrix built from a different job ordering than the index array passed to the solver. Results are silently mis-assigned.

  4. Distribution shift — model trained on historical jobs; production jobs have a different complexity mix. Monitor calibration in production.

  5. Quantile inversion — if Q25 > Q75 for some jobs (possible with separate regressors), stochastic constraints become inconsistent. Enforce monotonicity after prediction.

Code
p25_arr = np.array(list(params.processing_p25.values()))
p75_arr = np.array(list(params.processing_p75.values()))
inversions = (p25_arr > p75_arr).sum()
print(f"Quantile inversions (Q25 > Q75): {inversions} of {len(p25_arr)} jobs")

if inversions > 0:
    mid    = (p25_arr + p75_arr) / 2
    p25_arr = np.minimum(p25_arr, mid)
    p75_arr = np.maximum(p75_arr, mid)
    print("Repaired via isotonic projection.")
Quantile inversions (Q25 > Q75): 0 of 225 jobs

15.7 Stage 5: Model Serialisation and Caching

Fitted models are expensive to retrain. Two persistence patterns cover most needs:

  • joblib for scikit-learn models and numpy arrays — fast binary serialisation
  • pickle for lightweight Python objects — dicts, dataclasses, small DataFrames
Code
import joblib, os, pickle, tempfile

tmpdir = tempfile.mkdtemp()

joblib.dump(gbm, os.path.join(tmpdir, "gbm_duration.joblib"))
for tau, model in q_models.items():
    joblib.dump(model, os.path.join(tmpdir, f"qr_{int(tau*100)}.joblib"))

with open(os.path.join(tmpdir, "or_params.pkl"), "wb") as f:
    pickle.dump(params, f)

loaded_gbm    = joblib.load(os.path.join(tmpdir, "gbm_duration.joblib"))
loaded_params = pickle.load(open(os.path.join(tmpdir, "or_params.pkl"), "rb"))

pred_orig   = gbm.predict(X_test[:3]).round(3)
pred_loaded = loaded_gbm.predict(X_test[:3]).round(3)
print("Serialisation round-trip check (first 3 predictions):")
print(f"  Original : {pred_orig}")
print(f"  Reloaded : {pred_loaded}")
print(f"  Match    : {(pred_orig == pred_loaded).all()}")
Serialisation round-trip check (first 3 predictions):
  Original : [41.252 53.779 44.154]
  Reloaded : [41.252 53.779 44.154]
  Match    : True

15.7.1 Caching Expensive Pipeline Stages

Use joblib.Memory to cache any pure function whose inputs are stable between runs — data loading, feature extraction, cross-validation grids.

Code
from joblib import Memory

cache_dir = os.path.join(tmpdir, "cache")
memory    = Memory(cache_dir, verbose=0)

@memory.cache
def extract_features(df: pd.DataFrame, feature_cols: list) -> np.ndarray:
    """Cached feature extraction — recomputed only when inputs change."""
    return df[feature_cols].values.copy()

X_cached  = extract_features(df, feature_cols)
X_cached2 = extract_features(df, feature_cols)   # cache hit

print(f"Feature matrix from cache: shape {X_cached2.shape}")
print(f"Shapes match: {X_cached.shape == X_cached2.shape}")
Feature matrix from cache: shape (1500, 6)
Shapes match: True

15.8 Putting It Together: The Pipeline Object

Wrapping all five stages in a single class makes the pipeline reusable and testable. The OR solver calls pipeline.get_params(new_jobs_df) and receives a SchedulingParams object — it never touches the ML code directly.

Code
class ORPipeline:
    """
    End-to-end pipeline: validate → engineer → predict → handoff.
    The OR solver interacts only with `get_params`.
    """

    def __init__(self, schema, feature_cols, target_col):
        self.schema       = schema
        self.feature_cols = feature_cols
        self.target_col   = target_col
        self.gbm          = None
        self.q_models     = {}
        self._fitted      = False

    def fit(self, df: pd.DataFrame):
        validated = self.schema.validate(df)
        X = validated[self.feature_cols].values
        y = validated[self.target_col].values
        X_tr, _, y_tr, _ = train_test_split(X, y, test_size=0.2, random_state=42)

        self.gbm = GradientBoostingRegressor(
            n_estimators=200, max_depth=4, learning_rate=0.05, random_state=42)
        self.gbm.fit(X_tr, y_tr)

        for tau in [0.25, 0.50, 0.75]:
            pipe = Pipeline([
                ("sc", StandardScaler()),
                ("qr", QuantileRegressor(quantile=tau, alpha=0.01, solver="highs")),
            ])
            pipe.fit(X_tr, y_tr)
            self.q_models[tau] = pipe

        self._fitted = True
        return self

    def get_params(self, df: pd.DataFrame) -> SchedulingParams:
        if not self._fitted:
            raise RuntimeError("Pipeline must be fitted before calling get_params.")
        validated = self.schema.validate(df)
        return build_or_params(validated, self.gbm, self.q_models, self.feature_cols)

    def save(self, path: str):
        joblib.dump(self, path)

    @classmethod
    def load(cls, path: str) -> "ORPipeline":
        return joblib.load(path)


pipeline = ORPipeline(job_schema, feature_cols, target_col)
pipeline.fit(df)
sample_params = pipeline.get_params(df.head(10))

print("Pipeline fitted. Sample handoff (10 jobs):")
print(f"  Jobs         : {sample_params.job_ids}")
print(f"  p_point range: [{min(sample_params.processing_times.values()):.1f}, "
      f"{max(sample_params.processing_times.values()):.1f}] min")
Pipeline fitted. Sample handoff (10 jobs):
  Jobs         : [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  p_point range: [18.1, 55.6] min

15.9 Benchmarking the Pipeline: Does Better Prediction Help?

A pipeline is only justified if better ML predictions translate to better OR decisions. The benchmark compares three approaches on single-machine scheduling under SPT dispatching:

Approach Processing times used
Nominal Noisy group means (historical averages)
GBM point GBM predicted duration
GBM p75 75th-percentile estimate (buffer scheduling)
Code
rng3 = np.random.default_rng(99)
n_bench = 50

bench_idx = rng3.choice(len(df), size=n_bench, replace=False)
bench_df  = df.iloc[bench_idx].reset_index(drop=True)
bp        = pipeline.get_params(bench_df)

true_p = bench_df["duration_min"].values
ids    = bench_df["job_id"].values

def weighted_tardiness(sequence, proc_times, due_dates, weights):
    t, wt = 0.0, 0.0
    for jid in sequence:
        t  += proc_times[jid]
        wt += weights[jid] * max(t - due_dates[jid], 0)
    return wt

def spt_sequence(proc_times, ids):
    return sorted(ids, key=lambda j: proc_times[j])

true_proc = dict(zip(ids, true_p))
true_due  = {jid: pt * rng3.uniform(1.5, 3.0) for jid, pt in true_proc.items()}
true_wts  = {jid: int(rng3.choice([1, 2, 3], p=[0.5, 0.35, 0.15])) for jid in ids}

nom_proc = {jid: bench_df.loc[bench_df.job_id == jid, "duration_min"].values[0]
            * rng3.uniform(0.85, 1.15) for jid in ids}

results = {}
for name, proc in [("Nominal", nom_proc),
                   ("GBM point", bp.processing_times),
                   ("GBM p75",   bp.processing_p75)]:
    seq = spt_sequence(proc, ids)
    wt  = weighted_tardiness(seq, true_proc, true_due, true_wts)
    results[name] = wt

fig = go.Figure(go.Bar(
    x=list(results.keys()),
    y=list(results.values()),
    marker_color=["#aec7e8", "#4e79a7", "#e15759"],
    text=[f"{v:,.0f}" for v in results.values()],
    textposition="outside",
))
fig.update_layout(
    yaxis_title="Total weighted tardiness (min·units)",
    template="plotly_white", height=380,
    showlegend=False)
fig.show()

print("\nTotal weighted tardiness:")
for name, val in results.items():
    print(f"  {name:<12}: {val:>10,.1f}")
Figure 15.3: Scheduling under three processing-time estimates on 50 test jobs. SPT dispatching using GBM point predictions reduces total weighted tardiness vs. nominal means. The 75th-percentile (buffer) estimate further reduces tardiness — the right choice depends on the penalty structure for late jobs.

Total weighted tardiness:
  Nominal     :   55,706.6
  GBM point   :   54,336.9
  GBM p75     :   54,631.3

15.10 Summary

A robust OR pipeline is not a sequence of ad hoc scripts — it is a typed, validated, testable data product. The key design principles:

  • Validate at ingestion, not discovery: Pandera schema enforcement turns silent errors into loud ones before they reach the solver.
  • Engineer for calibration, not just accuracy: OR models consume quantiles and distributional parameters, not MSE-minimising means.
  • Isolate the handoff: a dedicated function converts ML outputs into OR parameters, enforcing domain constraints (non-negativity, monotonicity) explicitly.
  • Serialise the pipeline, not just the model: the ORPipeline object encapsulates schema, features, models, and handoff logic — one artefact to version and deploy.
  • Benchmark on decisions, not predictions: the pipeline earns its complexity only if better ML inputs produce better OR outputs.

These principles are domain-agnostic. The supply chain pipeline (Chapter 14), the scheduling pipeline (Chapter 15), and the capstone (Chapter 16) all share this five-stage skeleton — the domain changes, the structure does not.

15.11 Further Reading

  • Molnar, C. Interpretable Machine Learning (2nd ed., 2022) — Ch. 8 on calibration.
  • Pedregosa et al. “Scikit-learn: Machine Learning in Python.” JMLR 12 (2011).
  • Pandera documentation: schema inference, hypothesis testing, and data synthesis.
  • Elmachtoub & Grigas (2022). “Smart ‘Predict, then Optimize’.” Management Science 68(1).
  • Lam et al. (2016). “Quantile Forecasting for Inventory Decisions.” Operations Research.