Microsoft Fabric Part 3. Taskmaster Project. Adding Dimensions: Tasks, Episodes and Date to Notebooks and creating Delta PARQUET and PARQUET Files

In Parts 1 and 2, we did a lot of analysis on the data. Understood Contestants and the data between the csv files attempts and people. Its now time to create some of the other dimensions we need for the project.

  • DimTasks
  • DimEpisodes
  • DimDate

Back to our Taskmaster Notebook to see what other data is available to use. But first of all. When creating stored procedures to transform data on SQL DB, The stored procedures are usually split into Dims and Facts. Its time to split this notebook up into the corresponding dims and facts.

Dim Contestants

A copy of the original file is saved as DimContestants. Then the new notebook is opened and everything Customer related is saved, including the initial configuring of the Lakehouse.

The notebook ends with the creation of the overwritten files.

The Code

spark.stop()

Is more interesting. Do we want to stop Spark after every dimension and fact creation. Or only at the end of the process?

One to think about but for this, lets have this after every notebook has run.

DimEpisode

Another copy is made to create DimEpisode. everything is removed apart from code that can relate to DimEpisode.

#Read the first file into a dataframe Teams
dfep = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/episodes (1).csv")
# dfattempts now is a Spark DataFrame containing CSV data from "Files/Data/Bronze/episodes (1).csv".

#And rename a couple of columns
#dfpeopletransf  = dfpeople.withColumnRenamed("id","contestantID").withColumnRenamed("name","contestant")

display(dfep.distinct())

We have some key information like dates in here but we will save these as keys for the fact table when we get to the fact table.

There is also a really important fact in here. Points.

Create Series and Episodes

# Select multiple columns with aliases
dfepisode = dfep.select(
    dfep.series.alias('seriesID'),
    dfep.series_label.alias('series'),
        dfep.episode.alias('episodeID'),
    dfep.title.alias('episodeTitle')
).distinct()

# Show the resulting DataFrame
display(dfepisode)

Check that we have distinct Episodes

#Lets immediately do the group by and min value now we know what we are doing to create a distinct list (Removing any IDs for a different series)

#import the function col to use. 
from pyspark.sql.functions import col
from pyspark.sql.functions import count as _count

#Creates the full distinct list. 

dfepisode.groupBy('Series', 'episodeID', 'episodeTitle' )\
       .agg(_count('episodeTitle').alias('TotalRecords'))\
       .filter(col('TotalRecords')>1).show(1000)

This looks good. We just need to add a default row and a key and we have ourselves another dimension which will eventually contain a hierarchy between series and episode.

Add default row for series and episode

we are now doing the same code blocks we created for Contestants.

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a sample DataFrame
data = [(-1, "Not Known","-1","Not Known")]
columns = ["seriesID", "series", "episodeID", "episodeTitle"]
new_row  = spark.createDataFrame(data, columns)

# Union the new row with the existing DataFrame
dfepisode = dfepisode.union(new_row)

# Show the updated DataFrame
dfepisode.show(1000)
#Just in case. We distinct again to remove any extra NA Rows
dfepisode = dfepisode.distinct()
dfepisode.show(1000)

Create the Episode Key

#Create an Episode Key
# Imports Window and row_number
from pyspark.sql import Window
from pyspark.sql.functions import row_number

# Applying partitionBy() and orderBy()
window_spec = Window.partitionBy().orderBy("episodeID")

# Add a new column "row_number" using row_number() over the specified window
dfepisode = dfepisode.withColumn("EpisodeKey", row_number().over(window_spec)- 2)

# Show the result
dfepisode.show(1000)

Create Delta PARQUET

What is the difference between Delta PARQUET and PARQUET

The Basic PARQUET File is the same, but we also have a _delta_log directory and checkpoint files. The Delta is simply a folder that contains these objects.

from delta.tables import DeltaTable

dfepisode.write.mode("overwrite").option("overwriteSchema", "true").format("delta").saveAsTable("dimEpisode")

We have re worked the code so we overwrite the schema also if it has changed to avoid errors

Create PARQUET

#Read the first file into a dataframe Teams
dftasks = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/tasks_by_objective.csv")

display(dftasks.distinct())

DimTask

Save a copy again and create DimTask

The Notebooks can be easily selected from the left hand pane and are automatically saved.

And in the workspace we can see that we have three new files to add

Our Last dimension before adding a dim date table will be tasks. After loading the tasks csv, it appears to be empty which is odd because it says there should be over 809 rows so something does appear to have gone wrong with the data source.

We need to look around the files and see what we can get.

We already know that the attempts contains tasks but doesn’t have as much data in it.

Getting Tasks from Attempts

#Read the first file into a dataframe
dfattempts = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/attempts.csv")
# dfattempts now is a Spark DataFrame containing CSV data from "Files/Data/Bronze/attempts.csv".
display(dfattempts)

attempts is read into this notebook as a DataFrame again because there is a task column in this table.

# Select multiple columns with aliases
dftaskat = dfattempts.select(
    dfattempts.task.alias('taskID'),
    dfattempts.task_label.alias('task_label')
).distinct()

# Show the resulting DataFrame
display(dftaskat)

Task Objectives

dftaskob = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/tasks_by_objective.csv")

display(dftaskob.distinct())

We have two task data source possibilities. Lets count the records

print(dftaskat.count())
print(dftaskob.count())

It would be useful to join them and then see what we have

First of all, a quick glance at both schemas helps

Update dftaskob with different column names

dftaskob = dftaskob.withColumnRenamed(“task”,”taskObID”).withColumnRenamed(“task_label”,”taskOb”)

display(dftaskob)

This is because we are going to join and its easier to rename the columns that would duplicate in the joined table before we get to the actual join.

Join dftaskob and dftaskat

# condition
condition = dftaskat["taskID"] == dftaskob["taskObID"]

# Now perform the left outer join
dftaskcheck = dftaskat.join(dftaskob, on=condition, how="full").drop(dftaskob.taskObID)

display(dftaskcheck)

It seems like good sense to create the condition(s) first to simplify the code block.

A Full outer join is used and taskObID is dropped because its part of the join so we don’t really need it.

