OpenLineage & Data Observability: Complete Gids 2026

Gepubliceerd: 13 april 2026
Leestijd: 12 minuten
Data Engineering

Leer hoe OpenLineage en data observability tools je helpen datakwaliteit te bewaken en pipeline-problemen vroegtijdig te detecteren.

```html

Wat is OpenLineage en waarom is het cruciaal in 2026?

Data pipelines zijn complexer dan ooit. Organisaties draaien tientallen, soms honderden, gekoppelde workflows in Airflow, dbt, Spark en Flink — verspreid over meerdere clouds en on-premise omgevingen. Wanneer één dataset corrupt raakt, weten teams vaak pas uren later welke downstream dashboards, ML-modellen en rapporten daardoor zijn aangetast. De kosten van deze "data downtime" lopen in grote organisaties op tot miljoenen euro's per jaar.

Definitie: OpenLineage

OpenLineage is een open standaard (Linux Foundation) voor het vastleggen en uitwisselen van data lineage-metadata. Het definieert een universeel event-model waarmee tools als Apache Airflow, dbt, Apache Spark en Flink automatisch bijhouden welke datasets worden gelezen, getransformeerd en geschreven — inclusief tijdstempels, jobparameters en kwaliteitsstatistieken.

Data Observability is het bredere concept: de mate waarin een team de gezondheid van zijn datasysteemen continu kan monitoren, begrijpen en herstellen — vergelijkbaar met applicatie-observability (Prometheus + Grafana) maar dan gericht op data.

In 2026 is data observability geen nice-to-have meer. Regelgeving zoals de EU AI Act, DORA en GDPR vereisen aantoonbare datakwaliteit en herleidbaarheid. Tegelijkertijd maken LLM-gedreven data producten het catastrofaal als trainingsdata ongemerkt degradeert. OpenLineage biedt de open, vendor-neutrale ruggengraat waarop organisaties hun observability-stack bouwen.

Automatische Lineage

Geen handmatige documentatie meer. Integrations met Airflow, dbt en Spark sturen events automatisch.

Compliance-Ready

Volledige audit trail: wie raakte welke data aan, wanneer, en met welk resultaat.

Root Cause in Minuten

Impact analyse bij data-incidenten: direct zien welke downstream assets zijn beïnvloed.

Hoe werkt OpenLineage? Het event-model uitgelegd

OpenLineage werkt via een simpel maar krachtig principe: elke data-tool die een job uitvoert, stuurt gestandaardiseerde JSON-events naar een centrale backend (de Lineage Backend). Het referentie-backend is Marquez, maar de standaard staat het gebruik van elk compatibel systeem toe (Atlan, DataHub, OpenMetadata, enz.).

1

Job Start — START Event

Wanneer een Airflow-taak of Spark-job begint, vuurt de OpenLineage-integratie een START-event af. Dit event bevat de job-naam, namespace, inputs (datasets die worden gelezen) en de run-ID.

2

Executie — RUNNING Event (optioneel)

Tijdens de uitvoering kunnen tussentijdse statistieken worden gestuurd: aantal verwerkte records, geheugengebruik, custom facets (bijv. datakwaliteitsscores van Great Expectations).

3

Afsluiting — COMPLETE of FAIL Event

Na afloop wordt een COMPLETE-event gestuurd met outputs (geschreven datasets), rij-aantallen, schema-informatie en kwaliteitsfacetten. Bij een fout: FAIL met de error-message.

4

Opslag & Visualisatie in Marquez

Marquez slaat alle events op in PostgreSQL en biedt een REST API + UI. Andere tools (dbt docs, DataHub) kunnen via deze API de lineage ophalen en in hun eigen catalogus integreren.

5

Alerting & Dashboarding

Op basis van de events kunnen teams alerts instellen: "stuur Slack-bericht als tabel orders_daily langer dan 2 uur niet is bijgewerkt" of "alarmeer als row count >20% daalt."

Het OpenLineage JSON Event-formaat

Hieronder een vereenvoudigd voorbeeld van een COMPLETE-event zoals dat door een Spark-job wordt verzonden:

{
  "eventType": "COMPLETE",
  "eventTime": "2026-03-15T08:42:00Z",
  "run": {
    "runId": "d9a3f1c2-44b7-4e8a-9c2d-fa7b831e6024",
    "facets": {
      "spark_version": { "version": "3.5.1" },
      "processing_engine": { "name": "spark", "version": "3.5.1" }
    }
  },
  "job": {
    "namespace": "data-platform-prod",
    "name": "transform_orders_daily",
    "facets": {
      "sql": {
        "_producer": "https://github.com/OpenLineage/OpenLineage",
        "query": "SELECT DATE(created_at) AS order_date, SUM(total) FROM raw.orders GROUP BY 1"
      }
    }
  },
  "inputs": [{
    "namespace": "postgres://prod-db:5432",
    "name": "raw.orders",
    "facets": {
      "dataSource": { "name": "postgres-prod", "uri": "postgres://prod-db:5432/raw" },
      "schema": {
        "fields": [
          { "name": "order_id", "type": "INTEGER" },
          { "name": "created_at", "type": "TIMESTAMP" },
          { "name": "total", "type": "NUMERIC" }
        ]
      }
    }
  }],
  "outputs": [{
    "namespace": "bigquery://my-project",
    "name": "analytics.orders_daily",
    "facets": {
      "outputStatistics": { "rowCount": 842, "size": 67234 },
      "columnLineage": {
        "fields": {
          "order_date": { "inputFields": [{ "namespace": "postgres://prod-db:5432", "name": "raw.orders", "field": "created_at" }] },
          "revenue": { "inputFields": [{ "namespace": "postgres://prod-db:5432", "name": "raw.orders", "field": "total" }] }
        }
      }
    }
  }],
  "producer": "https://github.com/OpenLineage/OpenLineage/releases/tag/1.18.0"
}

Tip: Column-Level Lineage

Activeer altijd column-level lineage via het columnLineage-facet. Dit is de gouden standaard voor GDPR-compliance: je kunt precies aantonen welk bronsysteem-veld uitkomt in welke output-kolom, ook al gaat het door meerdere transformatielagen.

Praktische implementatie: van nul naar productie

Stap 1: Marquez lokaal opstarten met Docker Compose

# docker-compose.yml
version: "3.8"
services:
  marquez:
    image: marquezproject/marquez:latest
    ports:
      - "5000:5000"   # API
      - "5001:5001"   # Admin
    environment:
      MARQUEZ_PORT: 5000
      MARQUEZ_ADMIN_PORT: 5001
    depends_on:
      - marquez-db

  marquez-web:
    image: marquezproject/marquez-web:latest
    ports:
      - "3000:3000"
    environment:
      MARQUEZ_HOST: marquez
      MARQUEZ_PORT: 5000

  marquez-db:
    image: postgres:15
    environment:
      POSTGRES_USER: marquez
      POSTGRES_PASSWORD: marquez
      POSTGRES_DB: marquez
    volumes:
      - marquez_db:/var/lib/postgresql/data

volumes:
  marquez_db:
# Opstarten
docker compose up -d

# Controleer of API bereikbaar is
curl http://localhost:5000/api/v1/namespaces

Stap 2: Apache Airflow integreren

Installeer de OpenLineage-provider voor Airflow. Voeg de volgende regels toe aan je requirements.txt en airflow.cfg:

# requirements.txt
apache-airflow-providers-openlineage==1.10.0
openlineage-airflow==1.18.0
# airflow.cfg (of als omgevingsvariabelen)
[openlineage]
transport = {"type": "http", "url": "http://marquez:5000", "endpoint": "api/v1/lineage"}
namespace = data-platform-prod
disabled_for_operators = []

Vanaf dit moment sturen alle bestaande Airflow DAGs automatisch lineage-events naar Marquez. Geen aanpassingen aan je DAG-code nodig.

# Voorbeeld DAG — OpenLineage werkt transparant op de achtergrond
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from datetime import datetime

with DAG(
    dag_id="orders_etl_pipeline",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False
) as dag:

    extract = PostgresOperator(
        task_id="extract_orders",
        postgres_conn_id="postgres_prod",
        sql="""
            INSERT INTO staging.orders_export
            SELECT order_id, customer_id, total, created_at
            FROM raw.orders
            WHERE DATE(created_at) = CURRENT_DATE - INTERVAL '1 day'
        """
    )

    load_to_bq = BigQueryInsertJobOperator(
        task_id="load_orders_to_bigquery",
        configuration={
            "query": {
                "query": """
                    SELECT
                        DATE(created_at) AS order_date,
                        COUNT(*) AS order_count,
                        SUM(total) AS revenue
                    FROM `my-project.staging.orders_export`
                    GROUP BY 1
                """,
                "destinationTable": {
                    "projectId": "my-project",
                    "datasetId": "analytics",
                    "tableId": "orders_daily"
                },
                "writeDisposition": "WRITE_TRUNCATE",
                "useLegacySql": False
            }
        },
        gcp_conn_id="google_cloud_default"
    )

    extract >> load_to_bq

Stap 3: dbt integreren

# profiles.yml — voeg OpenLineage toe als dbt meta-configuratie
# Installeer de package:
# pip install openlineage-dbt==1.18.0

# Voer dbt uit met OpenLineage transport
export OPENLINEAGE_URL=http://localhost:5000
export OPENLINEAGE_NAMESPACE=dbt-prod

dbt run --target prod
# Alternatief: gebruik de dbt-openlineage wrapper voor volledige kolom-lineage
from openlineage.dbt import DbtLocalArtifactProcessor

processor = DbtLocalArtifactProcessor(
    producer="https://mijn-data-platform.nl",
    job_namespace="dbt-prod",
    project_dir="/opt/dbt/my_project",
    profile_name="my_profile",
    target="prod"
)
processor.parse()

Stap 4: Custom Python-jobs met de OpenLineage Client

from openlineage.client import OpenLineageClient
from openlineage.client.run import (
    RunEvent, RunState, Run, Job, Dataset,
    InputDataset, OutputDataset
)
from openlineage.client.facet import (
    SchemaDatasetFacet, SchemaField,
    OutputStatisticsOutputDatasetFacet,
    DataQualityMetricsInputDatasetFacet,
    ColumnMetric
)
from openlineage.client.uuid import generate_new_uuid
from datetime import datetime, timezone

# Verbind met Marquez
client = OpenLineageClient.from_environment()
# Of direct: client = OpenLineageClient(url="http://localhost:5000")

run_id = str(generate_new_uuid())
job_name = "custom_python_transform"
namespace = "data-platform-prod"
now = datetime.now(timezone.utc).isoformat()

# ---- START event ----
client.emit(RunEvent(
    eventType=RunState.START,
    eventTime=now,
    run=Run(runId=run_id),
    job=Job(namespace=namespace, name=job_name),
    inputs=[InputDataset(
        namespace="s3://my-bucket",
        name="raw/clickstream/2026-03-15",
        facets={
            "schema": SchemaDatasetFacet(fields=[
                SchemaField(name="user_id", type="STRING"),
                SchemaField(name="event_type", type="STRING"),
                SchemaField(name="timestamp", type="TIMESTAMP"),
            ])
        }
    )],
    outputs=[]
))

# ---- Voer je transformatie uit ----
import pandas as pd
df = pd.read_parquet("s3://my-bucket/raw/clickstream/2026-03-15/")
result = df.groupby("user_id")["event_type"].count().reset_index()
result.to_parquet("s3://my-bucket/aggregated/sessions/2026-03-15/")

row_count = len(result)
null_count = result["event_type"].isna().sum()

# ---- COMPLETE event met statistieken ----
client.emit(RunEvent(
    eventType=RunState.COMPLETE,
    eventTime=datetime.now(timezone.utc).isoformat(),
    run=Run(runId=run_id),
    job=Job(namespace=namespace, name=job_name),
    inputs=[InputDataset(
        namespace="s3://my-bucket",
        name="raw/clickstream/2026-03-15",
        facets={
            "dataQualityMetrics": DataQualityMetricsInputDatasetFacet(
                rowCount=len(df),
                columnMetrics={
                    "user_id": ColumnMetric(nullCount=df["user_id"].isna().sum())
                }
            )
        }
    )],
    outputs=[OutputDataset(
        namespace="s3://my-bucket",
        name="aggregated/sessions/2026-03-15",
        facets={
            "outputStatistics": OutputStatisticsOutputDatasetFacet(
                rowCount=row_count,
                size=result.memory_usage(deep=True).sum()
            )
        }
    )]
))

Pro Tip: Async Transport voor hoge throughput

In productie-omgevingen met honderden jobs per minuut gebruik je de async HTTP transport of de Kafka transport. Dit voorkomt dat het versturen van lineage-events je pipeline vertraagt:

export OPENLINEAGE_TRANSPORT='{
  "type": "kafka",
  "topicName": "openlineage-events",
  "config": {
    "bootstrap.servers": "kafka:9092",
    "acks": "0"
  }
}'

OpenLineage vs. alternatieven: eerlijke vergelijking

De data observability markt is in 2026 volwassen geworden. Hieronder vergelijken we OpenLineage (als open standaard + Marquez als backend) met de voornaamste commerciële en open-source alternatieven.

Criterium OpenLineage + Marquez Monte Carlo DataHub dbt + Elementary
Licentie / Kosten ✅ Open Source (gratis) ❌ Commercieel (€€€) ✅ Open Source (gratis) ✅ Open Source (gratis)
Lineage Standaard ✅ OpenLineage native ⚠️ Eigen formaat ✅ Ondersteunt OpenLineage ⚠️ Alleen dbt-graaf
Automatische Lineage ✅ Airflow, Spark, dbt, Flink ✅ Breed + ML ✅ Breed ecosysteem ⚠️ Alleen dbt
Column-level Lineage ✅ Via facets ✅ Ja ✅ Ja ✅ Ja (binnen dbt)
Alerting & Anomaly Detection ⚠️ Beperkt (zelf bouwen) ✅ AI-gedreven ⚠️ Beperkt ✅ Via Elementary
Schaalbaar naar 1000+ jobs ✅ Met Kafka transport ✅ SaaS ✅ Ja ⚠️ Beperkt
Vendor Lock-in ✅ Geen ❌ Hoog ✅ Laag ⚠️ dbt-afhankelijk
UI / Gebruiksvriendelijkheid ⚠️ Basis (Marquez UI) ✅ Uitstekend ✅ Goed ✅ Goed
Ideaal voor Open standaard als fundament Enterprise met groot budget Data Catalog + Lineage dbt-first teams

Aanbeveling 2026

Implementeer OpenLineage als event-standaard en stuur events naar zowel Marquez (voor operationele lineage) als DataHub (voor de data catalog). Monte Carlo is zinvol als je een dedicated observability budget hebt en directe AI-anomaly detection wilt zonder bouwwerk. Combineer waar mogelijk: de meeste enterprise tools ondersteunen OpenLineage als ingest-formaat.

Best practices voor productie-implementaties

Case: Retailer met 200+ dagelijkse pipelines

Een Nederlandse retailer draaide 200+ dagelijkse Airflow-pipelines vanuit meerdere bronnen (SAP, Shopify, eigen legacy systemen) naar een BigQuery data warehouse. Elke dag kwamen er "mystery failures" voor: een dashboard klopte niet, een ML-model kreeg de verkeerde invoer. Root cause analysis kostte gemiddeld 4 uur.

Na implementatie van OpenLineage + Marquez + PagerDuty-alerts op basis van lineage-events, daalde de MTTR (Mean Time To Recovery) van 4 uur naar 18 minuten. De volledige impact van een mislukte upstream job was in één klik zichtbaar via de Marquez-graaf.

1. Gebruik consistente namespaces

Een namespace is de scope waarbinnen een dataset of job bestaat. Gebruik altijd het URI-schema van het bronapparaat:

# Goed — URI-gebaseerde namespaces
postgres://prod-db.mycompany.nl:5432/analytics
bigquery://my-gcp-project
s3://my-data-lake-bucket
kafka://kafka-cluster:9092

# Slecht — niet-unieke namen
"mijn-database"
"productie"
"orders"

2. Enricheer events met custom facets voor datakwaliteit

from openlineage.client.facet import (
    DataQualityAssertionsDatasetFacet,
    Assertion
)

# Voeg Great Expectations resultaten toe als lineage-facet
gx_results = context.run_checkpoint("orders_checkpoint")
assertions = []

for result in gx_results.run_results.values():
    for validation_result in result["validation_result"]["results"]:
        assertions.append(Assertion(
            assertion=validation_result["expectation_config"]["expectation_type"],
            success=validation_result["success"],
            column=validation_result["expectation_config"]["kwargs"].get("column")
        ))

# Voeg toe aan je OutputDataset facets
output_facets = {
    "dataQualityAssertions": DataQualityAssertionsDatasetFacet(
        assertions=assertions
    )
}

3. Implementeer SLA-monitoring via Marquez API

import requests
from datetime import datetime, timedelta, timezone
import json

MARQUEZ_API = "http://marquez:5000/api/v1"
NAMESPACE = "data-platform-prod"

def check_dataset_freshness(dataset_name: str, max_age_hours: int = 2) -> dict:
    """Controleer of een dataset recent genoeg is bijgewerkt."""
    resp = requests.get(
        f"{MARQUEZ_API}/namespaces/{NAMESPACE}/datasets/{dataset_name}"
    )
    resp.raise_for_status()
    dataset = resp.json()

    last_modified = dataset.get("updatedAt")
    if not last_modified:
        return {"status": "UNKNOWN", "dataset": dataset_name}

    updated_at = datetime.fromisoformat(last_modified.replace("Z", "+00:00"))
    age = datetime.now(timezone.utc) - updated_at

    if age > timedelta(hours=max_age_hours):
        return {
            "status": "STALE",
            "dataset": dataset_name,
            "age_hours": round(age.total_seconds() / 3600, 1),
            "last_updated": last_modified
        }

    return {"status": "FRESH", "dataset": dataset_name, "age_hours": round(age.total_seconds() / 3600, 1)}

# Monitor kritieke datasets
critical_datasets = [
    "analytics.orders_daily",
    "analytics.inventory_snapshot",
    "ml.customer_features"
]

for ds in critical_datasets:
    result = check_dataset_freshness(ds, max_age_hours=3)
    if result["status"] == "STALE":
        # Stuur alert naar Slack/PagerDuty
        print(f"⚠️ ALERT: {ds} is {result['age_hours']}u oud!")
    else:
        print(f"✅ {ds}: {result['age_hours']}u oud — OK")

4. Bewaar lineage in Iceberg/Delta voor historische analyse

# Exporteer Marquez events naar je data lake voor langetermijn-analyse
# Handig voor compliance audits (GDPR, AI Act)

import requests
import json
from pyarrow import json as pa_json
import pyarrow.parquet as pq

def export_lineage_events(namespace: str, output_path: str, days: int = 90):
    """Export lineage events naar Parquet voor historische analyse."""
    events = []
    after = (datetime.now() - timedelta(days=days)).isoformat() + "Z"

    resp = requests.get(
        f"{MARQUEZ_API}/lineage",
        params={"nodeId": f"dataset:{namespace}:analytics.orders_daily"}
    )
    # Verwerk en sla op als Parquet
    with open(f"{output_path}/lineage_export_{datetime.now().date()}.json", "w") as f:
        json.dump(resp.json(), f, indent=2)

export_lineage_events("data-platform-prod", "/data/lineage-archive")

5. Governance: stel namespace-eigenaarschap vast

Registreer voor elke dataset in Marquez een eigenaar en classificatie-tag via de API:

curl -X PUT http://localhost:5000/api/v1/namespaces/