DataPartner365

Jouw partner voor datagedreven groei en inzichten

Delta Live Tables Complete Gids: Declaratieve ETL Pipelines voor MKB

Gepubliceerd: 25 januari 2026
Leestijd: 16 minuten
delta live tables, databricks dlt, declaratieve etl, data pipelines, streaming, data quality, medallion architecture, pipeline orchestration
Niveau: Intermediate tot Gevorderd

Delta Live Tables (DLT) is de declaratieve ETL engine van Databricks die data engineering transformeert van complexe code naar simpele declaratieve statements. Leer hoe je met DLT robuuste, schaalbare en onderhoudbare data pipelines bouwt voor MKB.

Wat is Delta Live Tables? (Beyond the Basics)

Delta Live Tables is niet zomaar een ETL tool - het is een **declaratieve data pipeline engine** die automatisch dependencies, scheduling, monitoring en error handling afhandelt. In plaats van te programmeren hoe data moet stromen, declareer je wat je wilt bereiken.

DLT Architecture: Drie Key Principes

1. Declaratieve Syntax
  • What, not How: Declareer resultaten, niet stappen
  • Automatische Orchestration: Dependencies automatisch afgeleid
  • Simplified Code: 70-80% minder code nodig
  • Self-documenting: Code is de documentatie
2. Built-in Quality
  • Data Expectations: Ingebouwde data quality checks
  • Automatic Monitoring: Real-time pipeline monitoring
  • Error Handling: Automatische retry en recovery
  • Lineage Tracking: Volledige data lineage
3. Unified Batch & Streaming
  • Single Codebase: Zelfde code voor batch en streaming
  • Automatic Optimization: Query optimalisatie door engine
  • Scalable Execution: Automatische scaling naar behoefte
  • Cost Efficient: Alleen betalen voor gebruikte resources

Core DLT Concepten en Syntax

Basis DLT Syntax en Annotaties

# Python DLT Basis Syntax
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

# 1. DECORATORS - Definieer pipeline objecten
@dlt.table(
    name="bron_klant_data",                # Table name
    comment="Bron klant data uit CRM",     # Description
    table_properties={                     # Metadata
        "quality": "bronze",
        "pipelines.autoOptimize.managed": "true"
    }
)
def laad_bron_data():
    """Bron data laad functie"""
    return spark.read.format("csv") \
        .option("header", "true") \
        .load("/mnt/bron/klanten/*.csv")

# 2. EXPECTATIONS - Data quality checks
@dlt.table(
    name="gevalideerde_klant_data",
    comment="Gevallideerde klant data met quality checks"
)
@dlt.expect("valid_email", "email LIKE '%@%'")           # Must have @
@dlt.expect_or_drop("valid_postcode", "postcode RLIKE '^[1-9][0-9]{3}\s?[A-Z]{2}$'")  # Drop invalid
@dlt.expect_or_fail("positive_amount", "bedrag > 0")    # Fail if negative
@dlt.expect_all({
    "valid_bsn": "bsn RLIKE '^[1-9][0-9]{8}$'",         # 9 cijfers
    "valid_telefoon": "telefoon RLIKE '^[0-9]{10}$'"    # 10 cijfers
})
def transformeer_klant_data():
    """Transformeer klant data met validaties"""
    df = dlt.read("bron_klant_data")
    
    return df \
        .withColumn("ingest_timestamp", current_timestamp()) \
        .withColumn("geboortedatum", to_date(col("geboortedatum_str"), "dd-MM-yyyy")) \
        .withColumn("leeftijd", 
            floor(months_between(current_date(), col("geboortedatum")) / 12))

# 3. VIEWS - Logische transformaties
@dlt.view(
    name="klant_segmenten",
    comment="Klant segmentatie view"
)
def bereken_klant_segmenten():
    """Bereken klant segmenten"""
    df = dlt.read("gevalideerde_klant_data")
    
    return df \
        .withColumn("segment",
            when(col("leeftijd") < 30, "JONG")
            .when(col("leeftijd") < 50, "MIDDEN")
            .otherwise("SENIOR")) \
        .withColumn("waarde_segment",
            when(col("totaal_omzet") > 10000, "PREMIUM")
            .when(col("totaal_omzet") > 5000, "MIDDEN")
            .otherwise("BASIC"))

