Microsoft Fabric Part 18. Taskmaster Project. Adding more meta data to check throughout the Dimension and Fact run. More changes to the Pyspark code

Now we have updated our ProcessedFiles delta parquet, we can update the dimensions accordingly.

  • It might be nice to have another delta parquet file here we can use to collect meta data for dims and facts.
  • We will also want the processedDate in each dim and fact table.

We are going to start with Delta Load 2. So. Series 1 and 2 have already been loaded. We are now loading Series 3 (This shows off some of the logic better)

Dim Contestant.

Bring back the list of Processed Files from ProcessedFiles

Immediately we have a change to the code because its delta Parquet and not Parquet

# Define the ABFS path to the Delta table
delta_table_path = "abfss://########-####-####-####-############@onelake.dfs.fabric.microsoft.com/########-####-####-####-############/Tables/processedfiles"

# Read the Delta table
dflog = spark.read.format("delta").load(delta_table_path)

# Filter the DataFrame
df_currentProcess = dflog[dflog["fullyProcessedFlag"] == 0][["Filename"]]

# Display the DataFrame
display(df_currentProcess)

New Code Block Check processed Files

In the previous code, we:

  • Create a new taskmaster schema to load data into
  • Loop through the files and add data to the above schema
  • Add the current process data into CurrentProcessedTMData.parquet.
  • Add the ProcessDate of the Taskmaster transformed data

We know at some point that we are having issues with the fact table getting duplicate series so we need to check throughout the process where this could be happening.

  • Change the processDate from being the TaskmasterTransformed process date, in case this was done on an earlier day. We want the current date here to ensure everything groups together.
  • We could also set up this data so we can store it in our new Delta parquet Log
  • Because we are also adding process date to the group. we need to be careful. minutes and seconds could create extra rows and we don’t want that
  • We also know that we are doing checks on the data as it goes in. And rechecking the numbers of historical data. so we need a flag for this
from pyspark.sql import functions as F

workspace_id = "########-####-####-####-############"
lakehouse_id = "########-####-####-####-############"

# Define the file path
file_path = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/CurrentProcessedTMData.parquet"

# Read the Parquet file
dfCheckCurrentProc = spark.read.parquet(file_path)

# Add the new column
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("sequence", F.lit(1))
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("parquetType", F.lit("parquet"))
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("analyticsType", F.lit("Transformation"))
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("name", F.lit("CurrentProcessedTMData"))
# Set the date as the current date
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("processedDate", current_date())
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("historicalCheckFlag", F.lit(0))
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("raiseerrorFlag", F.lit(0))

# Select the required columns and count the rows
dfCheckCurrentProc = dfCheckCurrentProc.groupBy("sequence","parquetType","analyticsType","name","source_filename", "processedDate","historicalCheckFlag","raiseerrorFlag").count()

# Rename the count column to TotalRows
dfCheckCurrentProc = dfCheckCurrentProc.withColumnRenamed("count", "totalRows")

# Show the data
display(dfCheckCurrentProc)
  • current_date() gets the current date. We only have date in the grouped auditing table. date and time can go into the dimension itsself in the process date.
  • sequence allows us to see the sequence of dim and fact creation
  • parquetType is either Parquet or Delta Parquet for this project
  • analyticsType is the type of table we are dealing with
  • historicalCheckFlag. Set to 0 if we are looking at the data being loaded. 1 if the checks are against older data.
  • raiseErrorflag if there looks like we have any issues. This can be set to 1 and someone can be alerted using power BI Reporting.

It would be really good to also have a run number here because each run will consist of a number of parquet files. In this case parquet and delta parquet

Get ContestantTransformed data

A small update here. Just in case, we use distinct to make sure we only have distinct contestants in this list

# Ensure the DataFrame contains only distinct values
dfc = dfc.distinct()

Merge Contestants and Taskmaster data

This is the point where we create the Contestant Dimension. the processDate to the current date and alias to to just processedTime.

from pyspark.sql.functions import col, current_date
from pyspark.sql import SparkSession, functions as F

df_small = F.broadcast(dfc)


# Perform the left join
merged_df = dftm.join(df_small, dftm["Contestant"] == df_small["Name"], "left_outer")\
                .select(
                    dfc["Contestant ID"],
                    dftm["Contestant"].alias("Contestant Name"),
                    dftm["Team"],
                    dfc["Image"],
                    dfc["From"],
                    dfc["Area"],
                    dfc["country"].alias("Country"),
                    dfc["seat"].alias("Seat"),
                    dfc["gender"].alias("Gender"),
                    dfc["hand"].alias("Hand"),
                    dfc["age"].alias("Age"),
                    dfc["ageRange"].alias("Age Range"),
                    current_timestamp().alias("processedTime"),
                ).distinct()



# Show the resulting DataFrame
merged_df.show()

Add the Default row

This changes because we have a new column that we need to add.

from pyspark.sql import SparkSession,Row
from pyspark.conf import SparkConf
from pyspark.sql.functions import current_timestamp


workspace_id = "########-####-####-####-############"
lakehouse_id = "########-####-####-####-############"

path_to_parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/dimcontestant")

# Check if the Delta Parquet file exists
delta_file_exists = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()).exists(spark._jvm.org.apache.hadoop.fs.Path(path_to_parquet_file))

if delta_file_exists:
    print("Delta Parquet file exists. We don't need to add the NA row")
else:

    # Add a Default row
    # Create a sample DataFrame
    data = [(-1, "Not Known","Not Known","Not Known","Not Known","Not Known","Not Known","0","NA","0","0","0")]
    columns = ["Contestant ID", "Contestant Name", "Team","Image","From","Area","Country","Seat","Gender","Hand","Age","Age Range"]
    new_row  = spark.createDataFrame(data, columns)

    # Add the current timestamp to the ProcessedTime column
    new_row = new_row.withColumn("ProcessedTime", current_timestamp())

    # Union the new row with the existing DataFrame
    dfContfinal = dfContfinal.union(new_row)

    # Show the updated DataFrame
    dfContfinal.show(1000)

    print("Delta Parquet file does not exist. Add NA Row")

The above screen show is for Series 1 and 2. Just to show you that the row is created if its the first time.

All the process dates are the same (at least the same day) ensuring everything will be grouped when we add it to the audit delta parquet table.

However, this is displayed for our 2nd load for series 3 (2nd load)

Check the new data that has been loaded into the delta parquet.

Checking the data, checks the dimension that has been created as delta parquet This is the last code section and we want to build more in after this

Get the Contestant Names and Source Files of the Current Load

Because we can process either 1 file alone, or multiple files. we need to get the source file name AND the contestant name at this point.

#Get just the contestant name and source File name from our currrent load

df_contestant_name = dftm.selectExpr(“`Contestant` as contestantNameFinal”, “`source_filename` as sourceFileNameFinal”).distinct()

df_contestant_name.show()

For the audit we need to know what our current Contestants are and who are the Contestants we have previously worked with. Also the original file they below too.

We took this information from an earlier dataframe when we loaded the source file to be used to create the dimension. This still contains the source file name. The Dimension does not contain the file name any more.

Join df_contestant_name to dfnewdimc to see what data is new and what data has been previously loaded

from pyspark.sql.functions import broadcast, when, col

