# Week 3 - Extract, transform, and load (ETL)

This week, we will practice the the ETL process, particularly data extraction and transformation (**Layer 2 in the OLAP Architecture below**).&#x20;

This week, the lab sheet contains 2 parts.

* Part 1: Self-reading. The aim of Part 1 is to help you understand the basic concepts of ETL and data quality.
* Part 2: Practice ETL skills on a simple dataset.

## OLAP Architecture

<figure><img src="/files/kzCUGRZn2OZKB5vjEkmP" alt=""><figcaption><p>OLAP Architecture</p></figcaption></figure>

## Part 1: Theoretical Concepts

### What is ETL?

* **Extract**
  * The process of fetching relevant data from the source systems.
* **Transform**
  * The process in which we apply transformation such as
    * aggregation, joining with other datasets, applying rules, splitting or merging results, lookup, join, pivot, and&#x20;
    * applying the data reduction techniques, and many others.
* **Load**

  * The process of loading data into data warehouse destination tables such as fact and dimensions.
  * Building additional structures to improve system performance.

  The source data should be loaded into the data warehouse in a **consistent** way and be **reliable**.

### ETL Overview

Architecture of data warehousing:

Data sources $$\rightarrow$$Data staging area $$\rightarrow$$Data warehouse

<figure><img src="/files/fILtGZriXCRukkocNa1r" alt=""><figcaption><p>ETL Overview</p></figcaption></figure>

### ETL Lifecycle

<figure><img src="/files/CrZSlbu8GJnUaoKyMyLV" alt=""><figcaption><p>ETL Liftcycle</p></figcaption></figure>

### Extract and Transform in ETL

Data transfer or ETL is an important component of the BI and data warehousing system.

* The extract step should be able to pick data from all these kinds of data sources.&#x20;
  * Data extraction is a process of fetching relevant data from the source systems. Data extraction is a process of fetching relevant data from the source systems. Data might come from different sources such as an Excel file, a .csv file, a relational database, or a website.
  * The transformation step should be careful to ensure this data is consistent.
    * Build keys; apply data-cleansing rules on the dataset.
  * Fetch the data in an incremental method.
    * Data in the source system might be removed or replaced by newer data records.
    * Some data rows may appear only once in the data source and be replaced with new data.

### Loading Data to the Data Warehouse

<figure><img src="/files/zh4hR4W1VL3oAbOu1xP8" alt=""><figcaption><p>Loading data to the data warehouse</p></figcaption></figure>

### ETL Major Steps

* Determine the data you need and don't need.
* Determine all the data sources, both internal and external.
* Establish comprehensive data extraction, cleansing rules and transformation rules.
* Plan for aggregated tables.
* Organise data staging area and test tools.
* Write a procedure for all data loads.
* Loading dimension tables and fact tables.

### Data Staging

A staging area ensures that

* We don't spend any extra activities in the extract step to convert or transform data, and also.
* The data won't be loaded all at once into the memory.

Extracting data from sources into an integrated database comes with some challenges.&#x20;

* For very big datasets, fetching data from sources shouldn't be combined with any kind of conversion or transformation.&#x20;
  * The main reason is that it will reduce the performance and slow down the whole ETL process.
  * Loading a very large dataset into memory will require a high amount of server resources that are not available at all times

### Logical Data Map

The logical data map describes the relationship between the extreme starting points and the extreme ending points of the ETL system usually presented in a table or spreadsheet.

#### Components of Logical Data Map

The logical data map is usually presented in a table or spreadsheet format and includes the following specific components:

* **Target table name**: The physical name of the table as it appears in a data warehouse.
* **Target column name**: The name of the column in a data warehouse table.
* **Table type**: Indicates if the table is a fact or dimension.

#### The Logical Data Map Example

<figure><img src="/files/4tT9EwgZPEO4SwWew3k8" alt=""><figcaption><p>The logical data map example</p></figcaption></figure>

#### Hints of Logical Data Map