# 4. LIVE TABLES - Echte tabellen
@dlt.table(
    name="klant_360",
    comment="Klant 360 overzicht voor analytics",
    partition_cols=["land"],              # Partitioneren
    table_properties={
        "quality": "gold",
        "pipelines.autoOptimize.zOrderCols": "klant_id"
    }
)
def maak_klant_360():
    """Maak klant 360 overzicht"""
    segmenten = dlt.read("klant_segmenten")
    
    return segmenten \
        .groupBy("klant_id", "land", "segment", "waarde_segment") \
        .agg(
            count("*").alias("aantal_transacties"),
            sum("bedrag").alias("totaal_omzet"),
            avg("bedrag").alias("gemiddelde_omzet"),
            max("ingest_timestamp").alias("laatste_transactie")
        )

DLT Object Types

  • @dlt.table: Fysieke Delta table
  • @dlt.view: Logische view (geen storage)
  • @dlt.temp_view: Tijdelijke view (per run)
  • @dlt.expect: Data quality expectation
  • @dlt.expect_or_drop: Drop rijen die falen
  • @dlt.expect_or_fail: Fail pipeline bij falen

Pipeline Execution Modes

  • Triggered: Run on-demand of scheduled
  • Continuous: Real-time streaming
  • Development: Local testing mode
  • Production: Optimized execution

Built-in Monitoring

  • Expectation Metrics: Pass/fail rates
  • Execution Metrics: Runtime, resources
  • Data Quality: Compleetheid, accuraatheid
  • Cost Metrics: DBU usage, efficiency

Praktische DLT Pipeline voor MKB

Complete E-commerce Pipeline

# Complete E-commerce DLT Pipeline voor MKB
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime, timedelta

# === BRONZE LAYER: Raw Data Ingestion ===
@dlt.table(
    name="bronze_orders",
    comment="Raw order data vanuit webshop",
    table_properties={
        "quality": "bronze",
        "pipelines.reset.allowed": "true"
    }
)
def laad_orders():
    """Laad order data vanuit verschillende bronnen"""
    # Webhook/API data
    api_orders = spark.read.format("json") \
        .load("/mnt/bron/orders/api/*.json")
    
    # Database exports
    db_orders = spark.read.format("jdbc") \
        .option("url", "jdbc:postgresql://db.example.com:5432/ecommerce") \
        .option("dbtable", "orders") \
        .option("user", "readonly") \
        .option("password", "***") \
        .load()
    
    # CSV uploads (backoffice)
    csv_orders = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load("/mnt/bron/orders/csv/*.csv")
    
    # Unie alle bronnen
    return api_orders.union(db_orders).union(csv_orders) \
        .withColumn("ingest_timestamp", current_timestamp()) \
        .withColumn("bron_type", lit("orders"))

@dlt.table(
    name="bronze_customers",
    comment="Raw customer data vanuit CRM",
    table_properties={"quality": "bronze"}
)
@dlt.expect_or_drop("valid_email_format", "email RLIKE '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}$'")
def laad_customers():
    """Laad customer data met email validatie"""
    return spark.read.format("parquet") \
        .load("/mnt/bron/customers/*.parquet") \
        .withColumn("ingest_timestamp", current_timestamp()) \
        .withColumn("bron_type", lit("customers"))

@dlt.table(
    name="bronze_products",
    comment="Raw product catalogus",
    table_properties={"quality": "bronze"}
)
def laad_products():
    """Laad product data"""
    return spark.read.format("delta") \
        .load("/mnt/bron/products/") \
        .withColumn("ingest_timestamp", current_timestamp()) \
        .withColumn("bron_type", lit("products"))

# === SILVER LAYER: Cleaned & Enriched Data ===
@dlt.table(
    name="silver_orders",
    comment="Cleaned order data met validaties",
    partition_cols=["order_date"],
    table_properties={"quality": "silver"}
)
@dlt.expect_all({
    "positive_quantity": "quantity > 0",
    "positive_price": "price > 0",
    "valid_order_date": "order_date >= '2020-01-01'"
})
@dlt.expect_or_drop("valid_order_status", "status IN ('pending', 'paid', 'shipped', 'delivered', 'cancelled')")
def transform_orders():
    """Transformeer order data"""
    orders = dlt.read("bronze_orders")
    
    # Data cleaning
    cleaned = orders \
        .withColumn("order_date", to_date(col("order_timestamp"))) \
        .withColumn("order_year", year(col("order_date"))) \
        .withColumn("order_month", month(col("order_date"))) \
        .withColumn("order_week", weekofyear(col("order_date"))) \
        .withColumn("order_weekday", date_format(col("order_date"), "EEEE")) \
        .withColumn("order_amount", col("quantity") * col("price")) \
        .withColumn("vat_amount", col("order_amount") * 0.21)  # Nederlandse BTW
    
    # Handle missing values
    return cleaned \
        .fillna({"discount": 0.0}) \
        .withColumn("customer_id", 
            when(col("customer_id").isNull(), lit("UNKNOWN"))
            .otherwise(col("customer_id")))

@dlt.table(
    name="silver_customers",
    comment="Cleaned customer data met PII masking",
    table_properties={
        "quality": "silver",
        "pii.masked": "true"
    }
)
@dlt.expect_or_drop("valid_postcode_nl", "postcode RLIKE '^[1-9][0-9]{3}\s?[A-Z]{2}$'")
@dlt.expect_or_drop("valid_phone_nl", "telefoon RLIKE '^[0-9]{10}$' OR telefoon IS NULL")
def transform_customers():
    """Transformeer customer data met PII bescherming"""
    customers = dlt.read("bronze_customers")
    
    # PII masking voor interne gebruik
    return customers \
        .withColumn("email_masked",
            regexp_replace(col("email"), "(?<=.).(?=.*@)", "*")) \
        .withColumn("telefoon_masked",
            when(col("telefoon").isNotNull(),
                 concat(lit("******"), substring(col("telefoon"), 7, 4)))
            .otherwise(None)) \
        .withColumn("geboortedatum",
            to_date(col("geboortedatum_str"), "dd-MM-yyyy")) \
        .withColumn("leeftijd",
            floor(months_between(current_date(), col("geboortedatum")) / 12)) \
        .withColumn("provincie",
            when(substring(col("postcode"), 1, 1) == "1", "Noord-Holland")
            .when(substring(col("postcode"), 1, 1) == "2", "Zuid-Holland")
            .when(substring(col("postcode"), 1, 1) == "3", "Zeeland")
            .when(substring(col("postcode"), 1, 1) == "4", "Noord-Brabant")
            .when(substring(col("postcode"), 1, 1) == "5", "Limburg")
            .when(substring(col("postcode"), 1, 1) == "6", "Utrecht")
            .when(substring(col("postcode"), 1, 1) == "7", "Gelderland")
            .when(substring(col("postcode"), 1, 1) == "8", "Overijssel")
            .when(substring(col("postcode"), 1, 1) == "9", "Friesland/Groningen/Drenthe")
            .otherwise("Onbekend"))

@dlt.table(
    name="silver_products",
    comment="Cleaned product data met categorisatie",
    table_properties={"quality": "silver"}
)
@dlt.expect("positive_stock", "stock >= 0")
@dlt.expect("positive_price", "price > 0")
def transform_products():
    """Transformeer product data"""
    products = dlt.read("bronze_products")
    
    # Categorisatie logica
    return products \
        .withColumn("categorie",
            when(col("product_name").contains("boek"), "Boeken")
            .when(col("product_name").contains("kleding"), "Kleding")
            .when(col("product_name").contains("elektron"), "Elektronica")
            .when(col("product_name").contains("voeding"), "Voeding")
            .otherwise("Overig")) \
        .withColumn("prijs_klasse",
            when(col("price") < 10, "Budget")
            .when(col("price") < 50, "Midden")
            .when(col("price") < 200, "Premium")
            .otherwise("Luxe")) \
        .withColumn("btw_tarief",
            when(col("categorie").isin("Boeken", "Voeding"), 0.09)  # Verlaagd BTW tarief
            .otherwise(0.21))  # Standaard BTW

# === GOLD LAYER: Business Ready Data ===
@dlt.table(
    name="gold_customer_orders",
    comment="Customer order aggregaties voor analytics",
    partition_cols=["order_year", "order_month"],
    table_properties={
        "quality": "gold",
        "pipelines.autoOptimize.zOrderCols": "customer_id,order_date"
    }
)
def create_customer_orders():
    """Maak customer order aggregaties"""
    orders = dlt.read("silver_orders")
    customers = dlt.read("silver_customers")
    
    # Join orders met customers
    customer_orders = orders.join(
        customers.select("customer_id", "provincie", "leeftijd", "segment"),
        "customer_id",
        "left"
    )
    
    return customer_orders \
        .groupBy(
            "customer_id",
            "provincie",
            "order_year",
            "order_month",
            date_trunc("week", col("order_date")).alias("order_week")
        ) \
        .agg(
            count("*").alias("aantal_orders"),
            sum("order_amount").alias("totaal_omzet"),
            sum("vat_amount").alias("totaal_btw"),
            avg("order_amount").alias("gemiddelde_order"),
            countDistinct("product_id").alias("unieke_producten"),
            min("order_date").alias("eerste_order"),
            max("order_date").alias("laatste_order")
        ) \
        .withColumn("order_frequency_days",
            datediff(col("laatste_order"), col("eerste_order")) / col("aantal_orders"))

