Ebook · Hoofdstuk 4 van 10

ETL en ELT Processen

Data verplaatsen en transformeren — van bron tot warehouse, idempotent en betrouwbaar.

De pipeline als product

22 min leestijd Beginner-Gevorderd

Vraag tien data engineers wat ETL is en je krijgt tien antwoorden. De afkorting staat voor Extract, Transform, Load — data uit bronnen halen, transformeren naar het juiste formaat, en laden in het warehouse. Klinkt eenvoudig. In de praktijk gaan hier de meeste projecten kapot, want de hoeken zitten in robuustheid, herstartbaarheid en kosten.

ETL versus ELT

De volgorde van de operaties is veranderd door de cloud. In de pre-cloud-era was compute schaars: je transformeerde data in een tussenstap (op een ETL-server), omdat het warehouse te duur was om die werkdruk te dragen. Met Snowflake, BigQuery en consorten heeft ELT de overhand genomen: laad de ruwe data eerst in het warehouse, en transformeer met SQL in het warehouse zelf. dbt is hier de standaardtool voor.

AspectETL (traditioneel)ELT (modern)
TransformatieIn ETL-engine (Informatica, SSIS)In warehouse (SQL/dbt)
TussenopslagVaak file-based / staging serverStagingtabellen in warehouse
SchaalBeperkt door ETL-serverOnbeperkt (warehouse compute)
Schema-onWriteVaak read (raw blijft ruw)
ToolsInformatica, Talend, SSISFivetran/Airbyte + dbt
KostenmodelLicenties + serversPay-per-query

Wanneer kies je nog wel voor klassieke ETL?

ELT in het warehouse werkt geweldig zolang je ruwe data in het warehouse mág komen. Bij PII-zware bronnen waar pseudonimisering of tokenisatie verplicht is voordat data je analyse-omgeving raakt, moet je transformeren vóór het laden. Hetzelfde geldt voor compliance-trajecten met data residency-eisen, of bronnen waar de volumes te groot zijn om ruw te bewaren (telemetrie, IoT). In die gevallen is ETL — bijvoorbeeld met een streaming engine als Flink of Kafka Streams — gewoon de juiste keuze.

Extract: data uit de bron halen

De extract-stap kent drie hoofdstrategieën:

Een eenvoudige incremental extract in Python:

import pandas as pd
from sqlalchemy import create_engine

src = create_engine("postgresql://user:pwd@source-db/app")
dwh = create_engine("snowflake://user:pwd@account/warehouse")

# Lees laatste watermark uit DWH metadata-tabel
with dwh.begin() as cx:
    last_ts = cx.execute(
        "SELECT MAX(load_ts) FROM meta.last_load WHERE table_name = 'orders'"
    ).scalar() or "1900-01-01"

# Haal alleen gewijzigde rijen op
query = f"""
    SELECT * FROM orders
    WHERE updated_at > '{last_ts}'
"""
df = pd.read_sql(query, src)

# Schrijf naar staging
df.to_sql("stg_orders", dwh, if_exists="append", index=False)

# Update watermark
with dwh.begin() as cx:
    cx.execute(
        "INSERT INTO meta.last_load VALUES ('orders', :ts)",
        {"ts": df["updated_at"].max()}
    )

print(f"{len(df)} rijen geladen, watermark naar {df['updated_at'].max()}")

Pas op met watermarks

Watermarks falen als bronrijen retroactief gewijzigd worden zonder updated_at te bumpen, of als de klok van de bron-database afwijkt. Geef je watermark altijd een kleine overlap (WHERE updated_at >= last_ts - INTERVAL '5 minutes') en zorg voor idempotentie aan de loadkant.

CDC met Debezium en Kafka — een minimaal opzet

Voor een betrouwbare CDC-pipeline op een PostgreSQL-bron leg je een Debezium-connector op de transactielog (WAL) en publiceer je de wijzigingen op een Kafka-topic. Een sink-connector schrijft elke wijziging naar een staging-tabel in het warehouse:

# debezium-postgres-connector.yaml
name: orders-cdc
config:
  connector.class: io.debezium.connector.postgresql.PostgresConnector
  database.hostname: source-db.internal
  database.port: 5432
  database.user: cdc_reader
  database.dbname: app
  database.server.name: app_prod
  table.include.list: public.orders,public.customers
  plugin.name: pgoutput
  publication.name: dbz_app_pub
  slot.name: dbz_app_slot
  tombstones.on.delete: true
  topic.prefix: cdc