* **Have a plan**: the foundation of the metadata
* **Identify data source candidates**: Identify the likely candidate data sources you believe will support the decisions needed by the business community
* **Analyse source systems with a data-profiling tool**: Detected data anomalies must be documented, and best efforts must be made to apply appropriate business rules to rectify data before it is loaded into the data warehouse.

### Data Extraction: Data Discovery Phase

**Context**: The analysis of the source system is usually divided into two major phases:

* The data discovery phase
* The anomaly detection phase

Data Discovery Phase key criterion for the success of the data warehouse is the cleanliness and cohesiveness of the data within it.

Once you understand what the target needs to look like, you need to identify and examine the data sources.

#### Physical Data Extraction

**Online Extraction**

* Data is extracted directly from the source system.
* May access source tables through an intermediate system.
* The intermediate system is usually similar to the source system.

**Offline Extraction**

* Data NOT extracted directly from the source system, instead staged explicitly outside the original source system.
* Data is either already structured or was created by an extraction routine.

#### Logical Data Extraction

**Full Extraction**

* The data is extracted completely from the source system.
* No need to keep track of changes.
* Source data is made available as-is with any additional information.

**Incremental Extraction**

* Data is extracted after a well-defined point/event in time.
* The mechanism used to reflect/record the temporal changes in data (column or table).
* Sometimes entire tables off-loaded from a source system into the Data Warehouse.
* Can have significant performance impacts on the data warehouse server.

### Data Transformation

The data after transformation should be

* Correct
* Unambiguous
* Consistent
* Complete

Data quality checks are run at 2 places

* after extraction and
* after cleaning and confirming

#### Data Transformation: Cleaning Data

**Anomaly Detection**

* E.g. a row significantly different from the rest

**Column Property Enforcement**

* Null Values in required columns
* Numeric values that fall outside of expected highs and lows
* The length of an attribute is exceptionally short/long
* Columns with certain values outside of discrete valid value sets
* Adherence to a required pattern/member of a set of pattern

#### Transformation - Confirming

Confirmation can be done in various ways

* Structure Enforcement
* Tables have proper primary and foreign keys
* Obey referential integrity
* Data and rule value enforcement
* Simple business rules
* Logical data checks

### What is data quality?

Data quality is a perception or an assessment of data’s fitness to serve its purpose in a given context.

<figure><img src="/files/tJIYfJ006Fc27HhqFsFh" alt=""><figcaption><p>Data quality lifycycle</p></figcaption></figure>

Data quality is described by several dimensions

* Correctness/Accuracy
* Consistency
* Completeness
* Timeliness
* Integrity/Validity
  * Integrity ensures that all data in a database can be traced and connected to other data

### Data Cleansing and Matching

It is the process of identifying and correcting dirty data. Dirty data means incomplete, wrong, duplicate, or out-of-date data. The data warehousing community often uses the words cleanse or scrub rather than clean

Data matching is to determine that one data item is the same as another data item. Data matching is used to identify duplicate records. Matching is particularly relevant for character-based data types.

* Numeric Data Types
* Datetime Data Types
  * For example, is 03/01/2021 the same as 01/03/2021? Is it the same as 2021-03-01T00:00:00Z+06?
  * We need some logic here to match them, such as by comparing the components or by comparing the date, month, and year.
* Character-based Data Types
  * For example, a customer named “Mr. Aleck Stevenson”. We need to match/recognise that “Mr. Aleck Stevenson” is the same as “Mr. S Aleck”, “Mr. Stevenson Aleck” and “Mr. Alec Stevenson”.

### Data Quality Validation

Data quality rules (data validation) are filters to prevent dirty data from getting into the warehouse. Based on the data location, there are three kinds of validation

* Incoming data validation
  * Checking the incoming data only on its own, without referring to the data already in the warehouse.
  * These rules verify that the data from the source systems is valid, e.g. within the expected range and in the right format
