Microsoft Fabric Part 18. Taskmaster Project. Adding more meta data to check throughout the Dimension and Fact run. More changes to the Pyspark code

Now we have updated our ProcessedFiles delta parquet, we can update the dimensions accordingly.

  • It might be nice to have another delta parquet file here we can use to collect meta data for dims and facts.
  • We will also want the processedDate in each dim and fact table.

We are going to start with Delta Load 2. So. Series 1 and 2 have already been loaded. We are now loading Series 3 (This shows off some of the logic better)

Dim Contestant.

Bring back the list of Processed Files from ProcessedFiles

Immediately we have a change to the code because its delta Parquet and not Parquet

# Define the ABFS path to the Delta table
delta_table_path = "abfss://########-####-####-####-############@onelake.dfs.fabric.microsoft.com/########-####-####-####-############/Tables/processedfiles"

# Read the Delta table
dflog = spark.read.format("delta").load(delta_table_path)

# Filter the DataFrame
df_currentProcess = dflog[dflog["fullyProcessedFlag"] == 0][["Filename"]]

# Display the DataFrame
display(df_currentProcess)

New Code Block Check processed Files

In the previous code, we:

  • Create a new taskmaster schema to load data into
  • Loop through the files and add data to the above schema
  • Add the current process data into CurrentProcessedTMData.parquet.
  • Add the ProcessDate of the Taskmaster transformed data

We know at some point that we are having issues with the fact table getting duplicate series so we need to check throughout the process where this could be happening.

  • Change the processDate from being the TaskmasterTransformed process date, in case this was done on an earlier day. We want the current date here to ensure everything groups together.
  • We could also set up this data so we can store it in our new Delta parquet Log
  • Because we are also adding process date to the group. we need to be careful. minutes and seconds could create extra rows and we don’t want that
  • We also know that we are doing checks on the data as it goes in. And rechecking the numbers of historical data. so we need a flag for this
from pyspark.sql import functions as F

workspace_id = "########-####-####-####-############"
lakehouse_id = "########-####-####-####-############"

# Define the file path
file_path = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/CurrentProcessedTMData.parquet"

# Read the Parquet file
dfCheckCurrentProc = spark.read.parquet(file_path)

# Add the new column
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("sequence", F.lit(1))
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("parquetType", F.lit("parquet"))
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("analyticsType", F.lit("Transformation"))
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("name", F.lit("CurrentProcessedTMData"))
# Set the date as the current date
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("processedDate", current_date())
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("historicalCheckFlag", F.lit(0))
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("raiseerrorFlag", F.lit(0))

# Select the required columns and count the rows
dfCheckCurrentProc = dfCheckCurrentProc.groupBy("sequence","parquetType","analyticsType","name","source_filename", "processedDate","historicalCheckFlag","raiseerrorFlag").count()

# Rename the count column to TotalRows
dfCheckCurrentProc = dfCheckCurrentProc.withColumnRenamed("count", "totalRows")

# Show the data
display(dfCheckCurrentProc)
  • current_date() gets the current date. We only have date in the grouped auditing table. date and time can go into the dimension itsself in the process date.
  • sequence allows us to see the sequence of dim and fact creation
  • parquetType is either Parquet or Delta Parquet for this project
  • analyticsType is the type of table we are dealing with
  • historicalCheckFlag. Set to 0 if we are looking at the data being loaded. 1 if the checks are against older data.
  • raiseErrorflag if there looks like we have any issues. This can be set to 1 and someone can be alerted using power BI Reporting.

It would be really good to also have a run number here because each run will consist of a number of parquet files. In this case parquet and delta parquet

Get ContestantTransformed data

A small update here. Just in case, we use distinct to make sure we only have distinct contestants in this list

# Ensure the DataFrame contains only distinct values
dfc = dfc.distinct()

Merge Contestants and Taskmaster data

This is the point where we create the Contestant Dimension. the processDate to the current date and alias to to just processedTime.

from pyspark.sql.functions import col, current_date
from pyspark.sql import SparkSession, functions as F

df_small = F.broadcast(dfc)


# Perform the left join
merged_df = dftm.join(df_small, dftm["Contestant"] == df_small["Name"], "left_outer")\
                .select(
                    dfc["Contestant ID"],
                    dftm["Contestant"].alias("Contestant Name"),
                    dftm["Team"],
                    dfc["Image"],
                    dfc["From"],
                    dfc["Area"],
                    dfc["country"].alias("Country"),
                    dfc["seat"].alias("Seat"),
                    dfc["gender"].alias("Gender"),
                    dfc["hand"].alias("Hand"),
                    dfc["age"].alias("Age"),
                    dfc["ageRange"].alias("Age Range"),
                    current_timestamp().alias("processedTime"),
                ).distinct()



# Show the resulting DataFrame
merged_df.show()

Add the Default row

This changes because we have a new column that we need to add.

from pyspark.sql import SparkSession,Row
from pyspark.conf import SparkConf
from pyspark.sql.functions import current_timestamp


workspace_id = "########-####-####-####-############"
lakehouse_id = "########-####-####-####-############"

path_to_parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/dimcontestant")

# Check if the Delta Parquet file exists
delta_file_exists = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()).exists(spark._jvm.org.apache.hadoop.fs.Path(path_to_parquet_file))

if delta_file_exists:
    print("Delta Parquet file exists. We don't need to add the NA row")
else:

    # Add a Default row
    # Create a sample DataFrame
    data = [(-1, "Not Known","Not Known","Not Known","Not Known","Not Known","Not Known","0","NA","0","0","0")]
    columns = ["Contestant ID", "Contestant Name", "Team","Image","From","Area","Country","Seat","Gender","Hand","Age","Age Range"]
    new_row  = spark.createDataFrame(data, columns)

    # Add the current timestamp to the ProcessedTime column
    new_row = new_row.withColumn("ProcessedTime", current_timestamp())

    # Union the new row with the existing DataFrame
    dfContfinal = dfContfinal.union(new_row)

    # Show the updated DataFrame
    dfContfinal.show(1000)

    print("Delta Parquet file does not exist. Add NA Row")

The above screen show is for Series 1 and 2. Just to show you that the row is created if its the first time.

All the process dates are the same (at least the same day) ensuring everything will be grouped when we add it to the audit delta parquet table.

However, this is displayed for our 2nd load for series 3 (2nd load)

Check the new data that has been loaded into the delta parquet.

Checking the data, checks the dimension that has been created as delta parquet This is the last code section and we want to build more in after this

Get the Contestant Names and Source Files of the Current Load

Because we can process either 1 file alone, or multiple files. we need to get the source file name AND the contestant name at this point.

#Get just the contestant name and source File name from our currrent load

df_contestant_name = dftm.selectExpr(“`Contestant` as contestantNameFinal”, “`source_filename` as sourceFileNameFinal”).distinct()

df_contestant_name.show()

For the audit we need to know what our current Contestants are and who are the Contestants we have previously worked with. Also the original file they below too.

We took this information from an earlier dataframe when we loaded the source file to be used to create the dimension. This still contains the source file name. The Dimension does not contain the file name any more.

Join df_contestant_name to dfnewdimc to see what data is new and what data has been previously loaded

from pyspark.sql.functions import broadcast, when, col

df_joined = dfnewdimc.join(
    df_contestant_name,
    dfnewdimc.ContestantName == df_contestant_name.contestantNameFinal,
    how='left'
)
# Add the currentFlag column
df_joined2 = df_joined.withColumn(
    "historicalCheckFlag", 
    when((col("contestantNameFinal").isNull()) & (col("ContestantKey") != -1), 1).otherwise(0))

# Drop the ContestantNameFinal column
df_joined3 = df_joined2.drop("contestantNameFinal")

#If its an audit row set sourceFileNameFinal to Calculated Row
df_joined4 = df_joined3.withColumn(
    "sourceFileNameFinal",
    when((col("sourceFileNameFinal").isNull()) & (col("ContestantKey") == -1), "Calculated Row")
    .otherwise(col("sourceFileNameFinal"))
)

# Show the result
display(df_joined4)

This joins the name we have just created to the data we are working with so we can set the historicalCheckFlag. For the first load everything is 0 because they are current.

In this load we can see some 1’s These are records already in the system and don’t match to our current load.

