Microsoft Fabric Part 14. Taskmaster Project. Creating a Delta Load on Transformed Notebook

Previously, Our Notebooks take every csv file in the folder to transform.

We don’t want to do this. We want to Load the Files that haven’t been processed.

And further down the line we want to be able to do every file, Or just the unprocessed ones.

We want to do a Delta Load. This means changing the pyspark code we have created.

And a big thankyou to everyone who helped on the forums. especially
frithjof_v who helped with the code.

We want to do the following

For this exercise, Taskmastertransformed.parquet has been deleted. We are going to start at the beginning. And Load Series 1,2,3. Then 4,5 and 6

Go into the DataLake in Azure and remove all the Taskmaster files apart from 1,2 and 3.

Back to Taskmaster Transformed Notebook

Using mssparkutils, Get List of Files to be processed from Bronze folder

# abfss path to the folder where the csv files are located 
files_path = 'abfss://############@onelake.dfs.fabric.microsoft.com/############/Files/Data/Bronze/TaskMasterSeriesFiles/taskmaster'

# the mssparkutils.fs.ls method lists all the files (and/or subfolders) inside the folder.
files = mssparkutils.fs.ls(files_path)

# Convert FileInfo objects to list of tuples (this creates a list of the file names in the folder.)
file_data = [(file.name,) for file in files]

# This creates a dataframe consisting of the file names in the folder
df_files_in_folder = spark.createDataFrame(file_data, ["name"])

# Show the DataFrame (this step can be removed)
display(df_files_in_folder)



To get the abfss path for this code, Right click on the folder and Copy ABFSS path.

Remember, an abfss path is an azure blob file system path.

We have the three starting files in the shortcutted Data Lake Folder.

Remove the none Taskmaster file from the list

from pyspark.sql.functions import col

# Filter rows that start with "Taskmaster"
df_files_in_folder = df_files_in_folder.filter(col("name").startswith("Taskmaster"))

# Show the resulting dataframe
df_files_in_folder.show()

The data frame has been filtered

The Pre Processed Audit File.

We want to have a file that contains a list of all the Processed files. This is created later on, but one was needed from the offset. We need an empty dataframe to check so we don’t get an error.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType,TimestampType


# Create a Spark session
spark = SparkSession.builder.appName("CreateEmptyDataFrame").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("filename", StringType(), True),
    StructField("processedTime", TimestampType(), True),
    StructField("fullyProcessedFlag", IntegerType(), True)
])

# Create an empty DataFrame with the specified schema
df_log  = spark.createDataFrame([], schema)

# Show the schema of the empty DataFrame
df_log.printSchema()

display(df_log)

Create the Initial Parquet Processed Files File if it doesn’t already exist

We are going to read from the ProcessedFiles Parquet file so We need a file to read from. Even if its the first time running.

from pyspark.sql.utils import AnalysisException

# Define the paths
workspace_id = "#######-####-####-####-############"
lakehouse_id = "########-####-####-####-############"
file_path = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles.parquet"

# Check if the file exists
try:
    spark.read.parquet(file_path)
    print(f"File {file_path} already exists. Skipping write operation.")
except AnalysisException:
    # File does not exist, proceed with writing
    df_log.write.mode("append").parquet(file_path)
    print(f"File {file_path} does not exist. Writing data to {file_path}.")

We we use a try:

for exception handling.

E.g. try and read the file. If true. You can read it then don’t do anything

Except:

If the try was false. Move to the AnalysisException.

In this case. Simply create the file. And print that the file didn’t exist so we created it. And it will be empty because it is created from our empty schema. All ready to start from scratch.

List Files from the processed Log

# This creates a dataframe with the file names of all the files which have already been processed, using the processedFiles Parquet File

from pyspark.sql.functions import input_file_name, regexp_extract

workspace_id = “#####-####-####-####-#############”
lakehouse_id = “#####-####-####-####-#############”

parquet_file = (f”abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles.parquet”)

df_already_processed = spark.read.parquet(parquet_file)

display(df_already_processed)

Nothing has been processed. this is the first attempt so nothing has as yet been logged

Which files haven’t been processed?

# Selecting only the filename column from df_already_processed
df_already_processed_filenames = df_already_processed.select("filename")

# Selecting only the name column from df_files_in_folder
df_files_in_folder_names = df_files_in_folder.select("name")

# Performing subtract operation to find non-matching rows 
# (so only the file names of the files which have not been processed already are kept)
df_to_process = df_files_in_folder_names.subtract(df_already_processed_filenames)

