⏱️ 60 min

Building Robust Data Pipelines

Design and implement reliable data ingestion, validation, and transformation pipelines

What Makes a Pipeline 'Robust'?

A data pipeline that works on your laptop during development will fail in production — not because your code is wrong, but because production data is unpredictable. Files arrive late. Schemas change without notice. A sensor goes offline. A third-party API returns a 500 error. Robust pipelines are designed to handle failure gracefully. They validate data at every stage, emit structured logs, retry transient errors, and surface problems immediately rather than silently corrupting downstream models.

The four properties of a robust pipeline

- **Idempotent**: Running the pipeline twice produces the same result — safe to retry on failure - **Observable**: Every stage emits metrics and logs you can monitor in production - **Schema-enforced**: Data that doesn't match the expected schema is rejected, not passed through - **Fail-fast**: Errors surface immediately at the stage that caused them, not ten steps later

Validating Data with Great Expectations

Great Expectations is the standard library for data validation in Python pipelines. You define 'expectations' — assertions about your data — and it checks them on every run.

python
import great_expectations as gx

context = gx.get_context()

# Define expectations on your dataset
validator = context.sources.pandas_default.read_csv("data/raw/transactions.csv")

# Basic expectations
validator.expect_column_to_exist("user_id")
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_of_type("amount", "float")
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=100_000)

# Run validation and get a report
results = validator.validate()
print(f"Validation passed: {results.success}")
print(f"Failing checks: {results.statistics['unsuccessful_expectations']}")
Output:
Validation passed: True
Failing checks: 0

Building a Reusable Pipeline with Prefect

Prefect turns Python functions into observable, retryable pipeline tasks with a single decorator.

python
from prefect import flow, task
import pandas as pd

@task(retries=3, retry_delay_seconds=10)
def extract(source_path: str) -> pd.DataFrame:
    return pd.read_csv(source_path)

@task
def validate(df: pd.DataFrame) -> pd.DataFrame:
    assert df["user_id"].notna().all(), "user_id has nulls"
    assert (df["amount"] >= 0).all(), "negative amounts found"
    return df

@task
def transform(df: pd.DataFrame) -> pd.DataFrame:
    df["amount_log"] = df["amount"].apply(lambda x: max(0, x) ** 0.5)
    df["date"] = pd.to_datetime(df["timestamp"]).dt.date
    return df[["user_id", "amount_log", "date"]]

@task
def load(df: pd.DataFrame, dest_path: str):
    df.to_parquet(dest_path, index=False)

@flow(name="transaction-pipeline")
def run_pipeline(source: str, dest: str):
    raw = extract(source)
    valid = validate(raw)
    processed = transform(valid)
    load(processed, dest)

if __name__ == "__main__":
    run_pipeline("data/raw/transactions.csv", "data/processed/features.parquet")
Sharan Initiatives — AI, Finance, Photography & More