Microsoft Fabric Part 16. Taskmaster Project. Update to Taskmaster transformed Notebook for Contestants

In Part 15 we created a delta load of transformed dims and facts in the gold layer of our Fabric Architecture.

The Taskmaster file contains source_filename. e.g. Taskmaster_S1_01092015.csv Which we have used to partition the data.

We also have a transformed Contestants file. Each contestant belongs to a series. the file isn’t big enough to Partition. The Contestants file consists of data that we have prepared and extra data from Kaggle.

However, we have only prepped the data for the first run. We need to add more code for new files, and there are still amendments we can make to make this better.

its also useful to see all the issues along the way and how they are resolved. knowing what can hit you can be really handy when working on a project because you have already seen and dealt with issues

Load a new file into Azure Data Lake

  • Taskmaster season 4 has been loaded
  • Contestants already has season 4 in the data

Taskmaster Transformed Pyspark Notebook

Lets look at the way the new file will be dealt with

The data lake is short-cut to our bronze delta lake. And here we use mssparkutils to list the files. S4 is new.

  • The next code block removes Contestants from the list
  • And the next creates our empty dataframe consisting of filename, processedTime and fullyprocessedFlag
  • Check ProcessedFiles exists. if not create a file with the empty dataframe (Image below)

It’s worth just looking at this code block in more detail as it was recently updated. If the ProcessedFiles file already exists we don’t have to do anything. If it doesn’t exist we create the file using our empty template we created.

The file is already created and we know it contains Series 1, 2 and 3 that we have already added.

  • List files already processed (Image below)

Here we can see that S1 S2 and S3 have been fully processed

  • return only the none processed file (Image below)

And we find anything in the folder that isn’t in our processed Files Parquet file. In this case S4

  • Next we create an empty data frame for the taskmaster data
  • And for every file we process the data into the data frame

Checking the total rows in the S4 file. There are 240 rows (At this point there are more episodes to a series)

  • next the df_log dataframe is created with the Season 4 file name, Our processed file and 0 as the fullyprocessedFlag
  • Transform code. This is detailed in previous posts. Includes adding the Contestants.csv data. Adding in the people data from Kaggle to join with contestants.
  • The data is added to taskmastertransformed files so we only deal with the current set

Error Message when checking the TaskmasterTransformed file

We have hit our first issue

Caused by: org.apache.spark.SparkException: Parquet column cannot be converted in file abfss://986472b4-7316-485c-aa22-128e1cb29544@onelake.dfs.fabric.microsoft.com/ee31b1a4-16bf-4aae-aaab-f130bd4d53c6/Files/Data/Silver/taskmasterTransformed.parquet/source_filename=Taskmaster_S4_13062017.csv/part-00008-e963cfd6-7007-439b-9e0d-63b33eddabb3.c000.snappy.parquet. Column: [Points], Expected: double, Found: INT32.

There is an issue with Points. Expected double. Found INT32. Looking at the data,

This is the only point that isn’t a standard 0 to 5 number. And this should be fine as an integer. Here is where the issue is. Its actually in S1 which we have already processed

Josh did his own solo task and as we can see here, Points are decimal and not an integer. This doesn’t happen often at all. Its usually 1 through to 5. So this is the first time we have issues. To resolve this issue I am going to delete all the files again. remove S4 from the data lake and start again. Ensuring points is double not integer.

The change is here when we create the empty dataframe to append into

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType,TimestampType, DoubleType


# Create a Spark session
spark = SparkSession.builder.appName("CreateEmptyDataFrame").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Series", StringType(), True),
    StructField("Episode No", IntegerType(), True),
    StructField("Episode Name", StringType(), True),
    StructField("Episode Date", DateType(), True),
    StructField("Task Order", IntegerType(), True),
    StructField("Task", StringType(), True),
    StructField("Task Type", StringType(), True),
    StructField("Assignment", StringType(), True),
    StructField("Contestant", StringType(), True),
    StructField("Team", StringType(), True),
    StructField("Points", DoubleType(), True),
    StructField("Winner", IntegerType(), True),
    StructField("source_filename", StringType(), True),
    StructField("source_processedTime", TimestampType(), True)
])

# Create an empty DataFrame with the specified schema
dftm  = spark.createDataFrame([], schema)

# Show the schema of the empty DataFrame
dftm.printSchema()

display(dftm)

Changing to Doubletype is the only change made so far. The data is loaded through to dims and facts. S4 file is added into the datalake and we go again

This time when we get to the code that checks TaskmasterTransformed it works

from pyspark.sql.functions import input_file_name, regexp_extract

workspace_id = "########-####-####-####-############"
lakehouse_id = "########-####-####-####-############"

parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/taskmasterTransformed.parquet")

df_already_processed = spark.read.parquet(parquet_file)

display(df_already_processed)
  • Add the ContestantTransformed to a file and check
  • Add the log to a file and check

Note that we now have S4 logged with the process flag of 0. We continue on to the dims

Updating Dimension Dim Contestant

We bring back the list of unprocessed files

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType,TimestampType, DoubleType()