Checking Tasks in Attempts but not in Objectives

filtered_df = dftaskcheck.filter(dftaskcheck['id'].isNull())
filtered_df.show(1000)

rows = filtered_df.count()
print(f"DataFrame Rows count: {rows}")

There are 165 rows in attempt, not in task objective

filtered_df = dftaskcheck.filter(dftaskcheck['taskid'].isNull())
filtered_df.show(1000)

rows = filtered_df.count()
print(f"DataFrame Rows count: {rows}")

There are 264 tasks in Task objective that aren’t in attempts

#How many match?
filtered_df = dftaskcheck.filter((dftaskcheck['task'] == dftaskcheck['taskOb']))
filtered_df.show(1000)

rows = filtered_df.count()
print(f"DataFrame Rows count where ID and Task ID are identical: {rows}")

157 tasks are in both task objective and attempts. Lets now try to get a full list.

COALESCE and REPLACE

# We are going to COALESCE to get the item that isnt null into a null field
from pyspark.sql.functions import coalesce
from pyspark.sql import functions as F

dftaskcheck = dftaskcheck.withColumn("newTaskID",coalesce(dftaskcheck.taskID,dftaskcheck.id))
dftaskcheck = dftaskcheck.withColumn("newTask",coalesce(dftaskcheck.task,dftaskcheck.taskOb))
#dftaskcheck= dftaskcheck.select(col("newTaskID").alias("taskID"), col("newtask").alias("task"), 
#col("objective").alias("objectiveID"),col("objective_label").alias("objective")).distinct()

dftask = dftaskcheck.select(
    dftaskcheck.newTaskID.alias('taskID'),
    dftaskcheck.newTask.alias('task'),
    dftaskcheck.objective_label.alias('objective'),
).distinct()


# Get Rid of â–² and â–¼
dftask = dftask.withColumn("objective", F.regexp_replace("objective", "â–²", ""))
dftask = dftask.withColumn("objective", F.regexp_replace("objective", "â–¼", ""))

display(dftask)

So here coalesce has been used to return the DataFrame with a new column that gives us the none null item.

Then to finish â–² and â–¼ are removed.

We now just need to clean up the null objectives. Lets just assume the null objectives are points.

Remove Null values using fillna and subset

dftask = dftask.fillna("points", subset=["objective"])
dftask.show(1000)

fillna() deals with missing values. It allows you to replace or fill in null values with specified values

Create default NA Row

We have done this code multiple times now so the blog post will just show the code and not screenshots

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a sample DataFrame
data = [(-1, "Not Known","Not Known")]
columns = ["taskID", "task", "objective"]
new_row  = spark.createDataFrame(data, columns)

# Union the new row with the existing DataFrame
dftask  = dftask.union(new_row)

# Show the updated DataFrame
dftask.show(1000) 

Running a DISTINCT just in case we applied the above more than once.

#Just in case. We distinct again to remove any extra NA Rows
dftask = dftask.distinct()
dftask.show(1000)

Creating the Contestant Key

# Imports Window and row_number
from pyspark.sql import Window
from pyspark.sql.functions import row_number

# Applying partitionBy() and orderBy()
window_spec = Window.partitionBy().orderBy("taskID")

# Add a new column "row_number" using row_number() over the specified window
dftask = dftask.withColumn("taskKey", row_number().over(window_spec)- 2)

# Show the result
dftask.show(1000)

Create DELTA PARQUET TABLE

from delta.tables import DeltaTable

dftask.write.mode("overwrite").format("delta").saveAsTable("dimTask")

CREATE PARQUET Table in Files Folder

dftask.write.mode("overwrite").parquet('Files/Data/Silver/DimTask.parquet')

Remember, In the context of the Lakehouse the tables created in the Files folder are external tables.

You don’t see external table in semantic model or SQL endpoint so when we get further with this exercise it will be interesting to see what we can do with both PARQUET files.

This is the end of the Task Notebook.

Dim Date

We have dates in our data which means, for a STAR schema we need a Date dimension and yes, We can create this in a Notebook.

Create a new notebook Dim Date

Start of again by configuring the Lakehouse and starting the spark session

There are many ways to do this. You could use SQL Magic, Or embed SQL into the Pyspark code. You could create it via a dataflow and send the result to the Lakehouse. This exercise is all about trying to do everything using PySpark Code.

# Import necessary modules
import datetime
#from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

# Create a Spark session
spark = SparkSession.builder.appName("DateRangeGenerator").getOrCreate()

# Define your start and end dates
start_date = '2020-01-01'
end_date = '2027-12-21'

# Convert start and end dates to datetime objects
start_datetime = datetime.datetime.strptime(start_date, '%Y-%m-%d')
end_datetime = datetime.datetime.strptime(end_date, '%Y-%m-%d')

# Calculate the number of days between start and end
num_days = (end_datetime - start_datetime).days

# Generate date range
dfdate = spark.createDataFrame([(start_datetime + datetime.timedelta(days=i),) for i in range(num_days)], ['calendarDate'])

# Optional: Convert to timestamp
dfdate = dfdate.withColumn('timestamp', col('calendarDate').cast(TimestampType()))

# Show the resulting DataFrame
dfdate.show()

Lets look at this in more detail:

We import the necessary modules:

  • import datetime datetime function is used to work with this format.
  • from pyspark.sql import SparkSession These are the entry points for working with Data Frames. ( I don’t think we need this because a Spark session is already running).
  • from pyspark.sql.functions import col, expr.
    • col is a function and allows you to reference a column in a DataFrame by its name. E.g. col(“Column_Name”)
    • expr is a function that allows you to execute SQL Like expressions with Pyspark.
  • from pyspark.sql.types import StructType, StructField, StringType, TimestampType
    • StructType is used to define a schema and represents a collection of fields.
    • StructField are these fields in the StructType. And the fields in question are strings and timestamps

