← Terug naar Downloads DataPartner365 — datapartner365.nl
DataPartner365 · Handleiding · 2026

Data Pipelines Bouwen

Van Python scripts tot enterprise-grade orchestratie: een praktische handleiding voor het bouwen van robuuste, herbruikbare en observeerbare data pipelines.

Python Airflow Azure Data Factory Databricks Gratis
1

Wat is een data pipeline & tooling kiezen

Definitie, vergelijking tools, wanneer wat gebruiken

Een data pipeline is een geautomatiseerd proces dat data verplaatst van bron naar bestemming, inclusief validatie, transformatie en foutafhandeling. Het verschil met een losse script: een pipeline is herhaalbaar, observeerbaar en herstelbaar bij fouten.

ToolTypeSterk inWanneer kiezen
Python + SQLAlchemyScriptFlexibiliteit, eenvoudKlein team, eenvoudige pipelines
Azure Data FactoryNo-code/low-codeAzure integratie, GUINiet-technische teams, Azure-first
Apache AirflowOrchestratorComplexe afhankelijkheden, PythonGrote teams, veel pipelines
Databricks DLTDeclaratiefStreaming + batch, DeltaLakehouse architectuur
dbtTransformatieSQL transformaties, testsTransformatielaag in DWH
ℹ️ Voor de meeste organisaties: begin met Python voor eenvoudige pipelines, voeg Airflow toe zodra je meer dan 5-10 pipelines hebt die van elkaar afhangen. Azure Data Factory is een goede keuze als het team geen Python wil schrijven.
2

Eenvoudige Python pipeline

requests + pandas + SQLAlchemy, API naar database

Een Python pipeline die data ophaalt van een REST API en laadt in een database is een uitstekend startpunt en leert je de fundamenten van pipeline design.