* Cross-Reference Validation
  * Cross-reference validation is where we check the incoming data against the data in the warehouse.
  * To make sure that the value of the incoming data is within a certain range that is calculated based on data in the warehouse
  * Like incoming data validation, cross-reference validation is performed on the fly when the data is being loaded into the warehouse.
* Data Warehouse Internal Validation
  * Data warehouse internal validation is where we check the data already in the warehouse.
    * We don't check the incoming data
  * Unlike the previous two, data warehouse internal validation is performed after the incoming data is fully loaded into the warehouse.
  * The purpose of doing this is to verify the quality of the data in the warehouse at the aggregate level.
    * This can be done by comparing the totals over a period of time against a known standard value.

## Part 2: ETL Practices

Consider the workflow and try to practice the solution for the scenario below.

### Description

The dataset below is an original transaction dataset. The file contains 20 transactions with the following columns:

* Transaction\_ID: Unique identifier for each transaction
* Date: Date of purchase
* Customer\_Name: Name of the customer
* Gender: Gender of the customer (M/F)
* Customer\_Type: Type of customer (Regular/Premium)
* Product: Name of the product purchased
* Product\_Category: Category of the product
* Unit\_Price: Price per unit of the product
* Quantity: Number of units purchased
* Discount: Discount amount applied to the transaction
* City: The city where the purchase was made
* State: State/province where the purchase was made
* Country: The country where the purchase was made

{% file src="/files/zm4ORFxptMXAc2JtbIbw" %}
Transaction Dataset
{% endfile %}

The dimension tables and fact table below were processed from the transaction dataset above.

{% file src="/files/nTEzrE0d4F8ItPYnRwlE" %}
Customer Dimension Table
{% endfile %}

{% file src="/files/OVjP8IqUdinqXOQXdPbn" %}
Date Dimension Table
{% endfile %}

{% file src="/files/XjyOAyc0zfmwgu9ZqjkV" %}
Location Dimension Table
{% endfile %}

{% file src="/files/RkrluDgNncNaLCQGEfMo" %}
Product Dimension Table
{% endfile %}

{% file src="/files/xhL2rRHzbgNOg1617qDf" %}
Sales Fact Table
{% endfile %}

### Task

**Can you reproduce these dimension tables and the fact table from the original transaction dataset?**

### Task Demonstration

A demonstration video is available in the [subpage](/data-warehousing-lab-sheets/week-3-extract-transform-and-load-etl/task-demonstration-video-python.md).

{% hint style="warning" %}
You can use any programming language, such as Python or R, or Excel to complete the above task.
{% endhint %}

## Part 3: Databricks ETL Pipeline

### Overview

In this section, we implement a simple ETL pipeline using **Databricks Free Edition** and **PySpark**. We ingest the **raw transaction dataset** from Part 2 (`Transaction Dataset.csv`) and transform it into a small **star schema** consisting of four Dimension tables and one Fact table:

* `Dim_Customer(CustomerID, CustomerName, Gender, CustomerType)`
* `Dim_Date(DateID, Date, Month, Year)`
* `Dim_Location(LocationID, City, State, Country)`
* `Dim_Product(ProductID, ProductName, Category)`
* `Fact_Sales(SalesID, ProductID, CustomerID, DateID, LocationID, Quantity, TotalPrice, Discount, ActualPrice)`

The workflow follows the standard ETL stages **Extract → Transform → Load → Validate**.

### Step 1: Initialize Environment

**Note**: Databricks Free Edition now manages compute resources automatically. You do not need to create a cluster manually.

1. Click **+ New** (top-left sidebar) -> **Notebook**.
2. Wait for the notebook to load.
3. Check the top-right corner:
   * Once you see a **green circle/indicator** (e.g., labeled **"Serverless"**), your environment is ready.

**Verification**: Type `print(spark.version)` in the first cell and run it. If it prints a version number (e.g., 3.x.x or 4.x.x), you are ready to proceed.

<figure><img src="/files/yonsh81TExLUPpSndZRl" alt=""><figcaption></figcaption></figure>

### Step 2: Upload Data