And now into the main Code block

  • The Start_date and End_dates are defined and can be changed when necessary
  • Both dates are converted into datetime format Year Month and Day
  • Next we get the number of dates between start and end in .days
  • Then the date_df is generated.
    • range(num_days)
    • Generates a sequence of integers from 0. Each of these integers represents the number of dates from the start date.
    • [(start_datetime + timedelta(days=i),) for i in range(num_days)]:
    • This creates a list of tuples (Tuples are used to store multiple items in a single variable) so if there is a week of days, i would be 7 and we would have 7 rows of dates.
    • Finally we create another column which has been converted to timestamp. Which we will use moving forward.

Add Columns to DimDate

We are now going to set a range of columns up for Date

#You can have all the functions separated by a comma rather than repeating
from pyspark.sql.functions import year, month, dayofmonth, quarter,concat,lit, col, when, date_format #to get month name

# Create a new DataFrame with an alias for the 'timestamp' column
dfdate2 = dfdate.select(
    dfdate.timestamp.alias('date')
).distinct()


# Add a new column 'year' by extracting the year from the 'date' column
dfdate2 = dfdate2.withColumn('year', year(dfdate2['date']))

#Add the month no
dfdate2 = dfdate2.withColumn('monthNo', month(dfdate2['date']))

#And the month Name
dfdate2 = dfdate2.withColumn('monthName', date_format(dfdate2['date'], 'MMMM'))

#Add the day of month
dfdate2 = dfdate2.withColumn('dayofmonth', dayofmonth(dfdate2['date']))

#Add Quarter as a number
dfdate2 = dfdate2.withColumn('quarterNo', quarter(dfdate2['date']))

#Add Q to the start of Quarter for Power BI
dfdate2 = dfdate2.withColumn("quarter", concat(lit("Q"),col("quarterNo")))

#Create financial year
dfdate2 = dfdate2.withColumn('financialYear',
                   when(dfdate2['monthNo'] >= 5, dfdate2['year']).otherwise(dfdate2['year'] - 1))

#Create season 
dfdate2 = dfdate2.withColumn('season',
                            when((col('monthNo') >= 3) & (col('monthNo') <= 5), 'Spring')
                            .when((col('monthNo') >= 6) & (col('monthNo') <= 8), 'Summer')
                            .when((col('monthNo') >= 9) | (col('monthNo') <= 2), 'Summer')
                            .otherwise('Not known'))



# Show the resulting DataFrame
dfdate2.show(1000)

Create a current Day Flag

its always useful to have some date flags to use in Analysis

from pyspark.sql.functions import current_date, col

# Add a flag column indicating whether the date is the current day
dfdate2 = dfdate2.withColumn("currentDayFlag", col("date") == current_date())

# Show the result
dfdate2.filter(dfdate2['currentDayFlag'] == True).show()

Note that we can now quickly look at the current date (this would need to be updated every day)

Create a current month Flag

from pyspark.sql.functions import month, current_date

dfdate2 = dfdate2.withColumn("currentMonthFlag", month(dfdate2['date']) == month(current_date()))

# Show the result
dfdate2.filter(dfdate2['currentMonthFlag'] == True).show()

Create a current year Flag

from pyspark.sql.functions import year, current_date

dfdate2 = dfdate2.withColumn("currentYearFlag", year(dfdate2['date']) == year(current_date()))

# Show the result
dfdate2.filter(dfdate2['currentYearFlag'] == True).show()

Create the Date Key

For this we need Year, Month and Day to create an integer key which will be recreated in the fact table

from pyspark.sql.functions import concat_ws, lpad, col

# Assuming you have a DataFrame named 'df' with columns 'year', 'month', and 'day'
dfdate2 = dfdate2.withColumn('DateKey',
                   concat_ws('', col('year'),
                             lpad(col('monthNo'), 2, '0'),
                             lpad(col('dayOfMonth'), 2, '0')).cast('int'))

dfdate2.show()
  • lpad is used to left pad a string column
  • concat_ws is used to concatename string columns into a single column

Save as Delta PARQUET and PARQUET

from delta.tables import DeltaTable

dfdate2.write.mode("overwrite").format("delta").saveAsTable("dimDate")
dfdate2.write.mode("overwrite").parquet('Files/Data/Silver/DimDate.parquet')

Update Source Control

Clicking back onto the Fabric Workspace. Make sure you update Source Control

Conclusion

In part three, we created individual notebooks for each dimensions.

We now have DimTask, DimEpisode, DimContestant and Dimdate

Next we want to start on the facts. And we want to revisit each dimension to see if we are happy with the types of each column.

Tell me what you would do? Would you have one notebook with everything in one? Or split up as I have done?

How would you deal with the importing of all the functions? Is there anything I can do better here?

Are there better ways of writing the code?

Part three and Im starting to really enjoy working with Pyspark. the takeaway is that I can use my SQL brain so its starting to really work for me.

So many questions though. for example, we can do full and appends to write the data but what about a more incremental approach?

Microsoft Fabric Part 2. Taskmaster Project. Updating a dim Contestants table in Notebook and Delta PARQUET File

This blog is the next step for

Microsoft Fabric Part 1. Taskmaster Project. Creating a dim Contestants table in Notebook and Delta PARQUET File

In Part 1, a Contestants Dimension was created from the csv file attempts csv. The data from this was transformed and added into a PARQUET file and a DELTA PARQUET file so we can test the differences later.

There are more csv files to look at and one specifically gives us more details for the Contestants dim

people.csv

The task here is to go back to the original Notebook and see if we can add this new information

The first bit of code to create goes after the following blocks of code.

  • Starting Spark Session
  • Reading attempts.csv into a dataframe
  • Renaming columns and creating dfattempttrans
  • creating the distinct dfcont from dtattempts (Creating the Contestants Dim)
#Read the first file into a dataframe Teams
dfpeople = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/people.csv")
# dfattempts now is a Spark DataFrame containing CSV data from "Files/Data/Bronze/people.csv".

#And rename a couple of columns
dfpeople = dfpeople.withColumnRenamed("id","contestantID").withColumnRenamed("name","contestant")
display(dfpeople)

We have used .withColumnRenamed to change a couple of columns.

ID to ContestantID and name to contestant

Before moving on. the assumption is that People is a distinct list.

Lets count the rows in each table so far

