Microsoft Fabric Part 17. Taskmaster Project. Adding more Pyspark code and meta data to the Transformed Data file

In Part 16 we created a pipeline to run through our 5 Notebooks.

We also ran some sql to check the data and found that Series 1 2 and 3 had been added 4 times into the delta parquet files.

We want to add more information through the run so we can error check the loading. So in this blog we are going back to the parquet files to make some changes.

This post will specifically be for the Transformed parquet file in the silver layer.

We are again, deleting al the files created and starting again. However, instead of starting with S1 S2 and S3 in the data lake we are going to start with S1 only. And build from there.

These amendments are made before we get started

Only the changes will be mentioned. We won’t go through all the code blocks again if there is no change to them.

Taskmaster Transformed

Update the Log Schema

We are changing our log file to include NoRows and contestanttransformednoRows (So we can check that we aren’t adding duplicates here)

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType,TimestampType


# Create a Spark session
spark = SparkSession.builder.appName("CreateEmptyDataFrame").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("filename", StringType(), True),
    StructField("processedTime", TimestampType(), True),
    StructField("NumberOfRows", IntegerType(), True),
    StructField("ContestantTransformedNumberOfRows", IntegerType(), True),
    StructField("fullyProcessedFlag", IntegerType(), True)
])


# Create an empty DataFrame with the specified schema
df_log  = spark.createDataFrame([], schema)

# Show the schema of the empty DataFrame
df_log.printSchema()

display(df_log)

Check ProcessedFiles exist

The next step is to check if the ProcessedFile exist. However, we want to change this file from parquet (file) to Delta parquet (Table) so this codeblock needs to change

from pyspark.sql.utils import AnalysisException

# Define the tablename
table_name = “SilverDebbiesTraininglh.ProcessedFiles”

# Check if the file exists
try:
spark.read.format(“delta”).table(table_name)

print(f”delta parquet {table_name} already exists. Skipping write operation.”)

except Exception as e:
if “TABLE_OR_VIEW_NOT_FOUND” in str(e):
# File does not exist, proceed with writing
df_log.write.format(“delta”).mode(“append”).saveAsTable(table_name)
print(f”File {table_name} does not exist. Writing data to {table_name}.”)

else:
# Re-raise the exception if it’s not the one we’re expecting
raise e

The variable e holds the exception object

The processedFiles file is now a Delta Parquet managed table. We will need to address this throughout the processing of the data.


List all the files that have already been processed

Another code block to change because we are now looking at tables, not files

# This creates a dataframe with the file names of all the files which have already been processed

df_already_processed = spark.sql("SELECT * FROM SilverDebbiesTraininglh.processedfiles")


display(df_already_processed)

At the moment, there is no data available.

Check the number of records again when null rows have been deleted

Part of the transform code. There is a possibility that there are null rows come through from the file.

We need to run the following code again after we remove the null rows

from pyspark.sql.functions import count

notmrows_df = dftm.groupBy("source_filename").agg(count("*").alias("NumberOfRows"))
   
display(notmrows_df)

We don’t want to log the number of rows and then find out that we have lots of null rows that have been lost and not recorded.

Join Contestants to Taskmaster data, only return contestants in the current set

Found an issue here. Josh is also in Champion of Champions and is in here twice. We only want S1. Therefore we need to change this join to reflect this.

We need to go back up one block to

This is what we join to. We create a dataframe that gives us the min episode date for age as at that point in time. We only join to Contestant. We need to also join on Series to avoid this issue.

Updated to include Series

Now back to the join.

# Join the min_episode_date into contestants
dfcont = dfcont.join(dfminSeriesCont,
            (dfcont["Name"] == dfminSeriesCont["Contestant"])&
            (dfcont["series_label"] == dfminSeriesCont["Series"]), "inner")\
.drop(dfminSeriesCont.Contestant)\
.drop(dfminSeriesCont.Series)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dfcont)