df_joined = dfnewdimc.join(
    df_contestant_name,
    dfnewdimc.ContestantName == df_contestant_name.contestantNameFinal,
    how='left'
)
# Add the currentFlag column
df_joined2 = df_joined.withColumn(
    "historicalCheckFlag", 
    when((col("contestantNameFinal").isNull()) & (col("ContestantKey") != -1), 1).otherwise(0))

# Drop the ContestantNameFinal column
df_joined3 = df_joined2.drop("contestantNameFinal")

#If its an audit row set sourceFileNameFinal to Calculated Row
df_joined4 = df_joined3.withColumn(
    "sourceFileNameFinal",
    when((col("sourceFileNameFinal").isNull()) & (col("ContestantKey") == -1), "Calculated Row")
    .otherwise(col("sourceFileNameFinal"))
)

# Show the result
display(df_joined4)

This joins the name we have just created to the data we are working with so we can set the historicalCheckFlag. For the first load everything is 0 because they are current.

In this load we can see some 1’s These are records already in the system and don’t match to our current load.

This file also now contains the source_filename

If its a default row, there will be no source filename so we set to calculated Row instead

Creating Dim Audit row

from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql import DataFrame

# Add the new column

dfdimaudit = df_joined4.withColumn("sequence", F.lit(2))

dfdimaudit = dfdimaudit.withColumn("parquetType", F.lit("Delta parquet"))

dfdimaudit = dfdimaudit.withColumn("analyticsType", F.lit("Dimension"))

dfdimaudit = dfdimaudit.withColumn("name", F.lit("Dim Contestant"))

# Extract the date from source_processedTime
dfdimaudit  = dfdimaudit.withColumn("processedDate", F.to_date("processedTime"))
dfdimaudit =  dfdimaudit.withColumn("raiseerrorFlag", F.lit(0))

# Select the required columns and count the rows
dfdimauditgr = dfdimaudit.groupBy("sequence","parquetType","analyticsType","name", "sourceFileNameFinal","processedDate","historicalCheckFlag","raiseerrorFlag").count()


# Alias sourceFileNameFinal to sourceFileName
dfdimauditgr = dfdimauditgr.withColumnRenamed("sourceFileNameFinal", "source_filename")

# Alias count to totalRowsdfdimauditgr = dfdimauditgr.withColumnRenamed("count", "totalRows")

# Show the data
display(dfdimauditgr)  

The files are grouped on processed time. So we need to deal with this accordingly so set date and time to just date using to_date() (To avoid minutes and seconds causing issues)

Our default row is set against, calculated Row.

The null source file name is the total count of our historical records

We are going to append the dim audit to the initial current processed audit so they both need exactly the same columns and these are

  • sequence
  • parquetType
  • analyticsType
  • name
  • source_filename
  • processedDate
  • totalRows
  • historicalCheckFlag
  • raiseerrorFlag

and we have the correct source file name. Even if we process multiple files.

Add Historical Data Check to null source filename

Append the audit for the transformation data and Dimension data

# Append the current processed transformation file(s) audit row with the dimension audit

audit3_df = dfCheckCurrentProc.unionByName(dfdimauditgr)

display(audit3_df) 

With the S3 file loaded we can see

  • Transformation rows for series 3.
  • Dimension rows for Contestant S3
  • The historical row count of all series already in the dimension.

Create Batch numbers for the audit.

If its the first time we run this process. Set to 0. Else set to the Match Last Batch No + 1

from pyspark.sql.functions import lit

# Function to check if a table exists
def table_exists(spark, table_name):
    return spark._jsparkSession.catalog().tableExists(table_name)

# Table name
table_name = "SilverDebbiesTraininglh.processeddimfact"

if table_exists(spark, table_name):

    # If the table exists, get the max batchno
    df = spark.table(table_name)
    max_batchno = df.agg({"batchno": "max"}).collect()[0][0]
    next_batchno = max_batchno+1

    # Add the batchNo column with a value of 0
    audit3_df = audit3_df.withColumn("batchNo", lit(next_batchno))

    print(f"processedDimFact data already exists. Next batchno: {next_batchno}")
    display(audit3_df) 
else:

    # Add the batchNo column with a value of 0
    audit3_df = audit3_df.withColumn("batchNo", lit(0))


    # Reorder columns to make batchNo the first column
    columns = ['batchNo'] + [col for col in audit3_df.columns if col != 'batchNo']
    audit3_df = audit3_df.select(columns)

    print("Table does not exist. Add Batch No 0")

    display(audit3_df) 

The following gives more information about the code used.

  • def table_exists(spark, table_name): defines a function named table_exsts that takes two parameters. the spark session object and the table name
  • Next we define a function named table_exists that will look for the table name
  • spark._jsparkSession.catalog() accesses the spark session catalogue which contains metadata about the entities in the spark session.
  • Basically, this is checking if the table name exists in the spark session catalogue. It it does we get the match Batch no from the table. Add 1 and add that as the column next_batchno
  • If its false we start batchno at 0

How does the column reordering work?

  • [‘batchNo’]: This creates a list with a single element, ‘batchNo’. This will be the first column in the new order.
  • [col for col in audit3_df.columns if col != ‘batchNo’]: This is a list comprehension that iterates over all the columns in audit3_df and includes each column name in the new list, if removes ‘batchNo’ from the list.

List Comprehension: Create lists by processing existing lists (Like columns here) into a single line of code.

  • Combining the lists: The + operator concatenates the two lists. So, [‘batchNo’] is combined with the list of all other columns, resulting in a new list where ‘batchNo’ is the first element, followed by all the other columns in their original order.

And here we can see in Load 2 we hit the if section of the above code. Our batch no is 1 (Because the first load was 0)

Reset totalRows to Integer

Found that totalRows was set to long again so before adding into delta parquet. reset to int.

Bring back the processeddimFact before saving with the new rows

Before saving the new audit rows we want to know if there are any issues. This means we need the data. However we also need to deal with the fact that this may be the first process time and there won’t be a file to check.

table_path = "abfss://########-####-####-####-############@onelake.dfs.fabric.microsoft.com/########-####-####-####-############/Tables/processeddimfact"
table_exists = DeltaTable.isDeltaTable(spark, table_path)

if table_exists:
    # Load the Delta table as a DataFrame
    dfprocesseddimfact = spark.read.format("delta").table("SilverDebbiesTraininglh.processeddimfact")

    # Display the table
    dfprocesseddimfact.show()  # Use show() instead of display() in standard PySpark

    # Print the schema
    dfprocesseddimfact.printSchema()

    print("The Delta table SilverDebbiesTraininglh.processeddimfact exists. Bring back data")
else:

    # Define the schema
    schema = StructType([
        StructField("batchNo", IntegerType(), nullable=False),
        StructField("sequence", IntegerType(), nullable=False),
        StructField("parquetType", StringType(), nullable=False),
        StructField("analyticsType", StringType(), nullable=False),
        StructField("name", StringType(), nullable=False),
        StructField("source_filename", StringType(), nullable=True),
        StructField("processedDate", DateType(), nullable=True),
        StructField("historicalCheckFlag", IntegerType(), nullable=False),
        StructField("raiseerrorFlag", IntegerType(), nullable=False),
        StructField("totalRows", IntegerType(), nullable=False)
    ])

    # Create an empty DataFrame with the schema
    dfprocesseddimfact = spark.createDataFrame([], schema)

    dfprocesseddimfact.show()

    print("The Delta table SilverDebbiesTraininglh.processeddimfact does not exist. Create empty dataframe")

