# The Data Science Pipeline for Optimization {#sec-ds-pipeline}
::: {.callout-note appearance="simple"}
## Learning Objectives
- Design a reproducible data pipeline from raw data to OR solver input
- Apply data validation and schema enforcement with Pandera
- Engineer features suited to OR parameter estimation
- Serialize fitted models and cache expensive computations with joblib
- Structure a project so the ML layer and the OR layer are cleanly separated
- Diagnose and repair common failure modes at the ML → OR handoff
:::
## The Pipeline as Infrastructure {#sec-pipeline-intro}
There is a temptation, especially in early-stage OR projects, to treat the data
pipeline as a formality — a few lines of pandas before the interesting work begins.
This is a mistake with consequences. A model that produces the wrong demand forecast
propagates the error into the inventory decision, the schedule, the capital plan.
Garbage in, garbage out is not a cliché; it is an engineering failure mode.
The pipeline is not plumbing. It is load-bearing structure.
J.E. Gordon, writing about the mechanics of materials, observed that failures almost
never originate in the main structural member — they originate at the joint, the
interface, the transition between one material and another. The same pattern repeats
in computational pipelines. The LP solver is rarely the source of a bad decision. The
error lives upstream, at the seam between the data and the model, or between the model
and the solver. A well-engineered pipeline makes these seams explicit, validated, and
testable.
This chapter builds that pipeline from scratch: raw tabular data to a validated,
serialized ML model whose outputs feed cleanly into an OR formulation. The techniques
are domain-agnostic — they apply equally to supply chain demand forecasting, scheduling
duration prediction, and energy dispatch.
---
## Anatomy of an OR Pipeline {#sec-pipeline-anatomy}
An end-to-end OR pipeline has five distinct stages, each with its own failure modes:
```{python}
#| label: fig-pipeline-diagram
#| fig-cap: "Five-stage OR pipeline. Each stage produces a validated artifact consumed by the next. Failures at the ML→OR interface (Stage 4) are the most common and hardest to diagnose."
import plotly.graph_objects as go
stages = [
"1. Ingest &\nValidate",
"2. Feature\nEngineering",
"3. Train &\nEvaluate",
"4. ML → OR\nHandoff",
"5. Solve &\nAnalyse",
]
x_pos = [0, 1, 2, 3, 4]
colors = ["#4e79a7", "#f28e2b", "#59a14f", "#e15759", "#76b7b2"]
fig = go.Figure()
for i, (stage, x, col) in enumerate(zip(stages, x_pos, colors)):
fig.add_shape(type="rect",
x0=x - 0.38, x1=x + 0.38, y0=0.2, y1=0.8,
fillcolor=col, opacity=0.85, line_color="white", line_width=2)
fig.add_annotation(x=x, y=0.5, text=stage,
showarrow=False, font=dict(color="white", size=12),
align="center")
if i < len(stages) - 1:
fig.add_annotation(
x=x + 0.42, y=0.5, ax=x + 0.58, ay=0.5,
xref="x", yref="y", axref="x", ayref="y",
showarrow=True, arrowhead=2, arrowsize=1.4,
arrowcolor="#555", arrowwidth=2)
artifacts = [
"DataFrame +\nschema",
"Feature\nmatrix X",
"Fitted\nmodel",
"OR\nparameters",
"Decision +\nsensitivity",
]
for x, art in zip(x_pos, artifacts):
fig.add_annotation(x=x, y=0.05, text=f"<i>{art}</i>",
showarrow=False, font=dict(color="#444", size=10), align="center")
fig.update_layout(
xaxis=dict(visible=False, range=[-0.6, 4.6]),
yaxis=dict(visible=False, range=[-0.1, 1.0]),
height=220, margin=dict(l=10, r=10, t=10, b=10),
plot_bgcolor="white", paper_bgcolor="white")
fig.show()
```
Stage 4 — the ML → OR handoff — deserves special attention. It is where a regression
output (a float) must be reconciled with an OR parameter (which may have integrality
constraints, non-negativity requirements, or coupling with other parameters). An
unchecked handoff produces infeasible models and silent errors.
---
## Stage 1: Data Ingestion and Validation {#sec-ingest}
### Why Validate Before Modelling?
A clean training set is not a guarantee that production data will arrive clean. Schema
drift — columns renamed, units changed, a categorical level added — breaks pipelines
silently. Pandera enforces a schema at ingestion time, turning silent errors into loud
ones.
```{python}
#| label: sec-pandera-schema
import numpy as np
import pandas as pd
import pandera as pa
from pandera import Column, DataFrameSchema, Check
import warnings
warnings.filterwarnings("ignore")
rng = np.random.default_rng(42)
n = 1_500
# --- synthetic manufacturing scheduling dataset ---
job_ids = np.arange(1, n + 1)
machines = rng.choice(["M1", "M2", "M3"], size=n)
operators = rng.choice(["junior", "senior", "lead"], size=n, p=[0.4, 0.4, 0.2])
complexity = rng.integers(1, 6, size=n)
setup_min = rng.uniform(5, 30, size=n)
true_duration = (
8 * complexity
+ np.where(operators == "junior", 12, np.where(operators == "senior", 6, 2))
+ setup_min * 0.3
+ rng.normal(0, 4, size=n)
).clip(5, 120)
raw_df = pd.DataFrame({
"job_id": job_ids,
"machine": machines,
"operator": operators,
"complexity": complexity,
"setup_min": setup_min.round(1),
"duration_min": true_duration.round(1),
})
# --- schema definition ---
job_schema = DataFrameSchema({
"job_id": Column(int, Check.greater_than(0), nullable=False),
"machine": Column(str, Check.isin(["M1", "M2", "M3"])),
"operator": Column(str, Check.isin(["junior", "senior", "lead"])),
"complexity": Column(int, Check(lambda s: s.between(1, 5).all())),
"setup_min": Column(float, Check.greater_than_or_equal_to(0)),
"duration_min": Column(float, Check.greater_than(0)),
})
validated_df = job_schema.validate(raw_df)
print(f"Schema validated: {len(validated_df):,} rows, {validated_df.shape[1]} columns")
print(validated_df.dtypes.to_string())
```
### Handling Schema Failures
When validation fails, the error is specific: which column, which rows, which check.
This is far superior to discovering a downstream infeasibility in the solver and
tracing it back to a negative processing time.
```{python}
#| label: sec-schema-failure
bad_row = pd.DataFrame([{
"job_id": -1, "machine": "M4", "operator": "intern",
"complexity": 7, "setup_min": -5.0, "duration_min": 0.0
}])
try:
job_schema.validate(bad_row, lazy=True)
except pa.errors.SchemaErrors as exc:
print("Schema violations detected:")
print(exc.failure_cases[["schema_context", "column", "check", "failure_case"]].to_string(index=False))
```
---
## Stage 2: Feature Engineering for OR {#sec-features}
Feature engineering for OR differs subtly from feature engineering for pure prediction.
The ML model's output is not an end in itself — it is a parameter in an optimisation
model. This shapes which features matter.
**Processing time estimation** (scheduling): features that capture operator skill,
job complexity, machine condition, setup requirements. The goal is not just accuracy
but *calibration* — the predicted distribution should match the true distribution,
because stochastic programs consume quantiles, not point estimates.
**Demand forecasting** (inventory): temporal features (day of week, seasonality),
external signals (promotions, weather), lag features. The critical ratio determines
which quantile is needed, not the mean.
```{python}
#| label: sec-feature-engineering
from sklearn.preprocessing import OrdinalEncoder
df = validated_df.copy()
enc_machine = OrdinalEncoder(categories=[["M1", "M2", "M3"]])
enc_operator = OrdinalEncoder(categories=[["junior", "senior", "lead"]])
df["machine_enc"] = enc_machine.fit_transform(df[["machine"]]).astype(int)
df["operator_enc"] = enc_operator.fit_transform(df[["operator"]]).astype(int)
# Interaction: complexity × operator skill level
df["complexity_x_skill"] = df["complexity"] * df["operator_enc"]
# Setup time quantile bin (useful for stochastic scheduling)
df["setup_quantile"] = pd.qcut(df["setup_min"], q=4, labels=[0, 1, 2, 3]).astype(int)
feature_cols = ["machine_enc", "operator_enc", "complexity",
"setup_min", "complexity_x_skill", "setup_quantile"]
target_col = "duration_min"
X = df[feature_cols].values
y = df[target_col].values
print("Feature matrix shape:", X.shape)
print(f"Target — mean: {y.mean():.1f} min, std: {y.std():.1f} min, "
f"range: [{y.min():.1f}, {y.max():.1f}]")
```
---
## Stage 3: Training and Evaluation {#sec-train}
### Train / Validation / Test Split
For OR pipelines, the test set should mirror production conditions. Temporal data
(demand, job arrivals) requires time-ordered splits — random shuffling leaks future
information into training. Even for non-temporal data, hold out a test set that is
never touched until final evaluation.
```{python}
#| label: sec-train-eval
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.linear_model import QuantileRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
X_tv, X_test, y_tv, y_test = train_test_split(X, y, test_size=0.15, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_tv, y_tv, test_size=0.15, random_state=42)
print(f"Train: {len(X_train):,} Val: {len(X_val):,} Test: {len(X_test):,}")
gbm = GradientBoostingRegressor(n_estimators=200, max_depth=4,
learning_rate=0.05, random_state=42)
gbm.fit(X_train, y_train)
y_val_pred = gbm.predict(X_val)
mae = mean_absolute_error(y_val, y_val_pred)
rmse = mean_squared_error(y_val, y_val_pred) ** 0.5
print(f"\nGBM validation MAE: {mae:.2f} min RMSE: {rmse:.2f} min")
```
### Quantile Regression for Stochastic OR
When the downstream OR model is stochastic — a stochastic LP, a chance-constrained
program, a newsvendor — the solver needs a quantile of the processing time distribution,
not the mean. Train a quantile regressor directly.
```{python}
#| label: sec-quantile-reg
tau_values = [0.25, 0.50, 0.75]
q_models = {}
for tau in tau_values:
pipe = Pipeline([
("scaler", StandardScaler()),
("qr", QuantileRegressor(quantile=tau, alpha=0.01, solver="highs")),
])
pipe.fit(X_train, y_train)
q_models[tau] = pipe
print("Quantile coverage on validation set:")
print(f"{'Quantile':>10} {'Target':>8} {'Actual':>8} {'|Error|':>8}")
for tau, pipe in q_models.items():
q_pred = pipe.predict(X_val)
coverage = (y_val <= q_pred).mean()
print(f"{tau:>10.2f} {tau:>8.2f} {coverage:>8.3f} {abs(coverage - tau):>8.3f}")
```
### Visualising Prediction Intervals
```{python}
#| label: fig-prediction-intervals
#| fig-cap: "Processing time predictions on the validation set. The GBM point estimate (blue) tracks the true duration; quantile bands at 25th and 75th percentiles capture ~50% of observations as expected. Scheduling under uncertainty uses the 75th-percentile estimate to build in buffer against tardiness."
idx = np.argsort(y_val)[:200]
y_sorted = y_val[idx]
y_pt = y_val_pred[idx]
q25 = q_models[0.25].predict(X_val)[idx]
q75 = q_models[0.75].predict(X_val)[idx]
x_plot = np.arange(len(idx))
fig = go.Figure()
fig.add_trace(go.Scatter(
x=np.concatenate([x_plot, x_plot[::-1]]),
y=np.concatenate([q75, q25[::-1]]),
fill="toself", fillcolor="rgba(78,121,167,0.15)",
line=dict(color="rgba(0,0,0,0)"), name="25–75th pct band"))
fig.add_trace(go.Scatter(x=x_plot, y=y_sorted, mode="markers",
marker=dict(size=4, color="gray", opacity=0.5), name="Actual"))
fig.add_trace(go.Scatter(x=x_plot, y=y_pt, mode="lines",
line=dict(color="#4e79a7", width=1.5), name="GBM point"))
fig.add_trace(go.Scatter(x=x_plot, y=q75, mode="lines",
line=dict(color="#e15759", width=1, dash="dash"), name="75th pct"))
fig.add_trace(go.Scatter(x=x_plot, y=q25, mode="lines",
line=dict(color="#59a14f", width=1, dash="dash"), name="25th pct"))
fig.update_layout(
xaxis_title="Job (sorted by true duration)",
yaxis_title="Processing time (min)",
template="plotly_white", height=380,
legend=dict(x=0.01, y=0.99))
fig.show()
```
---
## Stage 4: The ML → OR Handoff {#sec-handoff}
This is where most pipelines quietly break.
The handoff produces a dictionary of OR parameters. Keeping it as a dedicated function
— rather than inline assignment — makes it testable, loggable, and replaceable.
```{python}
#| label: sec-handoff-function
from dataclasses import dataclass
from typing import Dict
@dataclass
class SchedulingParams:
"""OR-ready parameters for a single-machine scheduling problem."""
job_ids: list
processing_times: Dict[int, float]
processing_p75: Dict[int, float]
processing_p25: Dict[int, float]
due_dates: Dict[int, float]
weights: Dict[int, float]
def build_or_params(
job_df: pd.DataFrame,
gbm_model,
q_models: dict,
feature_cols: list,
) -> SchedulingParams:
"""Convert a validated job DataFrame into OR-ready scheduling parameters."""
X_new = job_df[feature_cols].values
# Enforce non-negativity — OR models cannot have negative processing times
p_point = np.maximum(gbm_model.predict(X_new), 1.0)
p_p75 = np.maximum(q_models[0.75].predict(X_new), 1.0)
p_p25 = np.maximum(q_models[0.25].predict(X_new), 1.0)
ids = job_df["job_id"].tolist()
rng2 = np.random.default_rng(0)
due_dates = {jid: pt * rng2.uniform(1.5, 3.0)
for jid, pt in zip(ids, p_point)}
weights = {jid: rng2.choice([1, 2, 3], p=[0.5, 0.35, 0.15])
for jid in ids}
return SchedulingParams(
job_ids=ids,
processing_times=dict(zip(ids, p_point.round(2))),
processing_p75=dict(zip(ids, p_p75.round(2))),
processing_p25=dict(zip(ids, p_p25.round(2))),
due_dates={k: round(v, 2) for k, v in due_dates.items()},
weights=weights,
)
test_df = df.iloc[-len(X_test):].reset_index(drop=True)
params = build_or_params(test_df, gbm, q_models, feature_cols)
sample_ids = params.job_ids[:5]
print("Sample OR parameters (first 5 jobs):")
print(f"{'Job':>6} {'p_point':>8} {'p_p25':>7} {'p_p75':>7} {'due':>8} {'w':>3}")
for jid in sample_ids:
print(f"{jid:>6} {params.processing_times[jid]:>8.1f} "
f"{params.processing_p25[jid]:>7.1f} {params.processing_p75[jid]:>7.1f} "
f"{params.due_dates[jid]:>8.1f} {params.weights[jid]:>3}")
```
### Common Failure Modes at the Handoff
::: {.callout-warning}
**The five silent killers at the ML → OR interface**
1. **Negative predictions** — regression models can predict below zero; LP solvers
accept the input and return nonsensical solutions. Always clip to valid domain.
2. **Unit mismatch** — model trained on minutes, OR model expects hours. The solver
will find a solution; it will be wrong by a factor of 60.
3. **Dimension mismatch** — feature matrix built from a different job ordering than the
index array passed to the solver. Results are silently mis-assigned.
4. **Distribution shift** — model trained on historical jobs; production jobs have a
different complexity mix. Monitor calibration in production.
5. **Quantile inversion** — if Q25 > Q75 for some jobs (possible with separate
regressors), stochastic constraints become inconsistent. Enforce monotonicity
after prediction.
:::
```{python}
#| label: sec-monotonicity-check
p25_arr = np.array(list(params.processing_p25.values()))
p75_arr = np.array(list(params.processing_p75.values()))
inversions = (p25_arr > p75_arr).sum()
print(f"Quantile inversions (Q25 > Q75): {inversions} of {len(p25_arr)} jobs")
if inversions > 0:
mid = (p25_arr + p75_arr) / 2
p25_arr = np.minimum(p25_arr, mid)
p75_arr = np.maximum(p75_arr, mid)
print("Repaired via isotonic projection.")
```
---
## Stage 5: Model Serialisation and Caching {#sec-serialise}
Fitted models are expensive to retrain. Two persistence patterns cover most needs:
- **joblib** for scikit-learn models and numpy arrays — fast binary serialisation
- **pickle** for lightweight Python objects — dicts, dataclasses, small DataFrames
```{python}
#| label: sec-serialise-models
import joblib, os, pickle, tempfile
tmpdir = tempfile.mkdtemp()
joblib.dump(gbm, os.path.join(tmpdir, "gbm_duration.joblib"))
for tau, model in q_models.items():
joblib.dump(model, os.path.join(tmpdir, f"qr_{int(tau*100)}.joblib"))
with open(os.path.join(tmpdir, "or_params.pkl"), "wb") as f:
pickle.dump(params, f)
loaded_gbm = joblib.load(os.path.join(tmpdir, "gbm_duration.joblib"))
loaded_params = pickle.load(open(os.path.join(tmpdir, "or_params.pkl"), "rb"))
pred_orig = gbm.predict(X_test[:3]).round(3)
pred_loaded = loaded_gbm.predict(X_test[:3]).round(3)
print("Serialisation round-trip check (first 3 predictions):")
print(f" Original : {pred_orig}")
print(f" Reloaded : {pred_loaded}")
print(f" Match : {(pred_orig == pred_loaded).all()}")
```
### Caching Expensive Pipeline Stages
Use `joblib.Memory` to cache any pure function whose inputs are stable between runs —
data loading, feature extraction, cross-validation grids.
```{python}
#| label: sec-joblib-cache
from joblib import Memory
cache_dir = os.path.join(tmpdir, "cache")
memory = Memory(cache_dir, verbose=0)
@memory.cache
def extract_features(df: pd.DataFrame, feature_cols: list) -> np.ndarray:
"""Cached feature extraction — recomputed only when inputs change."""
return df[feature_cols].values.copy()
X_cached = extract_features(df, feature_cols)
X_cached2 = extract_features(df, feature_cols) # cache hit
print(f"Feature matrix from cache: shape {X_cached2.shape}")
print(f"Shapes match: {X_cached.shape == X_cached2.shape}")
```
---
## Putting It Together: The Pipeline Object {#sec-pipeline-object}
Wrapping all five stages in a single class makes the pipeline reusable and testable.
The OR solver calls `pipeline.get_params(new_jobs_df)` and receives a `SchedulingParams`
object — it never touches the ML code directly.
```{python}
#| label: sec-pipeline-class
class ORPipeline:
"""
End-to-end pipeline: validate → engineer → predict → handoff.
The OR solver interacts only with `get_params`.
"""
def __init__(self, schema, feature_cols, target_col):
self.schema = schema
self.feature_cols = feature_cols
self.target_col = target_col
self.gbm = None
self.q_models = {}
self._fitted = False
def fit(self, df: pd.DataFrame):
validated = self.schema.validate(df)
X = validated[self.feature_cols].values
y = validated[self.target_col].values
X_tr, _, y_tr, _ = train_test_split(X, y, test_size=0.2, random_state=42)
self.gbm = GradientBoostingRegressor(
n_estimators=200, max_depth=4, learning_rate=0.05, random_state=42)
self.gbm.fit(X_tr, y_tr)
for tau in [0.25, 0.50, 0.75]:
pipe = Pipeline([
("sc", StandardScaler()),
("qr", QuantileRegressor(quantile=tau, alpha=0.01, solver="highs")),
])
pipe.fit(X_tr, y_tr)
self.q_models[tau] = pipe
self._fitted = True
return self
def get_params(self, df: pd.DataFrame) -> SchedulingParams:
if not self._fitted:
raise RuntimeError("Pipeline must be fitted before calling get_params.")
validated = self.schema.validate(df)
return build_or_params(validated, self.gbm, self.q_models, self.feature_cols)
def save(self, path: str):
joblib.dump(self, path)
@classmethod
def load(cls, path: str) -> "ORPipeline":
return joblib.load(path)
pipeline = ORPipeline(job_schema, feature_cols, target_col)
pipeline.fit(df)
sample_params = pipeline.get_params(df.head(10))
print("Pipeline fitted. Sample handoff (10 jobs):")
print(f" Jobs : {sample_params.job_ids}")
print(f" p_point range: [{min(sample_params.processing_times.values()):.1f}, "
f"{max(sample_params.processing_times.values()):.1f}] min")
```
---
## Benchmarking the Pipeline: Does Better Prediction Help? {#sec-pipeline-benchmark}
A pipeline is only justified if better ML predictions translate to better OR decisions.
The benchmark compares three approaches on single-machine scheduling under SPT dispatching:
| Approach | Processing times used |
|---|---|
| **Nominal** | Noisy group means (historical averages) |
| **GBM point** | GBM predicted duration |
| **GBM p75** | 75th-percentile estimate (buffer scheduling) |
```{python}
#| label: fig-pipeline-benchmark
#| fig-cap: "Scheduling under three processing-time estimates on 50 test jobs. SPT dispatching using GBM point predictions reduces total weighted tardiness vs. nominal means. The 75th-percentile (buffer) estimate further reduces tardiness — the right choice depends on the penalty structure for late jobs."
rng3 = np.random.default_rng(99)
n_bench = 50
bench_idx = rng3.choice(len(df), size=n_bench, replace=False)
bench_df = df.iloc[bench_idx].reset_index(drop=True)
bp = pipeline.get_params(bench_df)
true_p = bench_df["duration_min"].values
ids = bench_df["job_id"].values
def weighted_tardiness(sequence, proc_times, due_dates, weights):
t, wt = 0.0, 0.0
for jid in sequence:
t += proc_times[jid]
wt += weights[jid] * max(t - due_dates[jid], 0)
return wt
def spt_sequence(proc_times, ids):
return sorted(ids, key=lambda j: proc_times[j])
true_proc = dict(zip(ids, true_p))
true_due = {jid: pt * rng3.uniform(1.5, 3.0) for jid, pt in true_proc.items()}
true_wts = {jid: int(rng3.choice([1, 2, 3], p=[0.5, 0.35, 0.15])) for jid in ids}
nom_proc = {jid: bench_df.loc[bench_df.job_id == jid, "duration_min"].values[0]
* rng3.uniform(0.85, 1.15) for jid in ids}
results = {}
for name, proc in [("Nominal", nom_proc),
("GBM point", bp.processing_times),
("GBM p75", bp.processing_p75)]:
seq = spt_sequence(proc, ids)
wt = weighted_tardiness(seq, true_proc, true_due, true_wts)
results[name] = wt
fig = go.Figure(go.Bar(
x=list(results.keys()),
y=list(results.values()),
marker_color=["#aec7e8", "#4e79a7", "#e15759"],
text=[f"{v:,.0f}" for v in results.values()],
textposition="outside",
))
fig.update_layout(
yaxis_title="Total weighted tardiness (min·units)",
template="plotly_white", height=380,
showlegend=False)
fig.show()
print("\nTotal weighted tardiness:")
for name, val in results.items():
print(f" {name:<12}: {val:>10,.1f}")
```
---
## Summary {#sec-pipeline-summary}
A robust OR pipeline is not a sequence of ad hoc scripts — it is a typed, validated,
testable data product. The key design principles:
- **Validate at ingestion, not discovery**: Pandera schema enforcement turns silent
errors into loud ones before they reach the solver.
- **Engineer for calibration, not just accuracy**: OR models consume quantiles and
distributional parameters, not MSE-minimising means.
- **Isolate the handoff**: a dedicated function converts ML outputs into OR parameters,
enforcing domain constraints (non-negativity, monotonicity) explicitly.
- **Serialise the pipeline, not just the model**: the `ORPipeline` object encapsulates
schema, features, models, and handoff logic — one artefact to version and deploy.
- **Benchmark on decisions, not predictions**: the pipeline earns its complexity only
if better ML inputs produce better OR outputs.
These principles are domain-agnostic. The supply chain pipeline (Chapter 14), the
scheduling pipeline (Chapter 15), and the capstone (Chapter 16) all share this
five-stage skeleton — the domain changes, the structure does not.
## Further Reading {#sec-pipeline-reading}
- Molnar, C. *Interpretable Machine Learning* (2nd ed., 2022) — Ch. 8 on calibration.
- Pedregosa et al. "Scikit-learn: Machine Learning in Python." *JMLR* 12 (2011).
- Pandera documentation: schema inference, hypothesis testing, and data synthesis.
- Elmachtoub & Grigas (2022). "Smart 'Predict, then Optimize'." *Management Science* 68(1).
- Lam et al. (2016). "Quantile Forecasting for Inventory Decisions." *Operations Research.*