De CDC-events bevatten op ("c" = create, "u" = update, "d" = delete) en zowel de before- als after-state van elke rij. Daarmee kun je in silver een correcte SCD2-historie reconstrueren, ook bij deletes — iets wat een watermark-extract structureel mist.

Transform: opschonen en harmoniseren

De transform-laag doet meer dan formatteren. Typische taken:

Hier een dbt-model dat orders dedupliceert en business logic toepast:

-- models/silver/orders.sql
{{ config(
    materialized='incremental',
    unique_key='order_id',
    incremental_strategy='merge'
) }}

WITH source AS (
    SELECT * FROM {{ ref('stg_orders') }}
    {% if is_incremental() %}
      WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
    {% endif %}
),
deduped AS (
    SELECT *,
        ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY updated_at DESC) AS rn
    FROM source
)
SELECT
    order_id,
    customer_id,
    order_date::DATE                    AS order_date,
    UPPER(TRIM(status))                 AS status,
    COALESCE(total_amount, 0)           AS total_amount,
    CASE WHEN total_amount > 1000 THEN 'high'
         WHEN total_amount > 100  THEN 'medium'
         ELSE 'low'
    END                                 AS order_size_tier,
    updated_at
FROM deduped
WHERE rn = 1

Load: idempotency is alles

Een load is idempotent als je hem twee keer kunt draaien zonder dat de uitkomst verandert. Bij een retry, een netwerkstoring of een herstart: de tabel moet er hetzelfde uit blijven zien als bij één geslaagde run.

Drie veelgebruikte loadstrategieën:

De universele MERGE-statement (werkt in Snowflake, BigQuery, Redshift, Synapse):

MERGE INTO dim_customer AS tgt
USING stg_customers AS src
   ON tgt.customer_id = src.customer_id

WHEN MATCHED AND (
       tgt.name      <> src.name
    OR tgt.segment   <> src.segment
    OR tgt.country   <> src.country
) THEN UPDATE SET
       name      = src.name,
       segment   = src.segment,
       country   = src.country,
       updated_at = CURRENT_TIMESTAMP

WHEN NOT MATCHED THEN INSERT (customer_id, name, segment, country, created_at)
   VALUES (src.customer_id, src.name, src.segment, src.country, CURRENT_TIMESTAMP);

De WHEN MATCHED AND-clausule is een belangrijk detail: zonder die check update je elke rij elke run, ook als er niets veranderd is. Dat geeft onnodige churn, lastigere debugging en hogere kosten.

Slowly Changing Dimensions in praktijk

Voor SCD Type 2 (historie bewaren) wordt het complexer. Hoofdstuk 5 gaat hier diep op in, maar als voorproefje:

-- 1. Sluit verlopen versies
UPDATE dim_customer
SET    valid_to = CURRENT_DATE - 1,
       is_current = FALSE
FROM   stg_customers s
WHERE  dim_customer.customer_id = s.customer_id
  AND  dim_customer.is_current = TRUE
  AND  (dim_customer.segment <> s.segment OR dim_customer.country <> s.country);

-- 2. Voeg nieuwe versie toe
INSERT INTO dim_customer (customer_id, name, segment, country, valid_from, valid_to, is_current)
SELECT s.customer_id, s.name, s.segment, s.country, CURRENT_DATE, '9999-12-31', TRUE
FROM   stg_customers s
LEFT JOIN dim_customer d
  ON d.customer_id = s.customer_id AND d.is_current = TRUE
WHERE  d.customer_id IS NULL
   OR  d.segment <> s.segment
   OR  d.country <> s.country;

Orkestratie

Je pipelines moeten ergens worden gestart, in de juiste volgorde, met dependencies. Populaire orkestrators:

Een Airflow-DAG voor een dagelijkse load:

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator

with DAG(
    dag_id="dwh_daily_load",
    start_date=datetime(2026, 1, 1),
    schedule="0 4 * * *",          # elke nacht om 04:00
    catchup=False,
    max_active_runs=1,             # voorkom overlappende runs
    default_args={"retries": 2, "retry_delay": 300},
) as dag:

    extract_orders   = PythonOperator(task_id="extract_orders",   python_callable=run_extract, op_args=["orders"])
    extract_customers = PythonOperator(task_id="extract_customers", python_callable=run_extract, op_args=["customers"])

    transform = DbtCloudRunJobOperator(task_id="dbt_build", job_id=12345)

    quality = PythonOperator(task_id="quality_checks", python_callable=run_ge_suite)

    [extract_orders, extract_customers] >> transform >> quality