1. Navigate to **+ New** -> **Add or upload data**.
2. Click on **Create or modify table** (the icon with a blue upload arrow).
3. Click **Browse** and select your local `Transaction Dataset.csv` file.
4. Once the preview appears, click **Create table** at the bottom right.
   * *Note: This action uploads the file to the default storage and registers it as a table.*
5. **Return to your Notebook** for the next step.

<figure><img src="/files/S42n331u07UfzEeXz8IF" alt=""><figcaption></figcaption></figure>

<figure><img src="/files/KoIMiWA5U8GVOIu0kNPU" alt=""><figcaption></figcaption></figure>

### Step 3: Ingest Raw Data

In this step, we verify the table creation and load the data into a Spark DataFrame for processing.

```python
# 1. Define the table name
# Note: This must match the table name created in Step 2 (usually lowercase)
table_name = "transaction_dataset"

# 2. Load data directly from the registered table
df_raw = spark.read.table(table_name)

# 3. Validation
print(f"Successfully loaded table: {table_name}")
print(f"Total records: {df_raw.count()}")

# 4. Preview the data
display(df_raw.limit(5))
```

<figure><img src="/files/78b4g2d8xZaURK28zfVe" alt=""><figcaption></figcaption></figure>

### Step 4: Transform (Build Dimension Tables and Fact Table)

In this step, we clean the raw data (trim strings, cast types, parse dates), generate **surrogate keys** for each dimension, and then construct the fact table by joining the raw transactions to the dimensions.

#### 4.1 Basic Cleaning

```python
# Import PySpark SQL functions
from pyspark.sql import functions as F

df_tx = (df_raw
  .select(
    F.trim("Transaction_ID").alias("Transaction_ID"),
    F.to_date(F.col("Date"), "M/d/yyyy").alias("Date"),
    F.trim("Customer_Name").alias("Customer_Name"),
    F.trim("Gender").alias("Gender"),
    F.trim("Customer_Type").alias("Customer_Type"),
    F.trim("Product").alias("Product"),
    F.trim("Product_Category").alias("Product_Category"),
    F.col("Unit_Price").cast("double").alias("Unit_Price"),
    F.col("Quantity").cast("int").alias("Quantity"),
    F.col("Discount").cast("double").alias("Discount"),
    F.trim("City").alias("City"),
    F.trim("State").alias("State"),
    F.trim("Country").alias("Country")
  )
)

display(df_tx.limit(5))
```

**Purpose of this step:**

* Trim Whitespace: Removes accidental spaces from strings (e.g., converts " M " to "M") to ensure data consistency.
* Type Casting: Converts numerical columns (Quantity, Price) from text to numbers so we can perform calculations later.
* Date Parsing: Converts the raw date string into a proper Date object for time-based analysis.

#### 4.2 Create `Dim_Customer`

We create one row per unique customer and assign a surrogate key based on **first appearance** in the raw dataset.

**Window functions** assign surrogate keys based on a defined ordering (e.g., first appearance in the data). This is a common approach to produce stable, reproducible IDs. The code structure for **4.3, 4.4, and 4.5 follows a similar pattern**: group → aggregate → window order → generate key.

```python
from pyspark.sql.window import Window

# 1) Find each customer's first appearance in the raw data
#    groupBy: group rows by customer attributes (one group per unique customer)
#    agg: aggregate function - calculate metrics for each group
#    F.min("Transaction_ID"): find the smallest Transaction_ID in each group
#    .alias("first_tx"): name this calculated column "first_tx"
cust_first = (df_tx
  .groupBy("Customer_Name", "Gender", "Customer_Type")
  .agg(F.min("Transaction_ID").alias("first_tx"))
)

# 2) Define window ordering rule (sort by first appearance)
#    Window.orderBy("first_tx"): arrange customers by their first transaction
w_cust = Window.orderBy("first_tx")

# 3) Assign a numeric surrogate key, then format as C101, C102, ...
#    withColumn: add a new column to the DataFrame
#    dense_rank().over(w_cust): assign sequential numbers (1,2,3...) based on w_cust ordering
#    F.format_string("C%03d", ...): format number as C101, C102, etc. (%03d = 3-digit zero-padded)
dim_customer = (cust_first
  .withColumn("cust_num", F.dense_rank().over(w_cust))
  .withColumn("CustomerID", F.format_string("C%03d", F.col("cust_num") + 100))
  .select(
    "CustomerID",
    F.col("Customer_Name").alias("CustomerName"),
    F.col("Gender"),
    F.col("Customer_Type").alias("CustomerType")
  )
)

display(dim_customer)
```

