Van Python scripts tot enterprise-grade orchestratie: een praktische handleiding voor het bouwen van robuuste, herbruikbare en observeerbare data pipelines.
Een data pipeline is een geautomatiseerd proces dat data verplaatst van bron naar bestemming, inclusief validatie, transformatie en foutafhandeling. Het verschil met een losse script: een pipeline is herhaalbaar, observeerbaar en herstelbaar bij fouten.
| Tool | Type | Sterk in | Wanneer kiezen |
|---|---|---|---|
| Python + SQLAlchemy | Script | Flexibiliteit, eenvoud | Klein team, eenvoudige pipelines |
| Azure Data Factory | No-code/low-code | Azure integratie, GUI | Niet-technische teams, Azure-first |
| Apache Airflow | Orchestrator | Complexe afhankelijkheden, Python | Grote teams, veel pipelines |
| Databricks DLT | Declaratief | Streaming + batch, Delta | Lakehouse architectuur |
| dbt | Transformatie | SQL transformaties, tests | Transformatielaag in DWH |
Een Python pipeline die data ophaalt van een REST API en laadt in een database is een uitstekend startpunt en leert je de fundamenten van pipeline design.
requests==2.31.0 pandas==2.2.0 sqlalchemy==2.0.27 psycopg2-binary==2.9.9 python-dotenv==1.0.0
import os import logging from datetime import datetime, timezone import requests import pandas as pd from sqlalchemy import create_engine, text from dotenv import load_dotenv load_dotenv() logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" ) log = logging.getLogger(__name__) def extract(api_url: str, api_key: str) -> list[dict]: """Haal data op van REST API met paginering.""" records = [] page = 1 headers = {"Authorization": f"Bearer {api_key}"} while True: response = requests.get( api_url, headers=headers, params={"page": page, "per_page": 100}, timeout=30 ) response.raise_for_status() # gooit HTTPError bij 4xx/5xx data = response.json() if not data.get("items"): break records.extend(data["items"]) log.info(f"Pagina {page}: {len(data['items'])} records opgehaald") page += 1 log.info(f"Extract klaar: {len(records)} records totaal") return records def transform(records: list[dict]) -> pd.DataFrame: """Valideer en normaliseer de ruwe API data.""" df = pd.DataFrame(records) # Type casting df["order_date"] = pd.to_datetime(df["order_date"], utc=True) df["amount"] = pd.to_numeric(df["amount"], errors="coerce") # Validatie invalid = df[df["amount"].isna() | (df["amount"] < 0)] if not invalid.empty: log.warning(f"{len(invalid)} ongeldige records weggegooid") df = df[df["amount"] >= 0] # Metadata toevoegen df["_loaded_at"] = datetime.now(timezone.utc) return df[["order_id", "customer_id", "order_date", "amount", "status", "_loaded_at"]] def load(df: pd.DataFrame, engine, table: str) -> None: """Laad data via upsert in de database.""" with engine.begin() as conn: # Tijdelijke staging tabel df.to_sql(f"{table}_staging", conn, if_exists="replace", index=False) # Upsert: insert + update bij conflict conn.execute(text(f""" INSERT INTO {table} (order_id, customer_id, order_date, amount, status, _loaded_at) SELECT order_id, customer_id, order_date, amount, status, _loaded_at FROM {table}_staging ON CONFLICT (order_id) DO UPDATE SET status = EXCLUDED.status, _loaded_at = EXCLUDED._loaded_at """)) log.info(f"Load klaar: {len(df)} rijen in '{table}'") if __name__ == "__main__": engine = create_engine(os.getenv("DATABASE_URL")) raw = extract(os.getenv("API_URL"), os.getenv("API_KEY")) clean = transform(raw) load(clean, engine, "bronze.orders")
Azure Data Factory (ADF) is Microsoft's cloud ETL service met een grafische interface. Pipelines worden opgeslagen als JSON en kunnen worden beheerd via Git.
{
"name": "ls_rest_api_orders",
"type": "RestService",
"properties": {
"url": "https://api.example.com",
"enableServerCertificateValidation": true,
"authenticationType": "Anonymous",
"authHeaders": {
"Authorization": {
"type": "AzureKeyVaultSecret",
"store": { "referenceName": "ls_keyvault" },
"secretName": "api-token"
}
}
}
}
{
"name": "pl_ingest_orders",
"parameters": {
"start_date": { "type": "string" },
"end_date": { "type": "string" }
},
"activities": [
{
"name": "copy_orders_to_adls",
"type": "Copy",
"inputs": [{
"referenceName": "ds_rest_orders",
"parameters": {
"start": "@pipeline().parameters.start_date",
"end": "@pipeline().parameters.end_date"
}
}],
"outputs": [{ "referenceName": "ds_adls_bronze_orders" }],
"typeProperties": {
"source": { "type": "RestSource" },
"sink": {
"type": "ParquetSink",
"storeSettings": { "type": "AzureBlobFSWriteSettings" }
}
}
}
]
}
Apache Airflow is de meest gebruikte open-source orchestrationtool. DAGs (Directed Acyclic Graphs) definiëren de taken en hun afhankelijkheden.
from __future__ import annotations from datetime import datetime, timedelta from airflow.decorators import dag, task from airflow.providers.http.sensors.http import HttpSensor from airflow.providers.postgres.hooks.postgres import PostgresHook default_args = { "owner": "data-team", "retries": 3, "retry_delay": timedelta(minutes=5), "email_on_failure": True, "email": ["alerts@bedrijf.nl"], } @dag( dag_id="ingest_orders_daily", schedule="0 6 * * *", # elke dag om 06:00 UTC start_date=datetime(2026, 1, 1), catchup=False, default_args=default_args, tags=["bronze", "orders"], ) def ingest_orders_daily(): # Sensor: wacht tot API beschikbaar is api_ready = HttpSensor( task_id="wait_for_api", http_conn_id="orders_api", endpoint="/health", poke_interval=60, timeout=600, ) @task() def extract(execution_date=None) -> list[dict]: import requests date_str = execution_date.strftime("%Y-%m-%d") response = requests.get( f"https://api.example.com/orders?date={date_str}", timeout=30 ) response.raise_for_status() return response.json()["items"] @task() def load_to_postgres(records: list[dict]) -> int: hook = PostgresHook(postgres_conn_id="postgres_dwh") with hook.get_conn() as conn, conn.cursor() as cur: cur.executemany( """INSERT INTO bronze.orders (order_id, customer_id, amount, status, order_date) VALUES (%(order_id)s, %(customer_id)s, %(amount)s, %(status)s, %(order_date)s) ON CONFLICT (order_id) DO NOTHING""", records ) return len(records) # Definieer afhankelijkheden raw_data = extract() row_count = load_to_postgres(raw_data) api_ready >> raw_data ingest_orders_daily()
apache-airflow Docker Compose setup voor lokaal ontwikkelen. In productie: gebruik AWS MWAA, Google Cloud Composer of Astronomer voor managed Airflow.
Delta Live Tables (DLT) is Databricks' declaratieve pipeline framework. Je beschrijft wat de data moet zijn, niet hoe het berekend moet worden.
import dlt from pyspark.sql import functions as F from pyspark.sql.types import StructType, StructField, StringType, DoubleType # Bronze: ruwe data inlezen @dlt.table( name="bronze_orders", comment="Ruwe orders vanuit ADLS landing zone", table_properties={"pipelines.autoOptimize.managed": "true"} ) def bronze_orders(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load("abfss://landing@storage.dfs.core.windows.net/orders/") ) # Silver: validatie met DLT Expectations @dlt.expect_or_drop("order_id niet null", "order_id IS NOT NULL") @dlt.expect_or_drop("positief bedrag", "amount > 0") @dlt.expect("geldige status", "status IN ('open', 'closed', 'pending')") @dlt.table(name="silver_orders") def silver_orders(): return ( dlt.read_stream("bronze_orders") .select( F.col("order_id"), F.col("customer_id"), F.to_date("order_date").alias("order_date"), F.col("amount").cast("double"), F.lower("status").alias("status"), F.current_timestamp().alias("_processed_at") ) )
Pipelines falen. Netwerk timeouts, API rate limits, schema wijzigingen — allemaal voorspelbare problemen waarvoor je van tevoren een strategie moet hebben.
import time import logging from functools import wraps log = logging.getLogger(__name__) def retry(max_attempts: int = 3, backoff_seconds: float = 2.0, exceptions: tuple = (Exception,)): """Decorator voor automatische retry met exponential backoff.""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): for attempt in range(1, max_attempts + 1): try: return func(*args, **kwargs) except exceptions as e: if attempt == max_attempts: log.error(f"Alle {max_attempts} pogingen mislukt: {e}") raise wait = backoff_seconds ** attempt log.warning( f"Poging {attempt} mislukt ({e}). " f"Wachten {wait:.0f}s voor volgende poging..." ) time.sleep(wait) return wrapper return decorator # Gebruik: @retry(max_attempts=3, backoff_seconds=2.0, exceptions=(requests.Timeout, requests.ConnectionError)) def fetch_page(url: str, page: int) -> dict: response = requests.get(url, params={"page": page}, timeout=30) response.raise_for_status() return response.json()
pip install tenacity
Een idempotente pipeline geeft hetzelfde resultaat ongeacht hoe vaak je hem uitvoert. Dit maakt veilig herstarten na fouten mogelijk.
import json from pathlib import Path class Checkpoint: """Sla pipeline voortgang op in een JSON bestand.""" def __init__(self, path: str): self.path = Path(path) def get(self, key: str, default=None): if not self.path.exists(): return default data = json.loads(self.path.read_text()) return data.get(key, default) def set(self, key: str, value) -> None: data = json.loads(self.path.read_text()) if self.path.exists() else {} data[key] = value self.path.write_text(json.dumps(data, default=str)) # Gebruik in pipeline: checkpoint = Checkpoint("pipeline_state.json") last_loaded = checkpoint.get("last_order_id", default=0) new_records = fetch_orders_since(last_loaded) load_to_db(new_records) # Sla voortgang op na succesvolle load if new_records: checkpoint.set("last_order_id", new_records[-1]["order_id"])
Observeerbaarheid is de sleutel tot betrouwbare pipelines. Je moet altijd weten: wat is de laatste succesvolle run, hoeveel records zijn verwerkt, en wat gaat er mis?
import requests import os def send_slack_alert(pipeline_name: str, error: str) -> None: """Stuur een Slack bericht bij pipeline fout.""" webhook_url = os.getenv("SLACK_WEBHOOK_URL") payload = { "blocks": [ { "type": "header", "text": {"type": "plain_text", "text": f"🚨 Pipeline fout: {pipeline_name}"} }, { "type": "section", "text": {"type": "mrkdwn", "text": f"*Fout:* `{error}`\n*Tijd:* {datetime.now()}"} } ] } requests.post(webhook_url, json=payload, timeout=10) # In je pipeline hoofd-loop: try: run_pipeline() except Exception as e: send_slack_alert("ingest_orders", str(e)) raise