DataPartner365 Learning

Gratis cursussen en trainingen voor data professionals

Module 4 van 4

Spark-SQL Essentials

Complete handleiding voor SQL queries, DataFrame API en geavanceerde query technieken in Databricks

90-120 minuten
Intermediate niveau
40+ praktische voorbeelden

Cursus Modules

Module 4: Spark-SQL Essentials in Databricks

Leerdoelen

Na deze module kun je:

  • SQL queries schrijven in Databricks notebooks
  • De DataFrame API effectief gebruiken
  • Temporary views en Global Temporary Views aanmaken
  • Geavanceerde SQL functies gebruiken (window functions, UDFs)
  • Data queryen uit Delta Lake tables
  • Query performance optimaliseren
  • Complexe joins en aggregaties uitvoeren

4.1 Spark-SQL Fundamentals

Spark-SQL is de module van Apache Spark voor gestructureerde dataverwerking. Het biedt SQL-achtige query's op distributed data via DataFrame API of directe SQL queries.

Feature Beschrijving Voordelen
DataFrame API Relationele API in Scala, Java, Python, R Type-safe, IDE ondersteuning, compile-time checking
SQL Interface ANSI SQL:2003 compatibele query taal Vertrouwde syntax voor SQL gebruikers
Catalyst Optimizer Query optimizer met regel- en cost-based optimalisatie Automatische performance optimalisatie
Tungsten Engine Geheugen- en CPU-geoptimaliseerde execution engine 10-100x snellere execution dan traditionele Spark
1

Stap 1: Je Eerste SQL Query in Databricks

-- Cel 1: Maak een nieuwe notebook en selecteer SQL als default language -- Of gebruik een Python notebook met SQL magic commands -- Cel 2: Maak een sample dataset CREATE OR REPLACE TEMPORARY VIEW employees AS SELECT * FROM ( VALUES (1, 'Jan', 'Jansen', 'IT', 5000, 'Amsterdam'), (2, 'Piet', 'Pietersen', 'Sales', 4500, 'Rotterdam'), (3, 'Marie', 'de Vries', 'IT', 6000, 'Utrecht'), (4, 'Klaas', 'Bakker', 'HR', 4000, 'Den Haag'), (5, 'Lisa', 'Smit', 'Sales', 4800, 'Amsterdam'), (6, 'Thomas', 'Mulder', 'IT', 5500, 'Rotterdam'), (7, 'Emma', 'Bos', 'Marketing', 4200, 'Utrecht'), (8, 'David', 'Visser', 'IT', 5200, 'Amsterdam') ) AS t(id, first_name, last_name, department, salary, city); -- Cel 3: Eenvoudige SELECT query SELECT * FROM employees; -- Cel 4: Query met filtering en ordering SELECT first_name, last_name, department, salary FROM employees WHERE department = 'IT' ORDER BY salary DESC; -- Cel 5: Aggregatie functies SELECT department, COUNT(*) AS employee_count, AVG(salary) AS avg_salary, MAX(salary) AS max_salary, MIN(salary) AS min_salary FROM employees GROUP BY department ORDER BY avg_salary DESC;
Tip: SQL Magic Commands in Python Notebooks

In Python notebooks kun je SQL queries uitvoeren met magic commands:

# Python cel %sql SELECT * FROM employees LIMIT 5; # Of gebruik spark.sql() functie result = spark.sql("SELECT department, COUNT(*) as count FROM employees GROUP BY department") result.show()

4.2 DataFrame API vs SQL

Vergelijking: DataFrame API vs SQL

Aspect DataFrame API (Python/Scala) SQL
Syntax Method chaining, function calls Declaratieve SQL statements
Type Safety Compile-time type checking (Scala) Runtime type checking
IDE Support Autocomplete, refactoring Beperkte IDE ondersteuning
Performance Zelfde (beide gebruiken Catalyst optimizer) Zelfde (beide gebruiken Catalyst optimizer)
Learning Curve Programmeurs, data scientists Analisten, SQL experts
Best Voor Complexe business logic, ETL pipelines Ad-hoc queries, rapportage, analytics
2

Stap 2: Dezelfde Query in DataFrame API en SQL

