Wat is OpenLineage en waarom is het cruciaal in 2026?
Data pipelines zijn complexer dan ooit. Organisaties draaien tientallen, soms honderden, gekoppelde workflows in Airflow, dbt, Spark en Flink — verspreid over meerdere clouds en on-premise omgevingen. Wanneer één dataset corrupt raakt, weten teams vaak pas uren later welke downstream dashboards, ML-modellen en rapporten daardoor zijn aangetast. De kosten van deze "data downtime" lopen in grote organisaties op tot miljoenen euro's per jaar.
Definitie: OpenLineage
OpenLineage is een open standaard (Linux Foundation) voor het vastleggen en uitwisselen van data lineage-metadata. Het definieert een universeel event-model waarmee tools als Apache Airflow, dbt, Apache Spark en Flink automatisch bijhouden welke datasets worden gelezen, getransformeerd en geschreven — inclusief tijdstempels, jobparameters en kwaliteitsstatistieken.
Data Observability is het bredere concept: de mate waarin een team de gezondheid van zijn datasysteemen continu kan monitoren, begrijpen en herstellen — vergelijkbaar met applicatie-observability (Prometheus + Grafana) maar dan gericht op data.
In 2026 is data observability geen nice-to-have meer. Regelgeving zoals de EU AI Act, DORA en GDPR vereisen aantoonbare datakwaliteit en herleidbaarheid. Tegelijkertijd maken LLM-gedreven data producten het catastrofaal als trainingsdata ongemerkt degradeert. OpenLineage biedt de open, vendor-neutrale ruggengraat waarop organisaties hun observability-stack bouwen.
Automatische Lineage
Geen handmatige documentatie meer. Integrations met Airflow, dbt en Spark sturen events automatisch.
Compliance-Ready
Volledige audit trail: wie raakte welke data aan, wanneer, en met welk resultaat.
Root Cause in Minuten
Impact analyse bij data-incidenten: direct zien welke downstream assets zijn beïnvloed.
Hoe werkt OpenLineage? Het event-model uitgelegd
OpenLineage werkt via een simpel maar krachtig principe: elke data-tool die een job uitvoert, stuurt gestandaardiseerde JSON-events naar een centrale backend (de Lineage Backend). Het referentie-backend is Marquez, maar de standaard staat het gebruik van elk compatibel systeem toe (Atlan, DataHub, OpenMetadata, enz.).
Job Start — START Event
Wanneer een Airflow-taak of Spark-job begint, vuurt de OpenLineage-integratie een START-event
af. Dit event bevat de job-naam, namespace, inputs (datasets die worden gelezen) en de run-ID.
Executie — RUNNING Event (optioneel)
Tijdens de uitvoering kunnen tussentijdse statistieken worden gestuurd: aantal verwerkte records, geheugengebruik, custom facets (bijv. datakwaliteitsscores van Great Expectations).
Afsluiting — COMPLETE of FAIL Event
Na afloop wordt een COMPLETE-event gestuurd met outputs (geschreven datasets), rij-aantallen,
schema-informatie en kwaliteitsfacetten. Bij een fout: FAIL met de error-message.
Opslag & Visualisatie in Marquez
Marquez slaat alle events op in PostgreSQL en biedt een REST API + UI. Andere tools (dbt docs, DataHub) kunnen via deze API de lineage ophalen en in hun eigen catalogus integreren.
Alerting & Dashboarding
Op basis van de events kunnen teams alerts instellen: "stuur Slack-bericht als tabel
orders_daily langer dan 2 uur niet is bijgewerkt" of "alarmeer als row count >20% daalt."
Het OpenLineage JSON Event-formaat
Hieronder een vereenvoudigd voorbeeld van een COMPLETE-event zoals dat door een Spark-job wordt verzonden:
{
"eventType": "COMPLETE",
"eventTime": "2026-03-15T08:42:00Z",
"run": {
"runId": "d9a3f1c2-44b7-4e8a-9c2d-fa7b831e6024",
"facets": {
"spark_version": { "version": "3.5.1" },
"processing_engine": { "name": "spark", "version": "3.5.1" }
}
},
"job": {
"namespace": "data-platform-prod",
"name": "transform_orders_daily",
"facets": {
"sql": {
"_producer": "https://github.com/OpenLineage/OpenLineage",
"query": "SELECT DATE(created_at) AS order_date, SUM(total) FROM raw.orders GROUP BY 1"
}
}
},
"inputs": [{
"namespace": "postgres://prod-db:5432",
"name": "raw.orders",
"facets": {
"dataSource": { "name": "postgres-prod", "uri": "postgres://prod-db:5432/raw" },
"schema": {
"fields": [
{ "name": "order_id", "type": "INTEGER" },
{ "name": "created_at", "type": "TIMESTAMP" },
{ "name": "total", "type": "NUMERIC" }
]
}
}
}],
"outputs": [{
"namespace": "bigquery://my-project",
"name": "analytics.orders_daily",
"facets": {
"outputStatistics": { "rowCount": 842, "size": 67234 },
"columnLineage": {
"fields": {
"order_date": { "inputFields": [{ "namespace": "postgres://prod-db:5432", "name": "raw.orders", "field": "created_at" }] },
"revenue": { "inputFields": [{ "namespace": "postgres://prod-db:5432", "name": "raw.orders", "field": "total" }] }
}
}
}
}],
"producer": "https://github.com/OpenLineage/OpenLineage/releases/tag/1.18.0"
}
Tip: Column-Level Lineage
Activeer altijd column-level lineage via het columnLineage-facet. Dit is de
gouden standaard voor GDPR-compliance: je kunt precies aantonen welk bronsysteem-veld uitkomt in welke
output-kolom, ook al gaat het door meerdere transformatielagen.
Praktische implementatie: van nul naar productie
Stap 1: Marquez lokaal opstarten met Docker Compose
# docker-compose.yml
version: "3.8"
services:
marquez:
image: marquezproject/marquez:latest
ports:
- "5000:5000" # API
- "5001:5001" # Admin
environment:
MARQUEZ_PORT: 5000
MARQUEZ_ADMIN_PORT: 5001
depends_on:
- marquez-db
marquez-web:
image: marquezproject/marquez-web:latest
ports:
- "3000:3000"
environment:
MARQUEZ_HOST: marquez
MARQUEZ_PORT: 5000
marquez-db:
image: postgres:15
environment:
POSTGRES_USER: marquez
POSTGRES_PASSWORD: marquez
POSTGRES_DB: marquez
volumes:
- marquez_db:/var/lib/postgresql/data
volumes:
marquez_db:
# Opstarten
docker compose up -d
# Controleer of API bereikbaar is
curl http://localhost:5000/api/v1/namespaces
Stap 2: Apache Airflow integreren
Installeer de OpenLineage-provider voor Airflow. Voeg de volgende regels toe aan je
requirements.txt en airflow.cfg:
# requirements.txt
apache-airflow-providers-openlineage==1.10.0
openlineage-airflow==1.18.0
# airflow.cfg (of als omgevingsvariabelen)
[openlineage]
transport = {"type": "http", "url": "http://marquez:5000", "endpoint": "api/v1/lineage"}
namespace = data-platform-prod
disabled_for_operators = []
Vanaf dit moment sturen alle bestaande Airflow DAGs automatisch lineage-events naar Marquez. Geen aanpassingen aan je DAG-code nodig.
# Voorbeeld DAG — OpenLineage werkt transparant op de achtergrond
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from datetime import datetime
with DAG(
dag_id="orders_etl_pipeline",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False
) as dag:
extract = PostgresOperator(
task_id="extract_orders",
postgres_conn_id="postgres_prod",
sql="""
INSERT INTO staging.orders_export
SELECT order_id, customer_id, total, created_at
FROM raw.orders
WHERE DATE(created_at) = CURRENT_DATE - INTERVAL '1 day'
"""
)
load_to_bq = BigQueryInsertJobOperator(
task_id="load_orders_to_bigquery",
configuration={
"query": {
"query": """
SELECT
DATE(created_at) AS order_date,
COUNT(*) AS order_count,
SUM(total) AS revenue
FROM `my-project.staging.orders_export`
GROUP BY 1
""",
"destinationTable": {
"projectId": "my-project",
"datasetId": "analytics",
"tableId": "orders_daily"
},
"writeDisposition": "WRITE_TRUNCATE",
"useLegacySql": False
}
},
gcp_conn_id="google_cloud_default"
)
extract >> load_to_bq
Stap 3: dbt integreren
# profiles.yml — voeg OpenLineage toe als dbt meta-configuratie
# Installeer de package:
# pip install openlineage-dbt==1.18.0
# Voer dbt uit met OpenLineage transport
export OPENLINEAGE_URL=http://localhost:5000
export OPENLINEAGE_NAMESPACE=dbt-prod
dbt run --target prod
# Alternatief: gebruik de dbt-openlineage wrapper voor volledige kolom-lineage
from openlineage.dbt import DbtLocalArtifactProcessor
processor = DbtLocalArtifactProcessor(
producer="https://mijn-data-platform.nl",
job_namespace="dbt-prod",
project_dir="/opt/dbt/my_project",
profile_name="my_profile",
target="prod"
)
processor.parse()
Stap 4: Custom Python-jobs met de OpenLineage Client
from openlineage.client import OpenLineageClient
from openlineage.client.run import (
RunEvent, RunState, Run, Job, Dataset,
InputDataset, OutputDataset
)
from openlineage.client.facet import (
SchemaDatasetFacet, SchemaField,
OutputStatisticsOutputDatasetFacet,
DataQualityMetricsInputDatasetFacet,
ColumnMetric
)
from openlineage.client.uuid import generate_new_uuid
from datetime import datetime, timezone
# Verbind met Marquez
client = OpenLineageClient.from_environment()
# Of direct: client = OpenLineageClient(url="http://localhost:5000")
run_id = str(generate_new_uuid())
job_name = "custom_python_transform"
namespace = "data-platform-prod"
now = datetime.now(timezone.utc).isoformat()
# ---- START event ----
client.emit(RunEvent(
eventType=RunState.START,
eventTime=now,
run=Run(runId=run_id),
job=Job(namespace=namespace, name=job_name),
inputs=[InputDataset(
namespace="s3://my-bucket",
name="raw/clickstream/2026-03-15",
facets={
"schema": SchemaDatasetFacet(fields=[
SchemaField(name="user_id", type="STRING"),
SchemaField(name="event_type", type="STRING"),
SchemaField(name="timestamp", type="TIMESTAMP"),
])
}
)],
outputs=[]
))
# ---- Voer je transformatie uit ----
import pandas as pd
df = pd.read_parquet("s3://my-bucket/raw/clickstream/2026-03-15/")
result = df.groupby("user_id")["event_type"].count().reset_index()
result.to_parquet("s3://my-bucket/aggregated/sessions/2026-03-15/")
row_count = len(result)
null_count = result["event_type"].isna().sum()
# ---- COMPLETE event met statistieken ----
client.emit(RunEvent(
eventType=RunState.COMPLETE,
eventTime=datetime.now(timezone.utc).isoformat(),
run=Run(runId=run_id),
job=Job(namespace=namespace, name=job_name),
inputs=[InputDataset(
namespace="s3://my-bucket",
name="raw/clickstream/2026-03-15",
facets={
"dataQualityMetrics": DataQualityMetricsInputDatasetFacet(
rowCount=len(df),
columnMetrics={
"user_id": ColumnMetric(nullCount=df["user_id"].isna().sum())
}
)
}
)],
outputs=[OutputDataset(
namespace="s3://my-bucket",
name="aggregated/sessions/2026-03-15",
facets={
"outputStatistics": OutputStatisticsOutputDatasetFacet(
rowCount=row_count,
size=result.memory_usage(deep=True).sum()
)
}
)]
))
Pro Tip: Async Transport voor hoge throughput
In productie-omgevingen met honderden jobs per minuut gebruik je de async HTTP transport of de Kafka transport. Dit voorkomt dat het versturen van lineage-events je pipeline vertraagt:
export OPENLINEAGE_TRANSPORT='{
"type": "kafka",
"topicName": "openlineage-events",
"config": {
"bootstrap.servers": "kafka:9092",
"acks": "0"
}
}'
OpenLineage vs. alternatieven: eerlijke vergelijking
De data observability markt is in 2026 volwassen geworden. Hieronder vergelijken we OpenLineage (als open standaard + Marquez als backend) met de voornaamste commerciële en open-source alternatieven.
| Criterium | OpenLineage + Marquez | Monte Carlo | DataHub | dbt + Elementary |
|---|---|---|---|---|
| Licentie / Kosten | ✅ Open Source (gratis) | ❌ Commercieel (€€€) | ✅ Open Source (gratis) | ✅ Open Source (gratis) |
| Lineage Standaard | ✅ OpenLineage native | ⚠️ Eigen formaat | ✅ Ondersteunt OpenLineage | ⚠️ Alleen dbt-graaf |
| Automatische Lineage | ✅ Airflow, Spark, dbt, Flink | ✅ Breed + ML | ✅ Breed ecosysteem | ⚠️ Alleen dbt |
| Column-level Lineage | ✅ Via facets | ✅ Ja | ✅ Ja | ✅ Ja (binnen dbt) |
| Alerting & Anomaly Detection | ⚠️ Beperkt (zelf bouwen) | ✅ AI-gedreven | ⚠️ Beperkt | ✅ Via Elementary |
| Schaalbaar naar 1000+ jobs | ✅ Met Kafka transport | ✅ SaaS | ✅ Ja | ⚠️ Beperkt |
| Vendor Lock-in | ✅ Geen | ❌ Hoog | ✅ Laag | ⚠️ dbt-afhankelijk |
| UI / Gebruiksvriendelijkheid | ⚠️ Basis (Marquez UI) | ✅ Uitstekend | ✅ Goed | ✅ Goed |
| Ideaal voor | Open standaard als fundament | Enterprise met groot budget | Data Catalog + Lineage | dbt-first teams |
Aanbeveling 2026
Implementeer OpenLineage als event-standaard en stuur events naar zowel Marquez (voor operationele lineage) als DataHub (voor de data catalog). Monte Carlo is zinvol als je een dedicated observability budget hebt en directe AI-anomaly detection wilt zonder bouwwerk. Combineer waar mogelijk: de meeste enterprise tools ondersteunen OpenLineage als ingest-formaat.
Best practices voor productie-implementaties
Case: Retailer met 200+ dagelijkse pipelines
Een Nederlandse retailer draaide 200+ dagelijkse Airflow-pipelines vanuit meerdere bronnen (SAP, Shopify, eigen legacy systemen) naar een BigQuery data warehouse. Elke dag kwamen er "mystery failures" voor: een dashboard klopte niet, een ML-model kreeg de verkeerde invoer. Root cause analysis kostte gemiddeld 4 uur.
Na implementatie van OpenLineage + Marquez + PagerDuty-alerts op basis van lineage-events, daalde de MTTR (Mean Time To Recovery) van 4 uur naar 18 minuten. De volledige impact van een mislukte upstream job was in één klik zichtbaar via de Marquez-graaf.
1. Gebruik consistente namespaces
Een namespace is de scope waarbinnen een dataset of job bestaat. Gebruik altijd het URI-schema van het bronapparaat:
# Goed — URI-gebaseerde namespaces
postgres://prod-db.mycompany.nl:5432/analytics
bigquery://my-gcp-project
s3://my-data-lake-bucket
kafka://kafka-cluster:9092
# Slecht — niet-unieke namen
"mijn-database"
"productie"
"orders"
2. Enricheer events met custom facets voor datakwaliteit
from openlineage.client.facet import (
DataQualityAssertionsDatasetFacet,
Assertion
)
# Voeg Great Expectations resultaten toe als lineage-facet
gx_results = context.run_checkpoint("orders_checkpoint")
assertions = []
for result in gx_results.run_results.values():
for validation_result in result["validation_result"]["results"]:
assertions.append(Assertion(
assertion=validation_result["expectation_config"]["expectation_type"],
success=validation_result["success"],
column=validation_result["expectation_config"]["kwargs"].get("column")
))
# Voeg toe aan je OutputDataset facets
output_facets = {
"dataQualityAssertions": DataQualityAssertionsDatasetFacet(
assertions=assertions
)
}
3. Implementeer SLA-monitoring via Marquez API
import requests
from datetime import datetime, timedelta, timezone
import json
MARQUEZ_API = "http://marquez:5000/api/v1"
NAMESPACE = "data-platform-prod"
def check_dataset_freshness(dataset_name: str, max_age_hours: int = 2) -> dict:
"""Controleer of een dataset recent genoeg is bijgewerkt."""
resp = requests.get(
f"{MARQUEZ_API}/namespaces/{NAMESPACE}/datasets/{dataset_name}"
)
resp.raise_for_status()
dataset = resp.json()
last_modified = dataset.get("updatedAt")
if not last_modified:
return {"status": "UNKNOWN", "dataset": dataset_name}
updated_at = datetime.fromisoformat(last_modified.replace("Z", "+00:00"))
age = datetime.now(timezone.utc) - updated_at
if age > timedelta(hours=max_age_hours):
return {
"status": "STALE",
"dataset": dataset_name,
"age_hours": round(age.total_seconds() / 3600, 1),
"last_updated": last_modified
}
return {"status": "FRESH", "dataset": dataset_name, "age_hours": round(age.total_seconds() / 3600, 1)}
# Monitor kritieke datasets
critical_datasets = [
"analytics.orders_daily",
"analytics.inventory_snapshot",
"ml.customer_features"
]
for ds in critical_datasets:
result = check_dataset_freshness(ds, max_age_hours=3)
if result["status"] == "STALE":
# Stuur alert naar Slack/PagerDuty
print(f"⚠️ ALERT: {ds} is {result['age_hours']}u oud!")
else:
print(f"✅ {ds}: {result['age_hours']}u oud — OK")
4. Bewaar lineage in Iceberg/Delta voor historische analyse
# Exporteer Marquez events naar je data lake voor langetermijn-analyse
# Handig voor compliance audits (GDPR, AI Act)
import requests
import json
from pyarrow import json as pa_json
import pyarrow.parquet as pq
def export_lineage_events(namespace: str, output_path: str, days: int = 90):
"""Export lineage events naar Parquet voor historische analyse."""
events = []
after = (datetime.now() - timedelta(days=days)).isoformat() + "Z"
resp = requests.get(
f"{MARQUEZ_API}/lineage",
params={"nodeId": f"dataset:{namespace}:analytics.orders_daily"}
)
# Verwerk en sla op als Parquet
with open(f"{output_path}/lineage_export_{datetime.now().date()}.json", "w") as f:
json.dump(resp.json(), f, indent=2)
export_lineage_events("data-platform-prod", "/data/lineage-archive")
5. Governance: stel namespace-eigenaarschap vast
Registreer voor elke dataset in Marquez een eigenaar en classificatie-tag via de API:
curl -X PUT http://localhost:5000/api/v1/namespaces/