@dlt.table(
    name="gold_product_performance",
    comment="Product performance metrics",
    table_properties={"quality": "gold"}
)
def create_product_performance():
    """Bereken product performance metrics"""
    orders = dlt.read("silver_orders")
    products = dlt.read("silver_products")
    
    # Join orders met products
    product_orders = orders.join(
        products.select("product_id", "categorie", "prijs_klasse", "btw_tarief"),
        "product_id",
        "left"
    )
    
    return product_orders \
        .groupBy(
            "product_id",
            "product_name",
            "categorie",
            "prijs_klasse"
        ) \
        .agg(
            count("*").alias("aantal_verkopen"),
            sum("quantity").alias("totaal_aantal"),
            sum("order_amount").alias("totaal_omzet"),
            sum(col("order_amount") * col("btw_tarief")).alias("totaal_btw"),
            avg("price").alias("gemiddelde_prijs"),
            countDistinct("customer_id").alias("unieke_klanten"),
            min("order_date").alias("eerste_verkoop"),
            max("order_date").alias("laatste_verkoop")
        ) \
        .withColumn("dagen_verkrijgbaar",
            datediff(current_date(), col("eerste_verkoop"))) \
        .withColumn("verkopen_per_dag",
            col("aantal_verkopen") / greatest(col("dagen_verkrijgbaar"), 1)) \
        .withColumn("omzet_per_dag",
            col("totaal_omzet") / greatest(col("dagen_verkrijgbaar"), 1))

@dlt.table(
    name="gold_daily_kpis",
    comment="Daily business KPI dashboard",
    partition_cols=["kpi_date"],
    table_properties={"quality": "gold"}
)
def create_daily_kpis():
    """Bereken dagelijkse KPIs"""
    orders = dlt.read("silver_orders")
    
    daily_stats = orders \
        .groupBy("order_date") \
        .agg(
            count("*").alias("dagelijkse_orders"),
            countDistinct("customer_id").alias("unieke_klanten"),
            sum("order_amount").alias("dagelijkse_omzet"),
            sum("vat_amount").alias("dagelijkse_btw"),
            avg("order_amount").alias("gemiddelde_order_waarde"),
            sum("quantity").alias("totaal_producten")
        ) \
        .withColumnRenamed("order_date", "kpi_date")
    
    # Rolling averages (7-day, 30-day)
    window_7d = Window.orderBy("kpi_date").rowsBetween(-6, 0)
    window_30d = Window.orderBy("kpi_date").rowsBetween(-29, 0)
    
    return daily_stats \
        .withColumn("omzet_7d_gemiddelde",
            avg("dagelijkse_omzet").over(window_7d)) \
        .withColumn("orders_7d_gemiddelde",
            avg("dagelijkse_orders").over(window_7d)) \
        .withColumn("omzet_30d_gemiddelde",
            avg("dagelijkse_omzet").over(window_30d)) \
        .withColumn("orders_30d_gemiddelde",
            avg("dagelijkse_orders").over(window_30d))

# === AUDIT & MONITORING ===
@dlt.table(
    name="audit_pipeline_metrics",
    comment="Pipeline execution metrics voor monitoring",
    table_properties={"quality": "audit"}
)
def create_audit_metrics():
    """Genereer audit metrics voor pipeline monitoring"""
    pipeline_info = spark.createDataFrame([
        ("bronze_orders", "bronze", "orders", current_timestamp()),
        ("bronze_customers", "bronze", "customers", current_timestamp()),
        ("bronze_products", "bronze", "products", current_timestamp()),
        ("silver_orders", "silver", "orders", current_timestamp()),
        ("silver_customers", "silver", "customers", current_timestamp()),
        ("silver_products", "silver", "products", current_timestamp()),
        ("gold_customer_orders", "gold", "customer_orders", current_timestamp()),
        ("gold_product_performance", "gold", "product_performance", current_timestamp()),
        ("gold_daily_kpis", "gold", "daily_kpis", current_timestamp())
    ], ["table_name", "layer", "domain", "audit_timestamp"])
    
    return pipeline_info

@dlt.view(
    name="quality_metrics_dashboard",
    comment="Data quality metrics dashboard"
)
def create_quality_dashboard():
    """Genereer quality metrics dashboard"""
    # In practice, you would query system tables
    # This is a simplified example
    return spark.createDataFrame([
        ("bronze_orders", "valid_order_status", 0.98, 0.02, current_timestamp()),
        ("silver_customers", "valid_postcode_nl", 0.95, 0.05, current_timestamp()),
        ("silver_products", "positive_price", 1.00, 0.00, current_timestamp())
    ], ["table_name", "expectation", "pass_rate", "fail_rate", "check_time"])

