Spark-SQL Essentials
Complete handleiding voor SQL queries, DataFrame API en geavanceerde query technieken in Databricks
90-120 minuten
Intermediate niveau
40+ praktische voorbeelden
Cursus Modules
Module 4: Spark-SQL Essentials in Databricks
Leerdoelen
Na deze module kun je:
- SQL queries schrijven in Databricks notebooks
- De DataFrame API effectief gebruiken
- Temporary views en Global Temporary Views aanmaken
- Geavanceerde SQL functies gebruiken (window functions, UDFs)
- Data queryen uit Delta Lake tables
- Query performance optimaliseren
- Complexe joins en aggregaties uitvoeren
4.1 Spark-SQL Fundamentals
Spark-SQL is de module van Apache Spark voor gestructureerde dataverwerking. Het biedt SQL-achtige query's op distributed data via DataFrame API of directe SQL queries.
| Feature | Beschrijving | Voordelen |
|---|---|---|
| DataFrame API | Relationele API in Scala, Java, Python, R | Type-safe, IDE ondersteuning, compile-time checking |
| SQL Interface | ANSI SQL:2003 compatibele query taal | Vertrouwde syntax voor SQL gebruikers |
| Catalyst Optimizer | Query optimizer met regel- en cost-based optimalisatie | Automatische performance optimalisatie |
| Tungsten Engine | Geheugen- en CPU-geoptimaliseerde execution engine | 10-100x snellere execution dan traditionele Spark |
1
Stap 1: Je Eerste SQL Query in Databricks
-- Cel 1: Maak een nieuwe notebook en selecteer SQL als default language
-- Of gebruik een Python notebook met SQL magic commands
-- Cel 2: Maak een sample dataset
CREATE OR REPLACE TEMPORARY VIEW employees AS
SELECT * FROM (
VALUES
(1, 'Jan', 'Jansen', 'IT', 5000, 'Amsterdam'),
(2, 'Piet', 'Pietersen', 'Sales', 4500, 'Rotterdam'),
(3, 'Marie', 'de Vries', 'IT', 6000, 'Utrecht'),
(4, 'Klaas', 'Bakker', 'HR', 4000, 'Den Haag'),
(5, 'Lisa', 'Smit', 'Sales', 4800, 'Amsterdam'),
(6, 'Thomas', 'Mulder', 'IT', 5500, 'Rotterdam'),
(7, 'Emma', 'Bos', 'Marketing', 4200, 'Utrecht'),
(8, 'David', 'Visser', 'IT', 5200, 'Amsterdam')
) AS t(id, first_name, last_name, department, salary, city);
-- Cel 3: Eenvoudige SELECT query
SELECT * FROM employees;
-- Cel 4: Query met filtering en ordering
SELECT first_name, last_name, department, salary
FROM employees
WHERE department = 'IT'
ORDER BY salary DESC;
-- Cel 5: Aggregatie functies
SELECT
department,
COUNT(*) AS employee_count,
AVG(salary) AS avg_salary,
MAX(salary) AS max_salary,
MIN(salary) AS min_salary
FROM employees
GROUP BY department
ORDER BY avg_salary DESC;
Tip: SQL Magic Commands in Python Notebooks
In Python notebooks kun je SQL queries uitvoeren met magic commands:
# Python cel
%sql
SELECT * FROM employees LIMIT 5;
# Of gebruik spark.sql() functie
result = spark.sql("SELECT department, COUNT(*) as count FROM employees GROUP BY department")
result.show()
4.2 DataFrame API vs SQL
Vergelijking: DataFrame API vs SQL
| Aspect | DataFrame API (Python/Scala) | SQL |
|---|---|---|
| Syntax | Method chaining, function calls | Declaratieve SQL statements |
| Type Safety | Compile-time type checking (Scala) | Runtime type checking |
| IDE Support | Autocomplete, refactoring | Beperkte IDE ondersteuning |
| Performance | Zelfde (beide gebruiken Catalyst optimizer) | Zelfde (beide gebruiken Catalyst optimizer) |
| Learning Curve | Programmeurs, data scientists | Analisten, SQL experts |
| Best Voor | Complexe business logic, ETL pipelines | Ad-hoc queries, rapportage, analytics |
2
Stap 2: Dezelfde Query in DataFrame API en SQL
# OPTIE 1: DataFrame API (Python)
from pyspark.sql import functions as F
# Filteren en aggregatie
result_df = (spark.table("employees")
.filter(F.col("department") == "IT")
.groupBy("city")
.agg(
F.count("*").alias("employee_count"),
F.avg("salary").alias("avg_salary")
)
.orderBy(F.desc("avg_salary"))
)
result_df.show()
# OPTIE 2: SQL (zelfde resultaat)
-- In een SQL cel of met %sql magic
SELECT
city,
COUNT(*) AS employee_count,
AVG(salary) AS avg_salary
FROM employees
WHERE department = 'IT'
GROUP BY city
ORDER BY avg_salary DESC;
Conversie tussen DataFrame en SQL
# DataFrame naar Temporary View
df.createOrReplaceTempView("my_temp_view")
# SQL query op DataFrame via SQL
sql_result = spark.sql("SELECT * FROM my_temp_view WHERE salary > 5000")
# Global Temporary View (beschikbaar in alle notebooks)
df.createOrReplaceGlobalTempView("my_global_view")
# Toegang tot Global Temporary View
global_result = spark.sql("SELECT * FROM global_temp.my_global_view")
4.3 Geavanceerde SQL Functies
3
Stap 3: Window Functions en CTEs
-- Cel 1: Common Table Expressions (CTEs)
-- Handig voor complexe queries
WITH department_stats AS (
SELECT
department,
AVG(salary) AS dept_avg_salary,
COUNT(*) AS dept_count
FROM employees
GROUP BY department
),
employee_ranking AS (
SELECT
e.*,
d.dept_avg_salary,
d.dept_count
FROM employees e
JOIN department_stats d ON e.department = d.department
)
SELECT * FROM employee_ranking;
-- Cel 2: Window Functions voor ranking en analytics
SELECT
first_name,
last_name,
department,
salary,
-- Rank binnen department op basis van salary
RANK() OVER (
PARTITION BY department
ORDER BY salary DESC
) AS salary_rank_in_dept,
-- Running total per department
SUM(salary) OVER (
PARTITION BY department
ORDER BY salary DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS running_total_dept,
-- Vergelijk met department gemiddelde
salary - AVG(salary) OVER (PARTITION BY department) AS diff_from_dept_avg,
-- Percentile binnen company
PERCENT_RANK() OVER (
ORDER BY salary
) AS salary_percentile
FROM employees
ORDER BY department, salary DESC;
-- Cel 3: Geavanceerde Date en String functies
CREATE OR REPLACE TEMPORARY VIEW sales AS
SELECT * FROM (
VALUES
(1, '2024-01-15', 'Laptop', 1200.50),
(2, '2024-01-16', 'Monitor', 350.75),
(3, '2024-02-01', 'Keyboard', 89.99),
(4, '2024-02-15', 'Mouse', 45.50),
(5, '2024-03-10', 'Laptop', 1350.00),
(6, '2024-03-15', 'Monitor', 399.99)
) AS t(id, sale_date, product, amount);
SELECT
id,
sale_date,
DATE_FORMAT(sale_date, 'yyyy-MM') AS sale_month,
DAY(sale_date) AS day_of_month,
MONTH(sale_date) AS month_number,
YEAR(sale_date) AS sale_year,
product,
UPPER(product) AS product_upper,
SUBSTRING(product, 1, 3) AS product_short,
amount,
ROUND(amount, 0) AS amount_rounded,
FORMAT_NUMBER(amount, 2) AS amount_formatted
FROM sales;
4.4 Joins en Complexe Queries
4
Stap 4: Alle Join Types met Voorbeelden
-- Maak sample tabellen voor joins
CREATE OR REPLACE TEMPORARY VIEW dept AS
SELECT * FROM (
VALUES
('IT', 'Information Technology', 'Building A'),
('Sales', 'Sales Department', 'Building B'),
('HR', 'Human Resources', 'Building C'),
('Finance', 'Finance Department', 'Building D')
) AS t(dept_id, dept_name, location);
-- INNER JOIN: Alleen matching rijen
SELECT
e.first_name,
e.last_name,
e.department,
d.dept_name,
d.location
FROM employees e
INNER JOIN dept d ON e.department = d.dept_id;
-- LEFT JOIN: Alle rijen van employees, matching van dept
SELECT
e.first_name,
e.last_name,
e.department,
d.dept_name,
d.location
FROM employees e
LEFT JOIN dept d ON e.department = d.dept_id;
-- FULL OUTER JOIN: Alle rijen van beide tabellen
SELECT
COALESCE(e.department, d.dept_id) AS department_code,
e.first_name,
e.last_name,
d.dept_name,
d.location
FROM employees e
FULL OUTER JOIN dept d ON e.department = d.dept_id;
-- CROSS JOIN: Cartesiaans product (alle combinaties)
-- Wees voorzichtig met grote datasets!
SELECT
e.first_name,
d.dept_name
FROM employees e
CROSS JOIN dept d
LIMIT 10;
-- Complexe joins met meerdere voorwaarden
CREATE OR REPLACE TEMPORARY VIEW employee_projects AS
SELECT * FROM (
VALUES
(1, 'Project Alpha', '2024-01-01', '2024-06-30'),
(3, 'Project Beta', '2024-02-01', '2024-08-31'),
(1, 'Project Gamma', '2024-03-01', '2024-09-30'),
(6, 'Project Alpha', '2024-01-01', '2024-06-30'),
(8, 'Project Delta', '2024-04-01', '2024-12-31')
) AS t(employee_id, project_name, start_date, end_date);
-- JOIN met meerdere tabellen en complexe WHERE clause
SELECT
e.first_name,
e.last_name,
e.department,
d.dept_name,
p.project_name,
p.start_date,
p.end_date,
DATEDIFF(p.end_date, p.start_date) AS project_duration_days
FROM employees e
INNER JOIN dept d ON e.department = d.dept_id
LEFT JOIN employee_projects p ON e.id = p.employee_id
WHERE e.salary > 4500
AND (p.project_name IS NOT NULL OR e.department = 'HR')
ORDER BY e.department, e.salary DESC;
4.5 Werken met Delta Lake Tables
Delta Lake: Transactionele Data Lake
Delta Lake voegt ACID transactions, schema enforcement en time travel toe aan data lakes.
5
Stap 5: Delta Lake Operations met SQL
-- Cel 1: Maak een Delta Lake table
CREATE OR REPLACE TABLE delta_employees
USING delta
AS
SELECT * FROM employees;
-- Cel 2: Query Delta table (zelfde als gewone table)
SELECT * FROM delta_employees WHERE department = 'IT';
-- Cel 3: INSERT data in Delta table
INSERT INTO delta_employees
VALUES
(9, 'Sara', 'Jansen', 'IT', 5300, 'Amsterdam'),
(10, 'Mark', 'de Wit', 'Finance', 4800, 'Rotterdam');
-- Cel 4: UPDATE data in Delta table
UPDATE delta_employees
SET salary = salary * 1.05 -- 5% salarisverhoging
WHERE department = 'IT';
-- Cel 5: DELETE data van Delta table
DELETE FROM delta_employees
WHERE salary < 4000;
-- Cel 6: MERGE (UPSERT) operatie
CREATE OR REPLACE TEMPORARY VIEW updates AS
SELECT * FROM (
VALUES
(1, 'Jan', 'Jansen', 'IT', 5250, 'Amsterdam'), -- Update bestaande
(11, 'Anna', 'Koster', 'Marketing', 4100, 'Utrecht') -- Nieuwe rij
) AS t(id, first_name, last_name, department, salary, city);
MERGE INTO delta_employees AS target
USING updates AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET
target.first_name = source.first_name,
target.last_name = source.last_name,
target.salary = source.salary
WHEN NOT MATCHED THEN
INSERT *;
-- Cel 7: Delta Lake Time Travel - historische data queryen
-- Toon huidige versie
DESCRIBE HISTORY delta_employees;
-- Query vorige versie (bijv. versie 1)
SELECT * FROM delta_employees VERSION AS OF 1;
-- Query op specifiek tijdstip
SELECT * FROM delta_employees
TIMESTAMP AS OF '2024-01-01 10:00:00';
-- Herstel naar vorige versie
RESTORE TABLE delta_employees TO VERSION AS OF 1;
-- Cel 8: Delta Lake VACUUM (verwijder oude bestanden)
VACUUM delta_employees
RETAIN 168 HOURS; -- Bewaar data van laatste 7 dagen
-- Cel 9: Delta Lake OPTIMIZE (compact kleine bestanden)
OPTIMIZE delta_employees
WHERE department = 'IT'; -- Z-ORDER voor betere performance
4.6 User-Defined Functions (UDFs)
6
Stap 6: Maak en Gebruik UDFs
# OPTIE 1: Python UDFs (eenvoudig maar langzaam)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Definieer een Python functie
def categorize_salary(salary):
if salary < 4000:
return "Junior"
elif salary < 5500:
return "Medior"
else:
return "Senior"
# Registreer als UDF
salary_category_udf = udf(categorize_salary, StringType())
# Gebruik de UDF
df_with_category = spark.table("employees")\
.withColumn("category", salary_category_udf("salary"))
df_with_category.show()
# OPTIE 2: Pandas UDFs (veel sneller met vectorization)
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
# Pandas UDF voor berekeningen
@pandas_udf(DoubleType())
def calculate_bonus(salary: pd.Series) -> pd.Series:
# Deze functie wordt vectorized uitgevoerd
return salary * 0.10 # 10% bonus
# Gebruik Pandas UDF
df_with_bonus = spark.table("employees")\
.withColumn("bonus", calculate_bonus("salary"))
df_with_bonus.show()
# OPTIE 3: SQL UDFs (register Python functies voor SQL gebruik)
# Definieer functie
def get_initials(first_name, last_name):
return f"{first_name[0]}.{last_name[0]}." if first_name and last_name else ""
# Registreer als SQL functie
spark.udf.register("get_initials", get_initials)
# Nu kan je het in SQL gebruiken!
%sql
SELECT
first_name,
last_name,
get_initials(first_name, last_name) AS initials
FROM employees;
# OPTIE 4: Scala UDFs (snelst, voor performance kritieke code)
// In een Scala notebook of cell:
// Definieer Scala UDF
val calculateTax: (Double) => Double = (salary: Double) => {
if (salary < 5000) salary * 0.30 else salary * 0.40
}
// Registreer UDF
spark.udf.register("calculate_tax", calculateTax)
// Gebruik in SQL
spark.sql("""
SELECT salary, calculate_tax(salary) as tax
FROM employees
""").show()
4.7 Performance Optimalisatie
Performance Best Practices
1. Data Layout Optimalisatie:
- Partitionering: Partitioneer op veelgebruikte filter kolommen
- Z-Ordering: Voor multi-dimensional query versnelling
- File Grootte: Optimaliseer naar 128MB - 1GB per bestand
2. Query Optimalisatie:
- Predicate Pushdown: Filter vroeg in query
- Projectie Pushdown: Selecteer alleen benodigde kolommen
- Vermijd SELECT *: Specificeer altijd kolommen
- Gebruik EXPLAIN: Analyseer query execution plan
3. Cache Strategieën:
- Delta Cache: Automatisch caching voor herhaalde reads
- Spark Cache: Voor kleine, vaak gebruikte datasets
- Result Cache: Voor vaak draaiende rapporten
7
Stap 7: Query Performance Analyse en Optimalisatie
-- Cel 1: ANALYZE TABLE voor statistieken
ANALYZE TABLE delta_employees COMPUTE STATISTICS;
-- Toon tabel statistieken
DESCRIBE EXTENDED delta_employees;
-- Cel 2: EXPLAIN plan om query execution te begrijpen
EXPLAIN EXTENDED
SELECT department, AVG(salary) AS avg_salary
FROM delta_employees
WHERE city = 'Amsterdam'
GROUP BY department;
-- Cel 3: Forceer broadcast join voor kleine tabellen
-- SQL manier
SELECT /*+ BROADCAST(d) */ e.*, d.dept_name
FROM delta_employees e
JOIN dept d ON e.department = d.dept_id;
-- DataFrame API manier
from pyspark.sql.functions import broadcast
small_df = spark.table("dept")
large_df = spark.table("delta_employees")
result = large_df.join(broadcast(small_df),
large_df.department == small_df.dept_id)
-- Cel 4: Gebruik CACHE voor vaak gebruikte queries
CACHE TABLE amsterdam_employees AS
SELECT * FROM delta_employees
WHERE city = 'Amsterdam';
-- Controleer of tabel gecached is
SHOW TABLES;
-- Uncache wanneer niet meer nodig
UNCACHE TABLE IF EXISTS amsterdam_employees;
-- Cel 5: Gebruik hint voor specifieke join strategie
SELECT /*+ MERGEJOIN(a, b) */ a.*, b.dept_name
FROM delta_employees a
JOIN dept b ON a.department = b.dept_id;
-- Andere hints:
-- /*+ BROADCAST(t) */ voor broadcast join
-- /*+ SHUFFLE_HASH(t) */ voor shuffle hash join
-- /*+ MERGE(t) */ voor sort merge join
-- /*+ COALESCE(3) */ voor output partitionering
-- /*+ REPARTITION(10) */ voor herverdeling
-- Cel 6: Monitor query performance met Spark UI
-- Ga naar Clusters -> Spark UI -> SQL tab
-- Zie query details, stages, en performance metrics
Performance Metrics Monitoring
# Python: Verzamel query metrics
import time
start_time = time.time()
# Voer query uit
result = spark.sql("""
SELECT department, AVG(salary) as avg_salary
FROM delta_employees
GROUP BY department
""")
# Forceer execution en meet tijd
row_count = result.count()
end_time = time.time()
execution_time = end_time - start_time
print(f"Query executed in {execution_time:.2f} seconds")
print(f"Returned {row_count} rows")
# Verzamel Spark metrics
metrics = spark.sparkContext.getExecutorMemoryStatus
print("Executor memory status:", metrics)
4.8 Databricks SQL vs Spark-SQL
Vergelijking: Databricks SQL Warehouse vs Notebook SQL
| Feature | Databricks SQL Warehouse | Notebook Spark-SQL |
|---|---|---|
| Use Case | BI & Analytics, ad-hoc queries | Data engineering, ETL, data science |
| Interface | Web UI, SQL Editor, BI tools | Notebooks, programmatisch |
| Performance | Geoptimaliseerd voor SQL, vectorized | Algemeen Spark engine |
| Concurrency | Hoge concurrency, query queuing | Gedeelde cluster resources |
| BI Integratie | Native Power BI, Tableau, etc. | Via JDBC/ODBC |
| Kosten | DBU/hour + compute | Cluster kosten |
-- Databricks SQL Voorbeeld (in SQL Warehouse)
-- Maak een query in Databricks SQL Editor:
-- 1. Maak een SQL Warehouse in Databricks UI
-- 2. Configureer grootte (Small, Medium, Large, etc.)
-- 3. Schrijf queries zoals gewoonlijk
-- Voorbeeld dashboard query
WITH monthly_sales AS (
SELECT
DATE_TRUNC('month', sale_date) AS sale_month,
product,
SUM(amount) AS total_sales,
COUNT(*) AS transaction_count
FROM sales
GROUP BY 1, 2
)
SELECT
sale_month,
product,
total_sales,
transaction_count,
total_sales / transaction_count AS avg_transaction_value,
SUM(total_sales) OVER (
PARTITION BY product
ORDER BY sale_month
) AS running_total_product
FROM monthly_sales
ORDER BY sale_month, product;
4.9 Best Practices en Veelgemaakte Fouten
Veelgemaakte Fouten en Oplossingen
1. Performance Issues:
- Fout: SELECT * op grote tabellen
- Oplossing: Specificeer altijd benodigde kolommen
- Fout: Cross joins op grote datasets
- Oplossing: Gebruik voorwaarden of vermijd cross joins
- Fout: Geen partitionering op grote tabellen
- Oplossing: Partitioneer op veelgebruikte filter kolommen
2. Data Quality Issues:
- Fout: NULL handling vergeten
- Oplossing: Gebruik COALESCE, IS NULL checks
- Fout: Geen data type validatie
- Oplossing: Gebruik schema enforcement in Delta Lake
- Fout: Duplicate data door joins
- Oplossing: Gebruik DISTINCT of aggregate functies
3. Cost Issues:
- Fout: Clusters draaien onnodig lang
- Oplossing: Auto-termination instellen
- Fout: Ongeoptimaliseerde queries
- Oplossing: Gebruik EXPLAIN en monitor performance
- Fout: Te grote clusters voor workload
- Oplossing: Right-size clusters en gebruik auto-scaling
Praktische Opdrachten
Hands-on Oefeningen
1
Opdracht 1: Complexe Data Analyse Query
Doel: Bouw een complete business intelligence rapportage
- Maak een dataset met minstens 1000 rijen (gebruik generate_series of importeer een dataset)
- Schrijf queries met: window functions, CTEs, meerdere joins
- Implementeer zowel DataFrame API als SQL versies
- Voeg UDFs toe voor custom business logic
- Optimaliseer query performance met hints en caching
2
Opdracht 2: ETL Pipeline met Delta Lake
Doel: Een complete data pipeline bouwen met SQL
- Lees data uit meerdere bronnen (CSV, JSON, Parquet)
- Transformeer data met complexe SQL queries
- Schrijf resultaten naar Delta Lake tables
- Implementeer SCD Type 2 (Slowly Changing Dimensions)
- Voeg time travel queries toe voor historische analyse
- Optimaliseer met partitionering en Z-ordering
3
Opdracht 3: Performance Benchmark
Doel: Verschillende SQL benaderingen vergelijken
- Schrijf dezelfde query in: pure SQL, DataFrame API, Pandas UDF
- Meet performance voor verschillende data groottes
- Experimenteer met verschillende join strategieën
- Test caching vs geen caching
- Documenteer bevindingen en maak aanbevelingen
Verdiepende Bronnen
Officiële Documentatie en Guides
Spark-SQL Documentatie:
- Spark SQL Programming Guide
- Databricks SQL Documentation
- Delta Lake Documentation
- PySpark SQL API Reference
Performance Guides:
DataPartner365 Resources:
Certificering en Verdere Training
Spark en Databricks Certificeringen:
- Databricks Certified Associate Developer for Apache Spark: Focus op Spark-SQL en DataFrame API
- Databricks Certified Data Analyst Associate: Geavanceerde SQL en query optimalisatie
- Apache Spark Developer Certification: Diepgaande Spark kennis inclusief SQL
DataPartner365 Vervolgcursussen: