Microsoft Fabric Part 17. Taskmaster Project. The Pipeline. Run multiple notebooks from one Notebook activity using the foreach activity

In Part 16 we processed Series 4 of Taskmaster. we had a lot of issues along the way to resolve.

Now we can create a pipeline to process all of the Notebooks we have created. We will start as simple as possible and try and build in better processes as we go along.

New Data Pipeline

Back in the Fabric workspace. Create the first Data Pipeline

Add activity: Notebook 1 Transformed

Click on Notebook to create the first Notebook

Its very simple at the moment. There are no parameters and nothing else to set.

Now for the next Notebook

Create an on Success connection between Transformed Silver and Dim Contestant.

Now we can set up the rest of the pipeline

This is as basic as it gets and it all goes well, this will be fine. However there are some things we haven’t dealt with

  • We have successes. But what happens if anything fails?
  • Each one of these is a Notebook. Imagine having lots of these here. Is there a way of having one Notebook that processes each notebook iteratively? This would clean up the pipeline
  • It would be super useful to record the metrics. What has been processed. How many rows etc. Can we get metrics from the run?

Iterate through Each Notebook

Lets create our first Pipeline post to process each notebook using just one Notebook activity.

In the first instance. We are going to add series 5 into the Data Lake. Technically, each Notebook should run and do nothing because there is no data to add

Create a csv file

Also get the IDs of the workspace and the notebooks. These were taken from the Fabric url’s. e.g.

And add this file into the bronze delta lake

Now we should be able to use this information in the pipeline

Create a lookup

In the pipeline we need a Lookup activity to get the information from the JSON file

And we can preview the data

Add a ForEach Activity

Drag and drop a ForEach activity onto your pipeline canvas and create an On Success Relationship between this and the Lookup.

Sequential is ticked because there are multiple rows for each notebook and we want to move through them sequentially.

Set the Items in Settings by clicking to get to the pipeline expression builder

We are using the output.value of our lookup activity.

@activity(‘GetNotebookNames’).output.value

  • @activity refers to the actual activity we are using. GetNotebookNames
  • .output accesses the output of the activity
  • .value gets the specific value of the output.

Configure the Notebook Activity Inside ForEach

Inside the Foreach. Add a Notebook

Again, click to use the Dynamic content expression builder to build

Workspace ID: @item().workspaceID

Notebook ID: @item().notebookID

Note, when you first set this up you see Workspace and Notebook Name. It changes to ID, I think because we are using the item() but this can be confusing.

This is the reason ID has been added into the csv file. But we still wanted the names in the file in order to better understand what is going on.

  • @item(): This refers to the current item in the iteration which is a row. When you’re looping through a collection of items, @item() represents each individual item as the loop progresses.
  • .notebookID: This accesses the notebookID property of the current item. And the notebookID is a column in the csv file

Running the pipeline

You can check the inputs and outputs of each activity.

If it fails you can also open the failure information.

Monitor

Click Monitor to get more information

Here you can see a run before certain issues were resolved.

Click on the failed text bubble to get to the details. I had added the wrong column name.

Go back to the Pipeline.

We ran with no new file. It would be useful at this point to create some SQL to quickly check the data manually. We also need to add this new Pipeline activity into our medallion architecture

Medallion Architecture task Flow

In the Fabric Workspace.

Check using SQL endpoint on the data

Technically, there should be only up to season 4 still. it would be nice to have some SQL that would quickly tell us what has happened to the data.

You can only use SQL on Delta Parquet files so we can’t look at our parquet Processedfile. this may be a reason to change this to a Delta parquet file in future so we can use SQL to check information easily.

From our Workspace, go into our gold lake.

We now want to work with the SQL analytics endpoint

Choose New SQL Query and name Check Dim and Facts.

these are the queries that have been created to check our data

--CONTESTANTS
--Make sure there are no duplicate contestants
SELECT ContestantID, ContestantName, COUNT(*)  As total FROM [GoldDebbiesTraininglh].[dbo].[dimcontestant]
Group BY ContestantID, ContestantName
Order by ContestantID
--Get the total of rows (Minus the default row) and divide by 5, which should tell you how manyseries of contestants we have
SELECT (Count(*)-1)/5 AS totalSeries FROM [GoldDebbiesTraininglh].[dbo].[dimcontestant]

4 Series. this is correct

--EPISODES
--Again, make sure we don't have duplicate records
Select Series, EpisodeName, Count(*) as Total 
from [GoldDebbiesTraininglh].[dbo].[dimepisode]
GROUP BY Series, EpisodeName
Order by Count(*) desc
--And get a distinct list of series
SELECT Distinct Series FROM [GoldDebbiesTraininglh].[dbo].[dimepisode]
Where Series <> '-1'
Order by Series
--TASKS
--Check we have no duplicates
SELECT Task, TaskOrder, COUNT(*) as Total FROM [GoldDebbiesTraininglh].[dbo].[dimtask]
GROUP BY Task, TaskOrder
Order by Count(*) Desc
--Check the number of rows per series via the fact table
SELECT e.Series, COUNT(*)AS Total FROM [GoldDebbiesTraininglh].[dbo].[facttaskmaster] f
INNER JOIN [GoldDebbiesTraininglh].[dbo].[dimepisode] e on f.EpisodeKey = e.EpisodeKey
GROUP BY e.Series
Order by COUNT(*) Desc

Now we can see a problem. Series one 2 and 3 are quite small. But here they have way more rows than S4. Something has gone wrong.

We need to add more into the scripts to ensure that we can identify specific issues. Like what were the original number of rows etc. It is only because we can remember that 240 was the original row number for S4 that we know S4 is correct. We need to identify this quickly and specifically.

Lets do more checking.

--And check the  duplicates with fact as the central table
SELECT d.date As SeriesStartDate, e.Series,e.EpisodeName, c.ContestantName, t.Task, t.TaskOrder, t.TaskType,  f.Points, COUNT(*) AS Total
FROM [GoldDebbiesTraininglh].[dbo].[facttaskmaster] f 
INNER JOIN [GoldDebbiesTraininglh].[dbo].[dimcontestant] c on f.ContestantKey = c.ContestantKey
INNER JOIN [GoldDebbiesTraininglh].[dbo].[dimepisode] e on f.EpisodeKey = e.EpisodeKey
INNER JOIN [GoldDebbiesTraininglh].[dbo].[dimdate] d on d.DateKey = f.SeriesStartDateKey
INNER JOIN [GoldDebbiesTraininglh].[dbo].[dimtask] t on t.TaskKey = f.TaskKey
Where  e.Series = 'S1'
GROUP BY d.date, e.Series,e.EpisodeName, c.ContestantName, t.Task, t.TaskOrder, t.TaskType,  f.Points
Order by COUNT(*) DESC

