Domain 2 β€” Module 7 of 10 70%
15 of 26 overall
Domain 2: Ingest and Transform Data Free ⏱ ~16 min read

PySpark Transformations: Code Your Pipeline

Write PySpark to transform data at scale β€” joins, aggregations, window functions, denormalization, and group-by patterns with line-by-line explanations.

PySpark in Fabric

Simple explanation

Think of PySpark as Python with superpowers.

Regular Python processes data on one computer. PySpark distributes the work across many computers in parallel. A transformation that takes 2 hours in Python might take 3 minutes in PySpark β€” because 40 machines share the load.

In Fabric, you write PySpark in notebooks. The notebook connects to a Spark cluster automatically. You read from lakehouses, transform the data, and write back to lakehouses β€” all using DataFrame operations that look a lot like pandas.

Reading and writing data

Read from a lakehouse

# Read a Delta table
df_orders = spark.read.format("delta").load("Tables/FactOrders")

# Or use the table name directly
df_orders = spark.table("lakehouse.FactOrders")

# Read raw files
df_csv = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("Files/raw/orders/*.csv")

Write to a lakehouse

# Write as Delta table (overwrite)
df_result.write.format("delta") \
    .mode("overwrite") \
    .save("Tables/FactOrdersCleaned")

# Write as Delta table (append)
df_new_rows.write.format("delta") \
    .mode("append") \
    .save("Tables/FactOrders")

# Save as managed table
df_result.write.saveAsTable("lakehouse.FactOrdersCleaned")

Common transformations

Filtering rows

# Keep only completed orders from 2026
df_filtered = df_orders.filter(
    (col("status") == "completed") &   # What's happening: keep only completed
    (year(col("order_date")) == 2026)   # AND only from 2026
)

Selecting and renaming columns

df_clean = df_orders.select(
    col("order_id"),                              # Keep as-is
    col("customer_name").alias("customer"),        # Rename
    col("total_amount").cast("decimal(10,2)"),     # Cast type
    upper(col("country")).alias("country_code")    # Transform + rename
)

Joins (denormalization)

Denormalization in PySpark is a join β€” combine normalised source tables into wider analytics tables.

# Join orders with customers and products
df_denormalized = df_orders \
    .join(df_customers, df_orders.customer_id == df_customers.customer_id, "left") \
    .join(df_products, df_orders.product_id == df_products.product_id, "left") \
    .select(
        df_orders.order_id,
        df_orders.order_date,
        df_customers.customer_name,      # From customers table
        df_customers.city,               # Flattened into one row
        df_products.product_name,        # From products table
        df_products.category,            # Flattened into one row
        df_orders.quantity,
        df_orders.revenue
    )

What’s happening: Three normalised tables (orders, customers, products) become one wide fact table with customer and product attributes included. This is denormalization.

Scenario: Carlos denormalizes production data

Carlos needs to create a FactProduction table for Precision Manufacturing’s Power BI reports. The source data is normalised across 5 SAP tables: ProductionBatches, Machines, Products, Factories, Shifts.

df_fact = df_batches \
    .join(df_machines, "machine_id", "left") \
    .join(df_products, "product_id", "left") \
    .join(df_factories, "factory_id", "left") \
    .join(df_shifts, "shift_id", "left") \
    .select(
        "batch_id", "production_date",
        "factory_name", "factory_region",
        "machine_type", "product_name", "product_category",
        "shift_name", "units_produced", "units_defective"
    )

One wide table. No joins needed at query time. Power BI reports load instantly.

Grouping and aggregation

from pyspark.sql.functions import sum, avg, count, max, year, month, year, month

# Revenue by product category per month
df_summary = df_orders \
    .groupBy(
        year("order_date").alias("year"),           # Group by year
        month("order_date").alias("month"),         # and month
        "product_category"                          # and category
    ) \
    .agg(
        sum("revenue").alias("total_revenue"),      # Sum revenue
        count("order_id").alias("order_count"),     # Count orders
        avg("revenue").alias("avg_order_value")     # Average order value
    ) \
    .orderBy("year", "month", "product_category")   # Sort results

Window functions

Window functions calculate values across a set of rows related to the current row β€” without collapsing them (unlike groupBy).

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, lag, sum as spark_sum

# Running total of revenue per customer, ordered by date
window_spec = Window \
    .partitionBy("customer_id") \
    .orderBy("order_date") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_with_running_total = df_orders.withColumn(
    "running_revenue",
    spark_sum("revenue").over(window_spec)   # Running sum within each customer
)

# Previous order date (for calculating days between orders)
df_with_prev = df_orders.withColumn(
    "prev_order_date",
    lag("order_date", 1).over(
        Window.partitionBy("customer_id").orderBy("order_date")
    )
)
What's happening: Window functions explained

Window functions let you compute values across rows without losing detail:

  • partitionBy("customer_id") β€” calculate separately for each customer (like groupBy, but keeps all rows)
  • orderBy("order_date") β€” within each partition, rows are ordered by date
  • rowsBetween(unboundedPreceding, currentRow) β€” the window frame: from the first row in the partition up to the current row
  • sum("revenue").over(window_spec) β€” running total: adds up revenue from the first order to the current one

Result: every row keeps its individual data AND gets a running total column. GroupBy would collapse rows into summaries.

Handling nulls

# Drop rows where critical columns are null
df_clean = df_orders.dropna(subset=["customer_id", "order_date"])

# Fill nulls with defaults
df_filled = df_orders.fillna({
    "discount": 0.0,          # Missing discount = no discount
    "shipping_method": "standard"  # Default shipping
})

# Replace specific values
df_fixed = df_orders.withColumn(
    "status",
    when(col("status").isNull(), "unknown")  # Null β†’ "unknown"
    .otherwise(col("status"))                 # Keep existing
)

Writing patterns comparison

Choose the write mode based on your loading pattern
Write ModeBehaviourUse Case
overwriteReplace entire table/partitionFull refresh of small tables, rebuilding fact tables
appendAdd new rows to existing tableAdding new daily data, streaming micro-batches
MERGE INTOUpdate existing + insert new (upsert)Incremental loads, SCD Type 1, deduplication
overwriteDynamicPartitionReplace only the partitions present in the write dataRefreshing specific date partitions without touching others

Question

What is the difference between groupBy and a window function?

Click or press Enter to reveal answer

Answer

groupBy collapses rows into summary rows (one row per group). Window functions calculate across rows but KEEP every individual row β€” each row gets the aggregated value added as a new column. Use groupBy for summaries, window functions for running totals, rankings, and lag/lead calculations.

Click to flip back

Question

What does denormalization look like in PySpark?

Click or press Enter to reveal answer

Answer

A series of joins that combine normalised source tables into one wide table. Example: df_orders.join(df_customers, 'customer_id').join(df_products, 'product_id') β€” flattening customer and product attributes into the orders table.

Click to flip back

Question

What write mode should you use for an incremental upsert in PySpark?

Click or press Enter to reveal answer

Answer

MERGE INTO (Spark SQL) β€” matches source and target rows on a key, updates existing matches, and inserts non-matches. Not .mode('append') (creates duplicates) or .mode('overwrite') (replaces everything).

Click to flip back


Knowledge Check

Carlos needs to calculate a 7-day rolling average of defect rates per factory, keeping every individual row in the output. Which PySpark approach should he use?

Knowledge Check

A PySpark notebook reads data from three normalised source tables (Orders, Customers, Products), joins them, and writes a single wide table to the lakehouse. What data engineering technique is this?

Next up: Transform Data with SQL & KQL β€” use T-SQL and Kusto Query Language for transformations in warehouses and Eventhouses.