In Part 14 we created a delta load of transformed data in the Silver layer of our Fabric architecture.
We now want to try and delta load the dimensions and the fact table
Our data is always new. It comes in and then doesn’t change so we don’t need to worry about updates. Just new data.
Before continuing, Delete all the dims and facts from the gold lakehouse created in previous blogs. We are going to recreate them.


Dimensions Dim contestant V2
We are going to process the dimension in a delta load. Currently this is at series level.
Questions.
- Is there anything in the Data that needs to be dealt with all in one. For example, updating age in the entire dataset, against current date? No. there is nothing that needs to be fully updated.
- How do we deal with data Warehouse Keys? We will look at this later.
Bring back the list of Processed files, that we are currently working on
parquet_file = "Files/Data/Silver/Log/ProcessedFiles.parquet"
dflog = spark.read.parquet(parquet_file)
df_currentProcess = dflog[dflog["fullyProcessedFlag"] == 0][["Filename"]]
display(df_currentProcess)

We only need the name, because we are going to feed the name into a loop.
Once everything is done. We will set fullProcessedFlag = 1. This way we will always know what we are currently working on. Now we want to bring back the partition(s) that match
Create an empty data frame to process into.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType,TimestampType
# Create a Spark session
spark = SparkSession.builder.appName("CreateEmptyDataFrame").getOrCreate()
# Define the schema for the DataFrame
schema = StructType([
StructField("ID", IntegerType(), True),
StructField("Series", StringType(), True),
StructField("Episode No", IntegerType(), True),
StructField("Episode Name", StringType(), True),
StructField("Episode Date", DateType(), True),
StructField("Task Order", IntegerType(), True),
StructField("Task", StringType(), True),
StructField("Task Type", StringType(), True),
StructField("Assignment", StringType(), True),
StructField("Contestant", StringType(), True),
StructField("Team", StringType(), True),
StructField("Points", IntegerType(), True),
StructField("Winner", IntegerType(), True),
StructField("source_processedTime", TimestampType(), True),
StructField("Year", StringType(), True),
StructField("day", StringType(), True),
StructField("month", StringType(), True),
StructField("source_filename", StringType(), True)
])
# Create an empty DataFrame with the specified schema
dftm = spark.createDataFrame([], schema)
# Show the schema of the empty DataFrame
dftm.printSchema()
display(dftm)
Loop through the files that need to be processed
parquet_file_path = "Files/Data/Silver/taskmasterTransformed.parquet"
partition_column = "source_filename"
from pyspark.sql.functions import col
# Loop through the dataframe which consist of the filenames
for row in df_currentProcess.rdd.collect(): # Collecting to driver (local) as a list of Rows
# Extract filename from the current row
filename = row["Filename"]
# Read the current parquet file partition into a dataframe
dfnf = spark.read.parquet(parquet_file_path).filter(col(partition_column)==filename)
dftm = dftm.union(dfnf)
display(dftm)