# Pipeline Configuration
pipeline_config = {
    "name": "E-commerce Data Pipeline",
    "storage": "/mnt/dlt/ecommerce_pipeline",
    "target": "main.ecommerce",
    "configuration": {
        "pipelines.enableOptimization": "true",
        "pipelines.trigger.interval": "1 hour",
        "pipelines.maxConcurrentRuns": 1,
        "spark.sql.adaptive.enabled": "true",
        "spark.databricks.delta.optimizeWrite.enabled": "true",
        "spark.databricks.delta.autoCompact.enabled": "true"
    },
    "libraries": [
        {"maven": {"coordinates": "io.delta:delta-core_2.12:2.3.0"}}
    ],
    "clusters": [
        {
            "label": "default",
            "node_type_id": "Standard_DS3_v2",
            "num_workers": 2,
            "autoscale": {
                "min_workers": 2,
                "max_workers": 8
            }
        }
    ],
    "development": False,
    "continuous": False,
    "photon": True
}

Real-time Streaming Pipelines

Streaming E-commerce Pipeline

# Real-time Streaming DLT Pipeline
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

# === STREAMING BRONZE: Real-time Ingestion ===
@dlt.table(
    name="streaming_bronze_orders",
    comment="Real-time order streaming vanuit Kafka",
    table_properties={"quality": "bronze"}
)
def streaming_orders():
    """Stream order data vanuit Kafka"""
    return spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka-broker:9092") \
        .option("subscribe", "ecommerce-orders") \
        .option("startingOffsets", "latest") \
        .load() \
        .select(
            from_json(col("value").cast("string"), 
                StructType([
                    StructField("order_id", StringType()),
                    StructField("customer_id", StringType()),
                    StructField("product_id", StringType()),
                    StructField("quantity", IntegerType()),
                    StructField("price", DecimalType(10, 2)),
                    StructField("order_timestamp", TimestampType())
                ])
            ).alias("data")
        ) \
        .select("data.*") \
        .withColumn("ingest_timestamp", current_timestamp())

@dlt.table(
    name="streaming_bronze_pageviews",
    comment="Real-time website pageviews",
    table_properties={"quality": "bronze"}
)
def streaming_pageviews():
    """Stream website pageview data"""
    return spark.readStream \
        .format("cloudFiles") \
        .option("cloudFiles.format", "json") \
        .option("cloudFiles.schemaLocation", "/mnt/dlt/schemas/pageviews") \
        .load("/mnt/streaming/pageviews/*.json") \
        .withColumn("ingest_timestamp", current_timestamp())

# === STREAMING SILVER: Real-time Processing ===
@dlt.table(
    name="streaming_silver_orders",
    comment="Real-time processed orders",
    table_properties={"quality": "silver"}
)
@dlt.expect("valid_quantity", "quantity > 0")
@dlt.expect_or_drop("valid_price", "price > 0")
def process_streaming_orders():
    """Process streaming orders in real-time"""
    orders = dlt.read("streaming_bronze_orders")
    
    return orders \
        .withWatermark("order_timestamp", "5 minutes") \
        .withColumn("order_date", to_date(col("order_timestamp"))) \
        .withColumn("order_amount", col("quantity") * col("price"))

@dlt.table(
    name="streaming_silver_sessions",
    comment="Real-time user sessions",
    table_properties={"quality": "silver"}
)
def create_user_sessions():
    """Create user sessions from pageviews"""
    pageviews = dlt.read("streaming_bronze_pageviews")
    
    # Session window: 30 minutes inactivity
    session_window = window(col("event_timestamp"), "30 minutes")
    
    return pageviews \
        .withWatermark("event_timestamp", "10 minutes") \
        .groupBy(
            col("user_id"),
            session_window
        ) \
        .agg(
            count("*").alias("pageviews"),
            collect_list("page_url").alias("visited_pages"),
            min("event_timestamp").alias("session_start"),
            max("event_timestamp").alias("session_end")
        ) \
        .withColumn("session_duration",
            (unix_timestamp(col("session_end")) - 
             unix_timestamp(col("session_start"))) / 60)  # in minutes

# === STREAMING GOLD: Real-time Analytics ===
@dlt.table(
    name="streaming_gold_revenue",
    comment="Real-time revenue dashboard",
    table_properties={
        "quality": "gold",
        "pipelines.trigger.interval": "1 minute"
    }
)
def create_revenue_dashboard():
    """Real-time revenue aggregations"""
    orders = dlt.read("streaming_silver_orders")
    
    # Tumbling window: per minute
    return orders \
        .withWatermark("order_timestamp", "10 minutes") \
        .groupBy(
            window(col("order_timestamp"), "1 minute").alias("time_window"),
            "order_date"
        ) \
        .agg(
            count("*").alias("orders_per_minute"),
            sum("order_amount").alias("revenue_per_minute"),
            countDistinct("customer_id").alias("unique_customers"),
            approx_count_distinct("product_id").alias("unique_products")
        ) \
        .withColumn("window_start", col("time_window.start")) \
        .withColumn("window_end", col("time_window.end"))

@dlt.table(
    name="streaming_gold_top_products",
    comment="Real-time top products",
    table_properties={
        "quality": "gold",
        "pipelines.trigger.interval": "5 minutes"
    }
)
def create_top_products():
    """Real-time top selling products"""
    orders = dlt.read("streaming_silver_orders")
    
    # Sliding window: last 1 hour, updated every 5 minutes
    return orders \
        .withWatermark("order_timestamp", "1 hour") \
        .groupBy(
            window(col("order_timestamp"), "1 hour", "5 minutes").alias("time_window"),
            "product_id"
        ) \
        .agg(
            count("*").alias("sales_count"),
            sum("quantity").alias("total_quantity"),
            sum("order_amount").alias("total_revenue")
        ) \
        .withColumn("rank",
            rank().over(Window.partitionBy("time_window")
                .orderBy(col("total_revenue").desc()))) \
        .filter(col("rank") <= 10)  # Top 10 products

# === CONTINUOUS PROCESSING ===
continuous_config = {
    "name": "Real-time E-commerce Pipeline",
    "storage": "/mnt/dlt/streaming_pipeline",
    "target": "main.streaming",
    "configuration": {
        "pipelines.trigger.interval": "continuous",
        "pipelines.maxConcurrentRuns": 1,
        "pipelines.continuous": "true",
        "spark.sql.streaming.stateStore.providerClass": 
            "com.databricks.sql.streaming.state.RocksDBStateStoreProvider",
        "spark.sql.shuffle.partitions": 200
    },
    "libraries": [
        {"maven": {"coordinates": "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0"}}
    ],
    "clusters": [
        {
            "label": "default",
            "node_type_id": "Standard_DS3_v2",
            "num_workers": 4,
            "autoscale": {
                "min_workers": 4,
                "max_workers": 16
            },
            "spark_conf": {
                "spark.sql.streaming.stateStore.maintenanceInterval": "1m",
                "spark.sql.streaming.metricsEnabled": "true"
            }
        }
    ],
    "development": False,
    "continuous": True,
    "photon": True
}

Data Quality en Monitoring

Expectation Types

# DLT Expectation Patterns
@dlt.table(
    name="quality_monitored_table"
)
# Basis expectation (log alleen)
@dlt.expect("valid_email", "email LIKE '%@%'")

# Drop invalid rows
@dlt.expect_or_drop("valid_postcode", 
    "postcode RLIKE '^[1-9][0-9]{3}\\\\s?[A-Z]{2}$'")

# Fail pipeline bij violation
@dlt.expect_or_fail("positive_amount", "amount > 0")

# Retention policy (bewaar maximaal 90 dagen)
@dlt.expect("retention_policy", 
    "order_date > current_date() - 90")

# Completeness check
@dlt.expect("required_fields", 
    "customer_id IS NOT NULL AND order_date IS NOT NULL")

# Consistency check
@dlt.expect("consistent_dates", 
    "order_date <= delivery_date OR delivery_date IS NULL")

# Business rule
@dlt.expect("discount_limit", 
    "discount_percentage <= 30 OR customer_tier = 'PREMIUM'")

# Multiple expectations
@dlt.expect_all({
    "valid_quantity": "quantity > 0",
    "valid_price": "price BETWEEN 0.01 AND 10000",
    "valid_status": "status IN ('pending', 'completed', 'cancelled')"
})

# Dynamic threshold
threshold = 1000
@dlt.expect(f"high_value_order", f"amount > {threshold}")

Monitoring Dashboard

-- SQL: DLT Monitoring Dashboard
CREATE OR REPLACE VIEW main.monitoring.dlt_dashboard AS
WITH pipeline_metrics AS (
  SELECT 
    pipeline_name,
    table_name,
    expectation,
    -- Metrics uit DLT system tables
    SUM(records_processed) as total_records,
    SUM(records_passed) as passed_records,
    SUM(records_failed) as failed_records,
    -- Bereken rates
    ROUND(SUM(records_passed) * 100.0 / 
          NULLIF(SUM(records_processed), 0), 2) as pass_rate,
    -- Trend analysis
    AVG(records_processed) OVER (
      PARTITION BY pipeline_name, table_name 
      ORDER BY timestamp 
      ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
    ) as avg_records_7runs
  FROM system.pipeline_metrics
  WHERE timestamp > CURRENT_TIMESTAMP() - INTERVAL '7 days'
  GROUP BY 1, 2, 3
),
execution_metrics AS (
  SELECT 
    pipeline_name,
    execution_id,
    start_time,
    end_time,
    -- Performance metrics
    TIMESTAMPDIFF('second', start_time, end_time) as duration_seconds,
    total_dbus,
    -- Efficiency metrics
    total_dbus / NULLIF(
      TIMESTAMPDIFF('second', start_time, end_time), 0
    ) as dbus_per_second,
    -- Status
    CASE 
      WHEN state = 'COMPLETED' THEN 'SUCCESS'
      WHEN state = 'FAILED' THEN 'FAILED'
      ELSE 'RUNNING'
    END as status
  FROM system.pipeline_executions
)
SELECT 
  pm.pipeline_name,
  pm.table_name,
  pm.expectation,
  pm.total_records,
  pm.passed_records,
  pm.failed_records,
  pm.pass_rate,
  em.duration_seconds,
  em.total_dbus,
  em.status,
  -- Health score (0-100)
  CASE 
    WHEN pm.pass_rate >= 99 THEN 100
    WHEN pm.pass_rate >= 95 THEN 90
    WHEN pm.pass_rate >= 90 THEN 80
    WHEN pm.pass_rate >= 80 THEN 70
    ELSE 50
  END as quality_score,
  -- Alert flag
  CASE 
    WHEN pm.pass_rate < 90 THEN 'ALERT'
    WHEN em.duration_seconds > 3600 THEN 'WARNING'
    ELSE 'OK'
  END as alert_status
FROM pipeline_metrics pm
JOIN execution_metrics em
  ON pm.pipeline_name = em.pipeline_name
ORDER BY pm.pass_rate ASC, em.duration_seconds DESC;

Kosten en Performance Optimalisatie

DLT Kosten Breakdown voor MKB

Batch Pipeline (Per Maand)
  • Compute (DBUs): €0.30/DBU × 500 DBU = €150
  • DLT Runtime: €0.10/DBU × 500 DBU = €50
  • Storage (Delta): 100GB × €0.023 = €2.30
  • Unity Catalog: €0.20/table × 20 tables = €4
  • Totaal: ~€206/maand
Streaming Pipeline (Per Maand)
  • Compute (DBUs): €0.30/DBU × 2000 DBU = €600
  • DLT Runtime: €0.10/DBU × 2000 DBU = €200
  • Storage (Delta): 200GB × €0.023 = €4.60
  • Unity Catalog: €0.20/table × 15 tables = €3
  • Totaal: ~€807/maand
Performance Tips
  • Auto-scaling: Min/max workers instellen
  • Photon Engine: Enable voor 2-5× speedup
  • Z-ordering: Optimaliseer query performance
  • Incremental Processing: Alleen nieuwe data
  • Cluster Pools: Reduce startup time
DLT Performance Configuratie
# Optimale DLT Configuration voor MKB
optimal_config = {
    "name": "Optimized MKB Pipeline",
    "storage": "/mnt/dlt/optimized",
    "target": "main.optimized",
    "configuration": {
        # Performance optimizations
        "pipelines.enableOptimization": "true",
        "pipelines.autoOptimize.managed": "true",
        "pipelines.trigger.interval": "1 hour",
        "pipelines.maxConcurrentRuns": 1,
        
        # Spark optimizations
        "spark.sql.adaptive.enabled": "true",
        "spark.sql.adaptive.coalescePartitions.enabled": "true",
        "spark.sql.adaptive.skewJoin.enabled": "true",
        "spark.sql.shuffle.partitions": "200",
        
        # Delta optimizations
        "spark.databricks.delta.optimizeWrite.enabled": "true",
        "spark.databricks.delta.autoCompact.enabled": "true",
        "spark.databricks.delta.properties.defaults.autoOptimize.autoCompact": "true",
        "spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite": "true",
        
        # Cost optimizations
        "pipelines.costControl.enabled": "true",
        "pipelines.costControl.maxDBUsPerRun": "100",
        "pipelines.costControl.maxRuntimeMinutes": "60"
    },
    "clusters": [
        {
            "label": "default",
            "node_type_id": "Standard_DS3_v2",  # Goede prijs/performance
            "num_workers": 2,
            "autoscale": {
                "min_workers": 2,
                "max_workers": 8
            },
            "spark_conf": {
                "spark.databricks.cluster.profile": "serverless",
                "spark.sql.adaptive.enabled": "true",
                "spark.sql.adaptive.coalescePartitions.enabled": "true"
            },
            "aws_attributes": {
                "availability": "SPOT_WITH_FALLBACK",  # 60-70% kostenbesparing
                "zone_id": "eu-west-1a"
            }
        }
    ],
    "libraries": [
        {"maven": {"coordinates": "io.delta:delta-core_2.12:2.3.0"}},
        {"maven": {"coordinates": "com.databricks:databricks-photon:0.1.0"}}
    ],
    "development": False,
    "continuous": False,
    "photon": True,
    "edition": "advanced"
}

Best Practices voor MKB

MKB DLT Implementatie Checklist

Week 1-2
Fase 1: Foundation
  • ✅ Eenvoudige batch pipeline bouwen
  • ✅ Basis expectations implementeren
  • ✅ Medallion architecture setup
  • ✅ Unity Catalog integration
  • ✅ Lokale testing en debugging
  • Kosten: €200-€500 setup
Week 3-4
Fase 2: Advanced Features
  • ✅ Incremental processing implementeren
  • ✅ Geavanceerde expectations (PII, business rules)
  • ✅ Monitoring dashboard bouwen
  • ✅ Cost control en alerts configureren
  • ✅ Team training en documentatie
  • Kosten: €300-€700/maand
Week 5-8
Fase 3: Scaling & Optimization
  • ✅ Streaming pipelines toevoegen
  • ✅ Performance tuning (Photon, Z-order)
  • ✅ Auto-scaling en cluster pools
  • ✅ Disaster recovery procedures
  • ✅ CI/CD pipeline implementeren
  • Kosten: €500-€1.500/maand

Veelgemaakte MKB Fouten

  • Te veel expectations: Begin met 5-10 kritieke checks
  • Geen incremental processing: Volledige refresh is duur
  • Vergeten te monitoren: Gebruik DLT monitoring dashboard
  • Geen cost controls: Zet DBU limits vanaf dag 1
  • Complexe transformations in bronze: Keep bronze simple
  • Geen error handling: Implementeer retry logic

Conclusie: DLT als MKB Data Engineering Game-Changer

ROI Analyse voor MKB

Traditionele ETL (Handmatig)
  • Development tijd: 3-6 maanden
  • Onderhoudskosten: 20-40 uur/maand
  • Error rate: 5-10% data issues
  • Time-to-insight: 24+ uur latency
  • Totaal kosten: €50.000-€100.000/jaar
DLT Pipeline
  • Development tijd: 2-8 weken
  • Onderhoudskosten: 2-5 uur/maand
  • Error rate: < 1% met expectations
  • Time-to-insight: Real-time tot 1 uur
  • Totaal kosten: €5.000-€18.000/jaar
ROI Samenvatting
  • Kostenbesparing: €32.000-€82.000/jaar
  • Productiviteitswinst: 10-20× snellere development
  • Data kwaliteit: 90-95% reductie in errors
  • Time-to-value: 2-4× snellere insights
  • ROI periode: 3-6 maanden

Jouw Volgende Stappen

Stap 1: Gratis DLT Assessment
Gebruik onze gratis DLT Assessment om je huidige ETL processen te analyseren en DLT potentieel te bepalen.

Stap 2: Proof of Concept
Start met een 30-dagen PoC op Databricks met DLT (gratis credits beschikbaar).

Stap 3: Gefaseerde Migratie
Migreer 1-2 kritieke pipelines naar DLT en meet impact voordat je verder gaat.

"Delta Live Tables transformeert data engineering van complexe programmeeruitdagingen naar simpele declaratieve statements. Waar voorheen maanden nodig waren voor robuuste pipelines, kan MKB nu binnen weken enterprise-grade data pipelines implementeren die zichzelf terugbetalen binnen maanden."

Abdullah Özisik - DLT Expert

👨‍💻 Over de auteur

Abdullah Özisik - Databricks Certified Architect en DLT specialist. Heeft tientallen Nederlandse MKB-bedrijven geholpen met DLT implementaties. "Delta Live Tables is de grootste game-changer voor MKB data engineering sinds de cloud. Waar voorheen dedicated data engineers nodig waren, kunnen nu business analysts en developers binnen weken productie-ready pipelines bouwen. Mijn passie is om deze technologie toegankelijk te maken voor elk MKB-bedrijf."

Vorige: Unity Catalog Deep Dive Alle Databricks Blogs Volgende: Databricks Pipelines