There should only be one row per Contestant,

But each has 4. the data has been added 4 times

Changing the Where criteria to S2, S3 and S4. There is 1 row for each for S4. S2 S3 have 4 rows. we know they were added along with S1. So now we have to understand why this is.

To do this we could do with some meta data in the Delta Parquet tables of the date and time the information was added.

Conclusion

We now have a pipeline that iterates through our Notebooks and we have discovered some issues along the way.

In the next blog post we are going to do some updates to the Notebooks and check why Season 1 has been added 4 times. then we can move on to our next Pipeline updates. recording more meta data about the runs

Microsoft Fabric Part 16. Taskmaster Project. Update to Taskmaster transformed Notebook for Contestants

In Part 15 we created a delta load of transformed dims and facts in the gold layer of our Fabric Architecture.

The Taskmaster file contains source_filename. e.g. Taskmaster_S1_01092015.csv Which we have used to partition the data.

We also have a transformed Contestants file. Each contestant belongs to a series. the file isn’t big enough to Partition. The Contestants file consists of data that we have prepared and extra data from Kaggle.

However, we have only prepped the data for the first run. We need to add more code for new files, and there are still amendments we can make to make this better.

its also useful to see all the issues along the way and how they are resolved. knowing what can hit you can be really handy when working on a project because you have already seen and dealt with issues

Load a new file into Azure Data Lake

  • Taskmaster season 4 has been loaded
  • Contestants already has season 4 in the data

Taskmaster Transformed Pyspark Notebook

Lets look at the way the new file will be dealt with

The data lake is short-cut to our bronze delta lake. And here we use mssparkutils to list the files. S4 is new.

  • The next code block removes Contestants from the list
  • And the next creates our empty dataframe consisting of filename, processedTime and fullyprocessedFlag
  • Check ProcessedFiles exists. if not create a file with the empty dataframe (Image below)

It’s worth just looking at this code block in more detail as it was recently updated. If the ProcessedFiles file already exists we don’t have to do anything. If it doesn’t exist we create the file using our empty template we created.

The file is already created and we know it contains Series 1, 2 and 3 that we have already added.

  • List files already processed (Image below)

Here we can see that S1 S2 and S3 have been fully processed

  • return only the none processed file (Image below)

And we find anything in the folder that isn’t in our processed Files Parquet file. In this case S4

  • Next we create an empty data frame for the taskmaster data
  • And for every file we process the data into the data frame

Checking the total rows in the S4 file. There are 240 rows (At this point there are more episodes to a series)

  • next the df_log dataframe is created with the Season 4 file name, Our processed file and 0 as the fullyprocessedFlag
  • Transform code. This is detailed in previous posts. Includes adding the Contestants.csv data. Adding in the people data from Kaggle to join with contestants.
  • The data is added to taskmastertransformed files so we only deal with the current set

Error Message when checking the TaskmasterTransformed file

We have hit our first issue

Caused by: org.apache.spark.SparkException: Parquet column cannot be converted in file abfss://986472b4-7316-485c-aa22-128e1cb29544@onelake.dfs.fabric.microsoft.com/ee31b1a4-16bf-4aae-aaab-f130bd4d53c6/Files/Data/Silver/taskmasterTransformed.parquet/source_filename=Taskmaster_S4_13062017.csv/part-00008-e963cfd6-7007-439b-9e0d-63b33eddabb3.c000.snappy.parquet. Column: [Points], Expected: double, Found: INT32.

There is an issue with Points. Expected double. Found INT32. Looking at the data,

This is the only point that isn’t a standard 0 to 5 number. And this should be fine as an integer. Here is where the issue is. Its actually in S1 which we have already processed

Josh did his own solo task and as we can see here, Points are decimal and not an integer. This doesn’t happen often at all. Its usually 1 through to 5. So this is the first time we have issues. To resolve this issue I am going to delete all the files again. remove S4 from the data lake and start again. Ensuring points is double not integer.

The change is here when we create the empty dataframe to append into

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType,TimestampType, DoubleType


# Create a Spark session
spark = SparkSession.builder.appName("CreateEmptyDataFrame").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Series", StringType(), True),
    StructField("Episode No", IntegerType(), True),
    StructField("Episode Name", StringType(), True),
    StructField("Episode Date", DateType(), True),
    StructField("Task Order", IntegerType(), True),
    StructField("Task", StringType(), True),
    StructField("Task Type", StringType(), True),
    StructField("Assignment", StringType(), True),
    StructField("Contestant", StringType(), True),
    StructField("Team", StringType(), True),
    StructField("Points", DoubleType(), True),
    StructField("Winner", IntegerType(), True),
    StructField("source_filename", StringType(), True),
    StructField("source_processedTime", TimestampType(), True)
])

# Create an empty DataFrame with the specified schema
dftm  = spark.createDataFrame([], schema)

# Show the schema of the empty DataFrame
dftm.printSchema()

display(dftm)

Changing to Doubletype is the only change made so far. The data is loaded through to dims and facts. S4 file is added into the datalake and we go again

This time when we get to the code that checks TaskmasterTransformed it works

from pyspark.sql.functions import input_file_name, regexp_extract

workspace_id = "########-####-####-####-############"
lakehouse_id = "########-####-####-####-############"

parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/taskmasterTransformed.parquet")

df_already_processed = spark.read.parquet(parquet_file)

display(df_already_processed)
  • Add the ContestantTransformed to a file and check
  • Add the log to a file and check

Note that we now have S4 logged with the process flag of 0. We continue on to the dims

Updating Dimension Dim Contestant

We bring back the list of unprocessed files

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType,TimestampType, DoubleType()


# Create a Spark session
spark = SparkSession.builder.appName("CreateEmptyDataFrame").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Series", StringType(), True),
    StructField("Episode No", IntegerType(), True),
    StructField("Episode Name", StringType(), True),
    StructField("Episode Date", DateType(), True),
    StructField("Task Order", IntegerType(), True),
    StructField("Task", StringType(), True),
    StructField("Task Type", StringType(), True),
    StructField("Assignment", StringType(), True),
    StructField("Contestant", StringType(), True),
    StructField("Team", StringType(), True),
    StructField("Points", DoubleType(), True),
    StructField("Winner", IntegerType(), True),
    StructField("source_processedTime", TimestampType(), True),
    StructField("Year", StringType(), True),
    StructField("day", StringType(), True),
    StructField("month", StringType(), True),
    StructField("source_filename", StringType(), True)
    
])