row_countp = dfpeople.count()
row_countc = dfcont.count()
print(f"Number of rows in the dfpeople: {row_countp}")
print(f"Number of rows in the dfcont: {row_countc}")

There is a lot more people in our new DataFrame. But are they unique?

#We need to test. Do we have unique Contestants?

#We need to import a function for col
from pyspark.sql.functions import *

dfpeople.groupBy('contestant')\
       .agg(min('contestantID').alias('MinContestantID'),count('contestant').alias('TotalRecords'))\
       .filter(col('TotalRecords')>1).show(1000)

Using \ allows us to break the code onto separate lines

We have the same issue here where some of the people have more than one record so lets filter a few

filtered_df = dfpeople.filter(dfpeople['name'] == 'Liza Tarbuck')
filtered_df.show()

The original assumption in Part 1 was incorrect. We have multiple based on the Champion of Champion series here. So the ID is not for the contestant. it’s for the contestant and series and some people have been on multiple series.

Do the IDs match between the tables?

filtered_df = dfcont.filter(dfcont['contestant'] == 'Bob Mortimer')
filtered_df.show()

Yes. So we need to rethink dim contestants. Actually if they exist in both we only need to use people. So we need to join them and check out if this is the case

Checking values exist in People that we can see in attempts rename Columns

We want to join the contestants that we obtained the attempts table to dfpeople. Simply so we can check what is missing?

We know we have more in people. But what happens if there are values missing in people than are in attempts? This will cause us more problems.

We will join on the ID and the contestant so before the join. Lets rename the dfCont columns so we don’t end up with duplicate column names

#And rename a couple of columns
dfcont = dfcont.withColumnRenamed("contestantID","cont_contestantID").withColumnRenamed("contestant","cont_contestant")

display(dfcont)

join dfCont and dfPeople

Please note that when joining tables, its better to create a new DataFrame rather than update an original one, because if you have to run it again, the DataFrame you join from will have changed.

# Original condition
original_condition = dfpeople["contestantID"] == dfcont["cont_contestantID"]

# Adding another condition based on the 'name' column
combined_condition = original_condition & (dfpeople["contestant"] == dfcont["cont_contestant"])

# Now perform the left outer join
dfcontcheck = dfpeople.join(dfcont, on=combined_condition, how="full")

display(dfcontcheck)

So we have everything in one table. Lets do some filtering to check our findings

Filter for Null Values

from pyspark.sql import functions as F
dfcontcheck.filter(F.isnull("cont_contestantID")).show(1000)

72 records that don’t exist in dfattempt which is fine

from pyspark.sql import functions as F
dfcontcheck.filter(F.isnull("contestantID")).show(1000)

And none in attempts that are in people.

People is therefore our master data set and the PySpark code can be changed accordingly.

At the moment there is a lot of checks happening in the PySpark code. Once up and running and ready to be turned into a proper transformation notebook. All these checks could be removed if required to simplify.

Updating the Code with the new Logic

We are taking the code from Part 1 and completely updating based on the new findings

Start Spark session

%%configure
{
    "defaultLakehouse": {  
        "name": "DebbiesFabricLakehouse"
    }
}

Read People into dfPeople DataFrame

#Read the first file into a dataframe Teams
dfpeople = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/people.csv")
# dfattempts now is a Spark DataFrame containing CSV data from "Files/Data/Bronze/people.csv".

#And rename a couple of columns
dfpeopletransf  = dfpeople.withColumnRenamed("id","contestantID").withColumnRenamed("name","contestant")

print(dfpeopletransf.count())
print(dfpeopletransf.distinct().count())
display(dfpeopletransf.distinct())

.withColumnRenamed brings back the dfpeople data set but renames the specified columns

Create dfCont to create our initial contestants DataFrame

Previously we had a lot of code blocks checking and creating Min IDs for a distinct contestant list. but now we have more of an understanding we can do this with less code.

#Lets immediately do the group by and min value now we know what we are doing to create a distinct list (Removing any IDs for a different series)

#import the function col to use. 
from pyspark.sql.functions import col

#Creates the full distinct list. 

dfcont = dfpeopletransf.groupBy('contestant', 'dob', 'gender','hand' )\
       .agg(min('contestantID').alias('contestantID'),count('contestant').alias('TotalRecords'))


display(dfcont)

groupby. all the distinct values we want. Contestant, dob, gender and hand all are unique with the name. (Previously we just had contestant)