# Showing the resulting DataFrame of files which have not yet been processed (this step can be removed)
display(df_to_process)

Note here we get just the filename and name columns. Then remove already processed from Processed using subtract

At this point there could be No files. 1 file or Multiple files to process. We can see our initial 3 files to process

What we need to do is create for loop to get all the none processed files into a dataframe. But first we need to create an empty dataframe to load them into

Create an Empty Dataframe

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType,TimestampType


# Create a Spark session
spark = SparkSession.builder.appName("CreateEmptyDataFrame").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Series", StringType(), True),
    StructField("Episode No", IntegerType(), True),
    StructField("Episode Name", StringType(), True),
    StructField("Episode Date", DateType(), True),
    StructField("Task Order", IntegerType(), True),
    StructField("Task", StringType(), True),
    StructField("Task Type", StringType(), True),
    StructField("Assignment", StringType(), True),
    StructField("Contestant", StringType(), True),
    StructField("Team", StringType(), True),
    StructField("Points", IntegerType(), True),
    StructField("Winner", IntegerType(), True),
    StructField("source_filename", StringType(), True),
    StructField("source_processedTime", TimestampType(), True)
])

# Create an empty DataFrame with the specified schema
dftm  = spark.createDataFrame([], schema)

# Show the schema of the empty DataFrame
dftm.printSchema()

source_filename and source_processedTime is for our meta data we are going to create, in the next step

Create the For Loop

from pyspark.sql.functions import current_timestamp, lit, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from datetime import datetime

# Loop through the dataframe which consist of the filenames
for row in df_to_process.rdd.collect(): # Collecting to driver (local) as a list of Rows 
    # Extract filename from the current row
    filename = row["name"]

    # Read the current csv file into a dataframe
    dfnf = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(files_path +"/" + filename)
    
    # Add filename column to the dataframe
    dfnf = dfnf.withColumn("source_filename", lit(filename))
    tidsstempel = current_timestamp()
    
    # Add current timestamp column ("source_processedTime") to the dataframe
    dfnf = dfnf.withColumn("source_processedTime", tidsstempel)
    
    # Append the dataframe to the table in Silver lakehouse
    #df.write.format("delta").mode("append").save('<Insert the abfss path to your silver layer Lakehouse table here>')

    dftm = dftm.union(dfnf)
    
    # Create a single-row DataFrame with the filename (this will be appended to the log table)
    single_row = [[filename]]
    single_row_schema = StructType([
        StructField("filename", StringType(), False)
    ])
   
    display(dftm)
    
  • Here, we are creating a for Loop, Collecting the list of every file name in df_to_process.
  • Then we get Name from the current row
  • Read the csv file of that name into a dataframe.
  • Add the file name, and a Date and time stamp.
  • Then Append to the dataframe created (dftm) so we could have 1 or many files here. usually it will be just one.
  • Repeat until all the unprocessed files are through into dftm

The Original Creation of the dataframe has been commented out.

Check dftm dataframe.

from pyspark.sql.functions import count

result_df = dftm.groupBy("source_filename").agg(count("*").alias("Total"))
   
display(result_df)

Here we can see the total of rows against each source file. Make sure you don’t accidentally process again into the dataframe.

Create the df_log

# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, current_date, col

#Get the distinct list of file names we are going to process
filenamesdf = dftm.withColumn("filename",col("source_filename")).select("filename").distinct()


# Remove everything after the "?" character, there used to be more in here but we dont have to worry about this since a code change
filenamesdf= filenamesdf.withColumn("filename", regexp_replace("filename", "\\?.*", ""))

#Add a process Date
filenamesdatedf = filenamesdf.withColumn("processedTime",current_timestamp())

#Add a flag
df_log = filenamesdatedf.withColumn("fullyProcessedFlag",lit(0))

display(df_log)

The following took the filename from our data, and added it into a data frame with the date and time of the process and 0 for our not fully processed flag.

The data is only fully processed when it has been added to the dimension and fact tables.

We can now run all the transformations as we did in previous posts. after bringing in the data.

  • Drop rows with all NULL values
  • Remove Special characters
  • Update the Date
  • Add day and add month and create new episode Date column
  • Bring in the Contestant Data
  • Bring in People data (Originally from Kaggle so may need manual updates eventually)
  • Join Contestants to the people data set (And check that everything maches)
  • Create age as at the time of the show. first get the minimum Episode Date
  • Join the Episode Date into the People dataframe
  • And calculate the age

Calculate the Age issue

IF we run the process and there are No Files to Process in the delta load table. the code can’t calculate using the min Series date.