If there is no data we can create an empty schema.

Get the Total of Dim Contestant rows (None Historical)

from pyspark.sql.functions import col, sum

# Filter the dataframe where name is "Dim Contestant"
filtered_df = dfprocesseddimfact.filter((col("Name") == "Dim Contestant") & (col("historicalCheckFlag") == 0)& (col("source_filename") != "Calculated Row"))

display(filtered_df) 


# Calculate the sum of TotalRows
total_rows_sum = filtered_df.groupBy("Name").agg(sum("TotalRows").alias("TotalRowsSum"))

# Rename the "Name" column to "SelectedName"
total_rows_sum = total_rows_sum.withColumnRenamed("Name", "SelectedName")


display(total_rows_sum) 

# Print the result
print(f"The sum of TotalRows where name is 'Dim Contestant' is: {total_rows_sum}")

Both Initial (None Historical Loads) equal 10

Get Historical Rows from our latest process and join to total Loads of the previous Load

#Join total_rows_sum to audit3_df
from pyspark.sql.functions import broadcast

# Broadcast the total_rows_sum dataframe
broadcast_total_rows_sum = broadcast(total_rows_sum)

# Perform the left join
df_checkolddata = audit3_df.filter(
    (audit3_df["historicalCheckFlag"] == 1) & (audit3_df["source_filename"] != "Calculated Row")
    ).join(
    broadcast_total_rows_sum,
    audit3_df["name"] == broadcast_total_rows_sum["SelectedName"],
    "left"
)

# Show the result
display(df_checkolddata)

So Series 1 and 2 have 10 contestants. After S3 load there are still 10 contestants. Everything is fine here. The historical records row allows us to ensure we haven’t caused any issues.

Set Raise Error Flag if there are issues

And now we can set the error flag if there is a problem which can raise an alert.

from pyspark.sql.functions import col

# Assuming df is your DataFrame
total_rows_sum = df_checkolddata.agg({"TotalRowsSum": "sum"}).collect()[0][0]
total_rows = df_checkolddata.agg({"totalRows": "sum"}).collect()[0][0]

# Check if TotalRowsSum equals totalRows
if total_rows_sum == total_rows:
    print("No issue with data")

else:
    df_checkolddata = df_checkolddata.withColumn("raiseerrorFlag", lit(1))

    print("Old and historical check does not match. Issue. Set raiseerrorFlag = 1")

# Show the updated DataFrame
df_checkolddata.show()

raiseerrorFlag is still 0

Remove the historical check row if there is no error

from pyspark.sql.functions import col

# Alias the DataFrames
audit3_df_alias = audit3_df.alias("a")
df_checkolddata_alias = df_checkolddata.alias("b")

# Perform left join on the name column and historicalCheckFlag
df_audit4 = audit3_df_alias.join(df_checkolddata_alias, 
                              (col("a.name") == col("b.name")) & 
                              (col("a.historicalCheckFlag") == col("b.historicalCheckFlag")), 
                              how="left")

# Select only the columns from audit3_df and TotalRowsSum from df_checkolddata, and alias historicalCheckFlag
df_audit5 = df_audit4.select(
    "a.*",
    col("b.raiseerrorFlag").alias("newraiseErrorFlag")
)


# Filter out rows where newraiseerrorflag is 0 because its fine. We don';'t need to therefore know about the historical update
df_audit6  = df_audit5.filter((col("newraiseErrorFlag") != 0) | (col("newraiseErrorFlag").isNull()))

# Remove the columns newraiseerrorflag
df_audit7 = df_audit6.drop("newraiseErrorflag")


# Show the result
display(df_audit7)

the historical row has been removed. if there isn’t a problem, there is no need to hold this record.

If the data has multiple audit rows. Flag as an issue. Else remove

# Count the number of rows where ContestantName is "Not Known"
dfcount_not_known = dfnewdimc.filter(col("ContestantName") == "Not Known").count()

# Print the result
print(f"Number of rows where ContestantName is 'Not Known': {dfcount_not_known}")

# Check if count_not_known is 1
if dfcount_not_known == 1:
    # Remove row from df_audit7 where source_filename is "CalculatedRow"
    df_audit8 = df_audit7.filter(col("source_filename") != "Calculated Row")

    print(f"1 dim audit row. We can remove the row from the audit table")

else:
    # Set source_filename to "Multiple Calculated Rows issue"
    df_audit8 = df_audit7.withColumn("source_filename", 
                                     when(col("source_filename") == "Calculated Row", 
                                          lit("Multiple Calculated Rows issue"))
                                     .otherwise(col("source_filename")))


    print(f"multiple dim audit row. We can specify this in the audit table")                                

# Show the updated df_audit7
df_audit8.show()

The Audit rows has now been removed. Unless there is an issue we don’t need it. this will be useful if we accidentally process multiple audit rows into the table.

Save as Delta Parquet

from delta.tables import DeltaTable

#You can also add the none default daya lake by clicking +Lakehouse
audit3_df.write.mode("append").option("overwriteSchema", "true").format("delta").saveAsTable("SilverDebbiesTraininglh.processedDimFact")

# Print a success message
print("Parquet file appended successfully.")

And save the data as a Delta Parquet processedDimFact in our Silver transform layer.

We can use both of these Delta Parquet Files later.

Check processdimfact

Dim Contestant is complete for now. But we will come back later to update the code.

Dim Episode

Create Episode Dim

Small change. We are adding processedTime as the current date and time.

from pyspark.sql.functions import current_timestamp

dftmEp = dftm.select("Series","Episode No" ,"Episode Name").distinct()

# Add the current timestamp as processedTime
dftmEp = dftmEp.withColumn("processedTime", current_timestamp())

dftmEp = dftmEp.orderBy("Series", "Episode No")

display(dftmEp)

Add a Default row

Just like Contestant. We need ProcessedTime adding

from pyspark.sql.functions import current_timestamp
 # Add the current timestamp to the ProcessedTime column
    new_row = new_row.withColumn("ProcessedTime", current_timestamp())

Again, Here is the screenshot taken on the first load of S1 and S2

And our S3 Load 2 hits the if block and no default row is created

Get the Series, Episode Name and Source File of the Processed data

#Get just the series, Episode Name and source_filename from our current load

df_SeEp = dftm.selectExpr("`Series` as SeriesFinal", "`Episode Name` as EpisodeFinal", "source_filename").distinct()

display(df_SeEp) 