Next we agg (aggregate with min contestantID and cont the number of contestants as total records

If you are used to SQL you would see the following

SELECT MIN(contestantID) AS contestantID, contestant, dob, gender, hand, Count(*) AS TotalRecords
FROM dfpeopletransf
GROUP BY contestant, dob, gender, hand

Create a Current Age

# Create a Spark session
#spark = SparkSession.builder.appName("AgeCalculation").getOrCreate()

# Convert the date of birth column to a date type in dfCont
dfcont = dfcont.withColumn("dob", dfcont["dob"].cast("date"))

# add the current date and set up into a new dataframe
df = dfcont.withColumn("current_date", current_date())

#Calculate age using dob and current date 
df = df.withColumn("age_decimal", datediff(df["current_date"], df["dob"]) / 365)

#Convert from decimal to int  
df = df.withColumn("age", col("age_decimal").cast("int"))

#Convert Customer ID to int
df = df.withColumn("contestantID", col("contestantID").cast("int"))

#Update dfCont
dfcont = df.select("contestantID", "contestant", "dob","gender","hand", "age")

dfcont.show(1000)

It would be fun to filter by Age and Age Group. lets create the age first based on the current Date,

  • First of all we ensure date is a date .cast(“date”)
  • Then using .withColumn we add the current date to the data frame current_date()
  • Next. we use datediff to get the age from the date of birth and the current date
  • Set age to int and customerID to int
  • Next update dfCont with the full list of columns (Excluding current_date because that was only needed to create the age.

Create Age group

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col

# Create a Spark session
#spark = SparkSession.builder.appName("ConditionalLogic").getOrCreate()

# Apply conditional logic
dfcont = dfcont.withColumn("ageRange", when(col("age") < 20, "0 to 20")
                               .when((col("age") >= 20) & (col("age") <= 30), "20 to 30")
                               .when((col("age") >= 30) & (col("age") <= 40), "30 to 40")
                               .when((col("age") >= 40) & (col("age") <= 50), "40 to 50")
                               .when((col("age") >= 50) & (col("age") <= 60), "50 to 60")
                               .otherwise("Over 60"))

# Show the result
dfcont.show()

We have created what would be a CASE in SQL

using .WithColumn to create a new column. .when .otherwise are used to create this new data item

Create New Row for -1 NA Default Row

We are back to almost the original code. Creating our Default row but this time with more columns

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a sample DataFrame
data = [(-1, "Not Known","1900-01-01","NA","NA","0","NA")]
columns = ["contestantID", "contestant", "dob", "gender", "hand", "age", "ageRange"]
new_row  = spark.createDataFrame(data, columns)

# Union the new row with the existing DataFrame
dfcont = dfcont.union(new_row)

# Show the updated DataFrame
#dfcont.show(1000)

dfcont.show(1000)

We can now see Not known in the list

Running a DISTINCT just in case we applied the above more than once.

#Just in case. We distinct again to remove any extra NA Rows
dfcont = dfcont.distinct()
dfcont.show(1000)

Checking we have unique Contestants GROUPBY

#And lets check the new distinct list with Min IDs we have made
#We need to import a function for col
from pyspark.sql.functions import *

dfcont.groupBy('contestant', 'dob', 'age','ageRange', 'hand','gender')\
        .agg(count('contestant').alias('TotalRecords'))\
        .filter(col('TotalRecords')>1).show(1000)


#Hopefully there should be no records

We have all unique contestants, including the extra information we have added to the dimension.

Creating the Contestant Key

We can now create slightly better code than Part 1 using less code blocks

#Now finally for this we need a Key a new column
# Imports Window and row_number
from pyspark.sql import Window
from pyspark.sql.functions import row_number

# Applying partitionBy() and orderBy()
window_spec = Window.partitionBy().orderBy("ContestantID")

# Add a new column "row_number" using row_number() over the specified window
dfcont = dfcont.withColumn("ContestantKey", row_number().over(window_spec)- 2)

# Show the result
dfcont.show(1000)

Previously a partition column with the value 1 was added so we could do the following: .partitionBy(“partition”)

But simply using partitionBy() and not including a column removes the need for the extra code and column and does the same thing.

Adding the Team

Previously the Team was added from attempts.csv but we know that we can change this to people now and have better team information.

#Creates the full distinct list. 
dfContTeam = dfpeopletransf.select(col("contestantID").alias("teamcontestantID"), col("contestant").alias("teamContestant"), 
col("team").alias("teamID"),col("team_label").alias("TeamName")).distinct()

display(dfContTeam)

Remove Rows with Empty team information

#Now remove the rows with null teamids

# Import necessary libraries and create a Spark session
#from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()

# Drop rows with NULL values in the "id" column
dfContTeam = dfContTeam.na.drop(subset=["teamId"])

# Show the filtered DataFrame
dfContTeam.show(truncate=False)

Join team to Contestant

#Now what we want to do is add TeamName into dfcont

dfcont = dfcont.join(dfContTeam, dfcont["contestant"] == dfContTeam["teamContestant"], how="left").drop(dfContTeam.teamContestant).drop(dfContTeam.teamcontestantID)

display(dfcont)

This time we cleaned up the code by adding the two columns to drop in the above statement so we don’t have two names and IDs from both data sets.

Create DELTA PARQUET TABLE

from delta.tables import DeltaTable

dfcont.write.mode("overwrite").format("delta").saveAsTable("dimContestant")

Because we are overwriting our new data set should overwrite our old Dimension. However, there will be conflict issues because the schema is so different so for this exercise, The files will be removed before being re-added.

CREATE PARQUET Table in Files Folder

dfcont.write.mode("overwrite").parquet('Files/Data/Silver/DimContestant.parquet')

We now have options of using the Delta PARQUET, or just the PARQUET File.

Conclusion

In Part 1 we used attempts csv for contestants. For Part 2 we did lots of analysis and checked that actually, people was a better data source for contestants. So we changed the input for the initial DataFrame to people.csv

We also used the opportunity to tidy up the code, now we are getting to grips with Pyspark as an alternative to SQL.

Next on the list is to create more dimensions.

  • Dim Task
  • Dim Episode
  • Dim Date

And the Taskmaster Stats Fact Table

Then to do some analysis, using the inbuilt visuals in a Notebook, and Power BI.

Microsoft Fabric Part 1. Taskmaster Project. Creating a dim Contestants table in Notebook and Delta PARQUET File

Its so exciting at the moment with all the changes going on in the world of Microsoft and I wanted to use the trial version of Fabric to see if I could move my ‘standard’ working practices over into Fabric.

The first thing was to find a practice dataset for this exercise so of course I went for my favorite stats based show. UK Taskmaster.

Kaggle

Kaggle delivered the goods with csv files of taskmaster information

https://www.kaggle.com/datasets/sujaykapadnis/comedians-challenged-ridiculous-taskmaster-ds

The challenge is to deliver a star schema from this data. Transforming it and creating Dims and Facts for the ultimate goal of a star schema.

CSV – SQL DB – Power BI

Usually I use Data factory to pull the data into a SQL database. In here I create Stored procedures to transform the data. These stored procedures are then added into the pipeline so we can rerun when needed.

Finally, the dimensions and facts are imported into Power BI from the Dims and Facts in the SQL Database.

So the question is. can I transform my files using Pyspark within a Notebook, and then create the Dim and Fact tables in DELTA PARQUET

I have used SQL for over 20 years and its my weapon of choice so this is going to be a real puzzler of a task.

Getting Started – Create the Fabric Workspace

I have my Fabric Trial and a Workspace added to the Fabric Trial.

Azure Devops

I always set up an Azure Devops

https://dev.azure.com

Here I have just Added a new Project. to Devlops Debbies Training

Visual Studio

I am going to use Visual Studio for the next task, but there are lots of other ways you can do this next step

And connect to the repository that has been created (You need to log in to see your repos)

Click clone and you will then have a local copy of the code base. it doesn’t support everything at the moment but it does support Notebooks, which is what will be used for this exercise.

C:\Users\DebbieEdwards\source\repos\DebbiesTraining

Connect Devops to Fabric

Back in Fabric

We will go back to the repos Soon to start using it.

Create a Delta lake in your Fabric Workspace in Data Engineering Fabric Experience

Clicking on data Engineering takes you to the Data engineering area of fabric where you can set up a Lakehouse.

I have simply called the Lakehouse DebbiesFabricLakehouse

When the Data Lakehouse is completed you get the following

And for the time being, Lets just use Lakehouse.

Add CSV Files into the Lakehouse

Notice that we have the following folders in the Lakehouse

For this project, the medallion architecture is being used.

  • Bronze is where the csv files are stored in the lakehouse. the raw data zone
  • Silver is the transformed PARQUET data will go (Which we will look at later)
  • Gold is the final transformed STAR SCHEMA layer. The curated zone.

I have all the taskmaster csv files and I am going to simply upload them to the Lakehouse Files / Data / Bronze folder

Refresh the folder and you should see your files in the pane.

Create the Notebook

Now we can start to work with the files and here is where the fun starts. Instead of a pipeline that pulls data into SQL before the transformations, Lets create a Notebook

We are ready to go. this is what I can see to the right of the code pane.

And Pyspark will be the Spark language of choice. I wont use any SQL just to see how it goes, away from my beloved SQL.

I am doing very simple code at the moment and hope make it better and better the further I get so lets just start adding Pyspark and markdown (Descriptions)

This is the first lab I used to get going

https://microsoftlearning.github.io/mslearn-fabric/Instructions/Labs/02-analyze-spark.html

when you first run the notebook you have to start a spark session against the delta lake. To do this I have started with the following code

%%configure
{
    "defaultLakehouse": {  
        "name": "DebbiesFabricLakehouse"
    }
}

Read the CSV File into a DataFrame

#Read the first file into a DataFrame
dfattempts = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/attempts.csv")
# dfattempts now is a Spark DataFrame containing CSV data from "Files/Data/Bronze/attempts.csv".
display(dfattempts)

Now we read our first csv into a data frame. A DataFrame is basically a table of data.

We can click on the play icon to run this part of the code. We can now start transforming.

Contestant transformation

The first thing I want to do it get out contestants. Create a unique key and add the key to the Dim table and the transformation table which still contains all the facts.

(I have gone back and forth in the code as I have worked on it so some code has been amended when certain issues have come to light)

#We will use this later in the process. the Following code renames the columns from the original data frame but brings back the entire dataframe again
#We actually only need the contestant part

dfattempttrns = dfattempts.withColumnRenamed("contestant","contestantIDToRemove").withColumnRenamed("contestant_label","contestantToRemove")
dfattempttrns.printSchema()


print(dfattempttrns.count())
print(dfattempttrns.distinct().count())
display(dfattempttrns.distinct())

A Transformed DataFrame is created from the main DataFrame and initially, two items have been renamed. contestant to contestantIDToRemove and contestant_label to contestantToremove. We will see why later.

We also print the count of contestants and we display the transformed data frame.

Create Contestants Dim

The line from pyspark.sql.functions import col imports the col function from the pyspark.sql.functions module

Here is an explanation of what it does.

Pyspark is a Python library used for working with big data and performing distributed data processing using Apache Spark.

The pyspark.sql.functions module provides a set of built in functions that can be used for data manipulation and transformation in Spark DataFrames.

The col function is one of these built in functions. It is used to:

  • Reference a column in a DataFrame.
  • You can use it to: Select a specific column.
  • Apply mathematical operations to a specific column. Create new columns based on existing ones
#import the function col to use. 
from pyspark.sql.functions import col

#Creates the full distinct list. 
dfcont = dfattempts.select(col("contestant").alias("contestantID"), col("contestant_label").alias("contestant")).distinct()

display(dfcont)

This code gives us a distinct list of Contestant IDs and Names and we have aliased the new names in this DataFrame. this uses our original first DataFrame, not the transformed one.

Again, we display the table

Order

#dfcont.orderBy([‘contestantid’], ascending = [True]).show(1000)

dfcont = dfcont.orderBy([‘contestantid’], ascending = [True])

dfcont.show(1000)

This code block just orders the data. The interesting part of the code here is that instead of display we use show to print out the DataFrame. If you leave it as show() only 20 rows are printed.

If you leave the code at show() only 20 rows are returned

Create a DataFrame of values

This is for information only. I wanted to see how a DataFrame is created of information specified in the code block.

from pyspark.sql import SparkSession

In PySpark, the SparkSession class serves as the entry point for programming with the Dataset and DataFrame API. It allows you to create DataFrames, register them as tables, execute SQL queries, cache tables, and read Parquet files.

From pyspark.sql import Row

In PySpark, the Row class is used to represent a row of data in a DataFrame. Each field in a Row object can be accessed by its name or index.

#https://saturncloud.io/blog/adding-new-rows-to-pyspark-dataframe-a-comprehensive-guide/
#Just for information. Not being used but here is how you write your own dataframe
from pyspark.sql import SparkSession
from pyspark.sql import Row

#I dont think I need this because I already have a spark session?
#spark = SparkSession.builder.appName('AddRows').getOrCreate()

data = [(70, 'Sam Campbell'),
         (71,'Julian Clary'),
         (72,'Lucy Beaumont'),
         (73,'Sue Perkins'),
         (73,'Susan Wokoma')]
columns = ["contestantID", "contestant"]
dfnarowCont = spark.createDataFrame(data, columns)

dfnarowCont.show()

Create a New Row for the default NC Row

for a dimension we always have a default row of Not Known and a key of -1. For example if we are looking at Customer orders and a customer hasn’t bought anything this month

DateKey Customer KeyQuantity
0101202256 244
01012022-10

Here we have -1 in the fact dimension so we can then bring through all the default Not known descriptors from the customer table if we need to, when analysing in Power BI.

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a sample DataFrame
data = [(-1, "Not Known")]
columns = ["contestantID", "contestant"]
new_row  = spark.createDataFrame(data, columns)

# Union the new row with the existing DataFrame
dfcont = dfcont.union(new_row)

# Show the updated DataFrame
#dfcont.show(1000)

dfcont.show(1000)

Now, lets run a distinct, in case we have run the above code numerous times and we have multiple default rows.

#Just in case. We distinct again to remove any extra NA Rows
dfcont = dfcont.distinct()
dfcont.show(1000) 

Testing the DataFrame for Unique values

https://www.statology.org/pyspark-count-by-group/

Our Dimension should consist of unique values. So we need to test our new dataset

from pyspark.sql.functions import *

dfcont.groupBy('contestant')\
       .agg(min('ContestantID').alias('MinContestantID'),count('contestant').alias('TotalRecords'))\
       .filter(col('TotalRecords')>1).show(1000)

Notice we don’t have dfcont = Because this loads the results into a data frame. here we are just looking at the data frame. It is basically just a SELECT statement

agg is used to calculate more than one aggregate (multiple aggregates) at a time on grouped DataFrame.

So to perform the agg, first, you need to perform the groupBy() on DataFrame which groups the records based on single or multiple column values, and then do the agg() to get the aggregate for each group.

The only thing that makes these values none distinct is that they will have different ContestantIDs. So this query is bringing back the min(ContestantID)

    Here we can see that we do have some bad data that we can transform and correct so everything in both the dim and the transformation tables holds the min ID

    Create a DataFrame of all contestants with Min Contestant ID

    Lets set a DataFrame and use it to update. We bring back the entire contestant list with the min IDs only. Creating a distinct list

    #We need to import a function for col
    from pyspark.sql.functions import *
    
    dfminContID = dfcont.groupBy('contestant')\
                         .agg(min('ContestantID').alias('MinContestantID'),count('contestant').alias('TotalRecords'))
    
    dfminContID.show(1000)

    GROUPBY and AGG and FILTER

    • Group on Contestants
    • Aggregate gives us a count of contestants called TotalRecords
    • Then we filter to only find records where TotalRecords is greater than 1

    Lets check the new distinct list with Min IDs we have made

      #We need to import a function for col
      from pyspark.sql.functions import *
      
      dfminContID.groupBy('contestant')\
              .agg(count('contestant').alias('TotalRecords'))\
              .filter(col('TotalRecords')>1).show(1000)

      JOIN two DataFrames Where JOIN Columns are the same

      Now what we want to do is update the two DataFrames dfcont with Min ID’s.

      from pyspark.sql import SparkSession

      from pyspark.sql.functions import when

      # Initialize Spark session I dont think is needed here as we already have it

      #spark = SparkSession.builder.appName(“MySparkApp”).getOrCreate()

      # Join DataFrames based on ‘Name’

      joinedCont_df = dfcont.join(dfminContID, on=”contestant”, how=”left_outer”)

      # Show the updated DataFrame

      joinedCont_df.show(1000)

      for the next step lts reupdate dfCont with our updated min value

      #We want to reset back to ContestantID and Contestant, this is a distinct List
      
      dfcont = joinedCont_df.select(col("minContestantID").alias("contestantID"), col("contestant")).distinct()
      dfcont.show(1000)
      

      We updated ContestantID with MinContestantID

      So, we have reset the ContestantID with the correct min ID

      You can’t modify the existing dataframe as it is immutable, but you can return a new dataframe with the desired modifications.

        Recheck a specific contestants data – Filter

        Lets do a filter on a contestant we know had a duplicate ID

        
        dfcont.select(col("contestantID"), col("contestant")).filter(dfcont.contestant == "Noel Fielding").distinct().show(1000)

        Update the ID of the transformed Table

        We also want to update the Contestant ID of our transformed table (dfattempttrns) we created near the beginning of the notebook. Eventually we can create a fact table from dfattempttrns with Contestant Keys, so we want to join them via the ContestantID.

        this is where the data items contestantIDTORemove and contestantToremove comes in handy

        #we also want to update the ID of our transformed table we created near the beginning of the notebook
        
        dfattempttrns = dfattempttrns.join(dfcont, dfattempttrns["contestantToRemove"] == dfcont["contestant"], how="inner")
        
        dfattempttrns = dfattempttrns.join(dfcont, on="contestant", how="inner")"""
        
        display(dfattempttrns)

        There are two extra columns at the end from our corrected dfcont DataFrame.

        the join was on the contestantname

        #Lets have a look at the schema

        dfattempttrns.printSchema()

        Here we have a little code to display the Schema of the updated dfattemptstrans DataFrame

        Drop Columns

        # Drop columns
        cols_to_drop = ["contestantIDToRemove", "contestantToRemove"]
        dfattempttrns = dfattempttrns.drop(*cols_to_drop)
        dfattempttrns.show()

        And just to be sure we can now check for our Contestant again

        #Lets do a filter on a contestent we know had a duplicate ID
        #dfattempttrns.filter(dfattempttrns.contestant == "Noel Fielding").show(1000)
        from pyspark.sql import SparkSession
        from pyspark.sql.functions import col
        dfattempttrns.select(col("contestantID"), col("contestant")).filter(dfattempttrns.contestant == "Noel Fielding").distinct().show(1000)
        

        Its now working. there is only one record in both the transformation table and the dimension table so the join on ID’s will work.

        Creating the Dim Key – Adding a column .WithColumn

        For contestants Dim we need a Key. So we need a new column. Lets add a new column to create the partition on. I’m sure there are lots of ways to do this and later in the process I will be able to really hone the code down with more Pyspark knowledge.

        from pyspark.sql.functions import lit
        
        # add column for discount
        dfcont = dfcont.withColumn("Partition", lit(1))
        # display the dataframe
        dfcont.show(1000)

        Create the Contestant Key using Partition by

        #Now finally for this we need a Key a new column
        # Imports Window and row_number
        from pyspark.sql import Window
        from pyspark.sql.functions import row_number
        
        # Applying partitionBy() and orderBy()
        window_spec = Window.partitionBy("Partition").orderBy("ContestantID")
        
        # Add a new column "row_number" using row_number() over the specified window
        dfcont = dfcont.withColumn("ContestantKey", row_number().over(window_spec)- 2)
        
        # Show the result
        dfcont.show(1000)

        Use – 2 which starts the ID no at -1 (-1,0) so the default can have the -1 key as standard

        from pyspark.sql import Window

        • Imports the Window class from the pyspark.sql module123.
        • The Window class is used to define a window specification in the DataFrame API2. A window specification is defined by specifying partitioning expressions and ordering expressions2.
        • The partitioning expressions are used to partition the data into groups, and the ordering expressions are used to order the data within each group2.

        from pyspark.sql.functions import row_number

        • The row_number function from the pyspark.sql.functions module is used to assign a unique row number to each row in a DataFrame.
        • It is commonly used in Spark SQL to create a sequential identifier for rows based on a specified order.

        because the NA ID is -1 for the default row it immediately gets the Key of -1 because of the order which is just what we wanted.

        #We want to remove the partition key from the dim
        
        dfcont = dfcont.select(col("contestantKey"), col("contestantID"), col("contestant")).distinct()
        dfcont.show(1000)

        Adding Team to Dim Contestant

        there is another item we want to add into the contestant dim. The Team.

        First of all, lets get a distinct list of everything we need from dfattempttrns. Namely. the contestantID and Contestant, TeamID and TeamName (These two are new, we haven’t worked with them so far)

        #import the function col to use. But shouldnt it already be available?
        #from pyspark.sql.functions import col
        
        #Creates the full distinct list. 
        dfContTeam = dfattempttrns.select(col("contestantID").alias("teamcontestantID"), col("contestant").alias("teamContestant"), 
        col("team").alias("teamID"),col("team_label").alias("TeamName")).distinct()
        
        display(dfContTeam)

        The original DataFrame includes the tasks and team is only set on a team task. In Contestants we simply want to know which team the contestant is in.

        we have created a new DataFrame called dfContteam

        Remove rows with NULL team IDs

        # Import necessary libraries and create a Spark session
        ##from pyspark.sql import SparkSession
        spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()
        
        # Drop rows with NULL values in the "id" column
        dfContTeam = dfContTeam.na.drop(subset=["teamId"])
        
        # Show the filtered DataFrame
        dfContTeam.show(truncate=False)

        Join tables to add team to contestants Dim using the contestant Name

        #Now what we want to do is add TeamName into dfcont
        
        #If we were joining on items with different columns names we could use the following
        dfcont = dfcont.join(dfContTeam, dfcont["contestant"] == dfContTeam["teamContestant"], how="inner")
        
        dfattempttrns = dfattempttrns.join(dfcont, on="contestant", how="inner")"""
        display(dfcont)

        When the dfContTeam was created. Contestants was called teamContestant and this is the join you see in the code.

        Drop Columns

        # Again we need to Drop columns we don't need from the data frame
        cols_to_drop = ["teamContestant", "teamID","teamcontestantID"]
        dfcont = dfcont.drop(*cols_to_drop)
        display(dfcont)
        

        Finally. A Dim Contestant DataFrame with everything we need.

        Adding ContestantID into the transformed DataFrame

        Before creating a PARQUET File it would be good to get the Contestant Key into the transformed table by joining the tables again on contestantID

        #join contestant to Contestant to remove
        dfattempttrns = dfattempttrns.join(dfcont, 
                                            dfattempttrns["contestantID"] == dfcont["contestantID"], 
                                            how="inner").drop(dfcont.contestantID).drop(dfcont.contestant)
        
        
        display(dfattempttrns)

        We now have .drop(dfcont.contestantID).drop(dfcont.contestant) in the code which removes our duplicate columns before they are added into the transformation table

        UPDATE – We have learned how to update using different column names AND Update using the same column name, whilst removing any columns from table B that are identical to table A

        Creating the DELTA PARQUET FILE

        Finally we can add Dim Contestants to a PARQUET file. We don’t need any partitions because its just a small dim table.

        from delta.tables import DeltaTable
        
        dfcont.write.mode("overwrite").format("delta").saveAsTable("dimContestant")
        

        After a refresh on tables

        this creates a Delta Lake table called DimContestant which is stored in Delta Format and provides ACID Transactions. for example, Time travel and versioned data.

        The Black triangle icon means that it is a DELTA PARQUET file.

        Save as PARQUET File

        However you can also do the following

        dfcont.write.mode("overwrite").parquet('Files/Data/Silver/DimContestant.parquet')
        

        This saves the DataFrame as a Parquet file into a specific folder in Files.

        Its columnar storage that works really well for analytics workloads but doesn’t support ACID DeltaLake transactional support live time travel mentioned above.

        Lets have a look at what we have created (the data is none partitioned)

        A PARQUET File is a columnar storage file. In the same way as Power BI has a columnar data store when you import the data.

        Our data is in the snappy.parquet file.

        The Success File is not a specific file format. Its the concept related to the execution so here we can see that the creation of the file was a success.

        So:

        • Delta Lake PARQUET with advanced features and transactional capabilities. Tables have to be in Tables folder in order to be recognized by other engines like SQL endpoint and semantic model. 
        • PARQUET for storage and simplicity. But are only visible for Spark?

        We will look at these both in more detail in a later exercise.

        Stopping the Spark Job

        Its always worth stopping the Spark job at the end of processing

        spark.stop()

        It will stop automatically after a time but this ensures that the process is stopped immediately upon the end of the transformation process

        Using Source Control

        Clicking back on the workspace. We added our workspace to Devops. So now we can see that there has been a change.

        when working with other people there will be feature branches to connect to rather than main but because this is just a one person exercise. Committing to main is fine for the time being.

        And in Devops we can see that the code has been committed.

        Conclusion

        We have taken an example group of csv files and transformed them so far to create one dimension for a star schema.

        This dimension data will be added into a star schema via the contestant key.

        We have also set up Devops to commit the notebook code into the GIT repository.

        In later posts we will move on to other dimensions. Create the fact tables and see what we can do with both the DELTA PARQUET and PARQET Files.

        Design a site like this with WordPress.com
        Get started