This file also now contains the source_filename

If its a default row, there will be no source filename so we set to calculated Row instead

Creating Dim Audit row

from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql import DataFrame

# Add the new column

dfdimaudit = df_joined4.withColumn("sequence", F.lit(2))

dfdimaudit = dfdimaudit.withColumn("parquetType", F.lit("Delta parquet"))

dfdimaudit = dfdimaudit.withColumn("analyticsType", F.lit("Dimension"))

dfdimaudit = dfdimaudit.withColumn("name", F.lit("Dim Contestant"))

# Extract the date from source_processedTime
dfdimaudit  = dfdimaudit.withColumn("processedDate", F.to_date("processedTime"))
dfdimaudit =  dfdimaudit.withColumn("raiseerrorFlag", F.lit(0))

# Select the required columns and count the rows
dfdimauditgr = dfdimaudit.groupBy("sequence","parquetType","analyticsType","name", "sourceFileNameFinal","processedDate","historicalCheckFlag","raiseerrorFlag").count()


# Alias sourceFileNameFinal to sourceFileName
dfdimauditgr = dfdimauditgr.withColumnRenamed("sourceFileNameFinal", "source_filename")

# Alias count to totalRowsdfdimauditgr = dfdimauditgr.withColumnRenamed("count", "totalRows")

# Show the data
display(dfdimauditgr)  

The files are grouped on processed time. So we need to deal with this accordingly so set date and time to just date using to_date() (To avoid minutes and seconds causing issues)

Our default row is set against, calculated Row.

The null source file name is the total count of our historical records

We are going to append the dim audit to the initial current processed audit so they both need exactly the same columns and these are

  • sequence
  • parquetType
  • analyticsType
  • name
  • source_filename
  • processedDate
  • totalRows
  • historicalCheckFlag
  • raiseerrorFlag

and we have the correct source file name. Even if we process multiple files.

Add Historical Data Check to null source filename

Append the audit for the transformation data and Dimension data

# Append the current processed transformation file(s) audit row with the dimension audit

audit3_df = dfCheckCurrentProc.unionByName(dfdimauditgr)

display(audit3_df) 

With the S3 file loaded we can see

  • Transformation rows for series 3.
  • Dimension rows for Contestant S3
  • The historical row count of all series already in the dimension.

Create Batch numbers for the audit.

If its the first time we run this process. Set to 0. Else set to the Match Last Batch No + 1

from pyspark.sql.functions import lit

# Function to check if a table exists
def table_exists(spark, table_name):
    return spark._jsparkSession.catalog().tableExists(table_name)

# Table name
table_name = "SilverDebbiesTraininglh.processeddimfact"

if table_exists(spark, table_name):

    # If the table exists, get the max batchno
    df = spark.table(table_name)
    max_batchno = df.agg({"batchno": "max"}).collect()[0][0]
    next_batchno = max_batchno+1

    # Add the batchNo column with a value of 0
    audit3_df = audit3_df.withColumn("batchNo", lit(next_batchno))

    print(f"processedDimFact data already exists. Next batchno: {next_batchno}")
    display(audit3_df) 
else:

    # Add the batchNo column with a value of 0
    audit3_df = audit3_df.withColumn("batchNo", lit(0))


    # Reorder columns to make batchNo the first column
    columns = ['batchNo'] + [col for col in audit3_df.columns if col != 'batchNo']
    audit3_df = audit3_df.select(columns)

    print("Table does not exist. Add Batch No 0")

    display(audit3_df) 

The following gives more information about the code used.

  • def table_exists(spark, table_name): defines a function named table_exsts that takes two parameters. the spark session object and the table name
  • Next we define a function named table_exists that will look for the table name
  • spark._jsparkSession.catalog() accesses the spark session catalogue which contains metadata about the entities in the spark session.
  • Basically, this is checking if the table name exists in the spark session catalogue. It it does we get the match Batch no from the table. Add 1 and add that as the column next_batchno
  • If its false we start batchno at 0

How does the column reordering work?

  • [‘batchNo’]: This creates a list with a single element, ‘batchNo’. This will be the first column in the new order.
  • [col for col in audit3_df.columns if col != ‘batchNo’]: This is a list comprehension that iterates over all the columns in audit3_df and includes each column name in the new list, if removes ‘batchNo’ from the list.

List Comprehension: Create lists by processing existing lists (Like columns here) into a single line of code.

  • Combining the lists: The + operator concatenates the two lists. So, [‘batchNo’] is combined with the list of all other columns, resulting in a new list where ‘batchNo’ is the first element, followed by all the other columns in their original order.

And here we can see in Load 2 we hit the if section of the above code. Our batch no is 1 (Because the first load was 0)

Reset totalRows to Integer

Found that totalRows was set to long again so before adding into delta parquet. reset to int.

Bring back the processeddimFact before saving with the new rows

Before saving the new audit rows we want to know if there are any issues. This means we need the data. However we also need to deal with the fact that this may be the first process time and there won’t be a file to check.

table_path = "abfss://########-####-####-####-############@onelake.dfs.fabric.microsoft.com/########-####-####-####-############/Tables/processeddimfact"
table_exists = DeltaTable.isDeltaTable(spark, table_path)

if table_exists:
    # Load the Delta table as a DataFrame
    dfprocesseddimfact = spark.read.format("delta").table("SilverDebbiesTraininglh.processeddimfact")

    # Display the table
    dfprocesseddimfact.show()  # Use show() instead of display() in standard PySpark

    # Print the schema
    dfprocesseddimfact.printSchema()

    print("The Delta table SilverDebbiesTraininglh.processeddimfact exists. Bring back data")
else:

    # Define the schema
    schema = StructType([
        StructField("batchNo", IntegerType(), nullable=False),
        StructField("sequence", IntegerType(), nullable=False),
        StructField("parquetType", StringType(), nullable=False),
        StructField("analyticsType", StringType(), nullable=False),
        StructField("name", StringType(), nullable=False),
        StructField("source_filename", StringType(), nullable=True),
        StructField("processedDate", DateType(), nullable=True),
        StructField("historicalCheckFlag", IntegerType(), nullable=False),
        StructField("raiseerrorFlag", IntegerType(), nullable=False),
        StructField("totalRows", IntegerType(), nullable=False)
    ])

    # Create an empty DataFrame with the schema
    dfprocesseddimfact = spark.createDataFrame([], schema)

    dfprocesseddimfact.show()

    print("The Delta table SilverDebbiesTraininglh.processeddimfact does not exist. Create empty dataframe")

If there is no data we can create an empty schema.

Get the Total of Dim Contestant rows (None Historical)

from pyspark.sql.functions import col, sum

# Filter the dataframe where name is "Dim Contestant"
filtered_df = dfprocesseddimfact.filter((col("Name") == "Dim Contestant") & (col("historicalCheckFlag") == 0)& (col("source_filename") != "Calculated Row"))

display(filtered_df) 


# Calculate the sum of TotalRows
total_rows_sum = filtered_df.groupBy("Name").agg(sum("TotalRows").alias("TotalRowsSum"))

# Rename the "Name" column to "SelectedName"
total_rows_sum = total_rows_sum.withColumnRenamed("Name", "SelectedName")


display(total_rows_sum) 

# Print the result
print(f"The sum of TotalRows where name is 'Dim Contestant' is: {total_rows_sum}")

Both Initial (None Historical Loads) equal 10

Get Historical Rows from our latest process and join to total Loads of the previous Load

#Join total_rows_sum to audit3_df
from pyspark.sql.functions import broadcast

# Broadcast the total_rows_sum dataframe
broadcast_total_rows_sum = broadcast(total_rows_sum)

# Perform the left join
df_checkolddata = audit3_df.filter(
    (audit3_df["historicalCheckFlag"] == 1) & (audit3_df["source_filename"] != "Calculated Row")
    ).join(
    broadcast_total_rows_sum,
    audit3_df["name"] == broadcast_total_rows_sum["SelectedName"],
    "left"
)

# Show the result
display(df_checkolddata)

So Series 1 and 2 have 10 contestants. After S3 load there are still 10 contestants. Everything is fine here. The historical records row allows us to ensure we haven’t caused any issues.

Set Raise Error Flag if there are issues

And now we can set the error flag if there is a problem which can raise an alert.