` are used in PySpark to handle column names that contain special characters, spaces, or reserved keywords.

Join df_SeEp to dfnewdimt

Join the list pf currently processed series and episodes to everything in the dimension so far (the data frame we created when we checked the dim


from pyspark.sql.functions import broadcast, when, col

# Broadcast df_contestant_name and perform the left join
df_joined = dfnewdimt.join(
    broadcast(df_SeEp),
    (dfnewdimt.Series == df_SeEp.SeriesFinal) & (dfnewdimt.EpisodeName == df_SeEp.EpisodeFinal),
    "left"
)
# Add the currentFlag column
df_joined2 = df_joined.withColumn("historicalCheckFlag", when(col("SeriesFinal").isNull(), 1).otherwise(0))

# Drop the ContestantNameFinal column
df_joined3 = df_joined2.drop("SeriesFinal").drop("EpisodeFinal")

#If its an audit row set sourceFileNameFinal to Calculated Row
df_joined4 = df_joined3.withColumn(
    "source_filename",
    when((col("source_filename").isNull()) & (col("EpisodeKey") == -1), "Calculated Row")
    .otherwise(col("source_filename"))
)

# Show the result
display(df_joined4)

All historicalCheckFlags are set to 0 if its the current Load (s3). 1 when there is no match and its historical data. the source_filename from df.SeEp has been left in the result which can be used later.

Again we have set the Episode Keys source_filename to “calculated Row” because there is no source to join this one record on by the series and episode name.

Add audit row

At the end of the process. We will add an audit row

from pyspark.sql import functions as F

# Add the new column
dfdimaudit = df_joined4.withColumn("sequence", F.lit(3))

dfdimaudit = dfdimaudit.withColumn("parquetType", F.lit("Delta parquet"))

dfdimaudit = dfdimaudit.withColumn("analyticsType", F.lit("Dimension"))

dfdimaudit = dfdimaudit.withColumn("name", F.lit("Dim Episode"))

# Extract the date from source_processedTime
dfdimaudit  = dfdimaudit.withColumn("processedDate", F.to_date("processedTime"))

dfdimaudit =  dfdimaudit.withColumn("raiseerrorFlag", F.lit(0))

# Select the required columns and count the rows
dfdimauditgr = dfdimaudit.groupBy("sequence","parquetType","analyticsType","name", "source_filename","processedDate","historicalCheckFlag","raiseerrorFlag").count()

# Alias count to totalRows
dfdimauditgr = dfdimauditgr.withColumnRenamed("count", "totalRows")

# Show the data
display(dfdimauditgr) 

for Load 2 we have 5 records in the current load with the source filename set. The historical load of 11 and the calculated Audit -1 row.

Bring back our processeddimfact data

from delta.tables import DeltaTable
from pyspark.sql.functions import col, max

# Load the Delta table as a DataFrame
dfprocesseddimfact = spark.read.format("delta").table("SilverDebbiesTraininglh.processeddimfact")

# Find the maximum BatchNo
max_batch_no = dfprocesseddimfact.agg(max("BatchNo")).collect()[0][0]

# Filter the DataFrame to include only rows with the maximum BatchNo
dfprocesseddimfact = dfprocesseddimfact.filter(col("BatchNo") == max_batch_no)

# Display the table
display(dfprocesseddimfact)
dfprocesseddimfact.printSchema()

Here we find the max batch number and then filter our data frame to only give us the max batch number which is the one we are working on. (Because at this point, we are in the middle of a batch of dims and facts)

Create a distinct batch No

distinct_batch_no = dfprocesseddimfact.select("batchNo").distinct()

display(distinct_batch_no)

We just want that distinct Max Batch number for the next process

Join to our Dim Episode Audit row to bring in the batch number via Left outer join

# Alias the dataframes
dfdimauditgr_alias = dfdimauditgr.alias("dfdimauditgr")
distinct_batch_no_alias = distinct_batch_no.alias("dfprocesseddimfact")

# Perform inner join
audit2_df = dfdimauditgr_alias.crossJoin(distinct_batch_no_alias)

# Select specific columns
audit2_df = audit2_df.select(
    distinct_batch_no_alias.batchNo,
    dfdimauditgr_alias.sequence,
    dfdimauditgr_alias.parquetType,
    dfdimauditgr_alias.analyticsType,
    dfdimauditgr_alias.name,
    dfdimauditgr_alias.source_filename,
    dfdimauditgr_alias.totalRows,
    dfdimauditgr_alias.processedDate,
    dfdimauditgr_alias.historicalCheckFlag,
    dfdimauditgr_alias.raiseerrorFlag
)

audit2_df = audit2_df.distinct()

# Show result
display(audit2_df)

The cross join (Cartesian Join) is because there is only 1 batch number set in distinct_batch_no (the Max Batch number is the one we are working on) So no need to add join criteria.

Set totalRows to Int

Once again TotalRows needs setting to Int

Get the processeddimfact data

# Load the Delta table as a DataFrame
    dfprocesseddimfact = spark.read.format("delta").table("SilverDebbiesTraininglh.processeddimfact")

    # Display the table
    dfprocesseddimfact.show()  # Use show() instead of display() in standard PySpark

    # Print the schema
    dfprocesseddimfact.printSchema()
    # Load the Delta table as a DataFrame
    dfprocesseddimfact = spark.read.format("delta").table("SilverDebbiesTraininglh.processeddimfact")

    # Display the table
    dfprocesseddimfact.show()  # Use show() instead of display() in standard PySpark

    # Print the schema
    dfprocesseddimfact.printSchema()

We will need this to check our historical checked data against the original update

Get the total rows of our None historical rows that have been previously loaded

Same logic as Contestant

Join Historical records with new audit to see if the numbers have changed

No changes to the old numbers.

Set raiseerrorFlag if there are issues

There are no issues

If no issues. Record is removed

As there are no issues the record is removed

If Audit row -1 is only in once. No need to hold the audit row

Append into processeddimfact

And check the data looks good

We head off into Dim Task now and here is where we hit some issues. We need to add more code into the process.

Dim Task

checking the count of rows between the original load, and the historical check

When we count the rows of processed data they come to 64. But when we run the check again it comes to 65. Which would trigger the raise error flag. And this is why

dfnewdimt_filtered = dfnewdimt.filter(dfnewdimt["Task"] == "Buy a gift for the Taskmaster")
display(dfnewdimt_filtered)

I settled on Task and TaskOrder to create the unique row. However this isn’t the case. we have two series where our Task and Task Order are identical.

What we really want is one record in the dimension applied to the correct seasons in the fact table. Not a duplicate like we have above.

The following code is adding Series 3 to both “Buy a gift for the taskmaster” and is consequently undercounting.

Series three is a good opportunity to update the logic as we know there is a record already added from season 2

Bring back old dimTask

from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Initialize Spark session
#spark = SparkSession.builder.appName("DeltaTableCheck").getOrCreate()

# Path to the Delta table
table_path = "abfss://########-####-####-####-############@onelake.dfs.fabric.microsoft.com/########-####-####-####-############/Tables/dimtask"

# Check if the Delta table exists
if DeltaTable.isDeltaTable(spark, table_path):
    # Load the Delta table as a DataFrame
    dfprevdimt = spark.read.format("delta").load(table_path)
    
else:
    # Define the schema for the empty DataFrame
    schema = StructType([
        StructField("Task", StringType(), True),
        StructField("TaskOrder", IntegerType(), True),
        StructField("TaskType", StringType(), True),
        StructField("Assignment", StringType(), True),
        StructField("ProcessedTime", TimestampType(), True)
    ])
    
    # Create an empty DataFrame with the specified schema
    dfprevdimt = spark.createDataFrame([], schema)

    print("no dim. Create empty dataframe")

# Display the DataFrame
dfprevdimt.show()

If its the first time, create an empty dataset. else bring back the dataset

Join new Tasks to all the Previously added tasks

The decision has been made to use all the columns in the join, Just in case a Task has a different assignment or Type.

Our Task that we know is identical for Series 2 and 3 appears. We use the dftm dataframe. This contains the data before we create the distinct dimension, as this contains source_filename. Inner Join only brings back the matching records.

Add this to a Delta Parquet File

We can now have a static item we can use later. and we can report on these items

If task already exists. Remove from the new load

from pyspark.sql.functions import col,current_timestamp

# Alias the DataFrames
dfprevdimt_df_alias = dfprevdimt.alias("old")
dftm_alias = dftm.alias("new")

# Perform left join on the name column and historicalCheckFlag
dftmdupRemoved = dftm_alias.join(dfprevdimt_df_alias, 
                              (col("new.Task") == col("old.Task")) & 
                              (col("new.Task Order") == col("old.TaskOrder"))& 
                              (col("new.Task Type") == col("old.TaskType"))& 
                              (col("new.Assignment") == col("old.Assignment")), 
                              how="Left_anti")



#And reset the dimension
dfTasksdupRemoved = dftmdupRemoved.select("Task","Task Order" ,"Task Type", "Assignment").distinct()

# Add the current timestamp as processedTime
dfTasksdupRemoved = dfTasksdupRemoved.withColumn("processedTime", current_timestamp())

# Show the result
display(dfTasksdupRemoved)

The left anti join bring back records that exist in the new data frame and removes anything that also exists in the pre existing data

dtfm is again used so we have to reset the dimension.

dfTasksdupRemoved now becomes our main dataframe so we need to ensure its used across the Notebook where neccessary

We then go on to create the default row (If its never been run before) and keys

Add a Default Row

Code changed to use dfTasksdupRemoved

Create the Task Key

Again, we have had a change to code to use the updated dataframe

Get Task details from current load

Previously, This was just Task and Order but we know that we need more in the join.

Create a historical Record check

Buy a gift for the Taskmaster is flagged as historical (Its in Series 2) and we have removed it from series 3. And we have added this information to a Delta Parquet.

We have also added more join criteria.

Create the Audit Rows

Add a value into source filename if full because its a historical record

Bring back the ProcessedDimfact

Get the Current batch

Join the batch number into the dataframe

Load the processed Dim Fact

Count the rows

Total Rows is correct at 65

Add the Rows of the historical check against rows that were originally processed

These now match

If Issue. Set RaiseErrorFlag = 1

No issues

Remove historical check if there are no issues

Check that there aren’t multiple rows in the data for the default -1 row. If all ok. remove default row from audit

And append the data into processeddimfact.

There is other logic to consider here. Imagine if we loaded in Series 1,2 and 3 together. Would the logic still remove the duplicate task. This will need running again to test

Taskmaster Fact

The fact table doesn’t have default records so we need to deal with this slightly differently. Lets run through the code again

The new fact data has been processed at the point we continue with the logic

Get Data of current process we can join on

Join current and already processed Fact data

  • df_currentfact is the current S3 data
  • dfnewfact is the newly loaded fact table that contains everything. S1,2 and 3.

dfnewfact is on the left of the join. So logically. If current and new both exist we get the match.

Create the Audit Row

If source Filename is null. Add Historical Data Check

Bring back the latest batch from processeddimFact

Get Current Batch No

Add the batch number to the audit

Load all of the processed dim fact

Total Up the none historical rows (totals created at the time)

Add the total rows (As at the time they were created to the total historical rows)

We can already see they match. 307 rows for S1 and S2

Reset the Raise Error flag to 1 if there is a problem

Remove the historical check row if there are no issues

And now you can append into the Delta parquet table and check the processed dim and fact.

And then we are back to setting the processedflag to 1 at the end of the fact table.

Conclusion

Now we have an audit of our Loads (2 Loads so far)

We have done a lot of work creating auditing. We can now add files and hopefully run the pipeline to automate the run. then we can create Power BI reporting.

We now have three audit Delta parquet tables

  1. processedFiles – This goes through each Taskmaster Series File and allows you to see what you are working on. the fullyProcessedFlag is set to 0 when the fact table is complete
  2. processeddimFact – (Shown above) The detailed audit for the transformation, dim and fact tables. Containing, number of records. The date time processed. And a flag for issues
  3. dimremovenoneuniquetaskaudit. Tasks are possible to duplicate. For example, we have the same task (And all of its components like sequence) are identical between series 2 and 3. We only need the one row so the duplicate is removed

There is more that we could do with this. of course there always is. However the big one is to make sure that we get none duplicated task information if series 2 and 3 are processed together.

We could also add more detail into the audit. For example any issues with the data such as Column A must always be populated with either A B C or D. If Null then flag as error. If anything other that specified codes then error.

We will come back to this but in the next blog I want to try something a bit different and set up the Data Warehouse. How would this project look if using the warehouse. And why would you choose one over the other.

So in part 19 we will look at setting the warehouse up and how we deal with the source data in the data lake

Microsoft Fabric Part 17. Taskmaster Project. Adding more Pyspark code and meta data to the Transformed Data file

In Part 16 we created a pipeline to run through our 5 Notebooks.

We also ran some sql to check the data and found that Series 1 2 and 3 had been added 4 times into the delta parquet files.

We want to add more information through the run so we can error check the loading. So in this blog we are going back to the parquet files to make some changes.

This post will specifically be for the Transformed parquet file in the silver layer.

We are again, deleting al the files created and starting again. However, instead of starting with S1 S2 and S3 in the data lake we are going to start with S1 only. And build from there.

These amendments are made before we get started

Only the changes will be mentioned. We won’t go through all the code blocks again if there is no change to them.

Taskmaster Transformed

Update the Log Schema

We are changing our log file to include NoRows and contestanttransformednoRows (So we can check that we aren’t adding duplicates here)

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType,TimestampType


# Create a Spark session
spark = SparkSession.builder.appName("CreateEmptyDataFrame").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("filename", StringType(), True),
    StructField("processedTime", TimestampType(), True),
    StructField("NumberOfRows", IntegerType(), True),
    StructField("ContestantTransformedNumberOfRows", IntegerType(), True),
    StructField("fullyProcessedFlag", IntegerType(), True)
])


# Create an empty DataFrame with the specified schema
df_log  = spark.createDataFrame([], schema)

# Show the schema of the empty DataFrame
df_log.printSchema()

display(df_log)

Check ProcessedFiles exist

The next step is to check if the ProcessedFile exist. However, we want to change this file from parquet (file) to Delta parquet (Table) so this codeblock needs to change

from pyspark.sql.utils import AnalysisException

# Define the tablename
table_name = “SilverDebbiesTraininglh.ProcessedFiles”

# Check if the file exists
try:
spark.read.format(“delta”).table(table_name)

print(f”delta parquet {table_name} already exists. Skipping write operation.”)

except Exception as e:
if “TABLE_OR_VIEW_NOT_FOUND” in str(e):
# File does not exist, proceed with writing
df_log.write.format(“delta”).mode(“append”).saveAsTable(table_name)
print(f”File {table_name} does not exist. Writing data to {table_name}.”)

else:
# Re-raise the exception if it’s not the one we’re expecting
raise e

The variable e holds the exception object

The processedFiles file is now a Delta Parquet managed table. We will need to address this throughout the processing of the data.


List all the files that have already been processed

Another code block to change because we are now looking at tables, not files

# This creates a dataframe with the file names of all the files which have already been processed

df_already_processed = spark.sql("SELECT * FROM SilverDebbiesTraininglh.processedfiles")


display(df_already_processed)

At the moment, there is no data available.

Check the number of records again when null rows have been deleted

Part of the transform code. There is a possibility that there are null rows come through from the file.

We need to run the following code again after we remove the null rows

from pyspark.sql.functions import count

notmrows_df = dftm.groupBy("source_filename").agg(count("*").alias("NumberOfRows"))
   
display(notmrows_df)

We don’t want to log the number of rows and then find out that we have lots of null rows that have been lost and not recorded.

Join Contestants to Taskmaster data, only return contestants in the current set

Found an issue here. Josh is also in Champion of Champions and is in here twice. We only want S1. Therefore we need to change this join to reflect this.

We need to go back up one block to

This is what we join to. We create a dataframe that gives us the min episode date for age as at that point in time. We only join to Contestant. We need to also join on Series to avoid this issue.

Updated to include Series

Now back to the join.

# Join the min_episode_date into contestants
dfcont = dfcont.join(dfminSeriesCont,
            (dfcont["Name"] == dfminSeriesCont["Contestant"])&
            (dfcont["series_label"] == dfminSeriesCont["Series"]), "inner")\
.drop(dfminSeriesCont.Contestant)\
.drop(dfminSeriesCont.Series)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dfcont)

Series has been added to the join. And we remove Series and Contestant using drop, so we only get the data from the Contestant side.

Now we only have 5 records. 5 contestants to a series

Get the number of rows from the contestants file

We also want to record how many rows we have in the contestants file so we can check if anything has errored here.

from pyspark.sql.functions import count

cresult_df = dfcontAge.groupBy("series_label").agg(count("*").alias("ContestantTransformedNumberOfRows"))

display(cresult_df)

Add number of Rows to the log for both the taskmaster file and the contestant file

# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, current_date, col,regexp_extract

#Get the distinct list of file names we are going to process
filenamesdf = dftm.withColumn("filename",col("source_filename")).select("filename").distinct()

# Remove everything after the "?" character, there used to be more in here but we dont have to worry about this since a code change
filenamesdf= filenamesdf.withColumn("filename", regexp_replace("filename", "\\?.*", ""))

#Add a process Date
filenamesdatedf = filenamesdf.withColumn("processedTime",current_timestamp())

# Join df_log with result_df on the filename column
filenamesRowsdf = filenamesdatedf.join(notmrows_df.select("source_filename", "NumberOfRows"), filenamesdatedf.filename ==notmrows_df.source_filename, "left").drop("source_filename")

display(filenamesRowsdf)

# add in the Contestant Transformed number of rows
filenamescnorowsdf = filenamesRowsdf.join(
    cresult_df.select("series_label", "ContestantTransformedNumberOfRows"),
    regexp_extract(filenamesRowsdf["filename"], r"Taskmaster_(S\d+)_", 1) == cresult_df["series_label"],
    "left"
    ).drop("series_label")

display(filenamescnorowsdf)


#Add a flag
df_log = filenamescnorowsdf.withColumn("fullyProcessedFlag",lit(0))

display(df_log)

Adding NumberOfRows

Adding the extra code has selected source_filename and NumberOfRows from notmrows_df. Left joined to our filenamesdatedf

Adding Contestant Number of Rows

Same again but we have to take the S1 from the filename using regexp_extract

Finally we add the flag.

This matches our schema.

Change ContestantTransformedNumberOfRows and NumberOfRows: from long to int

We have another issue in that an int has been reset to long. We need to deal with this before adding into the delta parquet.

from pyspark.sql.functions import col

# Change the data type of ContestantTransformedNumberOfRows and NumberOfRows from long to integer
df_log = df_log.withColumn("ContestantTransformedNumberOfRows", col("ContestantTransformedNumberOfRows").cast("integer"))
df_log = df_log.withColumn("NumberOfRows", col("NumberOfRows").cast("integer"))

# Display the schema to verify the change
df_log.printSchema()

And change the last section when we update processedFiles to Delta Parquet.

# Define the tablename
table_name = "SilverDebbiesTraininglh.ProcessedFiles"


df_log.write.format("delta").mode("append").saveAsTable(table_name)

print(f"File {table_name} does not exist. Writing data to {table_name}.")     

Check the processedFiles Parquet File

Finally, change this code to check the Delta Parquet file.

# This creates a dataframe with the file names of all the files which have already been processed

df_already_processed = spark.sql("SELECT * FROM SilverDebbiesTraininglh.processedfiles")


display(df_already_processed)

Conclusion

We have done a few updates here.

First of all. The ProcessedFiles Parquet has been changed to Delta Parquet for a few reasons. one is that we can use the SQL Analytic Endpoint to check the file. Another is that we can connect to the Delta Parquet with Power BI and create Audit Reporting.

Another change is that we have introduced new fields to the ProcessFiles Delta Parquet.

  • We have the NumberOfRows from the Taskmaster file. Always split by series.
  • And ContestantTransformedNumberOfRows to check that we just have 5 contestants for every series in the ContestantsTransformed File.

We can now move on an update the dimensions.

Microsoft Fabric Part 16. Taskmaster Project. Update to Taskmaster transformed Notebook for Contestants

In Part 15 we created a delta load of transformed dims and facts in the gold layer of our Fabric Architecture.

The Taskmaster file contains source_filename. e.g. Taskmaster_S1_01092015.csv Which we have used to partition the data.

We also have a transformed Contestants file. Each contestant belongs to a series. the file isn’t big enough to Partition. The Contestants file consists of data that we have prepared and extra data from Kaggle.

However, we have only prepped the data for the first run. We need to add more code for new files, and there are still amendments we can make to make this better.

its also useful to see all the issues along the way and how they are resolved. knowing what can hit you can be really handy when working on a project because you have already seen and dealt with issues

Load a new file into Azure Data Lake

  • Taskmaster season 4 has been loaded
  • Contestants already has season 4 in the data

Taskmaster Transformed Pyspark Notebook

Lets look at the way the new file will be dealt with

The data lake is short-cut to our bronze delta lake. And here we use mssparkutils to list the files. S4 is new.

  • The next code block removes Contestants from the list
  • And the next creates our empty dataframe consisting of filename, processedTime and fullyprocessedFlag
  • Check ProcessedFiles exists. if not create a file with the empty dataframe (Image below)

It’s worth just looking at this code block in more detail as it was recently updated. If the ProcessedFiles file already exists we don’t have to do anything. If it doesn’t exist we create the file using our empty template we created.

The file is already created and we know it contains Series 1, 2 and 3 that we have already added.

  • List files already processed (Image below)

Here we can see that S1 S2 and S3 have been fully processed

  • return only the none processed file (Image below)

And we find anything in the folder that isn’t in our processed Files Parquet file. In this case S4

  • Next we create an empty data frame for the taskmaster data
  • And for every file we process the data into the data frame

Checking the total rows in the S4 file. There are 240 rows (At this point there are more episodes to a series)

  • next the df_log dataframe is created with the Season 4 file name, Our processed file and 0 as the fullyprocessedFlag
  • Transform code. This is detailed in previous posts. Includes adding the Contestants.csv data. Adding in the people data from Kaggle to join with contestants.
  • The data is added to taskmastertransformed files so we only deal with the current set

Error Message when checking the TaskmasterTransformed file

We have hit our first issue

Caused by: org.apache.spark.SparkException: Parquet column cannot be converted in file abfss://986472b4-7316-485c-aa22-128e1cb29544@onelake.dfs.fabric.microsoft.com/ee31b1a4-16bf-4aae-aaab-f130bd4d53c6/Files/Data/Silver/taskmasterTransformed.parquet/source_filename=Taskmaster_S4_13062017.csv/part-00008-e963cfd6-7007-439b-9e0d-63b33eddabb3.c000.snappy.parquet. Column: [Points], Expected: double, Found: INT32.

There is an issue with Points. Expected double. Found INT32. Looking at the data,

This is the only point that isn’t a standard 0 to 5 number. And this should be fine as an integer. Here is where the issue is. Its actually in S1 which we have already processed

Josh did his own solo task and as we can see here, Points are decimal and not an integer. This doesn’t happen often at all. Its usually 1 through to 5. So this is the first time we have issues. To resolve this issue I am going to delete all the files again. remove S4 from the data lake and start again. Ensuring points is double not integer.

The change is here when we create the empty dataframe to append into

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType,TimestampType, DoubleType


# Create a Spark session
spark = SparkSession.builder.appName("CreateEmptyDataFrame").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Series", StringType(), True),
    StructField("Episode No", IntegerType(), True),
    StructField("Episode Name", StringType(), True),
    StructField("Episode Date", DateType(), True),
    StructField("Task Order", IntegerType(), True),
    StructField("Task", StringType(), True),
    StructField("Task Type", StringType(), True),
    StructField("Assignment", StringType(), True),
    StructField("Contestant", StringType(), True),
    StructField("Team", StringType(), True),
    StructField("Points", DoubleType(), True),
    StructField("Winner", IntegerType(), True),
    StructField("source_filename", StringType(), True),
    StructField("source_processedTime", TimestampType(), True)
])

# Create an empty DataFrame with the specified schema
dftm  = spark.createDataFrame([], schema)

# Show the schema of the empty DataFrame
dftm.printSchema()

display(dftm)

Changing to Doubletype is the only change made so far. The data is loaded through to dims and facts. S4 file is added into the datalake and we go again

This time when we get to the code that checks TaskmasterTransformed it works

from pyspark.sql.functions import input_file_name, regexp_extract

workspace_id = "########-####-####-####-############"
lakehouse_id = "########-####-####-####-############"

parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/taskmasterTransformed.parquet")

df_already_processed = spark.read.parquet(parquet_file)

display(df_already_processed)
  • Add the ContestantTransformed to a file and check
  • Add the log to a file and check

Note that we now have S4 logged with the process flag of 0. We continue on to the dims

Updating Dimension Dim Contestant

We bring back the list of unprocessed files

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType,TimestampType, DoubleType()


# Create a Spark session
spark = SparkSession.builder.appName("CreateEmptyDataFrame").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Series", StringType(), True),
    StructField("Episode No", IntegerType(), True),
    StructField("Episode Name", StringType(), True),
    StructField("Episode Date", DateType(), True),
    StructField("Task Order", IntegerType(), True),
    StructField("Task", StringType(), True),
    StructField("Task Type", StringType(), True),
    StructField("Assignment", StringType(), True),
    StructField("Contestant", StringType(), True),
    StructField("Team", StringType(), True),
    StructField("Points", DoubleType(), True),
    StructField("Winner", IntegerType(), True),
    StructField("source_processedTime", TimestampType(), True),
    StructField("Year", StringType(), True),
    StructField("day", StringType(), True),
    StructField("month", StringType(), True),
    StructField("source_filename", StringType(), True)
    
])

# Create an empty DataFrame with the specified schema
dftm  = spark.createDataFrame([], schema)

# Show the schema of the empty DataFrame
dftm.printSchema()

display(dftm)

Again, at this point we create an empty data frame to load into, and Points has been changed to DoubleType

  • Loop through the files that need processing. Adding just the partition from parquet file into a data frame.
  • Check the number of rows
  • Add to CurrentProcessedTMData.parquet so all the other dims and the fact can use this.
  • Get Contestant data and get the correct series from this data to match to taskmaster partition
  • Now filter to only the contestants we are working with
  • Merge Contestants and Taskmaster data
  • If the delta parquet file doesn’t exist. Add the default row of -1. Else do nothing. I had made an error in Part 15 . It was pointing towards the silver lakehouse files when it should have been pointing to gold tables. (See correct code Change below)

If New Create -1 Row. Else do nothing

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import Row


workspace_id = "########-####-####-####-############"
lakehouse_id = "########-####-####-####-############"

path_to_parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/dimcontestant")

# Check if the Delta Parquet file exists
delta_file_exists = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()).exists(spark._jvm.org.apache.hadoop.fs.Path(path_to_parquet_file))

if delta_file_exists:
    print("Delta Parquet file exists. We don't need to add the NA row")
else:

    # Add a Default row
    # Create a sample DataFrame
    data = [(-1, "Not Known","Not Known","Not Known","Not Known","Not Known","Not Known","0","NA","0","0","0")]
    columns = ["Contestant ID", "Contestant Name", "Team","Image","From","Area","Country","Seat","Gender","Hand","Age","Age Range"]
    new_row  = spark.createDataFrame(data, columns)

    # Union the new row with the existing DataFrame
    dfContfinal = dfContfinal.union(new_row)

    # Show the updated DataFrame
    dfContfinal.show(1000)

    print("Delta Parquet file does not exist. Add NA Row")

The only bit of code changed here is the path_to_parquet File.

New Code. If Row -1 doesn’t exist, get the max Key +1

We created the following code in the last post

from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number


# Process the DataFrame based on ContestantID
if dfContfinal.filter(dfContfinal["Contestant ID"] == -1).count() > 0:

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Contestant ID"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dfContfinalKey = dfContfinal.withColumn("ContestantKey", row_number().over(window_spec) - 2)

    # Show the result
    dfContfinalKey.show(1000)

    print("Data Contains Default row. No data Previously processed")
    
else:
    
    print("Data Contains No Default row. Data Previously processed")

So, if this is a new run. we will create the default -1 row, If we are adding to the data already there, we will already have a default row in the data. We won’t have created the -1 row. this time. We need to get the last Key and + 1.

We already have the code that created a Key from -1 if its new data in the if clause. We now need the else.

else:

    # Load data from the Delta table
    dfdimc = spark.read.format("delta").load("abfss://########-####-####-####-############@onelake.dfs.fabric.microsoft.com/########-####-####-####-############/Tables/dimcontestant")
     
   # Get the maximum value of ContestantKey
    max_contestant_key = dfdimc.agg({"ContestantKey": "max"}).collect()[0][0] 

    #Now, Take his number and create my Keys after the last key +1
    start_value = max_contestant_key+1

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Contestant ID"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dfContfinalKey = dfContfinal.withColumn("ContestantKey", row_number().over(window_spec) + start_value - 1)
    
    # Show the DataFrame
    dfContfinalKey.show()
    
    print("Data Contains No Default row. Data Previously processed")
  • So here he bring back the data from Dim Contestant.
  • We get the max key
  • create the start value by adding +1
  • Then create the window ordering by Contestant ID
  • Add in the Key using the Window_spec and using our Start_Value to increment from the last item in the dimension. In this case we are starting from ID 15
  • Append to the Delta parquet file

Error when appending into Parquet File

When running this code

from delta.tables import DeltaTable

#You can also add the none default data lake by clicking +Lakehouse
aliased_df.write.mode("append").option("overwriteSchema", "true").format("delta").saveAsTable("GoldDebbiesTraininglh.dimContestant")

AnalysisException: Failed to merge fields ‘ContestantID’ and ‘ContestantID’. Failed to merge incompatible data types LongType and IntegerType

We can test this by looking at the schemas.

This is correct. What about our original data?

So our original data was added to delta file as long. integers go up to 2,147,483,647, there seems to be no reason to store as long.

Because this is just a test, the decision is made to once again go back to the start and ensure that the data is integer from the first instance.

Remove all the files and tables from Gold and Silver. And Season 4 from the data lake. to go through the process again.

And whilst updating. we can also sort out the issues above

Taskmaster Transformed. No Updates.

Dim Contestant V2

Where is our integer becoming long and can we hit it at that point?

merged_df has an integer but it becomes long here when we get rid of any null team records.

# Cast ContestantID to IntegerType
dfContfinal = dfContfinal.withColumn("Contestant ID", dfContfinal["Contestant ID"].cast("int"))

# Verify the schema to ensure the change
dfContfinal.printSchema()

However, it also happens again at dfContFinal so we can actually move the above code block to after here (And then keep checking the schema)

This appears to be the last time that the ID gets changed to Long.

This shows the importance of checking your schema, especially when appending data into files.

Dim Episode V2

The current only change here to point to the correct gold tables

Also checked the schema to ensure nothing was long

The same thing is happening here so we need some code before we create the delta parquet to sort this out.

Dim Task V2

Same change again to point to the Gold Delta Table and not silver files.

And reset long to int

Taskmaster Fact V2

No changes

We now are at the point where we can add in Series 4, run the taskmaster transformed notebook and go back to Dim Contestant V2

Unfortunately, After the update, Another error has occurred in Dim Contestant

Age Error. Age has changed to String

AnalysisException: Failed to merge fields ‘Age’ and ‘Age’. Failed to merge incompatible data types StringType and IntegerType

So we repeat the process, again. and add more code to change Age back to Int.

we basically need to repeat the process until every error in contestant is resolved. Thankfully, this is the last problem with Dim contestant

Dim Episode V2

Create Episode Key

from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Process the DataFrame based on ContestantID
if dftmEp.filter(dftmEp["Episode No"] == -1).count() > 0:

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Series"), col("Episode No"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dftmEpKey = dftmEp.withColumn("EpisodeKey", row_number().over(window_spec) - 2)

    # Show the result
    dftmEpKey.show(1000)

    print("Data Contains Default row. No data Previously processed. Add Key from -1")
    
else:

     # Load data from the Delta table
    dfdime = spark.read.format("delta").load("abfss://986472b4-7316-485c-aa22-128e1cb29544@onelake.dfs.fabric.microsoft.com/ce55b91c-ed51-4885-9440-378a3c18450b/Tables/dimepisode")
     
    # Get the maximum value of ContestantKey
    max_episode_key = dfdime.agg({"EpisodeKey": "max"}).collect()[0][0] 

    #Now, Take his number and create my Keys after the last key +1
    start_value = max_episode_key+1

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Series"), col("Episode No"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dftmEpKey = dftmEp.withColumn("EpisodeKey", row_number().over(window_spec) + start_value - 1)
    
    # Show the DataFrame
    dftmEpKey.show()
    
    print("Data Contains No Default row. Data Previously processed. Add Key to continue sequence")  

Added the else to our Key creation code block.

Dim Task V2

Create Key

from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Process the DataFrame based on ContestantID
if dftmTsk.filter(dftmTsk["Task Order"] == -1).count() > 0:

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Task Order"), col("Task"))

    # Add a new column "TaskKey" using row_number() over the specified window
    dftmTskKey = dftmTsk.withColumn("TaskKey", row_number().over(window_spec) - 2)

    # Show the result
    dftmTskKey.show(1000)

    print("Data Contains Default row. No data Previously processed")
    
else:
     # Load data from the Delta table
    dfdimt = spark.read.format("delta").load("abfss://986472b4-7316-485c-aa22-128e1cb29544@onelake.dfs.fabric.microsoft.com/ce55b91c-ed51-4885-9440-378a3c18450b/Tables/dimtask")
     
    # Get the maximum value of ContestantKey
    max_task_key = dfdimt.agg({"TaskKey": "max"}).collect()[0][0] 

    #Now, Take his number and create my Keys after the last key +1
    start_value = max_task_key+1

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Task Order"), col("Task"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dftmTskKey = dftmTsk.withColumn("TaskKey", row_number().over(window_spec) + start_value - 1)
    
    # Show the DataFrame
    dftmTskKey.show()
    
    print("Data Contains No Default row. Data Previously processed. Add Key to continue sequence")      

The else has been added to this code block.

Conclusion

After running the fact table again, everything is successful.

Season 4 is set as fully processed.

We should now be able to repeat the process across all the other taskmaster series until completion.

We are now ready to add the notebooks into a Pipeline in the next blog post, so we can set up automated processing. And then move back to Power BI reporting.

The only step not yet set up is the Data Engineering side where we would usually automate the process of pulling though the data. Later on, we might have a think about how we can set something up to mimic a proper business scenario where the data is dropped for us to process.

Microsoft Fabric Part 13. Changing Structure of Lakehouses

Before continuing on with this project, lets look at an amendment to the lake house structure. mostly because we decided we wanted to keep the structure of the Task Flow. Instead of having one Lakehouse for Gold Silver and Bronze. We want three lake houses. One each for Bronze Silver and Gold.

3 new lakehouses are created

And now, the Notebooks need updating

Taskmaster Transformed

In the notebook.

Bronze is the main lakehouse that we are pulling the data from. But you can also add another Lakehouse.

And use the arrows to switch between the two.

The only Code that needs to be changed is when we create the Silver PARQET file (Its not Delta PARQUET at this point.

From

To

To get the ABFS path to move from Default Data Lake to another Delta Lake, right click on the destination lake and Copy ABFS Path

dftm_cleaned.write.mode("overwrite").parquet('abfss://986472b4-7316-485c-aa22-128e1cb29544@onelake.dfs.fabric.microsoft.com/ee31b1a4-16bf-4aae-aaab-f130bd4d53c6/Files/Data/Silver/taskmasterTransformed.parquet')

And we can go further, by parameterising the workspace ID and the lakehouse ID

f has been added to allow us to add parameters into the location string.

So we can now use this when we want to create a PARQET file in a different Lakehouse to the default one.

And we have introduced parameters.

Now we want to know how to do with with a Delta Parquet file moving it into the Gold Lakehouse

Silver to Gold lakehouse Delta Parquet

To

from delta.tables import DeltaTable


#You can also add the none default data lake by clicking +Lakehouse
aliased_df.write.mode("overwrite").option("overwriteSchema", "true").format("delta").saveAsTable("GoldDebbiesTraininglh.dimContestant")

And again, we clicked + and Added the Gold Lakehouse as the none default.

How can you tell which is the default?

Hover over the Lakehouse Name to get the list.

Conclusion

We have now transformed the architecture of the Lakehouse to have three Lakehouses. gold. Silver and Bronze. instead of One Lakehouse with 3 folders for Gold Silver and Bronze,

This has allowed us to see how the code changes when creating files in none default Lakehouses. And has allowed us to set up our first parameters. and it also means we can use the medallion task flow as is without having to do any amendments.

It also feels right to have more separation of the three areas.

Design a site like this with WordPress.com
Get started