# Create a Spark session
spark = SparkSession.builder.appName("CreateEmptyDataFrame").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Series", StringType(), True),
    StructField("Episode No", IntegerType(), True),
    StructField("Episode Name", StringType(), True),
    StructField("Episode Date", DateType(), True),
    StructField("Task Order", IntegerType(), True),
    StructField("Task", StringType(), True),
    StructField("Task Type", StringType(), True),
    StructField("Assignment", StringType(), True),
    StructField("Contestant", StringType(), True),
    StructField("Team", StringType(), True),
    StructField("Points", DoubleType(), True),
    StructField("Winner", IntegerType(), True),
    StructField("source_processedTime", TimestampType(), True),
    StructField("Year", StringType(), True),
    StructField("day", StringType(), True),
    StructField("month", StringType(), True),
    StructField("source_filename", StringType(), True)
    
])

# Create an empty DataFrame with the specified schema
dftm  = spark.createDataFrame([], schema)

# Show the schema of the empty DataFrame
dftm.printSchema()

display(dftm)

Again, at this point we create an empty data frame to load into, and Points has been changed to DoubleType

  • Loop through the files that need processing. Adding just the partition from parquet file into a data frame.
  • Check the number of rows
  • Add to CurrentProcessedTMData.parquet so all the other dims and the fact can use this.
  • Get Contestant data and get the correct series from this data to match to taskmaster partition
  • Now filter to only the contestants we are working with
  • Merge Contestants and Taskmaster data
  • If the delta parquet file doesn’t exist. Add the default row of -1. Else do nothing. I had made an error in Part 15 . It was pointing towards the silver lakehouse files when it should have been pointing to gold tables. (See correct code Change below)

If New Create -1 Row. Else do nothing

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import Row


workspace_id = "########-####-####-####-############"
lakehouse_id = "########-####-####-####-############"

path_to_parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/dimcontestant")

# Check if the Delta Parquet file exists
delta_file_exists = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()).exists(spark._jvm.org.apache.hadoop.fs.Path(path_to_parquet_file))

if delta_file_exists:
    print("Delta Parquet file exists. We don't need to add the NA row")
else:

    # Add a Default row
    # Create a sample DataFrame
    data = [(-1, "Not Known","Not Known","Not Known","Not Known","Not Known","Not Known","0","NA","0","0","0")]
    columns = ["Contestant ID", "Contestant Name", "Team","Image","From","Area","Country","Seat","Gender","Hand","Age","Age Range"]
    new_row  = spark.createDataFrame(data, columns)

    # Union the new row with the existing DataFrame
    dfContfinal = dfContfinal.union(new_row)

    # Show the updated DataFrame
    dfContfinal.show(1000)

    print("Delta Parquet file does not exist. Add NA Row")

The only bit of code changed here is the path_to_parquet File.

New Code. If Row -1 doesn’t exist, get the max Key +1

We created the following code in the last post

from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number


# Process the DataFrame based on ContestantID
if dfContfinal.filter(dfContfinal["Contestant ID"] == -1).count() > 0:

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Contestant ID"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dfContfinalKey = dfContfinal.withColumn("ContestantKey", row_number().over(window_spec) - 2)

    # Show the result
    dfContfinalKey.show(1000)

    print("Data Contains Default row. No data Previously processed")
    
else:
    
    print("Data Contains No Default row. Data Previously processed")

So, if this is a new run. we will create the default -1 row, If we are adding to the data already there, we will already have a default row in the data. We won’t have created the -1 row. this time. We need to get the last Key and + 1.

We already have the code that created a Key from -1 if its new data in the if clause. We now need the else.

else:

    # Load data from the Delta table
    dfdimc = spark.read.format("delta").load("abfss://########-####-####-####-############@onelake.dfs.fabric.microsoft.com/########-####-####-####-############/Tables/dimcontestant")
     
   # Get the maximum value of ContestantKey
    max_contestant_key = dfdimc.agg({"ContestantKey": "max"}).collect()[0][0] 

    #Now, Take his number and create my Keys after the last key +1
    start_value = max_contestant_key+1

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Contestant ID"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dfContfinalKey = dfContfinal.withColumn("ContestantKey", row_number().over(window_spec) + start_value - 1)
    
    # Show the DataFrame
    dfContfinalKey.show()
    
    print("Data Contains No Default row. Data Previously processed")
  • So here he bring back the data from Dim Contestant.
  • We get the max key
  • create the start value by adding +1
  • Then create the window ordering by Contestant ID
  • Add in the Key using the Window_spec and using our Start_Value to increment from the last item in the dimension. In this case we are starting from ID 15
  • Append to the Delta parquet file

Error when appending into Parquet File

When running this code

from delta.tables import DeltaTable

#You can also add the none default data lake by clicking +Lakehouse
aliased_df.write.mode("append").option("overwriteSchema", "true").format("delta").saveAsTable("GoldDebbiesTraininglh.dimContestant")

AnalysisException: Failed to merge fields ‘ContestantID’ and ‘ContestantID’. Failed to merge incompatible data types LongType and IntegerType