from pyspark.sql.functions import col

# Assuming df is your DataFrame
total_rows_sum = df_checkolddata.agg({"TotalRowsSum": "sum"}).collect()[0][0]
total_rows = df_checkolddata.agg({"totalRows": "sum"}).collect()[0][0]

# Check if TotalRowsSum equals totalRows
if total_rows_sum == total_rows:
    print("No issue with data")

else:
    df_checkolddata = df_checkolddata.withColumn("raiseerrorFlag", lit(1))

    print("Old and historical check does not match. Issue. Set raiseerrorFlag = 1")

# Show the updated DataFrame
df_checkolddata.show()

raiseerrorFlag is still 0

Remove the historical check row if there is no error

from pyspark.sql.functions import col

# Alias the DataFrames
audit3_df_alias = audit3_df.alias("a")
df_checkolddata_alias = df_checkolddata.alias("b")

# Perform left join on the name column and historicalCheckFlag
df_audit4 = audit3_df_alias.join(df_checkolddata_alias, 
                              (col("a.name") == col("b.name")) & 
                              (col("a.historicalCheckFlag") == col("b.historicalCheckFlag")), 
                              how="left")

# Select only the columns from audit3_df and TotalRowsSum from df_checkolddata, and alias historicalCheckFlag
df_audit5 = df_audit4.select(
    "a.*",
    col("b.raiseerrorFlag").alias("newraiseErrorFlag")
)


# Filter out rows where newraiseerrorflag is 0 because its fine. We don';'t need to therefore know about the historical update
df_audit6  = df_audit5.filter((col("newraiseErrorFlag") != 0) | (col("newraiseErrorFlag").isNull()))

# Remove the columns newraiseerrorflag
df_audit7 = df_audit6.drop("newraiseErrorflag")


# Show the result
display(df_audit7)

the historical row has been removed. if there isn’t a problem, there is no need to hold this record.

If the data has multiple audit rows. Flag as an issue. Else remove

# Count the number of rows where ContestantName is "Not Known"
dfcount_not_known = dfnewdimc.filter(col("ContestantName") == "Not Known").count()

# Print the result
print(f"Number of rows where ContestantName is 'Not Known': {dfcount_not_known}")

# Check if count_not_known is 1
if dfcount_not_known == 1:
    # Remove row from df_audit7 where source_filename is "CalculatedRow"
    df_audit8 = df_audit7.filter(col("source_filename") != "Calculated Row")

    print(f"1 dim audit row. We can remove the row from the audit table")

else:
    # Set source_filename to "Multiple Calculated Rows issue"
    df_audit8 = df_audit7.withColumn("source_filename", 
                                     when(col("source_filename") == "Calculated Row", 
                                          lit("Multiple Calculated Rows issue"))
                                     .otherwise(col("source_filename")))


    print(f"multiple dim audit row. We can specify this in the audit table")                                

# Show the updated df_audit7
df_audit8.show()

The Audit rows has now been removed. Unless there is an issue we don’t need it. this will be useful if we accidentally process multiple audit rows into the table.

Save as Delta Parquet

from delta.tables import DeltaTable

#You can also add the none default daya lake by clicking +Lakehouse
audit3_df.write.mode("append").option("overwriteSchema", "true").format("delta").saveAsTable("SilverDebbiesTraininglh.processedDimFact")

# Print a success message
print("Parquet file appended successfully.")

And save the data as a Delta Parquet processedDimFact in our Silver transform layer.

We can use both of these Delta Parquet Files later.

Check processdimfact

Dim Contestant is complete for now. But we will come back later to update the code.

Dim Episode

Create Episode Dim

Small change. We are adding processedTime as the current date and time.

from pyspark.sql.functions import current_timestamp

dftmEp = dftm.select("Series","Episode No" ,"Episode Name").distinct()

# Add the current timestamp as processedTime
dftmEp = dftmEp.withColumn("processedTime", current_timestamp())

dftmEp = dftmEp.orderBy("Series", "Episode No")

display(dftmEp)

Add a Default row

Just like Contestant. We need ProcessedTime adding

from pyspark.sql.functions import current_timestamp
 # Add the current timestamp to the ProcessedTime column
    new_row = new_row.withColumn("ProcessedTime", current_timestamp())

Again, Here is the screenshot taken on the first load of S1 and S2

And our S3 Load 2 hits the if block and no default row is created

Get the Series, Episode Name and Source File of the Processed data

#Get just the series, Episode Name and source_filename from our current load

df_SeEp = dftm.selectExpr("`Series` as SeriesFinal", "`Episode Name` as EpisodeFinal", "source_filename").distinct()

display(df_SeEp) 

