Microsoft Fabric Part 6. Taskmaster Project. Creating a Silver transformed layer and Finishing of the Dimensions

In part 4. A new Data source was used at task level and the first dimension was added. Lets quickly create the rest of the dims.

Then we are ready to move on to the next steps in Fabric.

Creating a Silver transformed layer

There are a few changes that would be good to make to the notebooks. The base data needs a level of transforming before we kick off with Dims and Facts.

It would be good to save the transformed data once in the Silver layer and then use this PARQUET file to work from. This means we never have to repeat any code for dims and facts.

We want

  • Silver Transformed data as PARQUET unmanaged files
  • Gold Dims and facts in Delta PARQUET Managed tables

Taskmaster Transformed Data

Create a new Notebook

##drop all rows with null values just in case
dftm = dftm.na.drop(how='all')

display(dftm)

Just in case. Get rid of Empty Rows

Clean Special Characters

Clean Special Characters in Task

The above was all from Part 5.

Resolve Episode date issues

Currently we have 28/07/2015 and when transforming to date the values are being lost. We need to change the date to 2015-07-28

#Update the date so we can transform it to a date column
from pyspark.sql.functions import col

# Get the Year
dftm_cleaned = dftm_cleaned.withColumn("Year", col("Episode Date").substr(-4, 4))

# Show the resulting DataFrame
dftm_cleaned.show()

Now add in Day

from pyspark.sql.functions import col, substring

# Assuming you have a DataFrame called 'df' with a column named 'Episode Date'
dftm_cleaned = dftm_cleaned.withColumn("day", substring(col("Episode Date"), 1, 2))

# Show the resulting DataFrame
dftm_cleaned.show()

And month

from pyspark.sql.functions import col, substring

# Assuming you have a DataFrame called 'df' with a column named 'Episode Date'
dftm_cleaned = dftm_cleaned.withColumn("month", col("Episode Date").substr(4, 2))

# Show the resulting DataFrame
dftm_cleaned.show()

Merge together and convert to date

from pyspark.sql.functions import concat_ws, col,to_date


# Assuming you have a DataFrame called 'df' with columns 'year', 'month', and 'day'
dftm_cleaned = dftm_cleaned.withColumn("Episode Date", concat_ws("-", col("year"), col("month"), col("day")))
dftm_cleaned = dftm_cleaned.withColumn("Episode Date", to_date("Episode Date", "yyyy-MM-dd"))

# Show the resulting DataFrame
display(dftm_cleaned)

Contestant transformations

we also need to bring through the contestant data which we will join later. This is just one file.

dfc = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/TaskMasterSeriesFiles/Taskmaster/Contestants.csv")

We have our old data set and although we couldn’t turn it into a star schema. there is data that we could use in there to create a more complete data set that can be joined to this data above.

People

#Read the file into a dataframe 
dfp = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/people.csv")

#And rename a couple of columns
dfp  = dfp.withColumnRenamed("id","contestantID").withColumnRenamed("name","contestant")

display(dfp)

In our original people data We have some great stuff here, Age Hand, Seat. We can use these to create some great Power BI visuals. Lets see if we can merge it into our new data

Join Contestant to people data

# Join the extra contestant information
dfcont = dfc.join(dfp, dfc["Name"] == dfp["contestant"], "left_outer").drop(dfp.contestantID).drop(dfp.series)\
.drop(dfp.series_label).drop(dfp.contestant).drop(dfp.team).drop(dfp.team_label).drop(dfp.champion)\
.drop(dfp.TMI)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dfcont )

And lets see if we have any nulls we need to add in.

filtered_df = dfcont.filter(dfcont['seat'].isNull()) 
display(filtered_df)

Just the one.

Name from new Dataset – Asim Chaudry name from old Data set – Asim Chaudhry

This is completely sortable before the join. The name is incorrect in the new data set so lets insert a code block to update before the join

# Assuming you have already created the DataFrame 'df'
from pyspark.sql.functions import when

# Replace "Asim" with "Ashim" in the 'name' column
dfc = dfc.withColumn("Name", when(dfc["Name"] == "Asim Chaudry", "Asim Chaudhry").otherwise(dfc["Name"]))


dfc.show(1000)

Now we can redo the join and have no null values

Create Age – Create Distinct Min Episode Date

Now this time we want to create the age as at the start of their task master series. Not current age.

So we need to create a new data frame consisting of just the contestant and min Series date

# Assuming you have already created the DataFrame 'dftm_cleaned'
from pyspark.sql.functions import min

# Group by "Contestant" and aggregate the minimum "Episode Date"
dfminSeriesCont = dftm_cleaned.groupBy("Contestant").agg(min("Episode Date").alias("min_episode_date"))

# Show the resulting DataFrame
dfminSeriesCont.show()

Note that we have already transformed the date and set as a date field in the dataframe.

Merge Episode Date with Contestant

Now we need to merge into our contestant file

# Join the min_episode_date into contestants
dfcont = dfcont.join(dfminSeriesCont,dfcont["Name"] == dfminSeriesCont["Contestant"], "left_outer").drop(dfminSeriesCont.Contestant)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dfcont)

Add Age

from pyspark.sql.functions import datediff
from pyspark.sql.functions import col

# Convert the date of birth column to a date type in dfCont
dfcont = dfcont.withColumn("dob", dfcont["dob"].cast("date"))

#Calculate age using dob and current date 
dfcontAge = dfcont.withColumn("age_decimal", datediff(dfcont["min_episode_date"], dfcont["dob"]) / 365)

#Convert from decimal to int  
dfcontAge = dfcontAge.withColumn("age", col("age_decimal").cast("int"))

#Convert Contestant ID to int
dfcontAge = dfcontAge.withColumn("contestant ID", col("contestant ID").cast("int"))

display(dfcontAge)

Create an Age Group

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col

# Apply conditional logic
dfcontAge = dfcontAge.withColumn("ageRange", when(col("age") < 20, "0 to 20")
                               .when((col("age") >= 20) & (col("age") <= 30), "20 to 30")
                               .when((col("age") >= 30) & (col("age") <= 40), "30 to 40")
                               .when((col("age") >= 40) & (col("age") <= 50), "40 to 50")
                               .when((col("age") >= 50) & (col("age") <= 60), "50 to 60")
                               .otherwise("Over 60"))

# Show the result
dfcontAge.show()

Save as PARQUET

Save both files as unmanaged PARQUET Files

We don’t need to save as Delta PARQUET because this is the silver layer which will be used to create the Delta PARQUET dim and fact tables.

You can’t directly partition a non-Delta Parquet file, but you can optimize it using V-Order for better performance. V-Ordering is true as default but if you want to check you can always use this code

Save your Taskmaster transformed PARQUET Files

Dim Episode

We can now update the original Notebook to use the transformed silver layer data, and remove all the cleaning that’s now in the notebook we have prepared

from pyspark.sql.functions import input_file_name, regexp_extract

parquet_file = "Files/Data/Silver/taskmasterTransformed.parquet"
dftm = spark.read.parquet(parquet_file)

display(dftm)

Notice the change to the code now we are reading a parquet file.

All the transformation code has been removed and we start at creating the episode Dim

##drop all rows with null values
dftmEp = dftmEp.na.drop(how='all')

display(dftmEp)

Create a Default Row

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a sample DataFrame
data = [(-1, -1,"Not Known")]
columns = ["series", "episode No", "Episode Name"]
new_row  = spark.createDataFrame(data, columns)

# Union the new row with the existing DataFrame
dftmEp = dftmEp.union(new_row)

# Show the updated DataFrame
dftmEp.show(1000)

Create an Episode Key

from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Create a window specification partitioned by "Series" and ordered by "Episode No"
window_spec = Window.orderBy(col("Series"), col("Episode No"))

# Add a new column "EpisodeKey" using row_number() over the specified window
dftmEpKey = dftmEp.withColumn("EpisodeKey", row_number().over(window_spec) - 2)

# Show the result
dftmEpKey.show(1000)

Clean the Column Names

import re

# Select columns with modified names (without special characters)
dftmEpKey_cleaned = dftmEpKey.select(*[col(c).alias(re.sub('[^0-9a-zA-Z]', '', c)) for c in dftmEpKey.columns])

# Show the result
dftmEpKey_cleaned.show(1000)

And save to Delta PARQUET and PARQUET (Just to give us the different examples to play with

Dim Task

In this dimension we want the Task, Task Type and Assignment (Solo or Group)

We can hopefully create this quite quickly in a new notebook

Create a new notebook Dim Task V2

The format is completely the same every time

Get the Task fields

Add a default row

Create the Key

Resolve invalid characters in the column names

Save as Delta PARQUET and PARQUET

Dim Contestant

We now get to the contestant. Create a new Notebook. Dim Contestant V2

Merge Contestant with the Taskmaster main file

from pyspark.sql.functions import col

# Perform the left join
merged_df = dftm.join(dfc, dftm["Contestant"] == dfc["Name"], "left_outer")\
                .select(
                    dfc["Contestant ID"],
                    dftm["Contestant"].alias("Contestant Name"),
                    dftm["Team"],
                    dfc["Image"],
                    dfc["From"],
                    dfc["Area"],
                    dfc["country"].alias("Country"),
                    dfc["seat"].alias("Seat"),
                    dfc["gender"].alias("Gender"),
                    dfc["hand"].alias("Hand"),
                    dfc["age"].alias("Age"),
                    dfc["ageRange"].alias("Age Range"),
                ).distinct()

# Show the resulting DataFrame
merged_df.show()

In the main file. Teams in only populated when its a group task.

Therefore we are getting two records. one with and one without a team so we need to remove rows

dfContfinal = merged_df.dropna(subset=["Team"])

# Show the resulting DataFrame
dfContfinal.show(1000)

Add a Default Row

Create a Contestant Key

Clean Column Names

Alias Column Names

renaming all columns to get past the issues with special characters in columns

Save as Delta PARQUET and PARQUET

Save the File and ensure you have committed source control

At the end of this post, we have all our dimensions (Date was created in a previous blog).

Next post will be creating the central fact table

And better yet, we have had more of a chance to convert all our year of SQL knowledge across to pyspark.

Microsoft Fabric Part 4. Taskmaster Project. Adding the fact table

In parts 1 and 2 we created and updated DimContestant

In Part 3 we created DimTask, DimEpisode and DimDate

Its time to create the fact table. the first thing we need to do is to get an understanding of what Facts we have and how they would join to the data we have created in the dims so far.

Lets create a notebook.

%%configure
{
    "defaultLakehouse": {  
        "name": "DebbiesFabricLakehouse"
    }
}

Adding possible csvs into DataFrames

Attempts

#Read the first file into a dataframe
dfattempts = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/attempts.csv")
# dfattempts now is a Spark DataFrame containing CSV data from "Files/Data/Bronze/attempts.csv".
display(dfattempts)

Contains points and ranks and appears to be at the episode and contestant level

episode_scores

#Read the file into a dataframe
dfepsc = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/episode_scores.csv")

display(dfepsc)

episode_scores is new to this project but seems a really good data set. Holds scores and Ranks at the episode and contestant level (but not at task level)

we actually don’t need this because its just aggregated data. We can remove this from our transformations. Power BI will aggregate for us

episodes

#Read the file into a dataframe
dfepis = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/episodes (1).csv")

display(dfepis)

This contains Dates and points. However the Scores are at Episode Level. We will be able to get this information from DAX, summing the individual Scores so we don’t need the scores at all. All we are interested in is the air date.

Lets look at the row counts

ct = dfepsc.count()
print(f"dfepsc: {ct}")

ct = dfepis.count()
print(f"dfepis: {ct}")

ct = dfattempts.count()
print(f"dfattempts: {ct}")

the number of columns vary. We need to make sense of what we have

Create the data we need from attempts

# Select multiple columns with aliases
dfdfattemptsf = dfattempts.select(
    dfattempts.task.alias('attTaskID'), #Join to DimTask
    dfattempts.task_label.alias('attTask'),
    dfattempts.episode.alias('attEpisode'),
    dfattempts.series.alias('attSeries'),
    dfattempts.contestant_label.alias('attContestant'), #Join to DimContestant
    dfattempts.points.alias('attPoints')
).distinct()

# Show the resulting DataFrame
display(dfdfattemptsf)

The Points in Attempts are the best ones we have at the moment because the Task csv was empty.

we are clearly now hitting real data problems and in a normal project we would be going back to the team creating the data to get them to resolve the problems.

Filter attempts

the attempts is our main table but we could do with the episode and Series information working with our dimensions we have and we only have Ids for both

Are episode and series related with the correct IDs? Lets do some filter queries to look at series 1, episode 1.

After a few filtered queries, there is clearly a some data issues, or unknowns that I am not aware of. With attempts. The episode 1 and series 1 doesn’t match the contestants and tasks (I know because I watched it).

For episodes (1) (Above) we do have the correct information for episode and series. But we aren’t using these to get data.

More problems with Episode Scores. Episode appears to be correct. But the series ID is completely different so we cant match on this. But we aren’t using this data any more.

We need to look at our PARQUET file DImEpisode

The first thing we need to do is Load our DimContestant PARQUET data back into a dataframe and to to this, we have to update the code slightly as previously we have loaded in csv files.

# Read Parquet file using read.parquet()
dfdimep  = spark.read.parquet("Files/Data/Silver/DimEpisode.parquet")
display(dfdimep)

To Get the path, Click on you File in the Files Folder in the Lakehouse and Copy ABFS Path, Copy Relative path for spark gives you a more concise path

We are now using the silver layer of the medallion architecture in the path. All our bronze landing data has been transformed.

Are our IDs correct in the Dim for Attempts, or for the Tasks by Objective csv?

Now we have a real issue. Our Episodes cant join to attempts.

Join with one Condition

We can join episode Scores to Episode first via the episode name? This would be an issue if you have repeating episode names in Series, but we are lucky that this is not the case.

# condition
original_condition = dfepis["title"] == dfepscf["scEpisode"]

# Now perform the left outer join
dfEpScorej =  dfepis.join(dfepscf, on=original_condition, how="full").drop(dfepscf.scEpisode)

display(dfEpScorej)

Filter new transformed table for episode 1 series 1

import pyspark.sql.functions as F

filtered_df = dfEpScorej.filter((F.col('episode') == 1) & (F.col('series') ==1))
display(filtered_df)

Filtering on Task 1 Season 1 gives us all 5 contestants against the episode. There are lots of columns we can remove. For example, we don’t need to hard code the winner. This can be done via Power BI DAX. So, the above join will be slightly updated with .drop.

# condition
original_condition = dfepis["title"] == dfepscf["scEpisode"]

# Now perform the left outer join
dfEpScorej =  dfepis.join(dfepscf, on=original_condition, how="full").drop(dfepscf.scEpisode)\
.drop(dfepscf.scEpisodeID).drop(dfepis.winner).drop(dfepis.winner_label).drop(dfepis.studio_date).drop(dfepis.winner).drop(dfepis.finale).drop(dfepis.TMI)

display(dfEpScorej)


Full is being used as the join because we want each side even if they haven't been joined. 

Use dfEpScorej now instead of dfepis and dfepscf

We have the attempts (dfattemptsf) left

So how can we join this final table?

  • We can’t join by Task because Episodes doesn’t include task
  • We can’t join by Series because the Series IDs don’t match and neither does Episode.
  • We can match by Contestant but Contestants can take part in multiple series so the join doesnt work

As it stands we cant actually match in this table so we need to go with Episodes and Scores for the fact table.

Add the Dimension Keys into the transformed table

Join DimContestants Via Contestant Name

We Can’t use ID but we can use name.

Also when we join we only want the Key and we can immediately remove everything relating to Contestant.

# Original condition
original_condition = dfdimcont["contestant"] == dfEpScorej["sccontestant"]

# Adding another condition based on the 'name' column
#combined_condition = original_condition & (dfdimcont["contestant"] == dfEpScorej["sccontestant"])

# Now perform the left outer join
dfEpScorej = dfEpScorej.join(dfdimcont, on=original_condition, how="left")\
.drop(dfdimcont.TeamName).drop(dfdimcont.teamID).drop(dfdimcont.age).drop(dfdimcont.gender)\
.drop(dfdimcont.hand).drop(dfdimcont.ageRange).drop(dfdimcont.dob)\
.drop(dfdimcont.contestant).drop(dfdimcont.contestantID)\
.drop(dfEpScorej.sccontestant).drop(dfEpScorej.scContestantID)

display(dfEpScorej)

If we have any null values they can be replaced with -1

But first we can check if any exists

dfEpScorej.filter(dfEpScorej.ContestantKey.isNull()).show()

fillna to replace null with a value

Even though we have none at the moment, there could be some in the future so fillna allows us to replace with our Default keys

dfEpScorej = dfEpScorej.fillna(-1, subset=[“ContestantKey”])
dfEpScorej.show()

Add Episode Key

All is good. we can join it into the transformed table. Removing everything but the key using .drop

the data was checked that it was ok before adding all the .drop’s into the code,

# Original condition
original_condition = dfdimep["seriesID"] == dfEpScorej["series"]

# Adding another condition based on the 'name' column
combined_condition = original_condition & (dfdimep["episodeID"] == dfEpScorej["episode"])

# Now perform the left outer join
df = dfEpScorej.join(dfdimep, on=combined_condition, how="left")\
.drop(dfdimep.episodeTitle).drop(dfdimep.episodeID).drop(dfdimep.series).drop(dfdimep.seriesID)\
.drop(dfEpScorej.series_label).drop(dfEpScorej.series).drop(dfEpScorej.episode).drop(dfEpScorej.title)

display(df)

Set Key to -1 if NULL

dfdimtask = dfdimtask.fillna(-1, subset=["taskKey"])
dfdimtask.show()

Conclusion

This specific project has been abandoned because the data doesn’t give me what I need join wise to create the star schema. Mostly because there is clearly a main table with missing data.

But, I can’t wait to take what I have learned and try again with another data set.

Parts 1 to 4 have given me some great insight into analysis transforming and modelling with Pyspark.

Design a site like this with WordPress.com
Get started