We can test this by looking at the schemas.

This is correct. What about our original data?

So our original data was added to delta file as long. integers go up to 2,147,483,647, there seems to be no reason to store as long.

Because this is just a test, the decision is made to once again go back to the start and ensure that the data is integer from the first instance.

Remove all the files and tables from Gold and Silver. And Season 4 from the data lake. to go through the process again.

And whilst updating. we can also sort out the issues above

Taskmaster Transformed. No Updates.

Dim Contestant V2

Where is our integer becoming long and can we hit it at that point?

merged_df has an integer but it becomes long here when we get rid of any null team records.

# Cast ContestantID to IntegerType
dfContfinal = dfContfinal.withColumn("Contestant ID", dfContfinal["Contestant ID"].cast("int"))

# Verify the schema to ensure the change
dfContfinal.printSchema()

However, it also happens again at dfContFinal so we can actually move the above code block to after here (And then keep checking the schema)

This appears to be the last time that the ID gets changed to Long.

This shows the importance of checking your schema, especially when appending data into files.

Dim Episode V2

The current only change here to point to the correct gold tables

Also checked the schema to ensure nothing was long

The same thing is happening here so we need some code before we create the delta parquet to sort this out.

Dim Task V2

Same change again to point to the Gold Delta Table and not silver files.

And reset long to int

Taskmaster Fact V2

No changes

We now are at the point where we can add in Series 4, run the taskmaster transformed notebook and go back to Dim Contestant V2

Unfortunately, After the update, Another error has occurred in Dim Contestant

Age Error. Age has changed to String

AnalysisException: Failed to merge fields ‘Age’ and ‘Age’. Failed to merge incompatible data types StringType and IntegerType

So we repeat the process, again. and add more code to change Age back to Int.

we basically need to repeat the process until every error in contestant is resolved. Thankfully, this is the last problem with Dim contestant

Dim Episode V2

Create Episode Key

from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Process the DataFrame based on ContestantID
if dftmEp.filter(dftmEp["Episode No"] == -1).count() > 0:

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Series"), col("Episode No"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dftmEpKey = dftmEp.withColumn("EpisodeKey", row_number().over(window_spec) - 2)

    # Show the result
    dftmEpKey.show(1000)

    print("Data Contains Default row. No data Previously processed. Add Key from -1")
    
else:

     # Load data from the Delta table
    dfdime = spark.read.format("delta").load("abfss://986472b4-7316-485c-aa22-128e1cb29544@onelake.dfs.fabric.microsoft.com/ce55b91c-ed51-4885-9440-378a3c18450b/Tables/dimepisode")
     
    # Get the maximum value of ContestantKey
    max_episode_key = dfdime.agg({"EpisodeKey": "max"}).collect()[0][0] 

    #Now, Take his number and create my Keys after the last key +1
    start_value = max_episode_key+1

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Series"), col("Episode No"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dftmEpKey = dftmEp.withColumn("EpisodeKey", row_number().over(window_spec) + start_value - 1)
    
    # Show the DataFrame
    dftmEpKey.show()
    
    print("Data Contains No Default row. Data Previously processed. Add Key to continue sequence")  

Added the else to our Key creation code block.

Dim Task V2

Create Key

from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Process the DataFrame based on ContestantID
if dftmTsk.filter(dftmTsk["Task Order"] == -1).count() > 0:

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Task Order"), col("Task"))

    # Add a new column "TaskKey" using row_number() over the specified window
    dftmTskKey = dftmTsk.withColumn("TaskKey", row_number().over(window_spec) - 2)

    # Show the result
    dftmTskKey.show(1000)

    print("Data Contains Default row. No data Previously processed")
    
else:
     # Load data from the Delta table
    dfdimt = spark.read.format("delta").load("abfss://986472b4-7316-485c-aa22-128e1cb29544@onelake.dfs.fabric.microsoft.com/ce55b91c-ed51-4885-9440-378a3c18450b/Tables/dimtask")
     
    # Get the maximum value of ContestantKey
    max_task_key = dfdimt.agg({"TaskKey": "max"}).collect()[0][0] 

    #Now, Take his number and create my Keys after the last key +1
    start_value = max_task_key+1

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Task Order"), col("Task"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dftmTskKey = dftmTsk.withColumn("TaskKey", row_number().over(window_spec) + start_value - 1)
    
    # Show the DataFrame
    dftmTskKey.show()
    
    print("Data Contains No Default row. Data Previously processed. Add Key to continue sequence")      

The else has been added to this code block.

Conclusion

After running the fact table again, everything is successful.

Season 4 is set as fully processed.

We should now be able to repeat the process across all the other taskmaster series until completion.

We are now ready to add the notebooks into a Pipeline in the next blog post, so we can set up automated processing. And then move back to Power BI reporting.

The only step not yet set up is the Data Engineering side where we would usually automate the process of pulling though the data. Later on, we might have a think about how we can set something up to mimic a proper business scenario where the data is dropped for us to process.

Design a site like this with WordPress.com
Get started