# Create an empty DataFrame with the specified schema
dftm  = spark.createDataFrame([], schema)

# Show the schema of the empty DataFrame
dftm.printSchema()

display(dftm)

Again, at this point we create an empty data frame to load into, and Points has been changed to DoubleType

  • Loop through the files that need processing. Adding just the partition from parquet file into a data frame.
  • Check the number of rows
  • Add to CurrentProcessedTMData.parquet so all the other dims and the fact can use this.
  • Get Contestant data and get the correct series from this data to match to taskmaster partition
  • Now filter to only the contestants we are working with
  • Merge Contestants and Taskmaster data
  • If the delta parquet file doesn’t exist. Add the default row of -1. Else do nothing. I had made an error in Part 15 . It was pointing towards the silver lakehouse files when it should have been pointing to gold tables. (See correct code Change below)

If New Create -1 Row. Else do nothing

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import Row


workspace_id = "########-####-####-####-############"
lakehouse_id = "########-####-####-####-############"

path_to_parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/dimcontestant")

# Check if the Delta Parquet file exists
delta_file_exists = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()).exists(spark._jvm.org.apache.hadoop.fs.Path(path_to_parquet_file))

if delta_file_exists:
    print("Delta Parquet file exists. We don't need to add the NA row")
else:

    # Add a Default row
    # Create a sample DataFrame
    data = [(-1, "Not Known","Not Known","Not Known","Not Known","Not Known","Not Known","0","NA","0","0","0")]
    columns = ["Contestant ID", "Contestant Name", "Team","Image","From","Area","Country","Seat","Gender","Hand","Age","Age Range"]
    new_row  = spark.createDataFrame(data, columns)

    # Union the new row with the existing DataFrame
    dfContfinal = dfContfinal.union(new_row)

    # Show the updated DataFrame
    dfContfinal.show(1000)

    print("Delta Parquet file does not exist. Add NA Row")

The only bit of code changed here is the path_to_parquet File.

New Code. If Row -1 doesn’t exist, get the max Key +1

We created the following code in the last post

from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number


# Process the DataFrame based on ContestantID
if dfContfinal.filter(dfContfinal["Contestant ID"] == -1).count() > 0:

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Contestant ID"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dfContfinalKey = dfContfinal.withColumn("ContestantKey", row_number().over(window_spec) - 2)

    # Show the result
    dfContfinalKey.show(1000)

    print("Data Contains Default row. No data Previously processed")
    
else:
    
    print("Data Contains No Default row. Data Previously processed")

So, if this is a new run. we will create the default -1 row, If we are adding to the data already there, we will already have a default row in the data. We won’t have created the -1 row. this time. We need to get the last Key and + 1.

We already have the code that created a Key from -1 if its new data in the if clause. We now need the else.

else:

    # Load data from the Delta table
    dfdimc = spark.read.format("delta").load("abfss://########-####-####-####-############@onelake.dfs.fabric.microsoft.com/########-####-####-####-############/Tables/dimcontestant")
     
   # Get the maximum value of ContestantKey
    max_contestant_key = dfdimc.agg({"ContestantKey": "max"}).collect()[0][0] 

    #Now, Take his number and create my Keys after the last key +1
    start_value = max_contestant_key+1

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Contestant ID"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dfContfinalKey = dfContfinal.withColumn("ContestantKey", row_number().over(window_spec) + start_value - 1)
    
    # Show the DataFrame
    dfContfinalKey.show()
    
    print("Data Contains No Default row. Data Previously processed")
  • So here he bring back the data from Dim Contestant.
  • We get the max key
  • create the start value by adding +1
  • Then create the window ordering by Contestant ID
  • Add in the Key using the Window_spec and using our Start_Value to increment from the last item in the dimension. In this case we are starting from ID 15
  • Append to the Delta parquet file

Error when appending into Parquet File

When running this code

from delta.tables import DeltaTable

#You can also add the none default data lake by clicking +Lakehouse
aliased_df.write.mode("append").option("overwriteSchema", "true").format("delta").saveAsTable("GoldDebbiesTraininglh.dimContestant")

AnalysisException: Failed to merge fields ‘ContestantID’ and ‘ContestantID’. Failed to merge incompatible data types LongType and IntegerType

We can test this by looking at the schemas.

This is correct. What about our original data?

So our original data was added to delta file as long. integers go up to 2,147,483,647, there seems to be no reason to store as long.

Because this is just a test, the decision is made to once again go back to the start and ensure that the data is integer from the first instance.

Remove all the files and tables from Gold and Silver. And Season 4 from the data lake. to go through the process again.

And whilst updating. we can also sort out the issues above

Taskmaster Transformed. No Updates.

Dim Contestant V2

Where is our integer becoming long and can we hit it at that point?

merged_df has an integer but it becomes long here when we get rid of any null team records.

# Cast ContestantID to IntegerType
dfContfinal = dfContfinal.withColumn("Contestant ID", dfContfinal["Contestant ID"].cast("int"))

# Verify the schema to ensure the change
dfContfinal.printSchema()

However, it also happens again at dfContFinal so we can actually move the above code block to after here (And then keep checking the schema)

This appears to be the last time that the ID gets changed to Long.

This shows the importance of checking your schema, especially when appending data into files.

Dim Episode V2

The current only change here to point to the correct gold tables

Also checked the schema to ensure nothing was long

The same thing is happening here so we need some code before we create the delta parquet to sort this out.

Dim Task V2

Same change again to point to the Gold Delta Table and not silver files.

And reset long to int

Taskmaster Fact V2

No changes

We now are at the point where we can add in Series 4, run the taskmaster transformed notebook and go back to Dim Contestant V2

Unfortunately, After the update, Another error has occurred in Dim Contestant

Age Error. Age has changed to String

AnalysisException: Failed to merge fields ‘Age’ and ‘Age’. Failed to merge incompatible data types StringType and IntegerType

So we repeat the process, again. and add more code to change Age back to Int.

we basically need to repeat the process until every error in contestant is resolved. Thankfully, this is the last problem with Dim contestant

Dim Episode V2

Create Episode Key

from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Process the DataFrame based on ContestantID
if dftmEp.filter(dftmEp["Episode No"] == -1).count() > 0:

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Series"), col("Episode No"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dftmEpKey = dftmEp.withColumn("EpisodeKey", row_number().over(window_spec) - 2)

    # Show the result
    dftmEpKey.show(1000)

    print("Data Contains Default row. No data Previously processed. Add Key from -1")
    