# OPTIE 1: DataFrame API (Python) from pyspark.sql import functions as F # Filteren en aggregatie result_df = (spark.table("employees") .filter(F.col("department") == "IT") .groupBy("city") .agg( F.count("*").alias("employee_count"), F.avg("salary").alias("avg_salary") ) .orderBy(F.desc("avg_salary")) ) result_df.show() # OPTIE 2: SQL (zelfde resultaat) -- In een SQL cel of met %sql magic SELECT city, COUNT(*) AS employee_count, AVG(salary) AS avg_salary FROM employees WHERE department = 'IT' GROUP BY city ORDER BY avg_salary DESC;
Conversie tussen DataFrame en SQL
# DataFrame naar Temporary View df.createOrReplaceTempView("my_temp_view") # SQL query op DataFrame via SQL sql_result = spark.sql("SELECT * FROM my_temp_view WHERE salary > 5000") # Global Temporary View (beschikbaar in alle notebooks) df.createOrReplaceGlobalTempView("my_global_view") # Toegang tot Global Temporary View global_result = spark.sql("SELECT * FROM global_temp.my_global_view")

4.3 Geavanceerde SQL Functies

3

Stap 3: Window Functions en CTEs

-- Cel 1: Common Table Expressions (CTEs) -- Handig voor complexe queries WITH department_stats AS ( SELECT department, AVG(salary) AS dept_avg_salary, COUNT(*) AS dept_count FROM employees GROUP BY department ), employee_ranking AS ( SELECT e.*, d.dept_avg_salary, d.dept_count FROM employees e JOIN department_stats d ON e.department = d.department ) SELECT * FROM employee_ranking; -- Cel 2: Window Functions voor ranking en analytics SELECT first_name, last_name, department, salary, -- Rank binnen department op basis van salary RANK() OVER ( PARTITION BY department ORDER BY salary DESC ) AS salary_rank_in_dept, -- Running total per department SUM(salary) OVER ( PARTITION BY department ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS running_total_dept, -- Vergelijk met department gemiddelde salary - AVG(salary) OVER (PARTITION BY department) AS diff_from_dept_avg, -- Percentile binnen company PERCENT_RANK() OVER ( ORDER BY salary ) AS salary_percentile FROM employees ORDER BY department, salary DESC;
-- Cel 3: Geavanceerde Date en String functies CREATE OR REPLACE TEMPORARY VIEW sales AS SELECT * FROM ( VALUES (1, '2024-01-15', 'Laptop', 1200.50), (2, '2024-01-16', 'Monitor', 350.75), (3, '2024-02-01', 'Keyboard', 89.99), (4, '2024-02-15', 'Mouse', 45.50), (5, '2024-03-10', 'Laptop', 1350.00), (6, '2024-03-15', 'Monitor', 399.99) ) AS t(id, sale_date, product, amount); SELECT id, sale_date, DATE_FORMAT(sale_date, 'yyyy-MM') AS sale_month, DAY(sale_date) AS day_of_month, MONTH(sale_date) AS month_number, YEAR(sale_date) AS sale_year, product, UPPER(product) AS product_upper, SUBSTRING(product, 1, 3) AS product_short, amount, ROUND(amount, 0) AS amount_rounded, FORMAT_NUMBER(amount, 2) AS amount_formatted FROM sales;

4.4 Joins en Complexe Queries

4

Stap 4: Alle Join Types met Voorbeelden

-- Maak sample tabellen voor joins CREATE OR REPLACE TEMPORARY VIEW dept AS SELECT * FROM ( VALUES ('IT', 'Information Technology', 'Building A'), ('Sales', 'Sales Department', 'Building B'), ('HR', 'Human Resources', 'Building C'), ('Finance', 'Finance Department', 'Building D') ) AS t(dept_id, dept_name, location); -- INNER JOIN: Alleen matching rijen SELECT e.first_name, e.last_name, e.department, d.dept_name, d.location FROM employees e INNER JOIN dept d ON e.department = d.dept_id; -- LEFT JOIN: Alle rijen van employees, matching van dept SELECT e.first_name, e.last_name, e.department, d.dept_name, d.location FROM employees e LEFT JOIN dept d ON e.department = d.dept_id; -- FULL OUTER JOIN: Alle rijen van beide tabellen SELECT COALESCE(e.department, d.dept_id) AS department_code, e.first_name, e.last_name, d.dept_name, d.location FROM employees e FULL OUTER JOIN dept d ON e.department = d.dept_id; -- CROSS JOIN: Cartesiaans product (alle combinaties) -- Wees voorzichtig met grote datasets! SELECT e.first_name, d.dept_name FROM employees e CROSS JOIN dept d LIMIT 10;
-- Complexe joins met meerdere voorwaarden CREATE OR REPLACE TEMPORARY VIEW employee_projects AS SELECT * FROM ( VALUES (1, 'Project Alpha', '2024-01-01', '2024-06-30'), (3, 'Project Beta', '2024-02-01', '2024-08-31'), (1, 'Project Gamma', '2024-03-01', '2024-09-30'), (6, 'Project Alpha', '2024-01-01', '2024-06-30'), (8, 'Project Delta', '2024-04-01', '2024-12-31') ) AS t(employee_id, project_name, start_date, end_date); -- JOIN met meerdere tabellen en complexe WHERE clause SELECT e.first_name, e.last_name, e.department, d.dept_name, p.project_name, p.start_date, p.end_date, DATEDIFF(p.end_date, p.start_date) AS project_duration_days FROM employees e INNER JOIN dept d ON e.department = d.dept_id LEFT JOIN employee_projects p ON e.id = p.employee_id WHERE e.salary > 4500 AND (p.project_name IS NOT NULL OR e.department = 'HR') ORDER BY e.department, e.salary DESC;

4.5 Werken met Delta Lake Tables

Delta Lake: Transactionele Data Lake

Delta Lake voegt ACID transactions, schema enforcement en time travel toe aan data lakes.

5

Stap 5: Delta Lake Operations met SQL

-- Cel 1: Maak een Delta Lake table CREATE OR REPLACE TABLE delta_employees USING delta AS SELECT * FROM employees; -- Cel 2: Query Delta table (zelfde als gewone table) SELECT * FROM delta_employees WHERE department = 'IT'; -- Cel 3: INSERT data in Delta table INSERT INTO delta_employees VALUES (9, 'Sara', 'Jansen', 'IT', 5300, 'Amsterdam'), (10, 'Mark', 'de Wit', 'Finance', 4800, 'Rotterdam'); -- Cel 4: UPDATE data in Delta table UPDATE delta_employees SET salary = salary * 1.05 -- 5% salarisverhoging WHERE department = 'IT'; -- Cel 5: DELETE data van Delta table DELETE FROM delta_employees WHERE salary < 4000; -- Cel 6: MERGE (UPSERT) operatie CREATE OR REPLACE TEMPORARY VIEW updates AS SELECT * FROM ( VALUES (1, 'Jan', 'Jansen', 'IT', 5250, 'Amsterdam'), -- Update bestaande (11, 'Anna', 'Koster', 'Marketing', 4100, 'Utrecht') -- Nieuwe rij ) AS t(id, first_name, last_name, department, salary, city); MERGE INTO delta_employees AS target USING updates AS source ON target.id = source.id WHEN MATCHED THEN UPDATE SET target.first_name = source.first_name, target.last_name = source.last_name, target.salary = source.salary WHEN NOT MATCHED THEN INSERT *;
-- Cel 7: Delta Lake Time Travel - historische data queryen -- Toon huidige versie DESCRIBE HISTORY delta_employees; -- Query vorige versie (bijv. versie 1) SELECT * FROM delta_employees VERSION AS OF 1; -- Query op specifiek tijdstip SELECT * FROM delta_employees TIMESTAMP AS OF '2024-01-01 10:00:00'; -- Herstel naar vorige versie RESTORE TABLE delta_employees TO VERSION AS OF 1; -- Cel 8: Delta Lake VACUUM (verwijder oude bestanden) VACUUM delta_employees RETAIN 168 HOURS; -- Bewaar data van laatste 7 dagen -- Cel 9: Delta Lake OPTIMIZE (compact kleine bestanden) OPTIMIZE delta_employees WHERE department = 'IT'; -- Z-ORDER voor betere performance

4.6 User-Defined Functions (UDFs)

6

Stap 6: Maak en Gebruik UDFs

# OPTIE 1: Python UDFs (eenvoudig maar langzaam) from pyspark.sql.functions import udf from pyspark.sql.types import StringType # Definieer een Python functie def categorize_salary(salary): if salary < 4000: return "Junior" elif salary < 5500: return "Medior" else: return "Senior" # Registreer als UDF salary_category_udf = udf(categorize_salary, StringType()) # Gebruik de UDF df_with_category = spark.table("employees")\ .withColumn("category", salary_category_udf("salary")) df_with_category.show() # OPTIE 2: Pandas UDFs (veel sneller met vectorization) import pandas as pd from pyspark.sql.functions import pandas_udf from pyspark.sql.types import DoubleType # Pandas UDF voor berekeningen @pandas_udf(DoubleType()) def calculate_bonus(salary: pd.Series) -> pd.Series: # Deze functie wordt vectorized uitgevoerd return salary * 0.10 # 10% bonus # Gebruik Pandas UDF df_with_bonus = spark.table("employees")\ .withColumn("bonus", calculate_bonus("salary")) df_with_bonus.show()
# OPTIE 3: SQL UDFs (register Python functies voor SQL gebruik) # Definieer functie def get_initials(first_name, last_name): return f"{first_name[0]}.{last_name[0]}." if first_name and last_name else "" # Registreer als SQL functie spark.udf.register("get_initials", get_initials) # Nu kan je het in SQL gebruiken! %sql SELECT first_name, last_name, get_initials(first_name, last_name) AS initials FROM employees; # OPTIE 4: Scala UDFs (snelst, voor performance kritieke code) // In een Scala notebook of cell: // Definieer Scala UDF val calculateTax: (Double) => Double = (salary: Double) => { if (salary < 5000) salary * 0.30 else salary * 0.40 } // Registreer UDF spark.udf.register("calculate_tax", calculateTax) // Gebruik in SQL spark.sql(""" SELECT salary, calculate_tax(salary) as tax FROM employees """).show()

4.7 Performance Optimalisatie

Performance Best Practices

1. Data Layout Optimalisatie:
  • Partitionering: Partitioneer op veelgebruikte filter kolommen
  • Z-Ordering: Voor multi-dimensional query versnelling
  • File Grootte: Optimaliseer naar 128MB - 1GB per bestand
2. Query Optimalisatie:
  • Predicate Pushdown: Filter vroeg in query
  • Projectie Pushdown: Selecteer alleen benodigde kolommen
  • Vermijd SELECT *: Specificeer altijd kolommen
  • Gebruik EXPLAIN: Analyseer query execution plan
3. Cache Strategieën:
  • Delta Cache: Automatisch caching voor herhaalde reads
  • Spark Cache: Voor kleine, vaak gebruikte datasets
  • Result Cache: Voor vaak draaiende rapporten
7

Stap 7: Query Performance Analyse en Optimalisatie

-- Cel 1: ANALYZE TABLE voor statistieken ANALYZE TABLE delta_employees COMPUTE STATISTICS; -- Toon tabel statistieken DESCRIBE EXTENDED delta_employees; -- Cel 2: EXPLAIN plan om query execution te begrijpen EXPLAIN EXTENDED SELECT department, AVG(salary) AS avg_salary FROM delta_employees WHERE city = 'Amsterdam' GROUP BY department; -- Cel 3: Forceer broadcast join voor kleine tabellen -- SQL manier SELECT /*+ BROADCAST(d) */ e.*, d.dept_name FROM delta_employees e JOIN dept d ON e.department = d.dept_id; -- DataFrame API manier from pyspark.sql.functions import broadcast small_df = spark.table("dept") large_df = spark.table("delta_employees") result = large_df.join(broadcast(small_df), large_df.department == small_df.dept_id)
-- Cel 4: Gebruik CACHE voor vaak gebruikte queries CACHE TABLE amsterdam_employees AS SELECT * FROM delta_employees WHERE city = 'Amsterdam'; -- Controleer of tabel gecached is SHOW TABLES; -- Uncache wanneer niet meer nodig UNCACHE TABLE IF EXISTS amsterdam_employees; -- Cel 5: Gebruik hint voor specifieke join strategie SELECT /*+ MERGEJOIN(a, b) */ a.*, b.dept_name FROM delta_employees a JOIN dept b ON a.department = b.dept_id; -- Andere hints: -- /*+ BROADCAST(t) */ voor broadcast join -- /*+ SHUFFLE_HASH(t) */ voor shuffle hash join -- /*+ MERGE(t) */ voor sort merge join -- /*+ COALESCE(3) */ voor output partitionering -- /*+ REPARTITION(10) */ voor herverdeling -- Cel 6: Monitor query performance met Spark UI -- Ga naar Clusters -> Spark UI -> SQL tab -- Zie query details, stages, en performance metrics
Performance Metrics Monitoring
# Python: Verzamel query metrics import time start_time = time.time() # Voer query uit result = spark.sql(""" SELECT department, AVG(salary) as avg_salary FROM delta_employees GROUP BY department """) # Forceer execution en meet tijd row_count = result.count() end_time = time.time() execution_time = end_time - start_time print(f"Query executed in {execution_time:.2f} seconds") print(f"Returned {row_count} rows") # Verzamel Spark metrics metrics = spark.sparkContext.getExecutorMemoryStatus print("Executor memory status:", metrics)

4.8 Databricks SQL vs Spark-SQL

Vergelijking: Databricks SQL Warehouse vs Notebook SQL

Feature Databricks SQL Warehouse Notebook Spark-SQL
Use Case BI & Analytics, ad-hoc queries Data engineering, ETL, data science
Interface Web UI, SQL Editor, BI tools Notebooks, programmatisch
Performance Geoptimaliseerd voor SQL, vectorized Algemeen Spark engine
Concurrency Hoge concurrency, query queuing Gedeelde cluster resources
BI Integratie Native Power BI, Tableau, etc. Via JDBC/ODBC
Kosten DBU/hour + compute Cluster kosten
-- Databricks SQL Voorbeeld (in SQL Warehouse) -- Maak een query in Databricks SQL Editor: -- 1. Maak een SQL Warehouse in Databricks UI -- 2. Configureer grootte (Small, Medium, Large, etc.) -- 3. Schrijf queries zoals gewoonlijk -- Voorbeeld dashboard query WITH monthly_sales AS ( SELECT DATE_TRUNC('month', sale_date) AS sale_month, product, SUM(amount) AS total_sales, COUNT(*) AS transaction_count FROM sales GROUP BY 1, 2 ) SELECT sale_month, product, total_sales, transaction_count, total_sales / transaction_count AS avg_transaction_value, SUM(total_sales) OVER ( PARTITION BY product ORDER BY sale_month ) AS running_total_product FROM monthly_sales ORDER BY sale_month, product;

4.9 Best Practices en Veelgemaakte Fouten

Veelgemaakte Fouten en Oplossingen

1. Performance Issues:
  • Fout: SELECT * op grote tabellen
  • Oplossing: Specificeer altijd benodigde kolommen
  • Fout: Cross joins op grote datasets
  • Oplossing: Gebruik voorwaarden of vermijd cross joins
  • Fout: Geen partitionering op grote tabellen
  • Oplossing: Partitioneer op veelgebruikte filter kolommen
2. Data Quality Issues:
  • Fout: NULL handling vergeten
  • Oplossing: Gebruik COALESCE, IS NULL checks
  • Fout: Geen data type validatie
  • Oplossing: Gebruik schema enforcement in Delta Lake
  • Fout: Duplicate data door joins
  • Oplossing: Gebruik DISTINCT of aggregate functies
3. Cost Issues:
  • Fout: Clusters draaien onnodig lang
  • Oplossing: Auto-termination instellen
  • Fout: Ongeoptimaliseerde queries
  • Oplossing: Gebruik EXPLAIN en monitor performance
  • Fout: Te grote clusters voor workload
  • Oplossing: Right-size clusters en gebruik auto-scaling

Praktische Opdrachten

Hands-on Oefeningen

1
Opdracht 1: Complexe Data Analyse Query

Doel: Bouw een complete business intelligence rapportage

  1. Maak een dataset met minstens 1000 rijen (gebruik generate_series of importeer een dataset)
  2. Schrijf queries met: window functions, CTEs, meerdere joins
  3. Implementeer zowel DataFrame API als SQL versies
  4. Voeg UDFs toe voor custom business logic
  5. Optimaliseer query performance met hints en caching
2
Opdracht 2: ETL Pipeline met Delta Lake

Doel: Een complete data pipeline bouwen met SQL

  1. Lees data uit meerdere bronnen (CSV, JSON, Parquet)
  2. Transformeer data met complexe SQL queries
  3. Schrijf resultaten naar Delta Lake tables
  4. Implementeer SCD Type 2 (Slowly Changing Dimensions)
  5. Voeg time travel queries toe voor historische analyse
  6. Optimaliseer met partitionering en Z-ordering
3
Opdracht 3: Performance Benchmark

Doel: Verschillende SQL benaderingen vergelijken

  1. Schrijf dezelfde query in: pure SQL, DataFrame API, Pandas UDF
  2. Meet performance voor verschillende data groottes
  3. Experimenteer met verschillende join strategieën
  4. Test caching vs geen caching
  5. Documenteer bevindingen en maak aanbevelingen

Verdiepende Bronnen

Certificering en Verdere Training

Spark en Databricks Certificeringen:

  • Databricks Certified Associate Developer for Apache Spark: Focus op Spark-SQL en DataFrame API
  • Databricks Certified Data Analyst Associate: Geavanceerde SQL en query optimalisatie
  • Apache Spark Developer Certification: Diepgaande Spark kennis inclusief SQL

DataPartner365 Vervolgcursussen: