Wat is Delta Live Tables? (Beyond the Basics)
Delta Live Tables is niet zomaar een ETL tool - het is een **declaratieve data pipeline engine** die automatisch dependencies, scheduling, monitoring en error handling afhandelt. In plaats van te programmeren hoe data moet stromen, declareer je wat je wilt bereiken.
DLT Architecture: Drie Key Principes
1. Declaratieve Syntax
- What, not How: Declareer resultaten, niet stappen
- Automatische Orchestration: Dependencies automatisch afgeleid
- Simplified Code: 70-80% minder code nodig
- Self-documenting: Code is de documentatie
2. Built-in Quality
- Data Expectations: Ingebouwde data quality checks
- Automatic Monitoring: Real-time pipeline monitoring
- Error Handling: Automatische retry en recovery
- Lineage Tracking: Volledige data lineage
3. Unified Batch & Streaming
- Single Codebase: Zelfde code voor batch en streaming
- Automatic Optimization: Query optimalisatie door engine
- Scalable Execution: Automatische scaling naar behoefte
- Cost Efficient: Alleen betalen voor gebruikte resources
Core DLT Concepten en Syntax
Basis DLT Syntax en Annotaties
# Python DLT Basis Syntax
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
# 1. DECORATORS - Definieer pipeline objecten
@dlt.table(
name="bron_klant_data", # Table name
comment="Bron klant data uit CRM", # Description
table_properties={ # Metadata
"quality": "bronze",
"pipelines.autoOptimize.managed": "true"
}
)
def laad_bron_data():
"""Bron data laad functie"""
return spark.read.format("csv") \
.option("header", "true") \
.load("/mnt/bron/klanten/*.csv")
# 2. EXPECTATIONS - Data quality checks
@dlt.table(
name="gevalideerde_klant_data",
comment="Gevallideerde klant data met quality checks"
)
@dlt.expect("valid_email", "email LIKE '%@%'") # Must have @
@dlt.expect_or_drop("valid_postcode", "postcode RLIKE '^[1-9][0-9]{3}\s?[A-Z]{2}$'") # Drop invalid
@dlt.expect_or_fail("positive_amount", "bedrag > 0") # Fail if negative
@dlt.expect_all({
"valid_bsn": "bsn RLIKE '^[1-9][0-9]{8}$'", # 9 cijfers
"valid_telefoon": "telefoon RLIKE '^[0-9]{10}$'" # 10 cijfers
})
def transformeer_klant_data():
"""Transformeer klant data met validaties"""
df = dlt.read("bron_klant_data")
return df \
.withColumn("ingest_timestamp", current_timestamp()) \
.withColumn("geboortedatum", to_date(col("geboortedatum_str"), "dd-MM-yyyy")) \
.withColumn("leeftijd",
floor(months_between(current_date(), col("geboortedatum")) / 12))
# 3. VIEWS - Logische transformaties
@dlt.view(
name="klant_segmenten",
comment="Klant segmentatie view"
)
def bereken_klant_segmenten():
"""Bereken klant segmenten"""
df = dlt.read("gevalideerde_klant_data")
return df \
.withColumn("segment",
when(col("leeftijd") < 30, "JONG")
.when(col("leeftijd") < 50, "MIDDEN")
.otherwise("SENIOR")) \
.withColumn("waarde_segment",
when(col("totaal_omzet") > 10000, "PREMIUM")
.when(col("totaal_omzet") > 5000, "MIDDEN")
.otherwise("BASIC"))
# 4. LIVE TABLES - Echte tabellen
@dlt.table(
name="klant_360",
comment="Klant 360 overzicht voor analytics",
partition_cols=["land"], # Partitioneren
table_properties={
"quality": "gold",
"pipelines.autoOptimize.zOrderCols": "klant_id"
}
)
def maak_klant_360():
"""Maak klant 360 overzicht"""
segmenten = dlt.read("klant_segmenten")
return segmenten \
.groupBy("klant_id", "land", "segment", "waarde_segment") \
.agg(
count("*").alias("aantal_transacties"),
sum("bedrag").alias("totaal_omzet"),
avg("bedrag").alias("gemiddelde_omzet"),
max("ingest_timestamp").alias("laatste_transactie")
)
DLT Object Types
- @dlt.table: Fysieke Delta table
- @dlt.view: Logische view (geen storage)
- @dlt.temp_view: Tijdelijke view (per run)
- @dlt.expect: Data quality expectation
- @dlt.expect_or_drop: Drop rijen die falen
- @dlt.expect_or_fail: Fail pipeline bij falen
Pipeline Execution Modes
- Triggered: Run on-demand of scheduled
- Continuous: Real-time streaming
- Development: Local testing mode
- Production: Optimized execution
Built-in Monitoring
- Expectation Metrics: Pass/fail rates
- Execution Metrics: Runtime, resources
- Data Quality: Compleetheid, accuraatheid
- Cost Metrics: DBU usage, efficiency
Praktische DLT Pipeline voor MKB
Complete E-commerce Pipeline
# Complete E-commerce DLT Pipeline voor MKB
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime, timedelta
# === BRONZE LAYER: Raw Data Ingestion ===
@dlt.table(
name="bronze_orders",
comment="Raw order data vanuit webshop",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "true"
}
)
def laad_orders():
"""Laad order data vanuit verschillende bronnen"""
# Webhook/API data
api_orders = spark.read.format("json") \
.load("/mnt/bron/orders/api/*.json")
# Database exports
db_orders = spark.read.format("jdbc") \
.option("url", "jdbc:postgresql://db.example.com:5432/ecommerce") \
.option("dbtable", "orders") \
.option("user", "readonly") \
.option("password", "***") \
.load()
# CSV uploads (backoffice)
csv_orders = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("/mnt/bron/orders/csv/*.csv")
# Unie alle bronnen
return api_orders.union(db_orders).union(csv_orders) \
.withColumn("ingest_timestamp", current_timestamp()) \
.withColumn("bron_type", lit("orders"))
@dlt.table(
name="bronze_customers",
comment="Raw customer data vanuit CRM",
table_properties={"quality": "bronze"}
)
@dlt.expect_or_drop("valid_email_format", "email RLIKE '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}$'")
def laad_customers():
"""Laad customer data met email validatie"""
return spark.read.format("parquet") \
.load("/mnt/bron/customers/*.parquet") \
.withColumn("ingest_timestamp", current_timestamp()) \
.withColumn("bron_type", lit("customers"))
@dlt.table(
name="bronze_products",
comment="Raw product catalogus",
table_properties={"quality": "bronze"}
)
def laad_products():
"""Laad product data"""
return spark.read.format("delta") \
.load("/mnt/bron/products/") \
.withColumn("ingest_timestamp", current_timestamp()) \
.withColumn("bron_type", lit("products"))
# === SILVER LAYER: Cleaned & Enriched Data ===
@dlt.table(
name="silver_orders",
comment="Cleaned order data met validaties",
partition_cols=["order_date"],
table_properties={"quality": "silver"}
)
@dlt.expect_all({
"positive_quantity": "quantity > 0",
"positive_price": "price > 0",
"valid_order_date": "order_date >= '2020-01-01'"
})
@dlt.expect_or_drop("valid_order_status", "status IN ('pending', 'paid', 'shipped', 'delivered', 'cancelled')")
def transform_orders():
"""Transformeer order data"""
orders = dlt.read("bronze_orders")
# Data cleaning
cleaned = orders \
.withColumn("order_date", to_date(col("order_timestamp"))) \
.withColumn("order_year", year(col("order_date"))) \
.withColumn("order_month", month(col("order_date"))) \
.withColumn("order_week", weekofyear(col("order_date"))) \
.withColumn("order_weekday", date_format(col("order_date"), "EEEE")) \
.withColumn("order_amount", col("quantity") * col("price")) \
.withColumn("vat_amount", col("order_amount") * 0.21) # Nederlandse BTW
# Handle missing values
return cleaned \
.fillna({"discount": 0.0}) \
.withColumn("customer_id",
when(col("customer_id").isNull(), lit("UNKNOWN"))
.otherwise(col("customer_id")))
@dlt.table(
name="silver_customers",
comment="Cleaned customer data met PII masking",
table_properties={
"quality": "silver",
"pii.masked": "true"
}
)
@dlt.expect_or_drop("valid_postcode_nl", "postcode RLIKE '^[1-9][0-9]{3}\s?[A-Z]{2}$'")
@dlt.expect_or_drop("valid_phone_nl", "telefoon RLIKE '^[0-9]{10}$' OR telefoon IS NULL")
def transform_customers():
"""Transformeer customer data met PII bescherming"""
customers = dlt.read("bronze_customers")
# PII masking voor interne gebruik
return customers \
.withColumn("email_masked",
regexp_replace(col("email"), "(?<=.).(?=.*@)", "*")) \
.withColumn("telefoon_masked",
when(col("telefoon").isNotNull(),
concat(lit("******"), substring(col("telefoon"), 7, 4)))
.otherwise(None)) \
.withColumn("geboortedatum",
to_date(col("geboortedatum_str"), "dd-MM-yyyy")) \
.withColumn("leeftijd",
floor(months_between(current_date(), col("geboortedatum")) / 12)) \
.withColumn("provincie",
when(substring(col("postcode"), 1, 1) == "1", "Noord-Holland")
.when(substring(col("postcode"), 1, 1) == "2", "Zuid-Holland")
.when(substring(col("postcode"), 1, 1) == "3", "Zeeland")
.when(substring(col("postcode"), 1, 1) == "4", "Noord-Brabant")
.when(substring(col("postcode"), 1, 1) == "5", "Limburg")
.when(substring(col("postcode"), 1, 1) == "6", "Utrecht")
.when(substring(col("postcode"), 1, 1) == "7", "Gelderland")
.when(substring(col("postcode"), 1, 1) == "8", "Overijssel")
.when(substring(col("postcode"), 1, 1) == "9", "Friesland/Groningen/Drenthe")
.otherwise("Onbekend"))
@dlt.table(
name="silver_products",
comment="Cleaned product data met categorisatie",
table_properties={"quality": "silver"}
)
@dlt.expect("positive_stock", "stock >= 0")
@dlt.expect("positive_price", "price > 0")
def transform_products():
"""Transformeer product data"""
products = dlt.read("bronze_products")
# Categorisatie logica
return products \
.withColumn("categorie",
when(col("product_name").contains("boek"), "Boeken")
.when(col("product_name").contains("kleding"), "Kleding")
.when(col("product_name").contains("elektron"), "Elektronica")
.when(col("product_name").contains("voeding"), "Voeding")
.otherwise("Overig")) \
.withColumn("prijs_klasse",
when(col("price") < 10, "Budget")
.when(col("price") < 50, "Midden")
.when(col("price") < 200, "Premium")
.otherwise("Luxe")) \
.withColumn("btw_tarief",
when(col("categorie").isin("Boeken", "Voeding"), 0.09) # Verlaagd BTW tarief
.otherwise(0.21)) # Standaard BTW
# === GOLD LAYER: Business Ready Data ===
@dlt.table(
name="gold_customer_orders",
comment="Customer order aggregaties voor analytics",
partition_cols=["order_year", "order_month"],
table_properties={
"quality": "gold",
"pipelines.autoOptimize.zOrderCols": "customer_id,order_date"
}
)
def create_customer_orders():
"""Maak customer order aggregaties"""
orders = dlt.read("silver_orders")
customers = dlt.read("silver_customers")
# Join orders met customers
customer_orders = orders.join(
customers.select("customer_id", "provincie", "leeftijd", "segment"),
"customer_id",
"left"
)
return customer_orders \
.groupBy(
"customer_id",
"provincie",
"order_year",
"order_month",
date_trunc("week", col("order_date")).alias("order_week")
) \
.agg(
count("*").alias("aantal_orders"),
sum("order_amount").alias("totaal_omzet"),
sum("vat_amount").alias("totaal_btw"),
avg("order_amount").alias("gemiddelde_order"),
countDistinct("product_id").alias("unieke_producten"),
min("order_date").alias("eerste_order"),
max("order_date").alias("laatste_order")
) \
.withColumn("order_frequency_days",
datediff(col("laatste_order"), col("eerste_order")) / col("aantal_orders"))
@dlt.table(
name="gold_product_performance",
comment="Product performance metrics",
table_properties={"quality": "gold"}
)
def create_product_performance():
"""Bereken product performance metrics"""
orders = dlt.read("silver_orders")
products = dlt.read("silver_products")
# Join orders met products
product_orders = orders.join(
products.select("product_id", "categorie", "prijs_klasse", "btw_tarief"),
"product_id",
"left"
)
return product_orders \
.groupBy(
"product_id",
"product_name",
"categorie",
"prijs_klasse"
) \
.agg(
count("*").alias("aantal_verkopen"),
sum("quantity").alias("totaal_aantal"),
sum("order_amount").alias("totaal_omzet"),
sum(col("order_amount") * col("btw_tarief")).alias("totaal_btw"),
avg("price").alias("gemiddelde_prijs"),
countDistinct("customer_id").alias("unieke_klanten"),
min("order_date").alias("eerste_verkoop"),
max("order_date").alias("laatste_verkoop")
) \
.withColumn("dagen_verkrijgbaar",
datediff(current_date(), col("eerste_verkoop"))) \
.withColumn("verkopen_per_dag",
col("aantal_verkopen") / greatest(col("dagen_verkrijgbaar"), 1)) \
.withColumn("omzet_per_dag",
col("totaal_omzet") / greatest(col("dagen_verkrijgbaar"), 1))
@dlt.table(
name="gold_daily_kpis",
comment="Daily business KPI dashboard",
partition_cols=["kpi_date"],
table_properties={"quality": "gold"}
)
def create_daily_kpis():
"""Bereken dagelijkse KPIs"""
orders = dlt.read("silver_orders")
daily_stats = orders \
.groupBy("order_date") \
.agg(
count("*").alias("dagelijkse_orders"),
countDistinct("customer_id").alias("unieke_klanten"),
sum("order_amount").alias("dagelijkse_omzet"),
sum("vat_amount").alias("dagelijkse_btw"),
avg("order_amount").alias("gemiddelde_order_waarde"),
sum("quantity").alias("totaal_producten")
) \
.withColumnRenamed("order_date", "kpi_date")
# Rolling averages (7-day, 30-day)
window_7d = Window.orderBy("kpi_date").rowsBetween(-6, 0)
window_30d = Window.orderBy("kpi_date").rowsBetween(-29, 0)
return daily_stats \
.withColumn("omzet_7d_gemiddelde",
avg("dagelijkse_omzet").over(window_7d)) \
.withColumn("orders_7d_gemiddelde",
avg("dagelijkse_orders").over(window_7d)) \
.withColumn("omzet_30d_gemiddelde",
avg("dagelijkse_omzet").over(window_30d)) \
.withColumn("orders_30d_gemiddelde",
avg("dagelijkse_orders").over(window_30d))
# === AUDIT & MONITORING ===
@dlt.table(
name="audit_pipeline_metrics",
comment="Pipeline execution metrics voor monitoring",
table_properties={"quality": "audit"}
)
def create_audit_metrics():
"""Genereer audit metrics voor pipeline monitoring"""
pipeline_info = spark.createDataFrame([
("bronze_orders", "bronze", "orders", current_timestamp()),
("bronze_customers", "bronze", "customers", current_timestamp()),
("bronze_products", "bronze", "products", current_timestamp()),
("silver_orders", "silver", "orders", current_timestamp()),
("silver_customers", "silver", "customers", current_timestamp()),
("silver_products", "silver", "products", current_timestamp()),
("gold_customer_orders", "gold", "customer_orders", current_timestamp()),
("gold_product_performance", "gold", "product_performance", current_timestamp()),
("gold_daily_kpis", "gold", "daily_kpis", current_timestamp())
], ["table_name", "layer", "domain", "audit_timestamp"])
return pipeline_info
@dlt.view(
name="quality_metrics_dashboard",
comment="Data quality metrics dashboard"
)
def create_quality_dashboard():
"""Genereer quality metrics dashboard"""
# In practice, you would query system tables
# This is a simplified example
return spark.createDataFrame([
("bronze_orders", "valid_order_status", 0.98, 0.02, current_timestamp()),
("silver_customers", "valid_postcode_nl", 0.95, 0.05, current_timestamp()),
("silver_products", "positive_price", 1.00, 0.00, current_timestamp())
], ["table_name", "expectation", "pass_rate", "fail_rate", "check_time"])
# Pipeline Configuration
pipeline_config = {
"name": "E-commerce Data Pipeline",
"storage": "/mnt/dlt/ecommerce_pipeline",
"target": "main.ecommerce",
"configuration": {
"pipelines.enableOptimization": "true",
"pipelines.trigger.interval": "1 hour",
"pipelines.maxConcurrentRuns": 1,
"spark.sql.adaptive.enabled": "true",
"spark.databricks.delta.optimizeWrite.enabled": "true",
"spark.databricks.delta.autoCompact.enabled": "true"
},
"libraries": [
{"maven": {"coordinates": "io.delta:delta-core_2.12:2.3.0"}}
],
"clusters": [
{
"label": "default",
"node_type_id": "Standard_DS3_v2",
"num_workers": 2,
"autoscale": {
"min_workers": 2,
"max_workers": 8
}
}
],
"development": False,
"continuous": False,
"photon": True
}
Real-time Streaming Pipelines
Streaming E-commerce Pipeline
# Real-time Streaming DLT Pipeline
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
# === STREAMING BRONZE: Real-time Ingestion ===
@dlt.table(
name="streaming_bronze_orders",
comment="Real-time order streaming vanuit Kafka",
table_properties={"quality": "bronze"}
)
def streaming_orders():
"""Stream order data vanuit Kafka"""
return spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-broker:9092") \
.option("subscribe", "ecommerce-orders") \
.option("startingOffsets", "latest") \
.load() \
.select(
from_json(col("value").cast("string"),
StructType([
StructField("order_id", StringType()),
StructField("customer_id", StringType()),
StructField("product_id", StringType()),
StructField("quantity", IntegerType()),
StructField("price", DecimalType(10, 2)),
StructField("order_timestamp", TimestampType())
])
).alias("data")
) \
.select("data.*") \
.withColumn("ingest_timestamp", current_timestamp())
@dlt.table(
name="streaming_bronze_pageviews",
comment="Real-time website pageviews",
table_properties={"quality": "bronze"}
)
def streaming_pageviews():
"""Stream website pageview data"""
return spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "/mnt/dlt/schemas/pageviews") \
.load("/mnt/streaming/pageviews/*.json") \
.withColumn("ingest_timestamp", current_timestamp())
# === STREAMING SILVER: Real-time Processing ===
@dlt.table(
name="streaming_silver_orders",
comment="Real-time processed orders",
table_properties={"quality": "silver"}
)
@dlt.expect("valid_quantity", "quantity > 0")
@dlt.expect_or_drop("valid_price", "price > 0")
def process_streaming_orders():
"""Process streaming orders in real-time"""
orders = dlt.read("streaming_bronze_orders")
return orders \
.withWatermark("order_timestamp", "5 minutes") \
.withColumn("order_date", to_date(col("order_timestamp"))) \
.withColumn("order_amount", col("quantity") * col("price"))
@dlt.table(
name="streaming_silver_sessions",
comment="Real-time user sessions",
table_properties={"quality": "silver"}
)
def create_user_sessions():
"""Create user sessions from pageviews"""
pageviews = dlt.read("streaming_bronze_pageviews")
# Session window: 30 minutes inactivity
session_window = window(col("event_timestamp"), "30 minutes")
return pageviews \
.withWatermark("event_timestamp", "10 minutes") \
.groupBy(
col("user_id"),
session_window
) \
.agg(
count("*").alias("pageviews"),
collect_list("page_url").alias("visited_pages"),
min("event_timestamp").alias("session_start"),
max("event_timestamp").alias("session_end")
) \
.withColumn("session_duration",
(unix_timestamp(col("session_end")) -
unix_timestamp(col("session_start"))) / 60) # in minutes
# === STREAMING GOLD: Real-time Analytics ===
@dlt.table(
name="streaming_gold_revenue",
comment="Real-time revenue dashboard",
table_properties={
"quality": "gold",
"pipelines.trigger.interval": "1 minute"
}
)
def create_revenue_dashboard():
"""Real-time revenue aggregations"""
orders = dlt.read("streaming_silver_orders")
# Tumbling window: per minute
return orders \
.withWatermark("order_timestamp", "10 minutes") \
.groupBy(
window(col("order_timestamp"), "1 minute").alias("time_window"),
"order_date"
) \
.agg(
count("*").alias("orders_per_minute"),
sum("order_amount").alias("revenue_per_minute"),
countDistinct("customer_id").alias("unique_customers"),
approx_count_distinct("product_id").alias("unique_products")
) \
.withColumn("window_start", col("time_window.start")) \
.withColumn("window_end", col("time_window.end"))
@dlt.table(
name="streaming_gold_top_products",
comment="Real-time top products",
table_properties={
"quality": "gold",
"pipelines.trigger.interval": "5 minutes"
}
)
def create_top_products():
"""Real-time top selling products"""
orders = dlt.read("streaming_silver_orders")
# Sliding window: last 1 hour, updated every 5 minutes
return orders \
.withWatermark("order_timestamp", "1 hour") \
.groupBy(
window(col("order_timestamp"), "1 hour", "5 minutes").alias("time_window"),
"product_id"
) \
.agg(
count("*").alias("sales_count"),
sum("quantity").alias("total_quantity"),
sum("order_amount").alias("total_revenue")
) \
.withColumn("rank",
rank().over(Window.partitionBy("time_window")
.orderBy(col("total_revenue").desc()))) \
.filter(col("rank") <= 10) # Top 10 products
# === CONTINUOUS PROCESSING ===
continuous_config = {
"name": "Real-time E-commerce Pipeline",
"storage": "/mnt/dlt/streaming_pipeline",
"target": "main.streaming",
"configuration": {
"pipelines.trigger.interval": "continuous",
"pipelines.maxConcurrentRuns": 1,
"pipelines.continuous": "true",
"spark.sql.streaming.stateStore.providerClass":
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider",
"spark.sql.shuffle.partitions": 200
},
"libraries": [
{"maven": {"coordinates": "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0"}}
],
"clusters": [
{
"label": "default",
"node_type_id": "Standard_DS3_v2",
"num_workers": 4,
"autoscale": {
"min_workers": 4,
"max_workers": 16
},
"spark_conf": {
"spark.sql.streaming.stateStore.maintenanceInterval": "1m",
"spark.sql.streaming.metricsEnabled": "true"
}
}
],
"development": False,
"continuous": True,
"photon": True
}
Data Quality en Monitoring
Expectation Types
# DLT Expectation Patterns
@dlt.table(
name="quality_monitored_table"
)
# Basis expectation (log alleen)
@dlt.expect("valid_email", "email LIKE '%@%'")
# Drop invalid rows
@dlt.expect_or_drop("valid_postcode",
"postcode RLIKE '^[1-9][0-9]{3}\\\\s?[A-Z]{2}$'")
# Fail pipeline bij violation
@dlt.expect_or_fail("positive_amount", "amount > 0")
# Retention policy (bewaar maximaal 90 dagen)
@dlt.expect("retention_policy",
"order_date > current_date() - 90")
# Completeness check
@dlt.expect("required_fields",
"customer_id IS NOT NULL AND order_date IS NOT NULL")
# Consistency check
@dlt.expect("consistent_dates",
"order_date <= delivery_date OR delivery_date IS NULL")
# Business rule
@dlt.expect("discount_limit",
"discount_percentage <= 30 OR customer_tier = 'PREMIUM'")
# Multiple expectations
@dlt.expect_all({
"valid_quantity": "quantity > 0",
"valid_price": "price BETWEEN 0.01 AND 10000",
"valid_status": "status IN ('pending', 'completed', 'cancelled')"
})
# Dynamic threshold
threshold = 1000
@dlt.expect(f"high_value_order", f"amount > {threshold}")
Monitoring Dashboard
-- SQL: DLT Monitoring Dashboard
CREATE OR REPLACE VIEW main.monitoring.dlt_dashboard AS
WITH pipeline_metrics AS (
SELECT
pipeline_name,
table_name,
expectation,
-- Metrics uit DLT system tables
SUM(records_processed) as total_records,
SUM(records_passed) as passed_records,
SUM(records_failed) as failed_records,
-- Bereken rates
ROUND(SUM(records_passed) * 100.0 /
NULLIF(SUM(records_processed), 0), 2) as pass_rate,
-- Trend analysis
AVG(records_processed) OVER (
PARTITION BY pipeline_name, table_name
ORDER BY timestamp
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as avg_records_7runs
FROM system.pipeline_metrics
WHERE timestamp > CURRENT_TIMESTAMP() - INTERVAL '7 days'
GROUP BY 1, 2, 3
),
execution_metrics AS (
SELECT
pipeline_name,
execution_id,
start_time,
end_time,
-- Performance metrics
TIMESTAMPDIFF('second', start_time, end_time) as duration_seconds,
total_dbus,
-- Efficiency metrics
total_dbus / NULLIF(
TIMESTAMPDIFF('second', start_time, end_time), 0
) as dbus_per_second,
-- Status
CASE
WHEN state = 'COMPLETED' THEN 'SUCCESS'
WHEN state = 'FAILED' THEN 'FAILED'
ELSE 'RUNNING'
END as status
FROM system.pipeline_executions
)
SELECT
pm.pipeline_name,
pm.table_name,
pm.expectation,
pm.total_records,
pm.passed_records,
pm.failed_records,
pm.pass_rate,
em.duration_seconds,
em.total_dbus,
em.status,
-- Health score (0-100)
CASE
WHEN pm.pass_rate >= 99 THEN 100
WHEN pm.pass_rate >= 95 THEN 90
WHEN pm.pass_rate >= 90 THEN 80
WHEN pm.pass_rate >= 80 THEN 70
ELSE 50
END as quality_score,
-- Alert flag
CASE
WHEN pm.pass_rate < 90 THEN 'ALERT'
WHEN em.duration_seconds > 3600 THEN 'WARNING'
ELSE 'OK'
END as alert_status
FROM pipeline_metrics pm
JOIN execution_metrics em
ON pm.pipeline_name = em.pipeline_name
ORDER BY pm.pass_rate ASC, em.duration_seconds DESC;
Kosten en Performance Optimalisatie
DLT Kosten Breakdown voor MKB
Batch Pipeline (Per Maand)
- Compute (DBUs): €0.30/DBU × 500 DBU = €150
- DLT Runtime: €0.10/DBU × 500 DBU = €50
- Storage (Delta): 100GB × €0.023 = €2.30
- Unity Catalog: €0.20/table × 20 tables = €4
- Totaal: ~€206/maand
Streaming Pipeline (Per Maand)
- Compute (DBUs): €0.30/DBU × 2000 DBU = €600
- DLT Runtime: €0.10/DBU × 2000 DBU = €200
- Storage (Delta): 200GB × €0.023 = €4.60
- Unity Catalog: €0.20/table × 15 tables = €3
- Totaal: ~€807/maand
Performance Tips
- Auto-scaling: Min/max workers instellen
- Photon Engine: Enable voor 2-5× speedup
- Z-ordering: Optimaliseer query performance
- Incremental Processing: Alleen nieuwe data
- Cluster Pools: Reduce startup time
DLT Performance Configuratie
# Optimale DLT Configuration voor MKB
optimal_config = {
"name": "Optimized MKB Pipeline",
"storage": "/mnt/dlt/optimized",
"target": "main.optimized",
"configuration": {
# Performance optimizations
"pipelines.enableOptimization": "true",
"pipelines.autoOptimize.managed": "true",
"pipelines.trigger.interval": "1 hour",
"pipelines.maxConcurrentRuns": 1,
# Spark optimizations
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.skewJoin.enabled": "true",
"spark.sql.shuffle.partitions": "200",
# Delta optimizations
"spark.databricks.delta.optimizeWrite.enabled": "true",
"spark.databricks.delta.autoCompact.enabled": "true",
"spark.databricks.delta.properties.defaults.autoOptimize.autoCompact": "true",
"spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite": "true",
# Cost optimizations
"pipelines.costControl.enabled": "true",
"pipelines.costControl.maxDBUsPerRun": "100",
"pipelines.costControl.maxRuntimeMinutes": "60"
},
"clusters": [
{
"label": "default",
"node_type_id": "Standard_DS3_v2", # Goede prijs/performance
"num_workers": 2,
"autoscale": {
"min_workers": 2,
"max_workers": 8
},
"spark_conf": {
"spark.databricks.cluster.profile": "serverless",
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true"
},
"aws_attributes": {
"availability": "SPOT_WITH_FALLBACK", # 60-70% kostenbesparing
"zone_id": "eu-west-1a"
}
}
],
"libraries": [
{"maven": {"coordinates": "io.delta:delta-core_2.12:2.3.0"}},
{"maven": {"coordinates": "com.databricks:databricks-photon:0.1.0"}}
],
"development": False,
"continuous": False,
"photon": True,
"edition": "advanced"
}
Best Practices voor MKB
MKB DLT Implementatie Checklist
Fase 1: Foundation
- ✅ Eenvoudige batch pipeline bouwen
- ✅ Basis expectations implementeren
- ✅ Medallion architecture setup
- ✅ Unity Catalog integration
- ✅ Lokale testing en debugging
- Kosten: €200-€500 setup
Fase 2: Advanced Features
- ✅ Incremental processing implementeren
- ✅ Geavanceerde expectations (PII, business rules)
- ✅ Monitoring dashboard bouwen
- ✅ Cost control en alerts configureren
- ✅ Team training en documentatie
- Kosten: €300-€700/maand
Fase 3: Scaling & Optimization
- ✅ Streaming pipelines toevoegen
- ✅ Performance tuning (Photon, Z-order)
- ✅ Auto-scaling en cluster pools
- ✅ Disaster recovery procedures
- ✅ CI/CD pipeline implementeren
- Kosten: €500-€1.500/maand
Veelgemaakte MKB Fouten
- Te veel expectations: Begin met 5-10 kritieke checks
- Geen incremental processing: Volledige refresh is duur
- Vergeten te monitoren: Gebruik DLT monitoring dashboard
- Geen cost controls: Zet DBU limits vanaf dag 1
- Complexe transformations in bronze: Keep bronze simple
- Geen error handling: Implementeer retry logic
Conclusie: DLT als MKB Data Engineering Game-Changer
ROI Analyse voor MKB
Traditionele ETL (Handmatig)
- Development tijd: 3-6 maanden
- Onderhoudskosten: 20-40 uur/maand
- Error rate: 5-10% data issues
- Time-to-insight: 24+ uur latency
- Totaal kosten: €50.000-€100.000/jaar
DLT Pipeline
- Development tijd: 2-8 weken
- Onderhoudskosten: 2-5 uur/maand
- Error rate: < 1% met expectations
- Time-to-insight: Real-time tot 1 uur
- Totaal kosten: €5.000-€18.000/jaar
ROI Samenvatting
- Kostenbesparing: €32.000-€82.000/jaar
- Productiviteitswinst: 10-20× snellere development
- Data kwaliteit: 90-95% reductie in errors
- Time-to-value: 2-4× snellere insights
- ROI periode: 3-6 maanden
Jouw Volgende Stappen
Stap 1: Gratis DLT Assessment
Gebruik onze gratis DLT Assessment om je huidige ETL processen te analyseren en DLT potentieel te bepalen.
Stap 2: Proof of Concept
Start met een 30-dagen PoC op Databricks met DLT (gratis credits beschikbaar).
Stap 3: Gefaseerde Migratie
Migreer 1-2 kritieke pipelines naar DLT en meet impact voordat je verder gaat.
"Delta Live Tables transformeert data engineering van complexe programmeeruitdagingen naar simpele declaratieve statements. Waar voorheen maanden nodig waren voor robuuste pipelines, kan MKB nu binnen weken enterprise-grade data pipelines implementeren die zichzelf terugbetalen binnen maanden."