python — requirements.txt
requests==2.31.0 pandas==2.2.0 sqlalchemy==2.0.27 psycopg2-binary==2.9.9 python-dotenv==1.0.0
python — pipeline.py
import os import logging from datetime import datetime, timezone import requests import pandas as pd from sqlalchemy import create_engine, text from dotenv import load_dotenv load_dotenv() logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" ) log = logging.getLogger(__name__) def extract(api_url: str, api_key: str) -> list[dict]: """Haal data op van REST API met paginering.""" records = [] page = 1 headers = {"Authorization": f"Bearer {api_key}"} while True: response = requests.get( api_url, headers=headers, params={"page": page, "per_page": 100}, timeout=30 ) response.raise_for_status() # gooit HTTPError bij 4xx/5xx data = response.json() if not data.get("items"): break records.extend(data["items"]) log.info(f"Pagina {page}: {len(data['items'])} records opgehaald") page += 1 log.info(f"Extract klaar: {len(records)} records totaal") return records def transform(records: list[dict]) -> pd.DataFrame: """Valideer en normaliseer de ruwe API data.""" df = pd.DataFrame(records) # Type casting df["order_date"] = pd.to_datetime(df["order_date"], utc=True) df["amount"] = pd.to_numeric(df["amount"], errors="coerce") # Validatie invalid = df[df["amount"].isna() | (df["amount"] < 0)] if not invalid.empty: log.warning(f"{len(invalid)} ongeldige records weggegooid") df = df[df["amount"] >= 0] # Metadata toevoegen df["_loaded_at"] = datetime.now(timezone.utc) return df[["order_id", "customer_id", "order_date", "amount", "status", "_loaded_at"]] def load(df: pd.DataFrame, engine, table: str) -> None: """Laad data via upsert in de database.""" with engine.begin() as conn: # Tijdelijke staging tabel df.to_sql(f"{table}_staging", conn, if_exists="replace", index=False) # Upsert: insert + update bij conflict conn.execute(text(f""" INSERT INTO {table} (order_id, customer_id, order_date, amount, status, _loaded_at) SELECT order_id, customer_id, order_date, amount, status, _loaded_at FROM {table}_staging ON CONFLICT (order_id) DO UPDATE SET status = EXCLUDED.status, _loaded_at = EXCLUDED._loaded_at """)) log.info(f"Load klaar: {len(df)} rijen in '{table}'") if __name__ == "__main__": engine = create_engine(os.getenv("DATABASE_URL")) raw = extract(os.getenv("API_URL"), os.getenv("API_KEY")) clean = transform(raw) load(clean, engine, "bronze.orders")
3

Azure Data Factory pipeline

Copy Activity, Linked Services, parameters, triggers

Azure Data Factory (ADF) is Microsoft's cloud ETL service met een grafische interface. Pipelines worden opgeslagen als JSON en kunnen worden beheerd via Git.

Linked Service configureren (REST API bron)

json — linked service definitie
{ "name": "ls_rest_api_orders", "type": "RestService", "properties": { "url": "https://api.example.com", "enableServerCertificateValidation": true, "authenticationType": "Anonymous", "authHeaders": { "Authorization": { "type": "AzureKeyVaultSecret", "store": { "referenceName": "ls_keyvault" }, "secretName": "api-token" } } } }

Pipeline JSON (Copy Activity)

json — pipeline met parameters
{ "name": "pl_ingest_orders", "parameters": { "start_date": { "type": "string" }, "end_date": { "type": "string" } }, "activities": [ { "name": "copy_orders_to_adls", "type": "Copy", "inputs": [{ "referenceName": "ds_rest_orders", "parameters": { "start": "@pipeline().parameters.start_date", "end": "@pipeline().parameters.end_date" } }], "outputs": [{ "referenceName": "ds_adls_bronze_orders" }], "typeProperties": { "source": { "type": "RestSource" }, "sink": { "type": "ParquetSink", "storeSettings": { "type": "AzureBlobFSWriteSettings" } } } } ] }
Sla gevoelige verbindingsgegevens (wachtwoorden, API keys) altijd op in Azure Key Vault. Gebruik Managed Identity voor ADF om zonder opgeslagen credentials te authenticeren bij Azure resources.
4

Airflow DAG schrijven

Python Operator, Sensors, TaskFlow API, scheduling

Apache Airflow is de meest gebruikte open-source orchestrationtool. DAGs (Directed Acyclic Graphs) definiëren de taken en hun afhankelijkheden.

python — airflow dag (taskflow api)
from __future__ import annotations from datetime import datetime, timedelta from airflow.decorators import dag, task from airflow.providers.http.sensors.http import HttpSensor from airflow.providers.postgres.hooks.postgres import PostgresHook default_args = { "owner": "data-team", "retries": 3, "retry_delay": timedelta(minutes=5), "email_on_failure": True, "email": ["alerts@bedrijf.nl"], } @dag( dag_id="ingest_orders_daily", schedule="0 6 * * *", # elke dag om 06:00 UTC start_date=datetime(2026, 1, 1), catchup=False, default_args=default_args, tags=["bronze", "orders"], ) def ingest_orders_daily(): # Sensor: wacht tot API beschikbaar is api_ready = HttpSensor( task_id="wait_for_api", http_conn_id="orders_api", endpoint="/health", poke_interval=60, timeout=600, ) @task() def extract(execution_date=None) -> list[dict]: import requests date_str = execution_date.strftime("%Y-%m-%d") response = requests.get( f"https://api.example.com/orders?date={date_str}", timeout=30 ) response.raise_for_status() return response.json()["items"] @task() def load_to_postgres(records: list[dict]) -> int: hook = PostgresHook(postgres_conn_id="postgres_dwh") with hook.get_conn() as conn, conn.cursor() as cur: cur.executemany( """INSERT INTO bronze.orders (order_id, customer_id, amount, status, order_date) VALUES (%(order_id)s, %(customer_id)s, %(amount)s, %(status)s, %(order_date)s) ON CONFLICT (order_id) DO NOTHING""", records ) return len(records) # Definieer afhankelijkheden raw_data = extract() row_count = load_to_postgres(raw_data) api_ready >> raw_data ingest_orders_daily()
ℹ️ Gebruik Astro CLI (Astronomer) of de officiële apache-airflow Docker Compose setup voor lokaal ontwikkelen. In productie: gebruik AWS MWAA, Google Cloud Composer of Astronomer voor managed Airflow.
5

Databricks Delta Live Tables

Declaratieve pipelines, streaming + batch, expectations

Delta Live Tables (DLT) is Databricks' declaratieve pipeline framework. Je beschrijft wat de data moet zijn, niet hoe het berekend moet worden.

python — delta live tables pipeline
import dlt from pyspark.sql import functions as F from pyspark.sql.types import StructType, StructField, StringType, DoubleType # Bronze: ruwe data inlezen @dlt.table( name="bronze_orders", comment="Ruwe orders vanuit ADLS landing zone", table_properties={"pipelines.autoOptimize.managed": "true"} ) def bronze_orders(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load("abfss://landing@storage.dfs.core.windows.net/orders/") ) # Silver: validatie met DLT Expectations @dlt.expect_or_drop("order_id niet null", "order_id IS NOT NULL") @dlt.expect_or_drop("positief bedrag", "amount > 0") @dlt.expect("geldige status", "status IN ('open', 'closed', 'pending')") @dlt.table(name="silver_orders") def silver_orders(): return ( dlt.read_stream("bronze_orders") .select( F.col("order_id"), F.col("customer_id"), F.to_date("order_date").alias("order_date"), F.col("amount").cast("double"), F.lower("status").alias("status"), F.current_timestamp().alias("_processed_at") ) )
6

Error handling & Retry logica

Exceptions, exponential backoff, dead letter queues

Pipelines falen. Netwerk timeouts, API rate limits, schema wijzigingen — allemaal voorspelbare problemen waarvoor je van tevoren een strategie moet hebben.

python — retry met exponential backoff
import time import logging from functools import wraps log = logging.getLogger(__name__) def retry(max_attempts: int = 3, backoff_seconds: float = 2.0, exceptions: tuple = (Exception,)): """Decorator voor automatische retry met exponential backoff.""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): for attempt in range(1, max_attempts + 1): try: return func(*args, **kwargs) except exceptions as e: if attempt == max_attempts: log.error(f"Alle {max_attempts} pogingen mislukt: {e}") raise wait = backoff_seconds ** attempt log.warning( f"Poging {attempt} mislukt ({e}). " f"Wachten {wait:.0f}s voor volgende poging..." ) time.sleep(wait) return wrapper return decorator # Gebruik: @retry(max_attempts=3, backoff_seconds=2.0, exceptions=(requests.Timeout, requests.ConnectionError)) def fetch_page(url: str, page: int) -> dict: response = requests.get(url, params={"page": page}, timeout=30) response.raise_for_status() return response.json()
Gebruik de tenacity library voor geavanceerde retry logica met jitter (willekeurige vertraging) om thundering herd problemen te voorkomen: pip install tenacity
7