Obviously this should rarely happen. But the use case for this is there so we need to handle it better.

We actually need to go back 1 step to joining the min_episode_date into df_Cont above. We only care about those contestants with an episode. So we need to change

# Join the min_episode_date into contestants
dfcont = dfcont.join(dfminSeriesCont,dfcont["Name"] == dfminSeriesCont["Contestant"], "left_outer").drop(dfminSeriesCont.Contestant)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dfcont)

Simply changing this to inner only brings back rows that exist on both sides.

Now, we get to the Code to calculate the age. This is the amended code (Just in case we have Nulls)

from pyspark.sql.functions import datediff, col, when

# Convert the date of birth column to a date type in dfCont
dfcont = dfcont.withColumn("dob", dfcont["dob"].cast("date"))

#Calculate age using dob and current date 
dfcontAge = dfcont.withColumn("age_decimal", when(dfcont["min_episode_date"].isNotNull(), datediff(dfcont["min_episode_date"], dfcont["dob"]) / 365)
.otherwise(None))

#Convert from decimal to int  
dfcontAge = dfcontAge.withColumn("age", col("age_decimal").cast("int"))

#Convert Contestant ID to int
dfcontAge = dfcontAge.withColumn("contestant ID", col("contestant ID").cast("int"))

display(dfcontAge)

Here we add a when function. so when min episode date isn’t null we do the calculation. otherwise nothing happens. None is a special value to denote the absence of a value.

Also as a new code block. We want to make sure everyone has an age.

# Select distinct Name and age, order by age in descending order
result_df = dfcontAge.select("Name", "age").distinct().orderBy(col("age").desc())

# Show the result
result_df.show()

Another possible one to watch out for in the automated process. We don’t want anything to come through without a set age.

From here we can continue with the final previous blog post logic

  • Create an age group

Changes Creating the Parquet file

There is something we need to understand at this point, You can’t pull data from a parquet file based on a filter. You can only filter the dataframe. We want to delta load facts and dimensions. Therefore we should partition the taskmastertransformed file, to avoid loading in all the data.

We create 3 Parquet Files here.

  • TaskmasterTransformed.
  • contestantTransformed (From data that we manually update in on file)
  • ProcessedFiles.parquet which feeds our for loop with the processed file names.

This is the original Create Parquet file for TaskmasterTransformed

Note, the mode is overwrite. We now want to append the data

workspace_id = “########-####-####-############”

lakehouse_id = “########-####-####-####-############”

dftm_cleaned.write.partitionBy(“source_filename”).mode(“append”).parquet(f”abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/taskmasterTransformed.parquet”)

  • PartitonBy has been added to create filename partitions
  • append has replaced Overwrite for the mode

Check the Parquet File for taskmasterTransformed

And we can have a look at the Parquet file created in the Silver Lakehouse

# This creates a dataframe with the file names of all the files which have already been processed

from pyspark.sql.functions import input_file_name, regexp_extract

workspace_id = "########-####-####-############"

lakehouse_id = "########-####-####-####-############"

parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/taskmasterTransformed.parquet")

df_already_processed = spark.read.parquet(parquet_file)

display(df_already_processed)

contestantTransformed

At this point, we also save the overwritten ContestantTransformed information.

dfcontAge.write.mode("overwrite").parquet(f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/contestantTransformed.parquet")

But we have changed the logic.

  1. There may be no data in here, if you are processing and the files have been processed
  2. There should only be data that hasn’t previously been processed in here

Which again means that we need to change Overwrite to append.

Processed Files Parquet

workspace_id = "########-####-####-############"

lakehouse_id = "########-####-####-####-############"

df_log.write.mode("append").parquet(f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles.parquet")

Check the Log Parquet File

# This creates a dataframe with the file names of all the files which have already been processed

from pyspark.sql.functions import input_file_name, regexp_extract

workspace_id = "########-####-####-############"

lakehouse_id = "########-####-####-####-############"

parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles.parquet")

df_plog = spark.read.parquet(parquet_file)

display(df_plog)

Conclusion

We now have a transformed dataset with 3 series, that are partitioned by series.

We also have a ProcessedFiles Parquet file with a flag to show that the process is not yet complete.

  • If you start from scratch. You create an empty Parquet file and the first files processed are appended at the end
  • Otherwise, new files are appended to the file

We will go on to complete the the dims and facts and then set the 0 to 1 in ProcessedFiles to show everything is completed in the next posts .

Then we can run again, adding the next series into the shortcutted Data Lake. And we can refine and make the process better as we go.

Design a site like this with WordPress.com
Get started