else:

     # Load data from the Delta table
    dfdime = spark.read.format("delta").load("abfss://986472b4-7316-485c-aa22-128e1cb29544@onelake.dfs.fabric.microsoft.com/ce55b91c-ed51-4885-9440-378a3c18450b/Tables/dimepisode")
     
    # Get the maximum value of ContestantKey
    max_episode_key = dfdime.agg({"EpisodeKey": "max"}).collect()[0][0] 

    #Now, Take his number and create my Keys after the last key +1
    start_value = max_episode_key+1

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Series"), col("Episode No"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dftmEpKey = dftmEp.withColumn("EpisodeKey", row_number().over(window_spec) + start_value - 1)
    
    # Show the DataFrame
    dftmEpKey.show()
    
    print("Data Contains No Default row. Data Previously processed. Add Key to continue sequence")  

Added the else to our Key creation code block.

Dim Task V2

Create Key

from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Process the DataFrame based on ContestantID
if dftmTsk.filter(dftmTsk["Task Order"] == -1).count() > 0:

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Task Order"), col("Task"))

    # Add a new column "TaskKey" using row_number() over the specified window
    dftmTskKey = dftmTsk.withColumn("TaskKey", row_number().over(window_spec) - 2)

    # Show the result
    dftmTskKey.show(1000)

    print("Data Contains Default row. No data Previously processed")
    
else:
     # Load data from the Delta table
    dfdimt = spark.read.format("delta").load("abfss://986472b4-7316-485c-aa22-128e1cb29544@onelake.dfs.fabric.microsoft.com/ce55b91c-ed51-4885-9440-378a3c18450b/Tables/dimtask")
     
    # Get the maximum value of ContestantKey
    max_task_key = dfdimt.agg({"TaskKey": "max"}).collect()[0][0] 

    #Now, Take his number and create my Keys after the last key +1
    start_value = max_task_key+1

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Task Order"), col("Task"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dftmTskKey = dftmTsk.withColumn("TaskKey", row_number().over(window_spec) + start_value - 1)
    
    # Show the DataFrame
    dftmTskKey.show()
    
    print("Data Contains No Default row. Data Previously processed. Add Key to continue sequence")      

The else has been added to this code block.

Conclusion

After running the fact table again, everything is successful.

Season 4 is set as fully processed.

We should now be able to repeat the process across all the other taskmaster series until completion.

We are now ready to add the notebooks into a Pipeline in the next blog post, so we can set up automated processing. And then move back to Power BI reporting.

The only step not yet set up is the Data Engineering side where we would usually automate the process of pulling though the data. Later on, we might have a think about how we can set something up to mimic a proper business scenario where the data is dropped for us to process.

Microsoft Fabric Part 15. Taskmaster Project. Creating a Delta Load on Dims and Facts

In Part 14 we created a delta load of transformed data in the Silver layer of our Fabric architecture.

We now want to try and delta load the dimensions and the fact table

Our data is always new. It comes in and then doesn’t change so we don’t need to worry about updates. Just new data.

Before continuing, Delete all the dims and facts from the gold lakehouse created in previous blogs. We are going to recreate them.

Dimensions Dim contestant V2

We are going to process the dimension in a delta load. Currently this is at series level.

Questions.

  • Is there anything in the Data that needs to be dealt with all in one. For example, updating age in the entire dataset, against current date? No. there is nothing that needs to be fully updated.
  • How do we deal with data Warehouse Keys? We will look at this later.

Bring back the list of Processed files, that we are currently working on

parquet_file = "Files/Data/Silver/Log/ProcessedFiles.parquet"
dflog = spark.read.parquet(parquet_file)

df_currentProcess = dflog[dflog["fullyProcessedFlag"] == 0][["Filename"]]


display(df_currentProcess)

We only need the name, because we are going to feed the name into a loop.

Once everything is done. We will set fullProcessedFlag = 1. This way we will always know what we are currently working on. Now we want to bring back the partition(s) that match

Create an empty data frame to process into.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType,TimestampType


# Create a Spark session
spark = SparkSession.builder.appName("CreateEmptyDataFrame").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Series", StringType(), True),
    StructField("Episode No", IntegerType(), True),
    StructField("Episode Name", StringType(), True),
    StructField("Episode Date", DateType(), True),
    StructField("Task Order", IntegerType(), True),
    StructField("Task", StringType(), True),
    StructField("Task Type", StringType(), True),
    StructField("Assignment", StringType(), True),
    StructField("Contestant", StringType(), True),
    StructField("Team", StringType(), True),
    StructField("Points", IntegerType(), True),
    StructField("Winner", IntegerType(), True),
    StructField("source_processedTime", TimestampType(), True),
    StructField("Year", StringType(), True),
    StructField("day", StringType(), True),
    StructField("month", StringType(), True),
    StructField("source_filename", StringType(), True)
    
])

# Create an empty DataFrame with the specified schema
dftm  = spark.createDataFrame([], schema)

# Show the schema of the empty DataFrame
dftm.printSchema()

display(dftm)

Loop through the files that need to be processed

parquet_file_path = "Files/Data/Silver/taskmasterTransformed.parquet"
partition_column = "source_filename"

from pyspark.sql.functions import col

# Loop through the dataframe which consist of the filenames
for row in df_currentProcess.rdd.collect(): # Collecting to driver (local) as a list of Rows 

    # Extract filename from the current row
    filename = row["Filename"]

    # Read the current parquet file partition into a dataframe
    dfnf = spark.read.parquet(parquet_file_path).filter(col(partition_column)==filename)
    
    dftm = dftm.union(dfnf) 

    display(dftm)  

Here, because we have the parquet file partitioned we can use .filter(col(partition_column)==filename) on the parquet file to only bring through what we are going to process.

parquet_File_Path and partition_column parameters are created. Both refer to the file we are going to bring through into the data frame.

