MLOps Pipeline in der Produktion: Vom Experiment zum zuverlässigen ML-System¶
Bei CORE SYSTEMS bringen wir ML-Modelle aus Jupyter in die Produktion — zuverlassig, sicher, mit 99,9 % SLA. In uber 15 Jahren haben wir End-to-End-Datenpipelines fur Kunden wie Packeta, Ceska sporitelna und iBod aufgebaut, einschließlich der Migration von 200 Mio. Transaktionen von On-Prem Oracle zu Azure Cosmos DB ohne Ausfallzeit. Die meisten ML-Projekte scheitern nicht wegen eines schlechten Modells, sondern wegen schlechter Infrastruktur drumherum. 87 % der ML-Modelle schaffen es nie in die Produktion (Gartner, 2025). MLOps ist die Disziplin, die dieses Problem lost.
Anatomie einer produktiven ML Pipeline¶
┌────────────┐ ┌──────────────┐ ┌─────────────┐ ┌──────────────┐
│ Data │──▶│ Feature │──▶│ Training │──▶│ Model │
│ Ingestion │ │ Engineering │ │ Pipeline │ │ Registry │
└────────────┘ └──────────────┘ └─────────────┘ └──────────────┘
│ │
▼ ▼
┌──────────────┐ ┌──────────────┐
│ Feature │ │ Serving │
│ Store │──────────────────────▶│ (API/Batch) │
└──────────────┘ └──────────────┘
│
▼
┌──────────────┐
│ Monitoring │
│ & Drift │
└──────────────┘
1. Feature Store — Das Herz der ML-Infrastruktur¶
Ein Feature Store ist ein zentrales Repository für ML Features — bereinigte, transformierte Datenattribute, die sowohl für Training als auch für Inference bereit sind.
Warum ein Feature Store?¶
- Konsistenz: dieselben Features in Training und Produktion (Eliminierung von Training-Serving Skew)
- Wiederverwendbarkeit: Features werden zwischen Teams und Modellen geteilt
- Zeitliche Korrektheit: Point-in-Time Correct Joins (kein Data Leakage)
- Latenz: Online Store für Real-Time Serving (< 10ms)
Implementierung¶
# Feast (open-source feature store) — Feature-Definition
from feast import Entity, Feature, FeatureView, FileSource
from feast.types import Float32, Int64
customer = Entity(name="customer_id", value_type=Int64)
customer_features = FeatureView(
name="customer_features",
entities=[customer],
schema=[
Feature(name="total_orders_30d", dtype=Int64),
Feature(name="avg_order_value", dtype=Float32),
Feature(name="days_since_last_order", dtype=Int64),
Feature(name="churn_risk_score", dtype=Float32),
],
source=FileSource(path="s3://features/customer_daily.parquet"),
ttl=timedelta(days=1),
)
Empfohlene Tools (2026):
| Tool | Typ | Am besten für |
|---|---|---|
| Feast | Open-Source | Startups, Flexibilität |
| Tecton | Managed | Enterprise, Real-time |
| Hopsworks | Open-Source + Managed | Full MLOps Platform |
| Databricks Feature Store | Managed | Databricks-Ökosystem |
| Redis + Custom | DIY | Ultra-niedrige Latenz |
2. Training Pipeline — Reproduzierbarkeit an erster Stelle¶
Jeder Trainingslauf muss 100 % reproduzierbar sein:
# DVC pipeline (dvc.yaml)
stages:
prepare:
cmd: python src/prepare.py
deps:
- src/prepare.py
- data/raw/
outs:
- data/processed/
params:
- prepare.split_ratio
- prepare.seed
train:
cmd: python src/train.py
deps:
- src/train.py
- data/processed/
outs:
- models/latest/
params:
- train.learning_rate
- train.epochs
- train.batch_size
metrics:
- metrics/train.json:
cache: false
evaluate:
cmd: python src/evaluate.py
deps:
- src/evaluate.py
- models/latest/
- data/processed/test/
metrics:
- metrics/eval.json:
cache: false
plots:
- metrics/confusion_matrix.csv
- metrics/roc_curve.csv
Schlüsselprinzipien¶
- Datenversionierung — DVC, LakeFS oder Delta Lake
- Code-Versionierung — Git (selbstverständlich)
- Umgebungsversionierung — Docker + requirements.txt mit gepinnten Versionen
- Experiment Tracking — MLflow, Weights & Biases oder Neptune
- Hyperparameter Management — Hydra oder Config-YAML in Git
# MLflow Experiment Tracking
import mlflow
with mlflow.start_run(run_name="xgboost-v3"):
mlflow.log_params({
"learning_rate": 0.01,
"max_depth": 6,
"n_estimators": 500,
"feature_set": "customer_v3",
})
model = train_model(X_train, y_train, params)
metrics = evaluate_model(model, X_test, y_test)
mlflow.log_metrics({
"auc_roc": metrics["auc_roc"],
"precision": metrics["precision"],
"recall": metrics["recall"],
"f1": metrics["f1"],
})
mlflow.sklearn.log_model(model, "model",
registered_model_name="churn-predictor")
3. Model Registry — Governance und Lifecycle¶
Model Registry = ein zentraler Katalog aller Modelle mit Versionen, Metadaten und Lifecycle-Status.
┌──────────────────────────────────────┐
│ Model Registry │
├──────────────────────────────────────┤
│ churn-predictor │
│ v1.0 → Archived │
│ v1.1 → Archived │
│ v2.0 → Production (since 2026-01) │
│ v2.1 → Staging (canary 5%) │
│ │
│ fraud-detector │
│ v3.0 → Production │
│ v3.1 → Staging │
│ │
│ recommendation-engine │
│ v1.0 → Production │
└──────────────────────────────────────┘
Lifecycle-Status: None → Staging → Production → Archived
Governance-Checkliste vor Production: - Metriken über Schwellenwert (AUC > 0,85, Latenz < 50ms) - A/B-Test mindestens 7 Tage - Bias-Audit (Fairness-Metriken) - Data Lineage Dokumentation - Rollback-Plan getestet
4. Model Serving — API und Batch¶
Online Serving (Real-time)¶
# FastAPI + ONNX Runtime für niedrige Latenz
from fastapi import FastAPI
import onnxruntime as ort
import numpy as np
app = FastAPI()
session = ort.InferenceSession("model.onnx")
@app.post("/predict")
async def predict(features: dict):
input_array = np.array([list(features.values())], dtype=np.float32)
result = session.run(None, {"input": input_array})
return {
"prediction": int(result[0][0]),
"probability": float(result[1][0][1]),
"model_version": "v2.0",
}
Latenz-Optimierung: - ONNX Runtime (2-5x schneller als sklearn/pytorch) - Modell-Quantisierung (FP32 → INT8) - Batching (dynamisches Micro-Batching) - Feature Caching (Redis für wiederholte Abfragen)
Batch Serving¶
# Spark Batch Inference
from pyspark.sql import SparkSession
import mlflow
spark = SparkSession.builder.appName("batch-predict").getOrCreate()
model = mlflow.pyfunc.spark_udf(spark, "models:/churn-predictor/Production")
df = spark.read.parquet("s3://features/customer_daily/")
predictions = df.withColumn("churn_probability", model(*feature_columns))
predictions.write.parquet("s3://predictions/churn/2026-02-20/")
5. Monitoring — Model Decay ist unvermeidlich¶
ML-Modelle degradieren. Daten ändern sich, die Welt ändert sich. Monitoring ist kein Nice-to-have, es ist eine Notwendigkeit.
Was überwachen¶
| Kategorie | Metriken | Alert |
|---|---|---|
| Data Quality | Fehlende Werte, Schema-Drift, Volumen | > 5 % fehlend |
| Feature Drift | PSI, KS-Test, Wasserstein-Distanz | PSI > 0,2 |
| Prediction Drift | Verteilung der Ausgaben | KS p < 0,01 |
| Model Performance | AUC, Precision, Recall (mit Ground Truth) | AUC-Abfall > 5 % |
| Latenz | p50, p95, p99 | p99 > 100ms |
| Durchsatz | Requests/Sek, Error Rate | Error > 1 % |
Drift Detection¶
# Population Stability Index (PSI)
def psi(expected: np.ndarray, actual: np.ndarray, bins: int = 10) -> float:
"""PSI < 0.1 = stable, 0.1-0.2 = moderate, > 0.2 = significant drift."""
breakpoints = np.quantile(expected, np.linspace(0, 1, bins + 1))
expected_pct = np.histogram(expected, breakpoints)[0] / len(expected)
actual_pct = np.histogram(actual, breakpoints)[0] / len(actual)
# Avoid log(0)
expected_pct = np.clip(expected_pct, 0.001, None)
actual_pct = np.clip(actual_pct, 0.001, None)
return np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))
Retraining-Auslöser¶
- Geplant: wöchentliches/monatliches Retraining mit frischen Daten
- Drift-basiert: automatisches Retraining bei PSI > 0,2
- Performance-basiert: Retraining bei Metrikabfall unter Schwellenwert
- Event-basiert: Retraining nach signifikanter Business-Änderung
6. CI/CD für ML¶
# GitHub Actions — ML pipeline CI/CD
name: ML Pipeline
on:
push:
paths: ['src/**', 'configs/**', 'data/dvc.lock']
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: pip install -r requirements.txt
- run: pytest tests/ -v
- run: python src/validate_data.py
- run: python src/train.py --config configs/ci.yaml
- run: python src/evaluate.py --threshold-file configs/thresholds.yaml
deploy-staging:
needs: test
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- run: mlflow models serve -m "models:/churn-predictor/Staging" --port 8080 &
- run: python tests/integration/test_serving.py
- run: kubectl apply -f k8s/staging/
promote-production:
needs: deploy-staging
runs-on: ubuntu-latest
environment: production # Manual approval gate
steps:
- run: python scripts/promote_model.py --from staging --to production
- run: kubectl apply -f k8s/production/
Unsere Erfahrung: MLOps-Pipeline fur einen Retail-Kunden¶
Wir haben eine vollstandige MLOps-Pipeline fur einen Retail-Kunden aufgebaut. Konkrete Ergebnisse:
- Retraining-Zyklus von 6 Wochen auf taglich — automatisierte Pipeline mit Drift-basierten Triggern
- Inference-Latenz p99 von 800 ms auf 45 ms — Migration auf ONNX Runtime + Modell-Quantisierung
- Drift-Erkennung innerhalb von 2 Stunden — automatisiertes Monitoring mit PSI-Metriken und Alerting
- End-to-End-Delivery — vom Feature-Store-Entwurf uber die Model Registry bis zum 24/7-Betrieb
- Pipeline lauft 24/7 mit 99,9 % SLA
Fazit¶
MLOps im Jahr 2026 dreht sich nicht um Tools — es geht um Disziplin. Die Schlüsselprinzipien:
- Reproduzierbarkeit — jedes Experiment muss wiederholbar sein
- Automatisierung — manuelle Schritte = Fehlerquelle
- Monitoring — ein Modell ohne Monitoring ist eine Zeitbombe
- Governance — wer hat das Modell genehmigt, auf welchen Daten, mit welchen Metriken
CORE SYSTEMS implementiert MLOps-Pipelines vom Architekturentwurf bis zum Produktionsbetrieb. Wir helfen Unternehmen, ML-Modelle aus Jupyter in die Produktion zu bringen — zuverlässig und sicher.
Möchten Sie MLOps in Ihrer Organisation einführen? Kontaktieren Sie uns für eine Beratung.
Brauchen Sie Hilfe bei der Implementierung?
Unsere Experten helfen Ihnen bei Design, Implementierung und Betrieb. Von der Architektur bis zur Produktion.
Kontaktieren Sie uns