` are used in PySpark to handle column names that contain special characters, spaces, or reserved keywords.

Join df_SeEp to dfnewdimt

Join the list pf currently processed series and episodes to everything in the dimension so far (the data frame we created when we checked the dim


from pyspark.sql.functions import broadcast, when, col

# Broadcast df_contestant_name and perform the left join
df_joined = dfnewdimt.join(
    broadcast(df_SeEp),
    (dfnewdimt.Series == df_SeEp.SeriesFinal) & (dfnewdimt.EpisodeName == df_SeEp.EpisodeFinal),
    "left"
)
# Add the currentFlag column
df_joined2 = df_joined.withColumn("historicalCheckFlag", when(col("SeriesFinal").isNull(), 1).otherwise(0))

# Drop the ContestantNameFinal column
df_joined3 = df_joined2.drop("SeriesFinal").drop("EpisodeFinal")

#If its an audit row set sourceFileNameFinal to Calculated Row
df_joined4 = df_joined3.withColumn(
    "source_filename",
    when((col("source_filename").isNull()) & (col("EpisodeKey") == -1), "Calculated Row")
    .otherwise(col("source_filename"))
)

# Show the result
display(df_joined4)

All historicalCheckFlags are set to 0 if its the current Load (s3). 1 when there is no match and its historical data. the source_filename from df.SeEp has been left in the result which can be used later.

Again we have set the Episode Keys source_filename to “calculated Row” because there is no source to join this one record on by the series and episode name.

Add audit row

At the end of the process. We will add an audit row

from pyspark.sql import functions as F

# Add the new column
dfdimaudit = df_joined4.withColumn("sequence", F.lit(3))

dfdimaudit = dfdimaudit.withColumn("parquetType", F.lit("Delta parquet"))

dfdimaudit = dfdimaudit.withColumn("analyticsType", F.lit("Dimension"))

dfdimaudit = dfdimaudit.withColumn("name", F.lit("Dim Episode"))

# Extract the date from source_processedTime
dfdimaudit  = dfdimaudit.withColumn("processedDate", F.to_date("processedTime"))

dfdimaudit =  dfdimaudit.withColumn("raiseerrorFlag", F.lit(0))

# Select the required columns and count the rows
dfdimauditgr = dfdimaudit.groupBy("sequence","parquetType","analyticsType","name", "source_filename","processedDate","historicalCheckFlag","raiseerrorFlag").count()

# Alias count to totalRows
dfdimauditgr = dfdimauditgr.withColumnRenamed("count", "totalRows")

# Show the data
display(dfdimauditgr) 

for Load 2 we have 5 records in the current load with the source filename set. The historical load of 11 and the calculated Audit -1 row.

Bring back our processeddimfact data

from delta.tables import DeltaTable
from pyspark.sql.functions import col, max

# Load the Delta table as a DataFrame
dfprocesseddimfact = spark.read.format("delta").table("SilverDebbiesTraininglh.processeddimfact")

# Find the maximum BatchNo
max_batch_no = dfprocesseddimfact.agg(max("BatchNo")).collect()[0][0]

# Filter the DataFrame to include only rows with the maximum BatchNo
dfprocesseddimfact = dfprocesseddimfact.filter(col("BatchNo") == max_batch_no)

# Display the table
display(dfprocesseddimfact)
dfprocesseddimfact.printSchema()

Here we find the max batch number and then filter our data frame to only give us the max batch number which is the one we are working on. (Because at this point, we are in the middle of a batch of dims and facts)

Create a distinct batch No

distinct_batch_no = dfprocesseddimfact.select("batchNo").distinct()

display(distinct_batch_no)

We just want that distinct Max Batch number for the next process

Join to our Dim Episode Audit row to bring in the batch number via Left outer join

# Alias the dataframes
dfdimauditgr_alias = dfdimauditgr.alias("dfdimauditgr")
distinct_batch_no_alias = distinct_batch_no.alias("dfprocesseddimfact")

# Perform inner join
audit2_df = dfdimauditgr_alias.crossJoin(distinct_batch_no_alias)

# Select specific columns
audit2_df = audit2_df.select(
    distinct_batch_no_alias.batchNo,
    dfdimauditgr_alias.sequence,
    dfdimauditgr_alias.parquetType,
    dfdimauditgr_alias.analyticsType,
    dfdimauditgr_alias.name,
    dfdimauditgr_alias.source_filename,
    dfdimauditgr_alias.totalRows,
    dfdimauditgr_alias.processedDate,
    dfdimauditgr_alias.historicalCheckFlag,
    dfdimauditgr_alias.raiseerrorFlag
)

audit2_df = audit2_df.distinct()

# Show result
display(audit2_df)

The cross join (Cartesian Join) is because there is only 1 batch number set in distinct_batch_no (the Max Batch number is the one we are working on) So no need to add join criteria.

Set totalRows to Int

Once again TotalRows needs setting to Int

Get the processeddimfact data

# Load the Delta table as a DataFrame
    dfprocesseddimfact = spark.read.format("delta").table("SilverDebbiesTraininglh.processeddimfact")

    # Display the table
    dfprocesseddimfact.show()  # Use show() instead of display() in standard PySpark

    # Print the schema
    dfprocesseddimfact.printSchema()
    # Load the Delta table as a DataFrame
    dfprocesseddimfact = spark.read.format("delta").table("SilverDebbiesTraininglh.processeddimfact")

    # Display the table
    dfprocesseddimfact.show()  # Use show() instead of display() in standard PySpark

    # Print the schema
    dfprocesseddimfact.printSchema()

We will need this to check our historical checked data against the original update

Get the total rows of our None historical rows that have been previously loaded

Same logic as Contestant

Join Historical records with new audit to see if the numbers have changed

No changes to the old numbers.

Set raiseerrorFlag if there are issues

There are no issues

If no issues. Record is removed

As there are no issues the record is removed

If Audit row -1 is only in once. No need to hold the audit row

Append into processeddimfact

And check the data looks good

We head off into Dim Task now and here is where we hit some issues. We need to add more code into the process.

Dim Task

checking the count of rows between the original load, and the historical check

When we count the rows of processed data they come to 64. But when we run the check again it comes to 65. Which would trigger the raise error flag. And this is why

dfnewdimt_filtered = dfnewdimt.filter(dfnewdimt["Task"] == "Buy a gift for the Taskmaster")
display(dfnewdimt_filtered)

I settled on Task and TaskOrder to create the unique row. However this isn’t the case. we have two series where our Task and Task Order are identical.

What we really want is one record in the dimension applied to the correct seasons in the fact table. Not a duplicate like we have above.

The following code is adding Series 3 to both “Buy a gift for the taskmaster” and is consequently undercounting.

Series three is a good opportunity to update the logic as we know there is a record already added from season 2

Bring back old dimTask

from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Initialize Spark session
#spark = SparkSession.builder.appName("DeltaTableCheck").getOrCreate()

# Path to the Delta table
table_path = "abfss://########-####-####-####-############@onelake.dfs.fabric.microsoft.com/########-####-####-####-############/Tables/dimtask"

# Check if the Delta table exists
if DeltaTable.isDeltaTable(spark, table_path):
    # Load the Delta table as a DataFrame
    dfprevdimt = spark.read.format("delta").load(table_path)
    
else:
    # Define the schema for the empty DataFrame
    schema = StructType([
        StructField("Task", StringType(), True),
        StructField("TaskOrder", IntegerType(), True),
        StructField("TaskType", StringType(), True),
        StructField("Assignment", StringType(), True),
        StructField("ProcessedTime", TimestampType(), True)
    ])
    
    # Create an empty DataFrame with the specified schema
    dfprevdimt = spark.createDataFrame([], schema)

    print("no dim. Create empty dataframe")

# Display the DataFrame
dfprevdimt.show()

If its the first time, create an empty dataset. else bring back the dataset

Join new Tasks to all the Previously added tasks

The decision has been made to use all the columns in the join, Just in case a Task has a different assignment or Type.

Our Task that we know is identical for Series 2 and 3 appears. We use the dftm dataframe. This contains the data before we create the distinct dimension, as this contains source_filename. Inner Join only brings back the matching records.

Add this to a Delta Parquet File

We can now have a static item we can use later. and we can report on these items

If task already exists. Remove from the new load

from pyspark.sql.functions import col,current_timestamp

# Alias the DataFrames
dfprevdimt_df_alias = dfprevdimt.alias("old")
dftm_alias = dftm.alias("new")

# Perform left join on the name column and historicalCheckFlag
dftmdupRemoved = dftm_alias.join(dfprevdimt_df_alias, 
                              (col("new.Task") == col("old.Task")) & 
                              (col("new.Task Order") == col("old.TaskOrder"))& 
                              (col("new.Task Type") == col("old.TaskType"))& 
                              (col("new.Assignment") == col("old.Assignment")), 
                              how="Left_anti")



#And reset the dimension
dfTasksdupRemoved = dftmdupRemoved.select("Task","Task Order" ,"Task Type", "Assignment").distinct()

# Add the current timestamp as processedTime
dfTasksdupRemoved = dfTasksdupRemoved.withColumn("processedTime", current_timestamp())

# Show the result
display(dfTasksdupRemoved)

The left anti join bring back records that exist in the new data frame and removes anything that also exists in the pre existing data

dtfm is again used so we have to reset the dimension.

dfTasksdupRemoved now becomes our main dataframe so we need to ensure its used across the Notebook where neccessary

We then go on to create the default row (If its never been run before) and keys

Add a Default Row

Code changed to use dfTasksdupRemoved

Create the Task Key

Again, we have had a change to code to use the updated dataframe

Get Task details from current load

Previously, This was just Task and Order but we know that we need more in the join.

Create a historical Record check

Buy a gift for the Taskmaster is flagged as historical (Its in Series 2) and we have removed it from series 3. And we have added this information to a Delta Parquet.

We have also added more join criteria.

Create the Audit Rows

Add a value into source filename if full because its a historical record

Bring back the ProcessedDimfact

Get the Current batch

Join the batch number into the dataframe

Load the processed Dim Fact

Count the rows

Total Rows is correct at 65

Add the Rows of the historical check against rows that were originally processed

These now match

If Issue. Set RaiseErrorFlag = 1

No issues

Remove historical check if there are no issues

Check that there aren’t multiple rows in the data for the default -1 row. If all ok. remove default row from audit

And append the data into processeddimfact.

There is other logic to consider here. Imagine if we loaded in Series 1,2 and 3 together. Would the logic still remove the duplicate task. This will need running again to test

Taskmaster Fact

The fact table doesn’t have default records so we need to deal with this slightly differently. Lets run through the code again

The new fact data has been processed at the point we continue with the logic

Get Data of current process we can join on

Join current and already processed Fact data

  • df_currentfact is the current S3 data
  • dfnewfact is the newly loaded fact table that contains everything. S1,2 and 3.

dfnewfact is on the left of the join. So logically. If current and new both exist we get the match.

Create the Audit Row

If source Filename is null. Add Historical Data Check

Bring back the latest batch from processeddimFact

Get Current Batch No

Add the batch number to the audit

Load all of the processed dim fact

Total Up the none historical rows (totals created at the time)

Add the total rows (As at the time they were created to the total historical rows)

We can already see they match. 307 rows for S1 and S2

Reset the Raise Error flag to 1 if there is a problem

Remove the historical check row if there are no issues

And now you can append into the Delta parquet table and check the processed dim and fact.

And then we are back to setting the processedflag to 1 at the end of the fact table.

Conclusion

Now we have an audit of our Loads (2 Loads so far)

We have done a lot of work creating auditing. We can now add files and hopefully run the pipeline to automate the run. then we can create Power BI reporting.

We now have three audit Delta parquet tables

  1. processedFiles – This goes through each Taskmaster Series File and allows you to see what you are working on. the fullyProcessedFlag is set to 0 when the fact table is complete
  2. processeddimFact – (Shown above) The detailed audit for the transformation, dim and fact tables. Containing, number of records. The date time processed. And a flag for issues
  3. dimremovenoneuniquetaskaudit. Tasks are possible to duplicate. For example, we have the same task (And all of its components like sequence) are identical between series 2 and 3. We only need the one row so the duplicate is removed

There is more that we could do with this. of course there always is. However the big one is to make sure that we get none duplicated task information if series 2 and 3 are processed together.

We could also add more detail into the audit. For example any issues with the data such as Column A must always be populated with either A B C or D. If Null then flag as error. If anything other that specified codes then error.

We will come back to this but in the next blog I want to try something a bit different and set up the Data Warehouse. How would this project look if using the warehouse. And why would you choose one over the other.

So in part 19 we will look at setting the warehouse up and how we deal with the source data in the data lake

Microsoft Fabric Part 12. Taskmaster Project. Pyspark Broadcast

In Part 10 we created the reporting and Project PBIP file. In Part 11 we looked at new Task flows and added our items into a medallion architecture.

Its now time to do some small tweaks to the project so far. One thing I discovered is the Pyspark broadcast function.

This function is used to optimise the performance of spark jobs by reducing data shuffling.

Broadcast is really good for join operations. And we have a lot of joins in the the notebooks. can we make the code better?

First of all. What is data shuffling?

Data Shuffling

In Distributed data processing its about redistributing data across partitions or nodes in a cluster. It happens when the data gets re-organised.

  • Joins
  • Aggregations
  • Groupings

So, data is distributed in partitions to allow for parallel processing

When you join two partitions together you get data shuffling (During transformation)

Shuffling is expensive and creates network overhead.

These are the stages of shuffling.

  • Map Stage – When the data is processed in partitions
  • Shuffle Stage – Shuffling and exchanging
  • Reduce Stage – Further processing after shuffling

We want to try and avoid unnecessary shuffling by using broadcast. Lets go back to our notebooks

Taskmaster transformed Notebook

# Join the extra contestant information
dfcont = dfc.join(dfp, dfc["Name"] == dfp["contestant"], "left_outer").drop(dfp.contestantID)\
.drop(dfp.contestant).drop(dfp.team).drop(dfp.team_label).drop(dfp.champion)\
.drop(dfp.TMI)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dfcont)

This is the original Join. Clicking run, it takes 6 seconds.

If you run it again it takes two seconds. Why is it quicker?

  • File caching
  • The second run will benefit from better data distribution and partitioning
  • JVM warmup. (Java Virtual Machine) has optimized on the second run

How do you get it to have time of run 1?

spark.catalog.clearCache()

Clears the memory before trying again.

Working with Broadcast on a join

You need to Broadcast the smaller table. In the above instance we can create a quick check of sizes on the two data sets.

dfcrow_count = dfc.count()
dfprow_count = dfp.count()

print(f"Number of rows in the dfc DataFrame: {dfcrow_count}")
print(f"Number of rows in the dfp DataFrame: {dfprow_count}")

Immediately we can see which dataframe is to be broadcast. dfc

from pyspark.sql import SparkSession, functions as F

df_small = F.broadcast(dfc)

# Join the extra contestant information
dfcont = df_small.join(dfp, df_small["Name"] == dfp["contestant"], "left_outer").drop(dfp.contestantID)\
.drop(dfp.contestant).drop(dfp.team).drop(dfp.team_label).drop(dfp.champion)\
.drop(dfp.TMI)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dfcont)

dfcont.explain()

Explain allows us to see the plan.

This took 1 second to run as opposed to the 6 seconds previously. if you are working with a lot of data this could really change things considerably with your runs.

Lets remove the original join and lock this one in.

The Execution Plan

Gives you more detail on how the command is being executed.

Conclusion

With this in mind, we can go through all the code and update anything where we can with a broadcast which should really help with the processing.

A really great addition to the Pyspark toolbox

Microsoft Fabric Part 5. Taskmaster Project. Creating and using Shortcuts from an Azure Data Lake and creating the first dimension

Parts 1 2 3 and 4 were attempting to transform data into a star schema for power BI using notebooks in fabric.

However there was missing data which meant we couldn’t go to the level of detail we wanted for the star (Episode level)

Now we have a new data set and the csv files have been transferred into a data lake manually.

We can attempt to set up a pipeline to import the data in another blog.

in Microsoft Fabric we go to data Engineering

And open up the Fabric Workspace.

We already have a Lakehouse and notebooks containing the work from the last few blog posts

Lets open the Lakehouse

Although we are using the medallion architecture, you may still wish to call your folders, for example raw and staging .

Create a Shortcut

For this exercise we are going to do something different and have the raw data in a datalake in Azure that we add a shortcut to.

the Files have been placed in a datalake and we want to shortcut to these files. Imagine that the process is already in place pre Fabric and we have decided to stick with this process.

This will be added to every time a series finishes.

We also have a Lookup contestants file in this folder that will be updated by having data appended to it

This new exercise will work with partitioning by series for the fact table. Back in Fabric.

Lets add a Shortcut to our new Taskmaster folder in the Bronze raw data area.

https://learn.microsoft.com/en-us/fabric/onelake/onelake-shortcuts

We need the connection settings. get this from the Data Lake. Endpoints Data lake Storage, Primary endpoint data lake storage and sign in.

Tip Make sure you have Storage Blog Data Contributor Role

You want the URL with dfs (Distributed File System), not blob.

However, note that the Data Lake is in North Europe.

Creating a shortcut to a data lake in a different geographical area can result in Egress charges

So we need to check that our Data Lake and Lakehouse are in the same area.

Go back to the Workspace and Workspace Settings

Thankfully both are in North Europe so this should be fine.

Note. To keep down Egress charges you can set up caching which keeps the files for 24 hours without needing to access the data in the shortcut. but this is only available for

  • GCS (Google Cloud Storage)
  • S3 (Amazon S3 Simple Storage Service)
  • S3 compatible shortcuts (Any Services using Amazon Simple Storage Service)

Back to our Shortcut. Click Next.

We want to create a Shortcut to the Taskmaster Folder in the raw Folder. Click Next

Click Create

We now have a shortcut set up to the Azure Data Lake. Later on we can add another file and see what happens.

Create a Notebook

Lets Create a New Notebook

%%configure
{
    "defaultLakehouse": {  
        "name": "DebbiesFabricLakehouse"
    }
}

First of all I’m going to configure to the current Lakehouse

Load in all the files in the Shortcut Folder

from pyspark.sql.functions import input_file_name, regexp_extract

dftm = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/TaskMasterSeriesFiles/Taskmaster/Taskmaster_*.csv")

dftm = dftm.withColumn("filename", regexp_extract(input_file_name(), r"([^/]+)$", 1))

display(dftm)
  • * alias has been used to get all series files and Series End Dates, and Only csv files.
  • input_file_name() brings us back the file name
  • regexp_extract(, r”([^/]+)$”, 1)) allows us to remove the URL and just keep the filename.

The File name may be important later.

Remove empty records

We have some null rows come through that we want to remove from the data set

how=’all’

This parameter is used with dropna() to drop rows where ALL values are NULL

Removing Special characters

There are issues. We have some strange characters coming through in the data set.

We can use a filter function on the col Column episode name LIKE and bring through a distinct list.

from pyspark.sql.functions import regexp_replace

# Replace '�' with 'eclair' in the 'text' column
dftm_cleaned = dftm.withColumn("Episode Name", regexp_replace("Episode Name", "A pistachio �clair", "A pistachio eclair"))

# Replace any remaining '�' with an empty string
dftm_cleaned = dftm_cleaned.withColumn("Episode Name", regexp_replace("Episode Name", "�", "'"))

# Show the resulting DataFrame
display(dftm_cleaned)

We have two.

é in eclair and any item with ‘

So we actually need to update based on Logic:

from pyspark.sql.functions import regexp_replace

# Replace '�' with 'eclair' in the 'text' column
dftm_cleaned = dftm.withColumn("Episode Name", regexp_replace("Episode Name", "�", "eclair"))

# Replace any remaining '�' with an empty string
dftm_cleaned = dftm_cleaned.withColumn("Episode Name", regexp_replace("Episode Name", "�", ""))

# Show the resulting DataFrame
dftm_cleaned.show(1000)

regexp_replace

This function replaces all substrings of a string that matches a specified pattern with a replacement string. Good for cleaning and transformation.

it might be best to check task name too by repeating the above filter and changing Episode name to Task.

we have Greg says…

and the rest are ‘

We can also deal with this

from pyspark.sql.functions import regexp_replace

# Replace '�' with 'eclair' in the 'text' column
dftm_cleaned = dftm_cleaned.withColumn("Task", regexp_replace("Task", "Greg says�", "Greg says..."))

# Replace any remaining '�' with an empty string
dftm_cleaned = dftm_cleaned.withColumn("Task", regexp_replace("Task", "�", "'"))

# Show the resulting DataFrame
display(dftm_cleaned)

Create Dim episode Dimension

Lets have a look at just S1. Does it have all the Episodes?

Yes it looks good. So the episode Dimension consists of Series and Episode.

from pyspark.sql.functions import to_date

dftmEp = dftm_cleaned.select("Series","Episode No" ,"Episode Name").distinct()

dftmEpOrdered = dftmEp.orderBy(col("Series"),col("Episode No"))

display(dftmEpOrdered)

This is a Distinct List

And we have created an OrderBy Data Frame to display.

There is an empty row we need to remove

##drop all rows with null values

dftmEp = dftmEp.na.drop(how='all')

display(dftmEp)

We have used this code block before to remove the fully null rows.

Add A Default Row

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a sample DataFrame
data = [(-1, -1,"Not Known")]
columns = ["series", "episode No", "Episode Name"]
new_row  = spark.createDataFrame(data, columns)

# Union the new row with the existing DataFrame
dftmEp = dftmEp.union(new_row)

# Show the updated DataFrame
dftmEp.show(1000)

Now we have the recommended Default row. It’s time to add a key

from pyspark.sql.functions import col

# Create a window specification partitioned by "Series" and ordered by "Episode No"
window_spec = Window.orderBy(col("Series"), col("Episode No"))

# Add a new column "EpisodeKey" using row_number() over the specified window
dftmEpKey = dftmEp.withColumn("EpisodeKey", row_number().over(window_spec) - 2)

# Show the result
dftmEpKey.show(1000)

Window.OrderBy

This function is used to define the ordering within a window specification. It allows you to specify the order in which rows are processed within a partition

row_number().over

The ROW_NUMBER() function, when used with the OVER() clause, assigns a sequential integer number to each row in the result set of an SQL query.

Create Delta PARQUET File

We have our first dimension. Lets add it to Silver folder in files as unmanaged and Tables as a managed Delta PARQUET table. that way we can see what we can do with both

We don’t want to partition the dimension. The Fact table will be partitioned

from delta.tables import DeltaTable

dftmEpKey.write.mode("overwrite").option("overwriteSchema", "true").format("delta").saveAsTable("dimEpisode")

However, There is an error coming up

AnalysisException: Found invalid character(s) among ‘ ,;{}()\n\t=’ in the column names of your schema. Please upgrade your Delta table to reader version 2 and writer version 5 and change the column mapping mode to ‘name’ mapping. You can use the following command:

Instead of upgrading. We want to remove the special characters. The columns are

|Series|Episode No| Episode Name|EpisodeKey|

Cleaning up Columns in a dataframe

import re

# Select columns with modified names (without special characters)
dftmEpKey_cleaned = dftmEpKey.select(*[col(c).alias(re.sub('[^0-9a-zA-Z]', '', c)) for c in dftmEpKey.columns])

# Show the result
dftmEpKey_cleaned.show(1000)

It seems it may have been the spaces.

re.sub

In PySpark, the re.sub() function is used to replace substrings that match a specified regular expression pattern with a string of your choice.:

re.sub('[^0-9a-zA-Z]', '', c) removes any characters that are not alphanumeric (letters or digits) from the column name.

The expression [col(c).alias(re.sub('[^0-9a-zA-Z]', '', c)) for c in dftmEpKey.columns] is a list comprehension that performs the following steps for each column name (c):

  • col(c).alias(...) creates a new column with the modified name (without special characters) using the alias() method.
from delta.tables import DeltaTable

dftmEpKey_cleaned.write.mode("overwrite").option("overwriteSchema", "true").format("delta").saveAsTable("dimEpisode")

And we can save to delta PARQUET

from delta.tables import DeltaTable

So, we now have an episode dimension. lets save the Notebook as Dim Taskmaster Episode V2 (We already have the original one saved)

Remember to commit in Source Control.

We want to create Task and Contestant dimensions. We already have a date dimension to work with.

In the next post, these extra dimensions and fact table will be created. and then we can see how they can be used

Microsoft Fabric Part 4. Taskmaster Project. Adding the fact table

In parts 1 and 2 we created and updated DimContestant

In Part 3 we created DimTask, DimEpisode and DimDate

Its time to create the fact table. the first thing we need to do is to get an understanding of what Facts we have and how they would join to the data we have created in the dims so far.

Lets create a notebook.

%%configure
{
    "defaultLakehouse": {  
        "name": "DebbiesFabricLakehouse"
    }
}

Adding possible csvs into DataFrames

Attempts

#Read the first file into a dataframe
dfattempts = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/attempts.csv")
# dfattempts now is a Spark DataFrame containing CSV data from "Files/Data/Bronze/attempts.csv".
display(dfattempts)

Contains points and ranks and appears to be at the episode and contestant level

episode_scores

#Read the file into a dataframe
dfepsc = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/episode_scores.csv")

display(dfepsc)

episode_scores is new to this project but seems a really good data set. Holds scores and Ranks at the episode and contestant level (but not at task level)

we actually don’t need this because its just aggregated data. We can remove this from our transformations. Power BI will aggregate for us

episodes

#Read the file into a dataframe
dfepis = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/episodes (1).csv")

display(dfepis)

This contains Dates and points. However the Scores are at Episode Level. We will be able to get this information from DAX, summing the individual Scores so we don’t need the scores at all. All we are interested in is the air date.

Lets look at the row counts

ct = dfepsc.count()
print(f"dfepsc: {ct}")

ct = dfepis.count()
print(f"dfepis: {ct}")

ct = dfattempts.count()
print(f"dfattempts: {ct}")

the number of columns vary. We need to make sense of what we have

Create the data we need from attempts

# Select multiple columns with aliases
dfdfattemptsf = dfattempts.select(
    dfattempts.task.alias('attTaskID'), #Join to DimTask
    dfattempts.task_label.alias('attTask'),
    dfattempts.episode.alias('attEpisode'),
    dfattempts.series.alias('attSeries'),
    dfattempts.contestant_label.alias('attContestant'), #Join to DimContestant
    dfattempts.points.alias('attPoints')
).distinct()

# Show the resulting DataFrame
display(dfdfattemptsf)

The Points in Attempts are the best ones we have at the moment because the Task csv was empty.

we are clearly now hitting real data problems and in a normal project we would be going back to the team creating the data to get them to resolve the problems.

Filter attempts

the attempts is our main table but we could do with the episode and Series information working with our dimensions we have and we only have Ids for both

Are episode and series related with the correct IDs? Lets do some filter queries to look at series 1, episode 1.

After a few filtered queries, there is clearly a some data issues, or unknowns that I am not aware of. With attempts. The episode 1 and series 1 doesn’t match the contestants and tasks (I know because I watched it).

For episodes (1) (Above) we do have the correct information for episode and series. But we aren’t using these to get data.

More problems with Episode Scores. Episode appears to be correct. But the series ID is completely different so we cant match on this. But we aren’t using this data any more.

We need to look at our PARQUET file DImEpisode

The first thing we need to do is Load our DimContestant PARQUET data back into a dataframe and to to this, we have to update the code slightly as previously we have loaded in csv files.

# Read Parquet file using read.parquet()
dfdimep  = spark.read.parquet("Files/Data/Silver/DimEpisode.parquet")
display(dfdimep)

To Get the path, Click on you File in the Files Folder in the Lakehouse and Copy ABFS Path, Copy Relative path for spark gives you a more concise path

We are now using the silver layer of the medallion architecture in the path. All our bronze landing data has been transformed.

Are our IDs correct in the Dim for Attempts, or for the Tasks by Objective csv?

Now we have a real issue. Our Episodes cant join to attempts.

Join with one Condition

We can join episode Scores to Episode first via the episode name? This would be an issue if you have repeating episode names in Series, but we are lucky that this is not the case.

# condition
original_condition = dfepis["title"] == dfepscf["scEpisode"]

# Now perform the left outer join
dfEpScorej =  dfepis.join(dfepscf, on=original_condition, how="full").drop(dfepscf.scEpisode)

display(dfEpScorej)

Filter new transformed table for episode 1 series 1

import pyspark.sql.functions as F

filtered_df = dfEpScorej.filter((F.col('episode') == 1) & (F.col('series') ==1))
display(filtered_df)

Filtering on Task 1 Season 1 gives us all 5 contestants against the episode. There are lots of columns we can remove. For example, we don’t need to hard code the winner. This can be done via Power BI DAX. So, the above join will be slightly updated with .drop.

# condition
original_condition = dfepis["title"] == dfepscf["scEpisode"]

# Now perform the left outer join
dfEpScorej =  dfepis.join(dfepscf, on=original_condition, how="full").drop(dfepscf.scEpisode)\
.drop(dfepscf.scEpisodeID).drop(dfepis.winner).drop(dfepis.winner_label).drop(dfepis.studio_date).drop(dfepis.winner).drop(dfepis.finale).drop(dfepis.TMI)

display(dfEpScorej)


Full is being used as the join because we want each side even if they haven't been joined. 

Use dfEpScorej now instead of dfepis and dfepscf

We have the attempts (dfattemptsf) left

So how can we join this final table?

  • We can’t join by Task because Episodes doesn’t include task
  • We can’t join by Series because the Series IDs don’t match and neither does Episode.
  • We can match by Contestant but Contestants can take part in multiple series so the join doesnt work

As it stands we cant actually match in this table so we need to go with Episodes and Scores for the fact table.

Add the Dimension Keys into the transformed table

Join DimContestants Via Contestant Name

We Can’t use ID but we can use name.

Also when we join we only want the Key and we can immediately remove everything relating to Contestant.

# Original condition
original_condition = dfdimcont["contestant"] == dfEpScorej["sccontestant"]

# Adding another condition based on the 'name' column
#combined_condition = original_condition & (dfdimcont["contestant"] == dfEpScorej["sccontestant"])

# Now perform the left outer join
dfEpScorej = dfEpScorej.join(dfdimcont, on=original_condition, how="left")\
.drop(dfdimcont.TeamName).drop(dfdimcont.teamID).drop(dfdimcont.age).drop(dfdimcont.gender)\
.drop(dfdimcont.hand).drop(dfdimcont.ageRange).drop(dfdimcont.dob)\
.drop(dfdimcont.contestant).drop(dfdimcont.contestantID)\
.drop(dfEpScorej.sccontestant).drop(dfEpScorej.scContestantID)

display(dfEpScorej)

If we have any null values they can be replaced with -1

But first we can check if any exists

dfEpScorej.filter(dfEpScorej.ContestantKey.isNull()).show()

fillna to replace null with a value

Even though we have none at the moment, there could be some in the future so fillna allows us to replace with our Default keys

dfEpScorej = dfEpScorej.fillna(-1, subset=[“ContestantKey”])
dfEpScorej.show()

Add Episode Key

All is good. we can join it into the transformed table. Removing everything but the key using .drop

the data was checked that it was ok before adding all the .drop’s into the code,

# Original condition
original_condition = dfdimep["seriesID"] == dfEpScorej["series"]

# Adding another condition based on the 'name' column
combined_condition = original_condition & (dfdimep["episodeID"] == dfEpScorej["episode"])

# Now perform the left outer join
df = dfEpScorej.join(dfdimep, on=combined_condition, how="left")\
.drop(dfdimep.episodeTitle).drop(dfdimep.episodeID).drop(dfdimep.series).drop(dfdimep.seriesID)\
.drop(dfEpScorej.series_label).drop(dfEpScorej.series).drop(dfEpScorej.episode).drop(dfEpScorej.title)

display(df)

Set Key to -1 if NULL

dfdimtask = dfdimtask.fillna(-1, subset=["taskKey"])
dfdimtask.show()

Conclusion

This specific project has been abandoned because the data doesn’t give me what I need join wise to create the star schema. Mostly because there is clearly a main table with missing data.

But, I can’t wait to take what I have learned and try again with another data set.

Parts 1 to 4 have given me some great insight into analysis transforming and modelling with Pyspark.

Microsoft Fabric Part 2. Taskmaster Project. Updating a dim Contestants table in Notebook and Delta PARQUET File

This blog is the next step for

Microsoft Fabric Part 1. Taskmaster Project. Creating a dim Contestants table in Notebook and Delta PARQUET File

In Part 1, a Contestants Dimension was created from the csv file attempts csv. The data from this was transformed and added into a PARQUET file and a DELTA PARQUET file so we can test the differences later.

There are more csv files to look at and one specifically gives us more details for the Contestants dim

people.csv

The task here is to go back to the original Notebook and see if we can add this new information

The first bit of code to create goes after the following blocks of code.

  • Starting Spark Session
  • Reading attempts.csv into a dataframe
  • Renaming columns and creating dfattempttrans
  • creating the distinct dfcont from dtattempts (Creating the Contestants Dim)
#Read the first file into a dataframe Teams
dfpeople = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/people.csv")
# dfattempts now is a Spark DataFrame containing CSV data from "Files/Data/Bronze/people.csv".

#And rename a couple of columns
dfpeople = dfpeople.withColumnRenamed("id","contestantID").withColumnRenamed("name","contestant")
display(dfpeople)

We have used .withColumnRenamed to change a couple of columns.

ID to ContestantID and name to contestant

Before moving on. the assumption is that People is a distinct list.

Lets count the rows in each table so far

row_countp = dfpeople.count()
row_countc = dfcont.count()
print(f"Number of rows in the dfpeople: {row_countp}")
print(f"Number of rows in the dfcont: {row_countc}")

There is a lot more people in our new DataFrame. But are they unique?

#We need to test. Do we have unique Contestants?

#We need to import a function for col
from pyspark.sql.functions import *

dfpeople.groupBy('contestant')\
       .agg(min('contestantID').alias('MinContestantID'),count('contestant').alias('TotalRecords'))\
       .filter(col('TotalRecords')>1).show(1000)

Using \ allows us to break the code onto separate lines

We have the same issue here where some of the people have more than one record so lets filter a few

filtered_df = dfpeople.filter(dfpeople['name'] == 'Liza Tarbuck')
filtered_df.show()

The original assumption in Part 1 was incorrect. We have multiple based on the Champion of Champion series here. So the ID is not for the contestant. it’s for the contestant and series and some people have been on multiple series.

Do the IDs match between the tables?

filtered_df = dfcont.filter(dfcont['contestant'] == 'Bob Mortimer')
filtered_df.show()

Yes. So we need to rethink dim contestants. Actually if they exist in both we only need to use people. So we need to join them and check out if this is the case

Checking values exist in People that we can see in attempts rename Columns

We want to join the contestants that we obtained the attempts table to dfpeople. Simply so we can check what is missing?

We know we have more in people. But what happens if there are values missing in people than are in attempts? This will cause us more problems.

We will join on the ID and the contestant so before the join. Lets rename the dfCont columns so we don’t end up with duplicate column names

#And rename a couple of columns
dfcont = dfcont.withColumnRenamed("contestantID","cont_contestantID").withColumnRenamed("contestant","cont_contestant")

display(dfcont)

join dfCont and dfPeople

Please note that when joining tables, its better to create a new DataFrame rather than update an original one, because if you have to run it again, the DataFrame you join from will have changed.

# Original condition
original_condition = dfpeople["contestantID"] == dfcont["cont_contestantID"]

# Adding another condition based on the 'name' column
combined_condition = original_condition & (dfpeople["contestant"] == dfcont["cont_contestant"])

# Now perform the left outer join
dfcontcheck = dfpeople.join(dfcont, on=combined_condition, how="full")

display(dfcontcheck)

So we have everything in one table. Lets do some filtering to check our findings

Filter for Null Values

from pyspark.sql import functions as F
dfcontcheck.filter(F.isnull("cont_contestantID")).show(1000)

72 records that don’t exist in dfattempt which is fine

from pyspark.sql import functions as F
dfcontcheck.filter(F.isnull("contestantID")).show(1000)

And none in attempts that are in people.

People is therefore our master data set and the PySpark code can be changed accordingly.

At the moment there is a lot of checks happening in the PySpark code. Once up and running and ready to be turned into a proper transformation notebook. All these checks could be removed if required to simplify.

Updating the Code with the new Logic

We are taking the code from Part 1 and completely updating based on the new findings

Start Spark session

%%configure
{
    "defaultLakehouse": {  
        "name": "DebbiesFabricLakehouse"
    }
}

Read People into dfPeople DataFrame

#Read the first file into a dataframe Teams
dfpeople = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/people.csv")
# dfattempts now is a Spark DataFrame containing CSV data from "Files/Data/Bronze/people.csv".

#And rename a couple of columns
dfpeopletransf  = dfpeople.withColumnRenamed("id","contestantID").withColumnRenamed("name","contestant")

print(dfpeopletransf.count())
print(dfpeopletransf.distinct().count())
display(dfpeopletransf.distinct())

.withColumnRenamed brings back the dfpeople data set but renames the specified columns

Create dfCont to create our initial contestants DataFrame

Previously we had a lot of code blocks checking and creating Min IDs for a distinct contestant list. but now we have more of an understanding we can do this with less code.

#Lets immediately do the group by and min value now we know what we are doing to create a distinct list (Removing any IDs for a different series)

#import the function col to use. 
from pyspark.sql.functions import col

#Creates the full distinct list. 

dfcont = dfpeopletransf.groupBy('contestant', 'dob', 'gender','hand' )\
       .agg(min('contestantID').alias('contestantID'),count('contestant').alias('TotalRecords'))


display(dfcont)

groupby. all the distinct values we want. Contestant, dob, gender and hand all are unique with the name. (Previously we just had contestant)

Next we agg (aggregate with min contestantID and cont the number of contestants as total records

If you are used to SQL you would see the following

SELECT MIN(contestantID) AS contestantID, contestant, dob, gender, hand, Count(*) AS TotalRecords
FROM dfpeopletransf
GROUP BY contestant, dob, gender, hand

Create a Current Age

# Create a Spark session
#spark = SparkSession.builder.appName("AgeCalculation").getOrCreate()

# Convert the date of birth column to a date type in dfCont
dfcont = dfcont.withColumn("dob", dfcont["dob"].cast("date"))

# add the current date and set up into a new dataframe
df = dfcont.withColumn("current_date", current_date())

#Calculate age using dob and current date 
df = df.withColumn("age_decimal", datediff(df["current_date"], df["dob"]) / 365)

#Convert from decimal to int  
df = df.withColumn("age", col("age_decimal").cast("int"))

#Convert Customer ID to int
df = df.withColumn("contestantID", col("contestantID").cast("int"))

#Update dfCont
dfcont = df.select("contestantID", "contestant", "dob","gender","hand", "age")

dfcont.show(1000)

It would be fun to filter by Age and Age Group. lets create the age first based on the current Date,

  • First of all we ensure date is a date .cast(“date”)
  • Then using .withColumn we add the current date to the data frame current_date()
  • Next. we use datediff to get the age from the date of birth and the current date
  • Set age to int and customerID to int
  • Next update dfCont with the full list of columns (Excluding current_date because that was only needed to create the age.

Create Age group

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col

# Create a Spark session
#spark = SparkSession.builder.appName("ConditionalLogic").getOrCreate()

# Apply conditional logic
dfcont = dfcont.withColumn("ageRange", when(col("age") < 20, "0 to 20")
                               .when((col("age") >= 20) & (col("age") <= 30), "20 to 30")
                               .when((col("age") >= 30) & (col("age") <= 40), "30 to 40")
                               .when((col("age") >= 40) & (col("age") <= 50), "40 to 50")
                               .when((col("age") >= 50) & (col("age") <= 60), "50 to 60")
                               .otherwise("Over 60"))

# Show the result
dfcont.show()

We have created what would be a CASE in SQL

using .WithColumn to create a new column. .when .otherwise are used to create this new data item

Create New Row for -1 NA Default Row

We are back to almost the original code. Creating our Default row but this time with more columns

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a sample DataFrame
data = [(-1, "Not Known","1900-01-01","NA","NA","0","NA")]
columns = ["contestantID", "contestant", "dob", "gender", "hand", "age", "ageRange"]
new_row  = spark.createDataFrame(data, columns)

# Union the new row with the existing DataFrame
dfcont = dfcont.union(new_row)

# Show the updated DataFrame
#dfcont.show(1000)

dfcont.show(1000)

We can now see Not known in the list

Running a DISTINCT just in case we applied the above more than once.

#Just in case. We distinct again to remove any extra NA Rows
dfcont = dfcont.distinct()
dfcont.show(1000)

Checking we have unique Contestants GROUPBY

#And lets check the new distinct list with Min IDs we have made
#We need to import a function for col
from pyspark.sql.functions import *

dfcont.groupBy('contestant', 'dob', 'age','ageRange', 'hand','gender')\
        .agg(count('contestant').alias('TotalRecords'))\
        .filter(col('TotalRecords')>1).show(1000)


#Hopefully there should be no records

We have all unique contestants, including the extra information we have added to the dimension.

Creating the Contestant Key

We can now create slightly better code than Part 1 using less code blocks

#Now finally for this we need a Key a new column
# Imports Window and row_number
from pyspark.sql import Window
from pyspark.sql.functions import row_number

# Applying partitionBy() and orderBy()
window_spec = Window.partitionBy().orderBy("ContestantID")

# Add a new column "row_number" using row_number() over the specified window
dfcont = dfcont.withColumn("ContestantKey", row_number().over(window_spec)- 2)

# Show the result
dfcont.show(1000)

Previously a partition column with the value 1 was added so we could do the following: .partitionBy(“partition”)

But simply using partitionBy() and not including a column removes the need for the extra code and column and does the same thing.

Adding the Team

Previously the Team was added from attempts.csv but we know that we can change this to people now and have better team information.

#Creates the full distinct list. 
dfContTeam = dfpeopletransf.select(col("contestantID").alias("teamcontestantID"), col("contestant").alias("teamContestant"), 
col("team").alias("teamID"),col("team_label").alias("TeamName")).distinct()

display(dfContTeam)

Remove Rows with Empty team information

#Now remove the rows with null teamids

# Import necessary libraries and create a Spark session
#from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()

# Drop rows with NULL values in the "id" column
dfContTeam = dfContTeam.na.drop(subset=["teamId"])

# Show the filtered DataFrame
dfContTeam.show(truncate=False)

Join team to Contestant

#Now what we want to do is add TeamName into dfcont

dfcont = dfcont.join(dfContTeam, dfcont["contestant"] == dfContTeam["teamContestant"], how="left").drop(dfContTeam.teamContestant).drop(dfContTeam.teamcontestantID)

display(dfcont)

This time we cleaned up the code by adding the two columns to drop in the above statement so we don’t have two names and IDs from both data sets.

Create DELTA PARQUET TABLE

from delta.tables import DeltaTable

dfcont.write.mode("overwrite").format("delta").saveAsTable("dimContestant")

Because we are overwriting our new data set should overwrite our old Dimension. However, there will be conflict issues because the schema is so different so for this exercise, The files will be removed before being re-added.

CREATE PARQUET Table in Files Folder

dfcont.write.mode("overwrite").parquet('Files/Data/Silver/DimContestant.parquet')

We now have options of using the Delta PARQUET, or just the PARQUET File.

Conclusion

In Part 1 we used attempts csv for contestants. For Part 2 we did lots of analysis and checked that actually, people was a better data source for contestants. So we changed the input for the initial DataFrame to people.csv

We also used the opportunity to tidy up the code, now we are getting to grips with Pyspark as an alternative to SQL.

Next on the list is to create more dimensions.

  • Dim Task
  • Dim Episode
  • Dim Date

And the Taskmaster Stats Fact Table

Then to do some analysis, using the inbuilt visuals in a Notebook, and Power BI.

Design a site like this with WordPress.com
Get started