Foutafhandeling die echt werkt

Backfill: hoe je historie veilig opnieuw laadt

Vroeg of laat ontdek je een bug in je transformaties die het laatste kwartaal aan data corrumpeert. Of een bronsysteem doet retroactieve correcties op rijen van vorige maand. In beide gevallen heb je een betrouwbare backfill-strategie nodig:

  1. Parametriseer je pipeline op een datumbereik, niet op "gisteren". Een DAG die --start-date en --end-date accepteert is herbruikbaar voor backfills, hercorrupted dagen en initial loads.
  2. Bewaar bronhistorie in bronze. Als bronze append-only is, kun je elke transformatie deterministisch opnieuw draaien zonder de bron opnieuw te bevragen.
  3. Gebruik partitioning op load-datum in silver/gold. Een backfill kan dan veilig één partitie overschrijven zonder de rest te raken — bijvoorbeeld via Snowflake's DELETE + INSERT in transactie, of BigQuery's partition-overwrite.
  4. Schaal compute tijdens de backfill omhoog. Op Snowflake een grotere warehouse-grootte, op Databricks meer workers. Backfills zijn intensiever dan reguliere loads.

Performance-tips voor zware loads

Anti-patterns

Key takeaways

  • ELT is de moderne standaard — laad ruw, transformeer met SQL/dbt.
  • CDC vangt deletes; watermark-extracts niet. Kies bewust.
  • Idempotentie is geen luxe; bouw alle loads zo dat ze veilig herstart kunnen worden.
  • MERGE is je beste vriend voor mutable tabellen — met expliciete change-detection.
  • Atomic swaps voorkomen halve states tijdens loads.
  • Goede orkestratie maakt of breekt operationele rust.

Veelgestelde vragen

Wat is het verschil tussen ETL en ELT?

Bij ETL transformeer je data buiten het warehouse, op een aparte ETL-server, en laad je het resultaat. Bij ELT laad je de ruwe data direct in het warehouse en doe je de transformatie binnen het warehouse, meestal met SQL en dbt. ELT is de moderne standaard omdat cloud-warehouses als Snowflake en BigQuery enorm veel compute leveren tegen lage kosten.

Wat is Change Data Capture (CDC)?

Change Data Capture leest wijzigingen direct uit de transactielog van de bron-database (bijvoorbeeld via Debezium, AWS DMS of Fivetran). Het detecteert inserts, updates en deletes near-real-time en zet ze op een stream. Het grote voordeel ten opzichte van een watermark-extract: deletes worden ook gedetecteerd, en de bron wordt niet belast met queries.

Wat betekent idempotentie in een ETL-pipeline?

Een idempotente load is een load die je twee keer of vaker kunt draaien met exact dezelfde uitkomst. Bij retries, herstarts en netwerkstoringen blijft de doeltabel ongewijzigd na een geslaagde rerun. Praktisch betekent dit: gebruik MERGE in plaats van blinde INSERT, gebruik unique keys, en doe atomische swaps in plaats van TRUNCATE+INSERT.

Welke orkestratietool is het beste?

Apache Airflow is de defacto standaard met het breedste ecosysteem. Prefect en Dagster zijn moderner, type-safe en hebben betere developer experience. Azure Data Factory past goed in de Microsoft-stack. Voor SQL-only workloads is dbt Cloud of dbt + cron vaak voldoende.

Hoe voorkom je dubbele rijen bij retries?

Gebruik MERGE (UPSERT) met een unique key zoals een natural key of hash. Bij append-only tabellen voeg je een unieke run_id of dedupliceer je achteraf met ROW_NUMBER. Op streaming-pipelines werk je met exactly-once semantics via tools als Kafka Streams of Flink. Bouw retries en idempotentie in vanaf dag één.

Wanneer gebruik je full extract en wanneer incremental?

Full extract is geschikt voor kleine, statische bronnen (referentietabellen, metadata) en als de bron geen betrouwbaar updated_at-veld heeft. Incremental extract — via een watermark of CDC — gebruik je voor grote, vaak wijzigende tabellen om bandbreedte en kosten te besparen. Een hybride aanpak (wekelijks full, dagelijks incremental) lost vaak corruptie en gemiste deletes op.