Keep models fresh with scheduled and trigger-based retraining pipelines
There are three triggers for retraining, and you should use all three:
Retrain on a fixed cadence (daily, weekly, monthly) regardless of performance signals. Simple to operate. Works when the world changes at a predictable rate. Risk: expensive if the model hasn't degraded.
Monitor a business metric or held-out evaluation set. Trigger retraining when the metric drops below a threshold. More efficient than scheduled retraining, but requires reliable ground truth labels in near-real-time.
Trigger retraining when input feature drift exceeds a threshold (PSI > 0.2 for key features). Doesn't require labels. Gets ahead of performance degradation before it's measurable.
from prefect import flow, task
from datetime import datetime
import mlflow
@task
def check_drift(reference_path: str, current_path: str) -> bool:
psi = compute_psi(reference_path, current_path)
return psi > 0.2 # Trigger threshold
@task
def retrain(training_data_path: str) -> str:
with mlflow.start_run():
model = train_model(training_data_path)
auc = evaluate(model)
mlflow.log_metric("auc", auc)
if auc > 0.88: # Quality gate
model_uri = mlflow.sklearn.log_model(model, "model").model_uri
return model_uri
raise ValueError(f"Model quality below threshold: AUC={auc:.3f}")
@task
def deploy(model_uri: str):
promote_to_production(model_uri)
@flow(name="continuous-training")
def ct_pipeline():
should_retrain = check_drift("data/reference.parquet", "data/current.parquet")
if should_retrain:
model_uri = retrain("data/training_latest.parquet")
deploy(model_uri)