We then use the ‘for row in‘ the df_currentProcess (Which is the dataframe containing the ready to process file names. For every one of these filenames we add the partition of taskmaster data to the dataframe.

so, next time we process, there will only be one year of data to add into the dims and fact tables.

Save to Parquet File

We also know that we are going to use this transformed data set for every single dim and fact table. So for every Dim Notebook we will have to repeat this process.

we always know that repeating is a No no in development so we can go one further. Add this DataFrame to a Parquet file that we can reuse for every other item in this process. It can be a simple Parquet file and will be overwritten every time.

workspace_id = "986472b4-7316-485c-aa22-128e1cb29544"
lakehouse_id = "ee31b1a4-16bf-4aae-aaab-f130bd4d53c6"

dftm.write.mode("overwrite").parquet(f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/CurrentProcessedTMData.parquet")

# Print a success message
print("Parquet file overwritten successfully.")

Update ContestantTransformed to work with the current set

Contestant file isn’t a big enough file to Partition. we can always bring it all through without any issues. We may want to change this in future but for this. We can bring through the entire data set into the dataframe, as in the previous project

To make the change we can then filter down to the correct contestant set.

We have a series_label column. E.g. S1, C2, CoC. And we can get this information out of the df_currentProcess Filename Column Taskmaster_S1_01092015.csv

from pyspark.sql import SparkSession, functions as F

df_small = F.broadcast(dfserieslabel)

# Join the extra contestant information
dfc = df_small.join(dfc, df_small["series_label"] == dfc["series_label"], "inner").drop(df_small.Filename)\
.drop(df_small.series_label)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dfc)

Test the updated filtered Contestants dataframe

from pyspark.sql.functions import count

result_df = dfc.groupBy("series_label").agg(count("*").alias("Total"))
   
display(result_df)

Correct. There are 5 contestants to a series.

Continue with the transformations from Previous blog Posts

  • Merge Contestants and taskmaster data to only bring back Contestant information using SELECT
  • Drop any without a team. these are the hosts.
  • Check for Duplicates (Now we only do the series we are working on. These shouldn’t be an issue

Add a Default Row

  • Add a Default row, However, we should already have a default row if the processing has already started. so this will need additional code
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import Row


workspace_id = "986472b4-7316-485c-aa22-128e1cb29544"
lakehouse_id = "ce55b91c-ed51-4885-9440-378a3c18450b"
path_to_parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/dimContestant.parquet")

# Check if the Delta Parquet file exists
delta_file_exists = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()).exists(spark._jvm.org.apache.hadoop.fs.Path(path_to_parquet_file))

if delta_file_exists:
    print("Delta Parquet file exists.")
else:

    # Add a Default row
    # Create a sample DataFrame
    data = [(-1, "Not Known","Not Known","Not Known","Not Known","Not Known","Not Known","0","NA","0","0","0")]
    columns = ["Contestant ID", "Contestant Name", "Team","Image","From","Area","Country","Seat","Gender","Hand","Age","Age Range"]
    new_row  = spark.createDataFrame(data, columns)

    # Union the new row with the existing DataFrame
    dfContfinal = dfContfinal.union(new_row)

    # Show the updated DataFrame
    dfContfinal.show(1000)

    print("Delta Parquet file does not exist.")

So, instead of just creating the default row. We only do it if the DimContestent Parquet file exists. We will do the if it exists logic next time around with new data. However, here we will want to get the Max Key to work with later.

Please note, this code is incorrect. We are looking in files within the silver delta lake. Our Dims are Tables (Delta parquet) in the Gold Delta Lake. We update this in Part 16

Create a Contestant Key

Previously we created a contestant key from 0 so you would have -1 as the default. And then 0. However we now have two possibilities

  • This is the first time run. There is no data and the above situation still exists
  • OR its not the first time it has been run. we need to start from the Key above what we already have.

If its the first time run we will now have a contestantID of -1 in the data because we have a default row

from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number


# Process the DataFrame based on ContestantID
if dfContfinal.filter(dfContfinal["Contestant ID"] == -1).count() > 0:

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Contestant ID"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dfContfinalKey = dfContfinal.withColumn("ContestantKey", row_number().over(window_spec) - 2)

    # Show the result
    dfContfinalKey.show(1000)

    print("Data Contains Default row. No data Previously processed")
    
else:
    
    print("Data Contains No Default row. Data Previously processed")

Here, an if and else has been added. If there is a Contestant ID of -1 then we can add our key as normal. We will update the else block next time around.

Add to Delta Parquet

Our new DimContestant file can now be appended to the Delta Parquet table so we change the code from overwrite to append

And we can now reuse the code across the other dims. Using the Parquet File we have created.

Its worth nothing here that when you set up a pipeline to run the notebooks. You will need to think about the order, in this case because DimContestant is the Dim where we get the filtered down data.

Dim Episode V2

Here. instead of getting the data from Taskmaster transformed. We can get the already selected data.

from pyspark.sql.functions import input_file_name, regexp_extract

#parquet_file = "Files/Data/Silver/taskmasterTransformed.parquet"
parquet_file = "Files/Data/Silver/Log/CurrentProcessedTMData.parquet"
dftm = spark.read.parquet(parquet_file)

display(dftm)

Run as normal until you get to Add Default row and we can reuse the code from DimContestant

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import Row

workspace_id = "986472b4-7316-485c-aa22-128e1cb29544"
lakehouse_id = "ce55b91c-ed51-4885-9440-378a3c18450b"
path_to_parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/dimEpisode.parquet")

# Check if the Delta Parquet file exists
delta_file_exists = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()).exists(spark._jvm.org.apache.hadoop.fs.Path(path_to_parquet_file))

if delta_file_exists:
    print("Delta Parquet file exists.")
else:

    # Add a Default row
    # Create a sample DataFrame
    data = [(-1, -1,"Not Known")]
    columns = ["series", "episode No", "Episode Name"]
    new_row  = spark.createDataFrame(data, columns)

    # Union the new row with the existing DataFrame
    dftmEp = dftmEp.union(new_row)

    # Show the updated DataFrame
    dftmEp.show(1000)


    print("Delta Parquet file does not exist.")

Create Episode Key

Again we can reuse our code from Dim Contestant here

from pyspark.sql.functions import col

from pyspark.sql.window import Window

from pyspark.sql.functions import row_number

# Process the DataFrame based on ContestantID

if dftmEp.filter(dftmEp[“Episode No”] == -1).count() > 0:

    # Create a window specification partitioned by “Series” and ordered by “Episode No”

    window_spec = Window.orderBy(col(“Series”), col(“Episode No”))

    # Add a new column “EpisodeKey” using row_number() over the specified window

    dftmEpKey = dftmEp.withColumn(“EpisodeKey”, row_number().over(window_spec) – 2)

    # Show the result

    dftmEpKey.show(1000)

    print(“Data Contains Default row. No data Previously processed”)

else:

    print(“Data Contains No Default row. Data Previously processed”)

Save to Delta Parquet

The last change is to change overwrite to append

from delta.tables import DeltaTable

dftmEpKey_cleaned.write.mode("append").option("overwriteSchema", "true").format("delta").saveAsTable("GoldDebbiesTraininglh.dimEpisode")

And these are the only changes needed. We will be coming back to add more to the else logic. repeat for Dim Task and all our Dims are now Delta.

Taskmaster Fact V2

Now all our Dims can be loaded as Delta. So it should be one series at a time.