<figure><img src="/files/z7T2NpDSc1WZWt9UF69N" alt=""><figcaption></figcaption></figure>

#### 4.3 Create `Dim_Date`

We create one row per unique date and assign `DateID` starting from 1. We also derive `Month` and `Year`.

```python
w_date = Window.orderBy("Date")

dim_date = (df_tx
  .select("Date")
  .dropDuplicates()
  .withColumn("DateID", F.dense_rank().over(w_date))
  .withColumn("Month", F.month("Date"))
  .withColumn("Year", F.year("Date"))
  .select(
    "DateID",
    F.date_format("Date", "M/d/yyyy").alias("Date"),
    "Month",
    "Year"
  )
)

display(dim_date)
```

<figure><img src="/files/X4uXegMIUSNxmPvt9Fzi" alt=""><figcaption></figcaption></figure>

#### 4.4 Create `Dim_Location`

We create one row per unique location (City, State, Country) and assign `LocationID` based on **first appearance** in the raw dataset.

```python
loc_first = (df_tx
  .groupBy("City", "State", "Country")
  .agg(F.min("Transaction_ID").alias("first_tx"))
)

w_loc = Window.orderBy("first_tx")

dim_location = (loc_first
  .withColumn("LocationID", F.dense_rank().over(w_loc))
  .select("LocationID", "City", "State", "Country")
)

display(dim_location)
```

<figure><img src="/files/nxR6r3bGJjxkFp6DDZov" alt=""><figcaption></figcaption></figure>

#### 4.5 Create `Dim_Product`

We create one row per unique product and assign `ProductID` based on **first appearance** in the raw dataset. (Note: the product dimension stores descriptive attributes; pricing remains in the fact table as measures.)

```python
prod_first = (df_tx
  .groupBy("Product", "Product_Category")
  .agg(F.min("Transaction_ID").alias("first_tx"))
)

w_prod = Window.orderBy("first_tx")

dim_product = (prod_first
  .withColumn("ProductID", F.dense_rank().over(w_prod))
  .select(
    "ProductID",
    F.col("Product").alias("ProductName"),
    F.col("Product_Category").alias("Category")
  )
)

display(dim_product)
```

<figure><img src="/files/4iG3j2ReqNz3Aj68JxJH" alt=""><figcaption></figcaption></figure>

#### 4.6 Create `Fact_Sales`

We join the transaction data to each dimension to obtain foreign keys. Then we compute the measures:

* `TotalPrice = Unit_Price * Quantity`
* `ActualPrice = TotalPrice * (1 - Discount/100)` (Discount is a percentage, e.g., 5 means 5%)

