Design and implement reliable data ingestion, validation, and transformation pipelines
A data pipeline that works on your laptop during development will fail in production — not because your code is wrong, but because production data is unpredictable. Files arrive late. Schemas change without notice. A sensor goes offline. A third-party API returns a 500 error. Robust pipelines are designed to handle failure gracefully. They validate data at every stage, emit structured logs, retry transient errors, and surface problems immediately rather than silently corrupting downstream models.
- **Idempotent**: Running the pipeline twice produces the same result — safe to retry on failure - **Observable**: Every stage emits metrics and logs you can monitor in production - **Schema-enforced**: Data that doesn't match the expected schema is rejected, not passed through - **Fail-fast**: Errors surface immediately at the stage that caused them, not ten steps later
Great Expectations is the standard library for data validation in Python pipelines. You define 'expectations' — assertions about your data — and it checks them on every run.
import great_expectations as gx
context = gx.get_context()
# Define expectations on your dataset
validator = context.sources.pandas_default.read_csv("data/raw/transactions.csv")
# Basic expectations
validator.expect_column_to_exist("user_id")
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_of_type("amount", "float")
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=100_000)
# Run validation and get a report
results = validator.validate()
print(f"Validation passed: {results.success}")
print(f"Failing checks: {results.statistics['unsuccessful_expectations']}")Validation passed: True Failing checks: 0
Prefect turns Python functions into observable, retryable pipeline tasks with a single decorator.
from prefect import flow, task
import pandas as pd
@task(retries=3, retry_delay_seconds=10)
def extract(source_path: str) -> pd.DataFrame:
return pd.read_csv(source_path)
@task
def validate(df: pd.DataFrame) -> pd.DataFrame:
assert df["user_id"].notna().all(), "user_id has nulls"
assert (df["amount"] >= 0).all(), "negative amounts found"
return df
@task
def transform(df: pd.DataFrame) -> pd.DataFrame:
df["amount_log"] = df["amount"].apply(lambda x: max(0, x) ** 0.5)
df["date"] = pd.to_datetime(df["timestamp"]).dt.date
return df[["user_id", "amount_log", "date"]]
@task
def load(df: pd.DataFrame, dest_path: str):
df.to_parquet(dest_path, index=False)
@flow(name="transaction-pipeline")
def run_pipeline(source: str, dest: str):
raw = extract(source)
valid = validate(raw)
processed = transform(valid)
load(processed, dest)
if __name__ == "__main__":
run_pipeline("data/raw/transactions.csv", "data/processed/features.parquet")