(We could go one further later on and actually do it at episode level so our Taskmaster reports are fully enabled along with the show. So we can look at that later.

It’s time to sort out the fact table. We can do this in Delta Load. and unlike the Dims. these are also partitioned by Series.

Again, we can change our initial data load to go to the CurrentProcessedTMData.parquet file

And then the transformations happen as is from previous blog posts

  • Create dateKey
  • Add the episode Key from DimEpisode by joining Episode Name
  • Add the task Key from DimTask by joining to Task and Task Order
  • Bring through the contestantTransformed Lookup (All data at the moment)
  • Joining taskmaster to contestents because we need seat information
  • Bring in the Contestant Key from Dim Contestant on Contestant Name and Seat (Because some contestants are in Champion of Champion series.
  • Create a series Start date Key and merge into the main df
  • Create the fact table of Keys and metrics

Add to Partitioned Delta Parquet Table

from delta.tables import DeltaTable

dftmfact.write.mode("append").option("overwriteSchema", "true")\
.partitionBy("SeriesStartDateKey").format("delta").saveAsTable("GoldDebbiesTraininglh.factTaskmaster")

Finish the Process ProcessedFiles.parquet

Our Dimensions and facts are complete. We can now finalise by updating our parquet data flags to 1. So we know that its all done and we are ready to add another series.

from pyspark.sql.functions import input_file_name, regexp_extract
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

parquet_file = "Files/Data/Silver/Log/ProcessedFiles.parquet"

schema = StructType([
    StructField("filename", StringType(), True),
    StructField("processedTime", TimestampType(), True),
    StructField("fullyProcessedFlag", IntegerType(), True)
])


#dflog = spark.read.parquet(parquet_file)
dflog = spark.read.schema(schema).parquet(parquet_file)

display(dflog)

An addition here. We have inferred the schema. This is good because if there is no data, without the schema, this code will error. Therefore adding the schema and using it means that if there is an issue and you don’t have any data, the code will still run.

Here we bring through the unfiltered table. we want everything because we are going to overwrite the parquet file.

Here there may be more files that already have 1 as their fullyprocessedFlag. We will overwrite them all with 1. Even if they are already 1.

Change all flags to 1

Here I have decided to use some SQL in the code. But there are lots of ways to do this

# Select the required columns
dflogselected = dflog.select("filename", "processedTime", "fullyProcessedFlag")

# Update fullyProcessedFlag where it is 0
parquet_file = dflogselected.withColumn(
    "fullyProcessedFlag",
    expr("CASE WHEN fullyProcessedFlag = 0 THEN 1 ELSE fullyProcessedFlag END")
)

# Show the updated DataFrame
parquet_file.show()

We add the SQL as an expression. set to 1 if the flag is 0.

Now, what we want to do is overwrite the Parquet with new Data. All saying we have finished processing.

Blocker – Operation Failed 404, head

When overwriting the file using the following code

workspace_id = "############-####-####-####-##########"
lakehouse_id = "############-####-####-####-##########"

df_Processed.write.mode("overwrite").option("overwriteSchema", "true")\
.parquet(f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles.parquet")

# Print a success message
print("Parquet file overwritten successfully.")

there is an error:

Caused by: org.apache.spark.SparkFileNotFoundException: Operation failed: “Not Found”, 404, HEAD, It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running ‘REFRESH TABLE tableName’ command in SQL or by recreating the Dataset/DataFrame involved.

We end up with ProcessedFiles.Parquet but the following data and files disappear

Having checked on the forums. Other people are having the same issue. It you append its fine. Overwriting causes the error. its possible because of the slight change in the schema (nullable = false) where it was true, but I think this may not be the case. This is proving incredibly problematic. Every single work around seems to cause the same issue.

This is the only work around that I have managed to run to success

Save to ProcessedFiles2.parquet

workspace_id = "############-####-####-####-##########"
lakehouse_id = "############-####-####-####-##########"

parquet_file.write.mode("overwrite").option("overwriteSchema", "true")\
.parquet(f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles2.parquet")

# Print a success message
print("Parquet file overwritten successfully.")

A brand new file is created called ProcessedFiles2

Get data from ProcessedFiles2.parquet into a data frame

from pyspark.sql.functions import input_file_name, regexp_extract
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

parquet_file = "Files/Data/Silver/Log/ProcessedFiles2.parquet"

schema = StructType([
    StructField("filename", StringType(), True),
    StructField("processedTime", TimestampType(), True),
    StructField("fullyProcessedFlag", IntegerType(), True)
])


dflog2 = spark.read.schema(schema).parquet(parquet_file)

display(dflog2)

Delete the original File.

workspace_id = "############-####-####-####-##########"
lakehouse_id = "############-####-####-####-##########"

dflog2.write.mode("overwrite").option("overwriteSchema", "true")\
.parquet(f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles.parquet")

# Print a success message
print("Parquet file overwritten successfully.")

Take the new data frame and add it to ProcessedFiles.parquet.

This seems to now work

workspace_id = "############-####-####-####-##########"
lakehouse_id = "############-####-####-####-##########"

dflog2.write.mode("overwrite").option("overwriteSchema", "true")\
.parquet(f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles.parquet")

# Print a success message
print("Parquet file overwritten successfully.")

Delete ProcessedFiles2.parquet file

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType
from pyspark.sql import Row

workspace_id = "986472b4-7316-485c-aa22-128e1cb29544"
lakehouse_id = "ee31b1a4-16bf-4aae-aaab-f130bd4d53c6"
file_Path = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles2.parquet"

# Use Spark to delete the file
spark._jvm.org.apache.hadoop.fs.FileSystem.get(
    spark._jsc.hadoopConfiguration()
).delete(spark._jvm.org.apache.hadoop.fs.Path(file_Path), True)

# Print a success message
print(f"{file_Path} has been deleted.")

And Finally. Delete the processedFiles2 file. We don’t need this any more.

Its more of a workaround than I would like but this can be implemented until a better solution is found

Conclusion

Now we have our Dims and Facts changed to Delta Parquet. We have done the initial load. in the next blog we will finish by adding new data and updating for this logic.

We will also look at other issues. For example, what if you accidentally append the same data into the Parquet file etc.

Microsoft Fabric Part 14. Taskmaster Project. Creating a Delta Load on Transformed Notebook

Previously, Our Notebooks take every csv file in the folder to transform.

We don’t want to do this. We want to Load the Files that haven’t been processed.

And further down the line we want to be able to do every file, Or just the unprocessed ones.

We want to do a Delta Load. This means changing the pyspark code we have created.

And a big thankyou to everyone who helped on the forums. especially
frithjof_v who helped with the code.

We want to do the following

For this exercise, Taskmastertransformed.parquet has been deleted. We are going to start at the beginning. And Load Series 1,2,3. Then 4,5 and 6

Go into the DataLake in Azure and remove all the Taskmaster files apart from 1,2 and 3.

Back to Taskmaster Transformed Notebook

Using mssparkutils, Get List of Files to be processed from Bronze folder

# abfss path to the folder where the csv files are located 
files_path = 'abfss://############@onelake.dfs.fabric.microsoft.com/############/Files/Data/Bronze/TaskMasterSeriesFiles/taskmaster'

# the mssparkutils.fs.ls method lists all the files (and/or subfolders) inside the folder.
files = mssparkutils.fs.ls(files_path)

# Convert FileInfo objects to list of tuples (this creates a list of the file names in the folder.)
file_data = [(file.name,) for file in files]

# This creates a dataframe consisting of the file names in the folder
df_files_in_folder = spark.createDataFrame(file_data, ["name"])

# Show the DataFrame (this step can be removed)
display(df_files_in_folder)



To get the abfss path for this code, Right click on the folder and Copy ABFSS path.

Remember, an abfss path is an azure blob file system path.

We have the three starting files in the shortcutted Data Lake Folder.

Remove the none Taskmaster file from the list

from pyspark.sql.functions import col

# Filter rows that start with "Taskmaster"
df_files_in_folder = df_files_in_folder.filter(col("name").startswith("Taskmaster"))

# Show the resulting dataframe
df_files_in_folder.show()

The data frame has been filtered

The Pre Processed Audit File.

We want to have a file that contains a list of all the Processed files. This is created later on, but one was needed from the offset. We need an empty dataframe to check so we don’t get an error.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType,TimestampType


# Create a Spark session
spark = SparkSession.builder.appName("CreateEmptyDataFrame").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("filename", StringType(), True),
    StructField("processedTime", TimestampType(), True),
    StructField("fullyProcessedFlag", IntegerType(), True)
])

# Create an empty DataFrame with the specified schema
df_log  = spark.createDataFrame([], schema)

# Show the schema of the empty DataFrame
df_log.printSchema()

display(df_log)

Create the Initial Parquet Processed Files File if it doesn’t already exist

We are going to read from the ProcessedFiles Parquet file so We need a file to read from. Even if its the first time running.

from pyspark.sql.utils import AnalysisException

# Define the paths
workspace_id = "#######-####-####-####-############"
lakehouse_id = "########-####-####-####-############"
file_path = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles.parquet"

# Check if the file exists
try:
    spark.read.parquet(file_path)
    print(f"File {file_path} already exists. Skipping write operation.")
except AnalysisException:
    # File does not exist, proceed with writing
    df_log.write.mode("append").parquet(file_path)
    print(f"File {file_path} does not exist. Writing data to {file_path}.")

We we use a try:

for exception handling.

E.g. try and read the file. If true. You can read it then don’t do anything

Except:

If the try was false. Move to the AnalysisException.

In this case. Simply create the file. And print that the file didn’t exist so we created it. And it will be empty because it is created from our empty schema. All ready to start from scratch.

List Files from the processed Log

# This creates a dataframe with the file names of all the files which have already been processed, using the processedFiles Parquet File

from pyspark.sql.functions import input_file_name, regexp_extract

workspace_id = “#####-####-####-####-#############”
lakehouse_id = “#####-####-####-####-#############”

parquet_file = (f”abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles.parquet”)

df_already_processed = spark.read.parquet(parquet_file)

display(df_already_processed)

Nothing has been processed. this is the first attempt so nothing has as yet been logged

Which files haven’t been processed?

# Selecting only the filename column from df_already_processed
df_already_processed_filenames = df_already_processed.select("filename")

# Selecting only the name column from df_files_in_folder
df_files_in_folder_names = df_files_in_folder.select("name")

# Performing subtract operation to find non-matching rows 
# (so only the file names of the files which have not been processed already are kept)
df_to_process = df_files_in_folder_names.subtract(df_already_processed_filenames)

# Showing the resulting DataFrame of files which have not yet been processed (this step can be removed)
display(df_to_process)

Note here we get just the filename and name columns. Then remove already processed from Processed using subtract

At this point there could be No files. 1 file or Multiple files to process. We can see our initial 3 files to process

What we need to do is create for loop to get all the none processed files into a dataframe. But first we need to create an empty dataframe to load them into

Create an Empty Dataframe

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType,TimestampType


# Create a Spark session
spark = SparkSession.builder.appName("CreateEmptyDataFrame").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Series", StringType(), True),
    StructField("Episode No", IntegerType(), True),
    StructField("Episode Name", StringType(), True),
    StructField("Episode Date", DateType(), True),
    StructField("Task Order", IntegerType(), True),
    StructField("Task", StringType(), True),
    StructField("Task Type", StringType(), True),
    StructField("Assignment", StringType(), True),
    StructField("Contestant", StringType(), True),
    StructField("Team", StringType(), True),
    StructField("Points", IntegerType(), True),
    StructField("Winner", IntegerType(), True),
    StructField("source_filename", StringType(), True),
    StructField("source_processedTime", TimestampType(), True)
])

# Create an empty DataFrame with the specified schema
dftm  = spark.createDataFrame([], schema)

# Show the schema of the empty DataFrame
dftm.printSchema()

source_filename and source_processedTime is for our meta data we are going to create, in the next step

Create the For Loop

from pyspark.sql.functions import current_timestamp, lit, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from datetime import datetime

# Loop through the dataframe which consist of the filenames
for row in df_to_process.rdd.collect(): # Collecting to driver (local) as a list of Rows 
    # Extract filename from the current row
    filename = row["name"]

    # Read the current csv file into a dataframe
    dfnf = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(files_path +"/" + filename)
    
    # Add filename column to the dataframe
    dfnf = dfnf.withColumn("source_filename", lit(filename))
    tidsstempel = current_timestamp()
    
    # Add current timestamp column ("source_processedTime") to the dataframe
    dfnf = dfnf.withColumn("source_processedTime", tidsstempel)
    
    # Append the dataframe to the table in Silver lakehouse
    #df.write.format("delta").mode("append").save('<Insert the abfss path to your silver layer Lakehouse table here>')

    dftm = dftm.union(dfnf)
    
    # Create a single-row DataFrame with the filename (this will be appended to the log table)
    single_row = [[filename]]
    single_row_schema = StructType([
        StructField("filename", StringType(), False)
    ])
   
    display(dftm)
    
  • Here, we are creating a for Loop, Collecting the list of every file name in df_to_process.
  • Then we get Name from the current row
  • Read the csv file of that name into a dataframe.
  • Add the file name, and a Date and time stamp.
  • Then Append to the dataframe created (dftm) so we could have 1 or many files here. usually it will be just one.
  • Repeat until all the unprocessed files are through into dftm

The Original Creation of the dataframe has been commented out.

Check dftm dataframe.

from pyspark.sql.functions import count

result_df = dftm.groupBy("source_filename").agg(count("*").alias("Total"))
   
display(result_df)

Here we can see the total of rows against each source file. Make sure you don’t accidentally process again into the dataframe.

Create the df_log

# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, current_date, col

#Get the distinct list of file names we are going to process
filenamesdf = dftm.withColumn("filename",col("source_filename")).select("filename").distinct()


# Remove everything after the "?" character, there used to be more in here but we dont have to worry about this since a code change
filenamesdf= filenamesdf.withColumn("filename", regexp_replace("filename", "\\?.*", ""))

#Add a process Date
filenamesdatedf = filenamesdf.withColumn("processedTime",current_timestamp())

#Add a flag
df_log = filenamesdatedf.withColumn("fullyProcessedFlag",lit(0))

display(df_log)

The following took the filename from our data, and added it into a data frame with the date and time of the process and 0 for our not fully processed flag.

The data is only fully processed when it has been added to the dimension and fact tables.

We can now run all the transformations as we did in previous posts. after bringing in the data.

  • Drop rows with all NULL values
  • Remove Special characters
  • Update the Date
  • Add day and add month and create new episode Date column
  • Bring in the Contestant Data
  • Bring in People data (Originally from Kaggle so may need manual updates eventually)
  • Join Contestants to the people data set (And check that everything maches)
  • Create age as at the time of the show. first get the minimum Episode Date
  • Join the Episode Date into the People dataframe
  • And calculate the age

Calculate the Age issue

IF we run the process and there are No Files to Process in the delta load table. the code can’t calculate using the min Series date.

Obviously this should rarely happen. But the use case for this is there so we need to handle it better.

We actually need to go back 1 step to joining the min_episode_date into df_Cont above. We only care about those contestants with an episode. So we need to change

# Join the min_episode_date into contestants
dfcont = dfcont.join(dfminSeriesCont,dfcont["Name"] == dfminSeriesCont["Contestant"], "left_outer").drop(dfminSeriesCont.Contestant)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dfcont)

Simply changing this to inner only brings back rows that exist on both sides.

Now, we get to the Code to calculate the age. This is the amended code (Just in case we have Nulls)

from pyspark.sql.functions import datediff, col, when

# Convert the date of birth column to a date type in dfCont
dfcont = dfcont.withColumn("dob", dfcont["dob"].cast("date"))

#Calculate age using dob and current date 
dfcontAge = dfcont.withColumn("age_decimal", when(dfcont["min_episode_date"].isNotNull(), datediff(dfcont["min_episode_date"], dfcont["dob"]) / 365)
.otherwise(None))

#Convert from decimal to int  
dfcontAge = dfcontAge.withColumn("age", col("age_decimal").cast("int"))

#Convert Contestant ID to int
dfcontAge = dfcontAge.withColumn("contestant ID", col("contestant ID").cast("int"))

display(dfcontAge)

Here we add a when function. so when min episode date isn’t null we do the calculation. otherwise nothing happens. None is a special value to denote the absence of a value.

Also as a new code block. We want to make sure everyone has an age.

# Select distinct Name and age, order by age in descending order
result_df = dfcontAge.select("Name", "age").distinct().orderBy(col("age").desc())

# Show the result
result_df.show()

Another possible one to watch out for in the automated process. We don’t want anything to come through without a set age.

From here we can continue with the final previous blog post logic

  • Create an age group

Changes Creating the Parquet file

There is something we need to understand at this point, You can’t pull data from a parquet file based on a filter. You can only filter the dataframe. We want to delta load facts and dimensions. Therefore we should partition the taskmastertransformed file, to avoid loading in all the data.

We create 3 Parquet Files here.

  • TaskmasterTransformed.
  • contestantTransformed (From data that we manually update in on file)
  • ProcessedFiles.parquet which feeds our for loop with the processed file names.

This is the original Create Parquet file for TaskmasterTransformed

Note, the mode is overwrite. We now want to append the data

workspace_id = “########-####-####-############”

lakehouse_id = “########-####-####-####-############”

dftm_cleaned.write.partitionBy(“source_filename”).mode(“append”).parquet(f”abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/taskmasterTransformed.parquet”)

  • PartitonBy has been added to create filename partitions
  • append has replaced Overwrite for the mode

Check the Parquet File for taskmasterTransformed

And we can have a look at the Parquet file created in the Silver Lakehouse

# This creates a dataframe with the file names of all the files which have already been processed

from pyspark.sql.functions import input_file_name, regexp_extract

workspace_id = "########-####-####-############"

lakehouse_id = "########-####-####-####-############"

parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/taskmasterTransformed.parquet")

df_already_processed = spark.read.parquet(parquet_file)

display(df_already_processed)

contestantTransformed

At this point, we also save the overwritten ContestantTransformed information.

dfcontAge.write.mode("overwrite").parquet(f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/contestantTransformed.parquet")

But we have changed the logic.

  1. There may be no data in here, if you are processing and the files have been processed
  2. There should only be data that hasn’t previously been processed in here

Which again means that we need to change Overwrite to append.

Processed Files Parquet

workspace_id = "########-####-####-############"

lakehouse_id = "########-####-####-####-############"

df_log.write.mode("append").parquet(f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles.parquet")

Check the Log Parquet File

# This creates a dataframe with the file names of all the files which have already been processed

from pyspark.sql.functions import input_file_name, regexp_extract

workspace_id = "########-####-####-############"

lakehouse_id = "########-####-####-####-############"

parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles.parquet")

df_plog = spark.read.parquet(parquet_file)

display(df_plog)

Conclusion

We now have a transformed dataset with 3 series, that are partitioned by series.

We also have a ProcessedFiles Parquet file with a flag to show that the process is not yet complete.

  • If you start from scratch. You create an empty Parquet file and the first files processed are appended at the end
  • Otherwise, new files are appended to the file

We will go on to complete the the dims and facts and then set the 0 to 1 in ProcessedFiles to show everything is completed in the next posts .

Then we can run again, adding the next series into the shortcutted Data Lake. And we can refine and make the process better as we go.

Design a site like this with WordPress.com
Get started