```python
# Join to customer
tx_keyed = (df_tx
  .join(dim_customer.select("CustomerID",
                            F.col("CustomerName").alias("Customer_Name"),
                            "Gender",
                            F.col("CustomerType").alias("Customer_Type")),
        on=["Customer_Name", "Gender", "Customer_Type"], how="left")
)

# Join to date (convert dim_date.Date back to date type for join)
dim_date_join = dim_date.select("DateID", F.to_date(F.col("Date"), "M/d/yyyy").alias("Date"))
tx_keyed = tx_keyed.join(dim_date_join, on=["Date"], how="left")

# Join to location
tx_keyed = tx_keyed.join(dim_location, on=["City", "State", "Country"], how="left")

# Join to product
tx_keyed = (tx_keyed
  .join(dim_product.select("ProductID",
                           F.col("ProductName").alias("Product"),
                           F.col("Category").alias("Product_Category")),
        on=["Product", "Product_Category"], how="left")
)

# Create SalesID (1..N)
w_sales = Window.orderBy("Transaction_ID")

fact_sales = (tx_keyed
  .withColumn("SalesID", F.dense_rank().over(w_sales))
  .withColumn("TotalPrice", F.round(F.col("Unit_Price") * F.col("Quantity"), 2))
  .withColumn("ActualPrice", F.round(F.col("TotalPrice") * (1 - F.col("Discount") / 100.0), 2))
  .select(
    "SalesID", "ProductID", "CustomerID", "DateID", "LocationID",
    "Quantity", "TotalPrice", "Discount", "ActualPrice"
  )
)

display(fact_sales)
```

<figure><img src="/files/E3yt8zlv3WbRusNxrPS0" alt=""><figcaption></figcaption></figure>

### Step 5: Load (Save as Delta Tables)

We store the final tables as **Delta tables**.

Delta tables are a storage format that provides:

* **ACID transactions**: Multiple users can read/write simultaneously without conflicts.
* **Data versioning**: Track changes and roll back to previous versions if needed.
* **Performance**: Optimized for fast queries on large datasets.

Delta is the default and recommended table format in Databricks for reliable data warehousing.

```python
# 1) Get current username and format it for database naming
#    regexp_replace replaces non-alphanumeric characters with underscores
#    .first()[0] extracts the first row, first column value
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]

# 2) Create a user-specific database to avoid table name conflicts
spark.sql(f"CREATE DATABASE IF NOT EXISTS db_{username}")

# 3) Switch to this database (all subsequent tables will be saved here)
spark.sql(f"USE db_{username}")

# 4) Save each DataFrame as a Delta table
#    .format("delta") = use Delta table format 
#    .mode("overwrite") = replace table if it exists (safe for re-runs)
#    .saveAsTable() = register as a queryable table
dim_customer.write.format("delta").mode("overwrite").saveAsTable("Dim_Customer")
dim_date.write.format("delta").mode("overwrite").saveAsTable("Dim_Date")
dim_location.write.format("delta").mode("overwrite").saveAsTable("Dim_Location")
dim_product.write.format("delta").mode("overwrite").saveAsTable("Dim_Product")
fact_sales.write.format("delta").mode("overwrite").saveAsTable("Fact_Sales")

print(f"Tables saved in database: db_{username}")
```

### Step 6: Validate (Basic Checks)

#### 6.1 Row count checks

Verify that all tables were created successfully by checking the number of rows in each table. This ensures the ETL process completed without data loss.

```python
print("Dim_Customer:", spark.table("Dim_Customer").count())
print("Dim_Date    :", spark.table("Dim_Date").count())
print("Dim_Location:", spark.table("Dim_Location").count())
print("Dim_Product :", spark.table("Dim_Product").count())
print("Fact_Sales  :", spark.table("Fact_Sales").count())
```

<figure><img src="/files/c2lKWTBRY6LCe8i7pyHt" alt=""><figcaption></figcaption></figure>

#### 6.2 Star schema join validation (SQL)

Test that the fact table can join with dimension tables correctly. This query aggregates sales revenue by product category, demonstrating that the star schema relationships work as expected.

```sql
%sql
SELECT
  p.Category,
  SUM(f.ActualPrice) AS Total_Revenue,
  SUM(f.Quantity)    AS Total_Quantity
FROM Fact_Sales f
JOIN Dim_Product p
  ON f.ProductID = p.ProductID
GROUP BY p.Category
ORDER BY Total_Revenue DESC;
```

<figure><img src="/files/ARXkzIY7TIu9RkxaDTZn" alt=""><figcaption></figcaption></figure>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://csse-uwa.gitbook.io/data-warehousing-lab-sheets/week-3-extract-transform-and-load-etl.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