Series has been added to the join. And we remove Series and Contestant using drop, so we only get the data from the Contestant side.

Now we only have 5 records. 5 contestants to a series

Get the number of rows from the contestants file

We also want to record how many rows we have in the contestants file so we can check if anything has errored here.

from pyspark.sql.functions import count

cresult_df = dfcontAge.groupBy("series_label").agg(count("*").alias("ContestantTransformedNumberOfRows"))

display(cresult_df)

Add number of Rows to the log for both the taskmaster file and the contestant file

# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, current_date, col,regexp_extract

#Get the distinct list of file names we are going to process
filenamesdf = dftm.withColumn("filename",col("source_filename")).select("filename").distinct()

# Remove everything after the "?" character, there used to be more in here but we dont have to worry about this since a code change
filenamesdf= filenamesdf.withColumn("filename", regexp_replace("filename", "\\?.*", ""))

#Add a process Date
filenamesdatedf = filenamesdf.withColumn("processedTime",current_timestamp())

# Join df_log with result_df on the filename column
filenamesRowsdf = filenamesdatedf.join(notmrows_df.select("source_filename", "NumberOfRows"), filenamesdatedf.filename ==notmrows_df.source_filename, "left").drop("source_filename")

display(filenamesRowsdf)

# add in the Contestant Transformed number of rows
filenamescnorowsdf = filenamesRowsdf.join(
    cresult_df.select("series_label", "ContestantTransformedNumberOfRows"),
    regexp_extract(filenamesRowsdf["filename"], r"Taskmaster_(S\d+)_", 1) == cresult_df["series_label"],
    "left"
    ).drop("series_label")

display(filenamescnorowsdf)


#Add a flag
df_log = filenamescnorowsdf.withColumn("fullyProcessedFlag",lit(0))

display(df_log)

Adding NumberOfRows

Adding the extra code has selected source_filename and NumberOfRows from notmrows_df. Left joined to our filenamesdatedf

Adding Contestant Number of Rows

Same again but we have to take the S1 from the filename using regexp_extract

Finally we add the flag.

This matches our schema.

Change ContestantTransformedNumberOfRows and NumberOfRows: from long to int

We have another issue in that an int has been reset to long. We need to deal with this before adding into the delta parquet.

from pyspark.sql.functions import col

# Change the data type of ContestantTransformedNumberOfRows and NumberOfRows from long to integer
df_log = df_log.withColumn("ContestantTransformedNumberOfRows", col("ContestantTransformedNumberOfRows").cast("integer"))
df_log = df_log.withColumn("NumberOfRows", col("NumberOfRows").cast("integer"))

# Display the schema to verify the change
df_log.printSchema()

And change the last section when we update processedFiles to Delta Parquet.

# Define the tablename
table_name = "SilverDebbiesTraininglh.ProcessedFiles"


df_log.write.format("delta").mode("append").saveAsTable(table_name)

print(f"File {table_name} does not exist. Writing data to {table_name}.")     

Check the processedFiles Parquet File

Finally, change this code to check the Delta Parquet file.

# This creates a dataframe with the file names of all the files which have already been processed

df_already_processed = spark.sql("SELECT * FROM SilverDebbiesTraininglh.processedfiles")


display(df_already_processed)

Conclusion

We have done a few updates here.

First of all. The ProcessedFiles Parquet has been changed to Delta Parquet for a few reasons. one is that we can use the SQL Analytic Endpoint to check the file. Another is that we can connect to the Delta Parquet with Power BI and create Audit Reporting.

Another change is that we have introduced new fields to the ProcessFiles Delta Parquet.

  • We have the NumberOfRows from the Taskmaster file. Always split by series.
  • And ContestantTransformedNumberOfRows to check that we just have 5 contestants for every series in the ContestantsTransformed File.

We can now move on an update the dimensions.

Design a site like this with WordPress.com
Get started