Here, because we have the parquet file partitioned we can use .filter(col(partition_column)==filename) on the parquet file to only bring through what we are going to process.
parquet_File_Path and partition_column parameters are created. Both refer to the file we are going to bring through into the data frame.
We then use the ‘for row in‘ the df_currentProcess (Which is the dataframe containing the ready to process file names. For every one of these filenames we add the partition of taskmaster data to the dataframe.
so, next time we process, there will only be one year of data to add into the dims and fact tables.
Save to Parquet File
We also know that we are going to use this transformed data set for every single dim and fact table. So for every Dim Notebook we will have to repeat this process.
we always know that repeating is a No no in development so we can go one further. Add this DataFrame to a Parquet file that we can reuse for every other item in this process. It can be a simple Parquet file and will be overwritten every time.
workspace_id = "986472b4-7316-485c-aa22-128e1cb29544"
lakehouse_id = "ee31b1a4-16bf-4aae-aaab-f130bd4d53c6"
dftm.write.mode("overwrite").parquet(f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/CurrentProcessedTMData.parquet")
# Print a success message
print("Parquet file overwritten successfully.")
Update ContestantTransformed to work with the current set
Contestant file isn’t a big enough file to Partition. we can always bring it all through without any issues. We may want to change this in future but for this. We can bring through the entire data set into the dataframe, as in the previous project
To make the change we can then filter down to the correct contestant set.
We have a series_label column. E.g. S1, C2, CoC. And we can get this information out of the df_currentProcess Filename Column Taskmaster_S1_01092015.csv
from pyspark.sql import SparkSession, functions as F
df_small = F.broadcast(dfserieslabel)
# Join the extra contestant information
dfc = df_small.join(dfc, df_small["series_label"] == dfc["series_label"], "inner").drop(df_small.Filename)\
.drop(df_small.series_label)
# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dfc)

Test the updated filtered Contestants dataframe
from pyspark.sql.functions import count
result_df = dfc.groupBy("series_label").agg(count("*").alias("Total"))
display(result_df)

Correct. There are 5 contestants to a series.
Continue with the transformations from Previous blog Posts
- Merge Contestants and taskmaster data to only bring back Contestant information using SELECT
- Drop any without a team. these are the hosts.
- Check for Duplicates (Now we only do the series we are working on. These shouldn’t be an issue
Add a Default Row
- Add a Default row, However, we should already have a default row if the processing has already started. so this will need additional code
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import Row
workspace_id = "986472b4-7316-485c-aa22-128e1cb29544"
lakehouse_id = "ce55b91c-ed51-4885-9440-378a3c18450b"
path_to_parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/dimContestant.parquet")
# Check if the Delta Parquet file exists
delta_file_exists = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()).exists(spark._jvm.org.apache.hadoop.fs.Path(path_to_parquet_file))
if delta_file_exists:
print("Delta Parquet file exists.")
else:
# Add a Default row
# Create a sample DataFrame
data = [(-1, "Not Known","Not Known","Not Known","Not Known","Not Known","Not Known","0","NA","0","0","0")]
columns = ["Contestant ID", "Contestant Name", "Team","Image","From","Area","Country","Seat","Gender","Hand","Age","Age Range"]
new_row = spark.createDataFrame(data, columns)
# Union the new row with the existing DataFrame
dfContfinal = dfContfinal.union(new_row)
# Show the updated DataFrame
dfContfinal.show(1000)
print("Delta Parquet file does not exist.")


So, instead of just creating the default row. We only do it if the DimContestent Parquet file exists. We will do the if it exists logic next time around with new data. However, here we will want to get the Max Key to work with later.
Please note, this code is incorrect. We are looking in files within the silver delta lake. Our Dims are Tables (Delta parquet) in the Gold Delta Lake. We update this in Part 16
Create a Contestant Key
Previously we created a contestant key from 0 so you would have -1 as the default. And then 0. However we now have two possibilities
- This is the first time run. There is no data and the above situation still exists
- OR its not the first time it has been run. we need to start from the Key above what we already have.
If its the first time run we will now have a contestantID of -1 in the data because we have a default row
from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
# Process the DataFrame based on ContestantID
if dfContfinal.filter(dfContfinal["Contestant ID"] == -1).count() > 0:
# Create a window specification partitioned by "Series" and ordered by "Episode No"
window_spec = Window.orderBy(col("Contestant ID"))
# Add a new column "EpisodeKey" using row_number() over the specified window
dfContfinalKey = dfContfinal.withColumn("ContestantKey", row_number().over(window_spec) - 2)
# Show the result
dfContfinalKey.show(1000)
print("Data Contains Default row. No data Previously processed")
else:
print("Data Contains No Default row. Data Previously processed")


Here, an if and else has been added. If there is a Contestant ID of -1 then we can add our key as normal. We will update the else block next time around.
Add to Delta Parquet
Our new DimContestant file can now be appended to the Delta Parquet table so we change the code from overwrite to append
And we can now reuse the code across the other dims. Using the Parquet File we have created.
Its worth nothing here that when you set up a pipeline to run the notebooks. You will need to think about the order, in this case because DimContestant is the Dim where we get the filtered down data.
Dim Episode V2
Here. instead of getting the data from Taskmaster transformed. We can get the already selected data.
from pyspark.sql.functions import input_file_name, regexp_extract
#parquet_file = "Files/Data/Silver/taskmasterTransformed.parquet"
parquet_file = "Files/Data/Silver/Log/CurrentProcessedTMData.parquet"
dftm = spark.read.parquet(parquet_file)
display(dftm)
Run as normal until you get to Add Default row and we can reuse the code from DimContestant
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import Row
workspace_id = "986472b4-7316-485c-aa22-128e1cb29544"
lakehouse_id = "ce55b91c-ed51-4885-9440-378a3c18450b"
path_to_parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/dimEpisode.parquet")
# Check if the Delta Parquet file exists
delta_file_exists = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()).exists(spark._jvm.org.apache.hadoop.fs.Path(path_to_parquet_file))
if delta_file_exists:
print("Delta Parquet file exists.")
else:
# Add a Default row
# Create a sample DataFrame
data = [(-1, -1,"Not Known")]
columns = ["series", "episode No", "Episode Name"]
new_row = spark.createDataFrame(data, columns)
# Union the new row with the existing DataFrame
dftmEp = dftmEp.union(new_row)
# Show the updated DataFrame
dftmEp.show(1000)
print("Delta Parquet file does not exist.")

Create Episode Key
Again we can reuse our code from Dim Contestant here
from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
# Process the DataFrame based on ContestantID
if dftmEp.filter(dftmEp[“Episode No”] == -1).count() > 0:
# Create a window specification partitioned by “Series” and ordered by “Episode No”
window_spec = Window.orderBy(col(“Series”), col(“Episode No”))
# Add a new column “EpisodeKey” using row_number() over the specified window
dftmEpKey = dftmEp.withColumn(“EpisodeKey”, row_number().over(window_spec) – 2)
# Show the result
dftmEpKey.show(1000)
print(“Data Contains Default row. No data Previously processed”)
else:
print(“Data Contains No Default row. Data Previously processed”)

Save to Delta Parquet
The last change is to change overwrite to append
from delta.tables import DeltaTable
dftmEpKey_cleaned.write.mode("append").option("overwriteSchema", "true").format("delta").saveAsTable("GoldDebbiesTraininglh.dimEpisode")
And these are the only changes needed. We will be coming back to add more to the else logic. repeat for Dim Task and all our Dims are now Delta.
Taskmaster Fact V2
Now all our Dims can be loaded as Delta. So it should be one series at a time.
(We could go one further later on and actually do it at episode level so our Taskmaster reports are fully enabled along with the show. So we can look at that later.
It’s time to sort out the fact table. We can do this in Delta Load. and unlike the Dims. these are also partitioned by Series.
Again, we can change our initial data load to go to the CurrentProcessedTMData.parquet file
And then the transformations happen as is from previous blog posts
- Create dateKey
- Add the episode Key from DimEpisode by joining Episode Name
- Add the task Key from DimTask by joining to Task and Task Order
- Bring through the contestantTransformed Lookup (All data at the moment)
- Joining taskmaster to contestents because we need seat information
- Bring in the Contestant Key from Dim Contestant on Contestant Name and Seat (Because some contestants are in Champion of Champion series.
- Create a series Start date Key and merge into the main df
- Create the fact table of Keys and metrics
Add to Partitioned Delta Parquet Table
from delta.tables import DeltaTable
dftmfact.write.mode("append").option("overwriteSchema", "true")\
.partitionBy("SeriesStartDateKey").format("delta").saveAsTable("GoldDebbiesTraininglh.factTaskmaster")
Finish the Process ProcessedFiles.parquet
Our Dimensions and facts are complete. We can now finalise by updating our parquet data flags to 1. So we know that its all done and we are ready to add another series.
from pyspark.sql.functions import input_file_name, regexp_extract
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
parquet_file = "Files/Data/Silver/Log/ProcessedFiles.parquet"
schema = StructType([
StructField("filename", StringType(), True),
StructField("processedTime", TimestampType(), True),
StructField("fullyProcessedFlag", IntegerType(), True)
])
#dflog = spark.read.parquet(parquet_file)
dflog = spark.read.schema(schema).parquet(parquet_file)
display(dflog)


An addition here. We have inferred the schema. This is good because if there is no data, without the schema, this code will error. Therefore adding the schema and using it means that if there is an issue and you don’t have any data, the code will still run.
Here we bring through the unfiltered table. we want everything because we are going to overwrite the parquet file.
Here there may be more files that already have 1 as their fullyprocessedFlag. We will overwrite them all with 1. Even if they are already 1.
Change all flags to 1
Here I have decided to use some SQL in the code. But there are lots of ways to do this
# Select the required columns
dflogselected = dflog.select("filename", "processedTime", "fullyProcessedFlag")
# Update fullyProcessedFlag where it is 0
parquet_file = dflogselected.withColumn(
"fullyProcessedFlag",
expr("CASE WHEN fullyProcessedFlag = 0 THEN 1 ELSE fullyProcessedFlag END")
)
# Show the updated DataFrame
parquet_file.show()

We add the SQL as an expression. set to 1 if the flag is 0.
Now, what we want to do is overwrite the Parquet with new Data. All saying we have finished processing.
Blocker – Operation Failed 404, head
When overwriting the file using the following code
workspace_id = "############-####-####-####-##########"
lakehouse_id = "############-####-####-####-##########"
df_Processed.write.mode("overwrite").option("overwriteSchema", "true")\
.parquet(f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles.parquet")
# Print a success message
print("Parquet file overwritten successfully.")
there is an error:
Caused by: org.apache.spark.SparkFileNotFoundException: Operation failed: “Not Found”, 404, HEAD, It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running ‘REFRESH TABLE tableName’ command in SQL or by recreating the Dataset/DataFrame involved.
We end up with ProcessedFiles.Parquet but the following data and files disappear

Having checked on the forums. Other people are having the same issue. It you append its fine. Overwriting causes the error. its possible because of the slight change in the schema (nullable = false) where it was true, but I think this may not be the case. This is proving incredibly problematic. Every single work around seems to cause the same issue.
This is the only work around that I have managed to run to success
Save to ProcessedFiles2.parquet
workspace_id = "############-####-####-####-##########"
lakehouse_id = "############-####-####-####-##########"
parquet_file.write.mode("overwrite").option("overwriteSchema", "true")\
.parquet(f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles2.parquet")
# Print a success message
print("Parquet file overwritten successfully.")
A brand new file is created called ProcessedFiles2
Get data from ProcessedFiles2.parquet into a data frame
from pyspark.sql.functions import input_file_name, regexp_extract
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
parquet_file = "Files/Data/Silver/Log/ProcessedFiles2.parquet"
schema = StructType([
StructField("filename", StringType(), True),
StructField("processedTime", TimestampType(), True),
StructField("fullyProcessedFlag", IntegerType(), True)
])
dflog2 = spark.read.schema(schema).parquet(parquet_file)
display(dflog2)
Delete the original File.
workspace_id = "############-####-####-####-##########"
lakehouse_id = "############-####-####-####-##########"
dflog2.write.mode("overwrite").option("overwriteSchema", "true")\
.parquet(f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles.parquet")
# Print a success message
print("Parquet file overwritten successfully.")
Take the new data frame and add it to ProcessedFiles.parquet.
This seems to now work
workspace_id = "############-####-####-####-##########"
lakehouse_id = "############-####-####-####-##########"
dflog2.write.mode("overwrite").option("overwriteSchema", "true")\
.parquet(f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles.parquet")
# Print a success message
print("Parquet file overwritten successfully.")
Delete ProcessedFiles2.parquet file
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType
from pyspark.sql import Row
workspace_id = "986472b4-7316-485c-aa22-128e1cb29544"
lakehouse_id = "ee31b1a4-16bf-4aae-aaab-f130bd4d53c6"
file_Path = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles2.parquet"
# Use Spark to delete the file
spark._jvm.org.apache.hadoop.fs.FileSystem.get(
spark._jsc.hadoopConfiguration()
).delete(spark._jvm.org.apache.hadoop.fs.Path(file_Path), True)
# Print a success message
print(f"{file_Path} has been deleted.")
And Finally. Delete the processedFiles2 file. We don’t need this any more.
Its more of a workaround than I would like but this can be implemented until a better solution is found
Conclusion
Now we have our Dims and Facts changed to Delta Parquet. We have done the initial load. in the next blog we will finish by adding new data and updating for this logic.

We will also look at other issues. For example, what if you accidentally append the same data into the Parquet file etc.
