Introductie: De Strijd om Jouw Pipeline
Als data engineer in 2026 kun je er niet omheen: workflow orchestratie is het zenuwstelsel van elke moderne data stack. Je hebt duizenden taken die op het juiste moment, in de juiste volgorde en met de juiste afhankelijkheden moeten draaien. Een fout in de orchestratielaag betekent stale dashboards, mislukte ML-modellen of — erger nog — verkeerde business beslissingen op basis van verouderde data.
Twee namen domineren het gesprek: Apache Airflow en Prefect. Airflow is de gevestigde orde, geboren bij Airbnb in 2014 en inmiddels het meest gebruikte orchestratieplatform ter wereld. Prefect is de uitdager: opgericht in 2018 met de expliciete missie om alles beter te doen dan Airflow. Beide tools hebben hun fanatieke aanhangers, en beide hebben legitieme redenen om te bestaan.
Wat is Workflow Orchestratie?
Workflow orchestratie is het automatisch plannen, uitvoeren en monitoren van complexe taaksequenties (pipelines). Een orchestrator bepaalt wanneer taken starten, beheert afhankelijkheden tussen taken, handelt fouten en retries af en biedt observability via logs en dashboards. Zonder orchestratie heb je een verzameling losse scripts met cronjobs — een recept voor nachtmerries in productie.
In 2026 zijn beide tools volwassen. Airflow 3.0 (uitgebracht in 2025) heeft de architectuur ingrijpend verbeterd. Prefect 3.x heeft zijn serverloze ambities volledig waargemaakt. De keuze is dus niet meer "gebruik Airflow want Prefect is onvolwassen" — het is een serieuze architectuurbeslissing met langdurige gevolgen. In deze blog leggen we alles naast elkaar zodat jij een weloverwogen keuze kunt maken.
Hoe Werken Ze Allebei?
Apache Airflow: DAGs als Eerste Principe
Airflow is gebouwd rond het concept van de Directed Acyclic Graph (DAG). Een DAG is een Python-bestand dat de structuur van je pipeline definieert: welke taken er zijn, in welke volgorde ze draaien en wat hun onderlinge afhankelijkheden zijn. De daadwerkelijke logica leeft in Operators (PythonOperator, BashOperator, SparkSubmitOperator, etc.).
DAG Definitie
Je schrijft een Python-bestand in de dags/ map. De Scheduler pikt dit bestand op en parseert het periodiek (elke paar seconden in Airflow 3.0 via de nieuwe DAG Processor).
Scheduling & Triggering
De Scheduler bepaalt op basis van schedule_interval of afhankelijkheden wanneer een DAG Run aangemaakt wordt. In Airflow 3.0 is de Scheduler aanzienlijk sneller en stabieler geworden.
Task Execution
De Executor (LocalExecutor, CeleryExecutor of KubernetesExecutor) pikt taken op en voert ze uit op Workers. Elke task instance wordt gelogd en bijgehouden in de metadatabase (Postgres of MySQL).
Monitoring via Webserver
De Airflow UI toont de DAG-structuur als graph, de status van alle runs en gedetailleerde logs per task. In Airflow 3.0 is de UI volledig vernieuwd naar een moderne React-frontend.
Prefect: Flows als Gewone Python
Prefect kiest een fundamenteel andere aanpak. In plaats van dat je je pipeline expliciet als een graph definieert, decoreer je gewone Python-functies met @flow en @task. Prefect inspecteert tijdens runtime de uitvoering en bouwt de dependency-graph automatisch op. Dit maakt Prefect-code significant leesbaarder en makkelijker te debuggen.
Flow & Task Decorators
Je omhult Python-functies met @task voor individuele stappen en @flow voor de orkesterende functie. Geen speciale DAG-klassen, geen Operators — gewoon Python.
Prefect Server of Prefect Cloud
Flows worden geregistreerd bij Prefect Server (self-hosted) of Prefect Cloud (managed SaaS). De server beheert scheduling, state en observability maar voert de code zelf niet uit.
Workers & Work Pools
Prefect Workers draaien in jouw infrastructuur (Docker, Kubernetes, serverless). Ze pollen de Prefect Server voor nieuwe flow runs en voeren die lokaal uit. Jij beheert de compute, Prefect beheert de orchestratie.
Observability & Automations
Prefect Cloud biedt uitgebreide monitoring, alerting via webhooks en Automations (event-driven triggers). Je kunt flows automatisch hertriggen op basis van upstream events.
Praktische Codevoorbeelden
Use Case: Dagelijkse ETL Pipeline (Airflow)
We bouwen een pipeline die data uit een REST API haalt, transformeert en naar een PostgreSQL-database schrijft.
# airflow/dags/dagelijkse_api_etl.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime, timedelta
import requests
import pandas as pd
# Standaard argumenten voor alle taken
default_args = {
"owner": "data-team",
"retries": 3,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["data-alerts@bedrijf.nl"],
}
def extract_api_data(**context):
"""Haalt data op van de externe API."""
execution_date = context["ds"] # YYYY-MM-DD formaat
response = requests.get(
f"https://api.voorbeeld.nl/orders",
params={"date": execution_date},
headers={"Authorization": "Bearer {{ var.value.api_token }}"},
timeout=30
)
response.raise_for_status()
# Sla data op via XCom voor de volgende taak
context["ti"].xcom_push(key="raw_orders", value=response.json())
print(f"✅ {len(response.json())} orders opgehaald voor {execution_date}")
def transform_data(**context):
"""Valideert en transformeert de ruwe orderdata."""
raw_data = context["ti"].xcom_pull(
task_ids="extract_data",
key="raw_orders"
)
df = pd.DataFrame(raw_data)
# Transformaties
df["order_date"] = pd.to_datetime(df["order_date"])
df["total_amount"] = df["quantity"] * df["unit_price"]
df = df[df["status"] != "CANCELLED"] # Filter geannuleerde orders
df = df.dropna(subset=["customer_id"])
print(f"📊 {len(df)} geldige orders na transformatie")
context["ti"].xcom_push(key="clean_orders", value=df.to_dict("records"))
def load_to_postgres(**context):
"""Laadt getransformeerde data naar PostgreSQL."""
clean_data = context["ti"].xcom_pull(
task_ids="transform_data",
key="clean_orders"
)
hook = PostgresHook(postgres_conn_id="postgres_datawarehouse")
# Upsert logica: INSERT ON CONFLICT UPDATE
insert_sql = """
INSERT INTO orders_staging (order_id, customer_id, order_date, total_amount, status)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (order_id) DO UPDATE
SET total_amount = EXCLUDED.total_amount,
status = EXCLUDED.status,
updated_at = NOW();
"""
rows = [
(r["order_id"], r["customer_id"], r["order_date"],
r["total_amount"], r["status"])
for r in clean_data
]
hook.run(insert_sql, parameters=rows)
print(f"💾 {len(rows)} orders geladen naar datawarehouse")
# DAG definitie
with DAG(
dag_id="dagelijkse_api_etl",
default_args=default_args,
description="Dagelijkse ETL van orders API naar datawarehouse",
schedule_interval="0 6 * * *", # Elke dag om 06:00
start_date=datetime(2026, 1, 1),
catchup=False, # Geen backfill voor gemiste runs
tags=["etl", "orders", "productie"],
) as dag:
extract_task = PythonOperator(
task_id="extract_data",
python_callable=extract_api_data,
)
transform_task = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
)
load_task = PythonOperator(
task_id="load_to_postgres",
python_callable=load_to_postgres,
)
# Afhankelijkheidsketen definiëren
extract_task >> transform_task >> load_task
Dezelfde Pipeline in Prefect
Nu dezelfde logica in Prefect — merk op hoe de code voelt als gewone Python:
# prefect/flows/dagelijkse_api_etl.py
from prefect import flow, task, get_run_logger
from prefect.blocks.system import Secret
from prefect_sqlalchemy import SqlAlchemyConnector
import requests
import pandas as pd
from datetime import date
@task(
retries=3,
retry_delay_seconds=300,
name="extract-api-data",
description="Haalt orders op van de externe API"
)
def extract_api_data(target_date: date) -> list[dict]:
"""Haalt data op van de externe API."""
logger = get_run_logger()
api_token = Secret.load("api-token").get()
response = requests.get(
"https://api.voorbeeld.nl/orders",
params={"date": target_date.isoformat()},
headers={"Authorization": f"Bearer {api_token}"},
timeout=30
)
response.raise_for_status()
orders = response.json()
logger.info(f"✅ {len(orders)} orders opgehaald voor {target_date}")
return orders
@task(
name="transform-data",
description="Valideert en transformeert orderdata"
)
def transform_data(raw_orders: list[dict]) -> list[dict]:
"""Valideert en transformeert de ruwe orderdata."""
logger = get_run_logger()
df = pd.DataFrame(raw_orders)
df["order_date"] = pd.to_datetime(df["order_date"])
df["total_amount"] = df["quantity"] * df["unit_price"]
df = df[df["status"] != "CANCELLED"]
df = df.dropna(subset=["customer_id"])
logger.info(f"📊 {len(df)} geldige orders na transformatie")
return df.to_dict("records")
@task(
name="load-to-postgres",
description="Laadt getransformeerde data naar PostgreSQL"
)
def load_to_postgres(clean_orders: list[dict]) -> int:
"""Laadt getransformeerde data naar PostgreSQL."""
logger = get_run_logger()
connector = SqlAlchemyConnector.load("postgres-datawarehouse")
with connector.get_connection(begin=False) as conn:
for order in clean_orders:
conn.execute("""
INSERT INTO orders_staging
(order_id, customer_id, order_date, total_amount, status)
VALUES (:order_id, :customer_id, :order_date, :total_amount, :status)
ON CONFLICT (order_id) DO UPDATE
SET total_amount = EXCLUDED.total_amount,
status = EXCLUDED.status,
updated_at = NOW()
""", order)
logger.info(f"💾 {len(clean_orders)} orders geladen naar datawarehouse")
return len(clean_orders)
# De flow: dit voelt als gewone Python!
@flow(
name="dagelijkse-api-etl",
description="Dagelijkse ETL van orders API naar datawarehouse",
log_prints=True
)
def dagelijkse_etl_flow(target_date: date = date.today()):
# Stap 1: Extract
raw_orders = extract_api_data(target_date)
# Stap 2: Transform (automatisch afhankelijk van extract)
clean_orders = transform_data(raw_orders)
# Stap 3: Load (automatisch afhankelijk van transform)
loaded_count = load_to_postgres(clean_orders)
return f"Succesvol {loaded_count} orders verwerkt"
if __name__ == "__main__":
# Lokaal uitvoeren voor testen — geen scheduler nodig!
dagelijkse_etl_flow()
Sleutelverschil in de Code
In Airflow gebruik je XCom voor data-uitwisseling tussen taken — dit vereist expliciet pushen en pullen van values via context["ti"]. In Prefect geef je data simpelweg als return-waarden en functie-argumenten door. De Prefect-code is daardoor significant intuïtiever en makkelijker te testen met standaard Python pytest.
Directe Vergelijking: Airflow vs Prefect
| Criterium | Apache Airflow | Prefect | Winnaar |
|---|---|---|---|
| Leercurve | Steil — DAG-paradigma, Operators, XCom, Connections | Laag — Python decorators, intuïtieve API | 🏆 Prefect |
| Ecosysteem / Providers | Enorm: 80+ officiële providers (AWS, GCP, Azure, Spark...) | Groeiend: 30+ integraties, minder volwassen | 🏆 Airflow |
| Lokaal Ontwikkelen | Complex — vereist docker-compose setup | Simpel — python flow.py werkt direct |
🏆 Prefect |
| Dynamische Pipelines | Beperkt (Dynamic Task Mapping in Airflow 2.3+, maar complex) | Native — gewone Python loops en conditionals | 🏆 Prefect |
| Testen | Lastig — DAGs vereisen Airflow-context | Eenvoudig — standaard Python unittest/pytest | 🏆 Prefect |
| Schaalbaarheid | Uitstekend met CeleryExecutor/KubernetesExecutor | Uitstekend met Kubernetes/serverless Work Pools | 🤝 Gelijk |
| Managed Optie | MWAA (AWS), Cloud Composer (GCP), Astronomer | Prefect Cloud (native, inclusief gratis tier) | 🏆 Prefect Cloud |
| Community & Documentatie | Volwassen — 10+ jaar aan StackOverflow vragen | Actief — goede docs, maar kleiner | 🏆 Airflow |
| UI & Observability | Volledig vernieuwd in Airflow 3.0 | Modern en intuïtief, Prefect Cloud heeft meer features | 🏆 Prefect Cloud |
| Licentie & Kosten | Apache 2.0 (open source), managed opties zijn duur | Open source core + Prefect Cloud (freemium) | 🤝 Context-afhankelijk |
| Event-driven Triggers | Dataset-gebaseerde triggers (Airflow 2.4+) | Uitgebreide Automations — webhooks, state-based | 🏆 Prefect |
| Adoptie in Enterprise | Dominant — de facto standaard | Groeiend, maar nog niet mainstream in enterprise | 🏆 Airflow |
Vergelijking met Andere Orchestrators
Airflow en Prefect zijn niet de enige spelers. In 2026 zijn er serieuze alternatieven die voor specifieke use cases beter kunnen passen:
Dagster
Asset-gebaseerde orchestratie. Ideaal als je denkt in data assets (tabellen, modellen) in plaats van taken. Uitstekende dbt-integratie. Steilere leercurve, maar superieure lineage observability. Sterke keuze voor data teams die software engineering principes centraal stellen.
Temporal
Workflow engine voor long-running, stateful processen. Niet primair gericht op data pipelines maar op business workflows. Uitzonderlijk robuust en schaalbaar. Te complex voor standaard ETL, maar onverslaanbaar voor complexe event-driven orchestratie.
Mage.ai
De nieuwkomer gericht op developer experience. Notebook-gebaseerde pipelines, ingebouwde data preview en testing. Interessant voor kleinere teams. Nog niet bewezen op enterprise-schaal, maar de snelste groei in adoptie onder startups in 2025-2026.
Praktijkvoorbeeld: Nederlandse FinTech Scale-up
Een Nederlandse betalingsverwerker met 50 engineers was jarenlang Airflow-gebruiker. Hun primaire pijnpunten: elke DAG-wijziging vereiste een deployment, lokaal testen was tijdrovend en nieuwe data engineers hadden weken nodig om productief te zijn.
Ze migreerden kritieke ML-pipelines naar Prefect, terwijl legacy ETL-pipelines op Airflow bleven. Resultaat: onboarding van nieuwe engineers daalde van 3 weken naar 4 dagen voor de Prefect-pipelines. De Airflow-pipelines — met 300+ provider-integraties — bleven stabiel op de bestaande infrastructuur. Een hybride aanpak die voor hen uitstekend werkt.
Best Practices & Production Tips
Apache Airflow in Productie
Airflow Best Practices
- Idempotente taken: Elke task moet meerdere keren uitvoerbaar zijn met hetzelfde resultaat. Gebruik
execution_dateals partitie-sleutel, nooitdatetime.now(). - Kleine XComs: XCom is voor metadata, niet voor data. Stuur nooit DataFrames via XCom — gebruik S3/GCS als tussenopslag.
- Catchup bewust uitschakelen: Zet
catchup=Falseop pipelines die niet historisch kunnen backfillen, anders overlaad je de scheduler bij een herstart. - DAG-parsing performance: Zware imports (Spark, grote ML-libraries) buiten de DAG-definitie houden. De scheduler parst DAGs constant — trage imports = trage scheduler.
- Gebruik Connections & Variables: Sla nooit credentials hardcoded op. Gebruik Airflow Connections voor database-credentials en Variables voor configuratie.
- KubernetesExecutor voor isolatie: In productie met variabele workloads geeft KubernetesPodOperator de beste resource-isolatie per task.
# airflow/dags/production_best_practice.py
# ✅ GOED: Imports buiten de module-scope, idempotente logica
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# ✅ Imports die bij task-uitvoering plaatsvinden, NIET bij DAG-parsing
def transform_data(**context):
import pandas as pd # Import hier, niet globaal!
from mypackage import transformer
ds = context["ds"] # Gebruik execution_date als partitie
# ✅ Idempotent: schrijf naar partitie gebaseerd op datum
output_path = f"s3://my-bucket/orders/date={ds}/output.parquet"
df = transformer.run(date=ds)
df.to_parquet(output_path)
# ✅ Kleine XCom: alleen het pad, niet de data
return output_path # Automatisch als XCom opgeslagen
Prefect in Productie
Prefect Best Practices
- Task cache: Gebruik
cache_key_fnvoor expensive taken die bij ongewijzigde input gecached kunnen worden — bespaart kosten en tijd bij retries. - Concurrency limieten: Stel Work Pool concurrency in om te voorkomen dat downstream systemen overbelast raken bij vele gelijktijdige runs.
- Artifacts gebruiken: Gebruik
create_table_artifact()encreate_link_artifact()voor rijke observability direct in de Prefect UI. - Deployments via CI/CD: Automatiseer
prefect deployin je GitHub Actions pipeline. Nooit handmatig deployen in productie. - Result persistence: Configureer
result_storagevoor grote flows zodat task-resultaten op S3/GCS opgeslagen worden en niet in memory.
# prefect/flows/production_best_practice.py
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.artifacts import create_table_artifact
from datetime import timedelta
@task(
retries=3,
retry_delay_seconds=exponential_backoff(backoff_factor=2),
# ✅ Cache: sla resultaat op voor 24 uur bij zelfde input
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=24),
persist_result=True # ✅ Opslaan in geconfigureerde result storage
)
def expensive_transformation(date: str, config: dict) -> list[dict]:
"""Zware transformatie die gecached wordt bij identieke input."""
import pandas as pd
# ... zware berekeningen ...
return results
@flow(
name="productie-flow",
result_storage="s3-bucket/prefect-results", # Expliciete result storage
)
def productie_pipeline(target_date: str):
# ✅ Artifacts voor rijke UI observability
results = expensive_transformation(target_date, config={"env": "prod"})
create_table_artifact(
key="resultaat-samenvatting",
table=results[:10], # Preview van eerste 10 rijen
description=f"Pipeline resultaten voor {target_date}"
)
return results
Wanneer Kies Je Wat?
Na al deze vergelijkingen de praktische beslisboom voor Nederlandse data engineers:
| Situatie |
|---|