Idempotentie & Checkpointing

Veilig herstarten, watermarks, state management

Een idempotente pipeline geeft hetzelfde resultaat ongeacht hoe vaak je hem uitvoert. Dit maakt veilig herstarten na fouten mogelijk.

Idempotentie patronen

  • Upsert (MERGE): INSERT + UPDATE bij conflict — geen duplicaten mogelijk
  • Truncate + Insert: Verwijder alles, herlaad. Eenvoudig maar traag voor grote tabellen
  • Partitie-overwrite: Vervang alleen de betreffende tijdspartitie
  • Watermark: Sla de laatste verwerkte waarde op, begin altijd daar
python — checkpoint/watermark patroon
import json from pathlib import Path class Checkpoint: """Sla pipeline voortgang op in een JSON bestand.""" def __init__(self, path: str): self.path = Path(path) def get(self, key: str, default=None): if not self.path.exists(): return default data = json.loads(self.path.read_text()) return data.get(key, default) def set(self, key: str, value) -> None: data = json.loads(self.path.read_text()) if self.path.exists() else {} data[key] = value self.path.write_text(json.dumps(data, default=str)) # Gebruik in pipeline: checkpoint = Checkpoint("pipeline_state.json") last_loaded = checkpoint.get("last_order_id", default=0) new_records = fetch_orders_since(last_loaded) load_to_db(new_records) # Sla voortgang op na succesvolle load if new_records: checkpoint.set("last_order_id", new_records[-1]["order_id"])
8

Monitoring & Alerting

Logging, metrics, Slack alerts, SLA tracking

Observeerbaarheid is de sleutel tot betrouwbare pipelines. Je moet altijd weten: wat is de laatste succesvolle run, hoeveel records zijn verwerkt, en wat gaat er mis?

python — slack alert bij fout
import requests import os def send_slack_alert(pipeline_name: str, error: str) -> None: """Stuur een Slack bericht bij pipeline fout.""" webhook_url = os.getenv("SLACK_WEBHOOK_URL") payload = { "blocks": [ { "type": "header", "text": {"type": "plain_text", "text": f"🚨 Pipeline fout: {pipeline_name}"} }, { "type": "section", "text": {"type": "mrkdwn", "text": f"*Fout:* `{error}`\n*Tijd:* {datetime.now()}"} } ] } requests.post(webhook_url, json=payload, timeout=10) # In je pipeline hoofd-loop: try: run_pipeline() except Exception as e: send_slack_alert("ingest_orders", str(e)) raise
ℹ️ Gebruik OpenTelemetry voor gestandaardiseerde metrics en traces. In Airflow: gebruik de ingebouwde SLA Miss callbacks. In Azure Monitor: configureer alerts op ADF pipeline failures via Azure Monitor alerts.
9

Checklist

Verplichte controles voor productie deployment
Robuustheid
  • Retry logica met exponential backoff geïmplementeerd
  • Timeouts ingesteld op alle externe aanroepen
  • Foutmeldingen zijn informatief en bevatten context
  • Dead letter opslag voor afgewezen records
Idempotentie
  • Dubbel uitvoeren geeft hetzelfde resultaat
  • Upsert of partitie-overwrite gebruikt (geen pure INSERT)
  • Checkpointing/watermark geïmplementeerd
  • Pipeline handmatig herstarten getest na gesimuleerde fout
Observeerbaarheid
  • Structured logging actief (JSON format)
  • Verwerkt recordaantal gelogd per run
  • Alerting geconfigureerd bij fout
  • SLA monitoring actief (maximale vertraging)
Beveiliging
  • Secrets in Key Vault / environment variables (niet in code)
  • Minimale permissies voor service accounts
  • Verbindingen lopen via VNet of Private Endpoint