Microsoft Fabric Part 18. Taskmaster Project. Adding more meta data to check throughout the Dimension and Fact run. More changes to the Pyspark code

Now we have updated our ProcessedFiles delta parquet, we can update the dimensions accordingly.

  • It might be nice to have another delta parquet file here we can use to collect meta data for dims and facts.
  • We will also want the processedDate in each dim and fact table.

We are going to start with Delta Load 2. So. Series 1 and 2 have already been loaded. We are now loading Series 3 (This shows off some of the logic better)

Dim Contestant.

Bring back the list of Processed Files from ProcessedFiles

Immediately we have a change to the code because its delta Parquet and not Parquet

# Define the ABFS path to the Delta table
delta_table_path = "abfss://########-####-####-####-############@onelake.dfs.fabric.microsoft.com/########-####-####-####-############/Tables/processedfiles"

# Read the Delta table
dflog = spark.read.format("delta").load(delta_table_path)

# Filter the DataFrame
df_currentProcess = dflog[dflog["fullyProcessedFlag"] == 0][["Filename"]]

# Display the DataFrame
display(df_currentProcess)

New Code Block Check processed Files

In the previous code, we:

  • Create a new taskmaster schema to load data into
  • Loop through the files and add data to the above schema
  • Add the current process data into CurrentProcessedTMData.parquet.
  • Add the ProcessDate of the Taskmaster transformed data

We know at some point that we are having issues with the fact table getting duplicate series so we need to check throughout the process where this could be happening.

  • Change the processDate from being the TaskmasterTransformed process date, in case this was done on an earlier day. We want the current date here to ensure everything groups together.
  • We could also set up this data so we can store it in our new Delta parquet Log
  • Because we are also adding process date to the group. we need to be careful. minutes and seconds could create extra rows and we don’t want that
  • We also know that we are doing checks on the data as it goes in. And rechecking the numbers of historical data. so we need a flag for this
from pyspark.sql import functions as F

workspace_id = "########-####-####-####-############"
lakehouse_id = "########-####-####-####-############"

# Define the file path
file_path = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/CurrentProcessedTMData.parquet"

# Read the Parquet file
dfCheckCurrentProc = spark.read.parquet(file_path)

# Add the new column
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("sequence", F.lit(1))
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("parquetType", F.lit("parquet"))
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("analyticsType", F.lit("Transformation"))
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("name", F.lit("CurrentProcessedTMData"))
# Set the date as the current date
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("processedDate", current_date())
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("historicalCheckFlag", F.lit(0))
dfCheckCurrentProc = dfCheckCurrentProc.withColumn("raiseerrorFlag", F.lit(0))

# Select the required columns and count the rows
dfCheckCurrentProc = dfCheckCurrentProc.groupBy("sequence","parquetType","analyticsType","name","source_filename", "processedDate","historicalCheckFlag","raiseerrorFlag").count()

# Rename the count column to TotalRows
dfCheckCurrentProc = dfCheckCurrentProc.withColumnRenamed("count", "totalRows")

# Show the data
display(dfCheckCurrentProc)
  • current_date() gets the current date. We only have date in the grouped auditing table. date and time can go into the dimension itsself in the process date.
  • sequence allows us to see the sequence of dim and fact creation
  • parquetType is either Parquet or Delta Parquet for this project
  • analyticsType is the type of table we are dealing with
  • historicalCheckFlag. Set to 0 if we are looking at the data being loaded. 1 if the checks are against older data.
  • raiseErrorflag if there looks like we have any issues. This can be set to 1 and someone can be alerted using power BI Reporting.

It would be really good to also have a run number here because each run will consist of a number of parquet files. In this case parquet and delta parquet

Get ContestantTransformed data

A small update here. Just in case, we use distinct to make sure we only have distinct contestants in this list

# Ensure the DataFrame contains only distinct values
dfc = dfc.distinct()

Merge Contestants and Taskmaster data

This is the point where we create the Contestant Dimension. the processDate to the current date and alias to to just processedTime.

from pyspark.sql.functions import col, current_date
from pyspark.sql import SparkSession, functions as F

df_small = F.broadcast(dfc)


# Perform the left join
merged_df = dftm.join(df_small, dftm["Contestant"] == df_small["Name"], "left_outer")\
                .select(
                    dfc["Contestant ID"],
                    dftm["Contestant"].alias("Contestant Name"),
                    dftm["Team"],
                    dfc["Image"],
                    dfc["From"],
                    dfc["Area"],
                    dfc["country"].alias("Country"),
                    dfc["seat"].alias("Seat"),
                    dfc["gender"].alias("Gender"),
                    dfc["hand"].alias("Hand"),
                    dfc["age"].alias("Age"),
                    dfc["ageRange"].alias("Age Range"),
                    current_timestamp().alias("processedTime"),
                ).distinct()



# Show the resulting DataFrame
merged_df.show()

Add the Default row

This changes because we have a new column that we need to add.

from pyspark.sql import SparkSession,Row
from pyspark.conf import SparkConf
from pyspark.sql.functions import current_timestamp


workspace_id = "########-####-####-####-############"
lakehouse_id = "########-####-####-####-############"

path_to_parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/dimcontestant")

# Check if the Delta Parquet file exists
delta_file_exists = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()).exists(spark._jvm.org.apache.hadoop.fs.Path(path_to_parquet_file))

if delta_file_exists:
    print("Delta Parquet file exists. We don't need to add the NA row")
else:

    # Add a Default row
    # Create a sample DataFrame
    data = [(-1, "Not Known","Not Known","Not Known","Not Known","Not Known","Not Known","0","NA","0","0","0")]
    columns = ["Contestant ID", "Contestant Name", "Team","Image","From","Area","Country","Seat","Gender","Hand","Age","Age Range"]
    new_row  = spark.createDataFrame(data, columns)

    # Add the current timestamp to the ProcessedTime column
    new_row = new_row.withColumn("ProcessedTime", current_timestamp())

    # Union the new row with the existing DataFrame
    dfContfinal = dfContfinal.union(new_row)

    # Show the updated DataFrame
    dfContfinal.show(1000)

    print("Delta Parquet file does not exist. Add NA Row")

The above screen show is for Series 1 and 2. Just to show you that the row is created if its the first time.

All the process dates are the same (at least the same day) ensuring everything will be grouped when we add it to the audit delta parquet table.

However, this is displayed for our 2nd load for series 3 (2nd load)

Check the new data that has been loaded into the delta parquet.

Checking the data, checks the dimension that has been created as delta parquet This is the last code section and we want to build more in after this

Get the Contestant Names and Source Files of the Current Load

Because we can process either 1 file alone, or multiple files. we need to get the source file name AND the contestant name at this point.

#Get just the contestant name and source File name from our currrent load

df_contestant_name = dftm.selectExpr(“`Contestant` as contestantNameFinal”, “`source_filename` as sourceFileNameFinal”).distinct()

df_contestant_name.show()

For the audit we need to know what our current Contestants are and who are the Contestants we have previously worked with. Also the original file they below too.

We took this information from an earlier dataframe when we loaded the source file to be used to create the dimension. This still contains the source file name. The Dimension does not contain the file name any more.

Join df_contestant_name to dfnewdimc to see what data is new and what data has been previously loaded

from pyspark.sql.functions import broadcast, when, col

df_joined = dfnewdimc.join(
    df_contestant_name,
    dfnewdimc.ContestantName == df_contestant_name.contestantNameFinal,
    how='left'
)
# Add the currentFlag column
df_joined2 = df_joined.withColumn(
    "historicalCheckFlag", 
    when((col("contestantNameFinal").isNull()) & (col("ContestantKey") != -1), 1).otherwise(0))

# Drop the ContestantNameFinal column
df_joined3 = df_joined2.drop("contestantNameFinal")

#If its an audit row set sourceFileNameFinal to Calculated Row
df_joined4 = df_joined3.withColumn(
    "sourceFileNameFinal",
    when((col("sourceFileNameFinal").isNull()) & (col("ContestantKey") == -1), "Calculated Row")
    .otherwise(col("sourceFileNameFinal"))
)

# Show the result
display(df_joined4)

This joins the name we have just created to the data we are working with so we can set the historicalCheckFlag. For the first load everything is 0 because they are current.

In this load we can see some 1’s These are records already in the system and don’t match to our current load.

This file also now contains the source_filename

If its a default row, there will be no source filename so we set to calculated Row instead

Creating Dim Audit row

from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql import DataFrame

# Add the new column

dfdimaudit = df_joined4.withColumn("sequence", F.lit(2))

dfdimaudit = dfdimaudit.withColumn("parquetType", F.lit("Delta parquet"))

dfdimaudit = dfdimaudit.withColumn("analyticsType", F.lit("Dimension"))

dfdimaudit = dfdimaudit.withColumn("name", F.lit("Dim Contestant"))

# Extract the date from source_processedTime
dfdimaudit  = dfdimaudit.withColumn("processedDate", F.to_date("processedTime"))
dfdimaudit =  dfdimaudit.withColumn("raiseerrorFlag", F.lit(0))

# Select the required columns and count the rows
dfdimauditgr = dfdimaudit.groupBy("sequence","parquetType","analyticsType","name", "sourceFileNameFinal","processedDate","historicalCheckFlag","raiseerrorFlag").count()


# Alias sourceFileNameFinal to sourceFileName
dfdimauditgr = dfdimauditgr.withColumnRenamed("sourceFileNameFinal", "source_filename")

# Alias count to totalRowsdfdimauditgr = dfdimauditgr.withColumnRenamed("count", "totalRows")

# Show the data
display(dfdimauditgr)  

The files are grouped on processed time. So we need to deal with this accordingly so set date and time to just date using to_date() (To avoid minutes and seconds causing issues)

Our default row is set against, calculated Row.

The null source file name is the total count of our historical records

We are going to append the dim audit to the initial current processed audit so they both need exactly the same columns and these are

  • sequence
  • parquetType
  • analyticsType
  • name
  • source_filename
  • processedDate
  • totalRows
  • historicalCheckFlag
  • raiseerrorFlag

and we have the correct source file name. Even if we process multiple files.

Add Historical Data Check to null source filename

Append the audit for the transformation data and Dimension data

# Append the current processed transformation file(s) audit row with the dimension audit

audit3_df = dfCheckCurrentProc.unionByName(dfdimauditgr)

display(audit3_df) 

With the S3 file loaded we can see

  • Transformation rows for series 3.
  • Dimension rows for Contestant S3
  • The historical row count of all series already in the dimension.

Create Batch numbers for the audit.

If its the first time we run this process. Set to 0. Else set to the Match Last Batch No + 1

from pyspark.sql.functions import lit

# Function to check if a table exists
def table_exists(spark, table_name):
    return spark._jsparkSession.catalog().tableExists(table_name)

# Table name
table_name = "SilverDebbiesTraininglh.processeddimfact"

if table_exists(spark, table_name):

    # If the table exists, get the max batchno
    df = spark.table(table_name)
    max_batchno = df.agg({"batchno": "max"}).collect()[0][0]
    next_batchno = max_batchno+1

    # Add the batchNo column with a value of 0
    audit3_df = audit3_df.withColumn("batchNo", lit(next_batchno))

    print(f"processedDimFact data already exists. Next batchno: {next_batchno}")
    display(audit3_df) 
else:

    # Add the batchNo column with a value of 0
    audit3_df = audit3_df.withColumn("batchNo", lit(0))


    # Reorder columns to make batchNo the first column
    columns = ['batchNo'] + [col for col in audit3_df.columns if col != 'batchNo']
    audit3_df = audit3_df.select(columns)

    print("Table does not exist. Add Batch No 0")

    display(audit3_df) 

The following gives more information about the code used.

  • def table_exists(spark, table_name): defines a function named table_exsts that takes two parameters. the spark session object and the table name
  • Next we define a function named table_exists that will look for the table name
  • spark._jsparkSession.catalog() accesses the spark session catalogue which contains metadata about the entities in the spark session.
  • Basically, this is checking if the table name exists in the spark session catalogue. It it does we get the match Batch no from the table. Add 1 and add that as the column next_batchno
  • If its false we start batchno at 0

How does the column reordering work?

  • [‘batchNo’]: This creates a list with a single element, ‘batchNo’. This will be the first column in the new order.
  • [col for col in audit3_df.columns if col != ‘batchNo’]: This is a list comprehension that iterates over all the columns in audit3_df and includes each column name in the new list, if removes ‘batchNo’ from the list.

List Comprehension: Create lists by processing existing lists (Like columns here) into a single line of code.

  • Combining the lists: The + operator concatenates the two lists. So, [‘batchNo’] is combined with the list of all other columns, resulting in a new list where ‘batchNo’ is the first element, followed by all the other columns in their original order.

And here we can see in Load 2 we hit the if section of the above code. Our batch no is 1 (Because the first load was 0)

Reset totalRows to Integer

Found that totalRows was set to long again so before adding into delta parquet. reset to int.

Bring back the processeddimFact before saving with the new rows

Before saving the new audit rows we want to know if there are any issues. This means we need the data. However we also need to deal with the fact that this may be the first process time and there won’t be a file to check.

table_path = "abfss://########-####-####-####-############@onelake.dfs.fabric.microsoft.com/########-####-####-####-############/Tables/processeddimfact"
table_exists = DeltaTable.isDeltaTable(spark, table_path)

if table_exists:
    # Load the Delta table as a DataFrame
    dfprocesseddimfact = spark.read.format("delta").table("SilverDebbiesTraininglh.processeddimfact")

    # Display the table
    dfprocesseddimfact.show()  # Use show() instead of display() in standard PySpark

    # Print the schema
    dfprocesseddimfact.printSchema()

    print("The Delta table SilverDebbiesTraininglh.processeddimfact exists. Bring back data")
else:

    # Define the schema
    schema = StructType([
        StructField("batchNo", IntegerType(), nullable=False),
        StructField("sequence", IntegerType(), nullable=False),
        StructField("parquetType", StringType(), nullable=False),
        StructField("analyticsType", StringType(), nullable=False),
        StructField("name", StringType(), nullable=False),
        StructField("source_filename", StringType(), nullable=True),
        StructField("processedDate", DateType(), nullable=True),
        StructField("historicalCheckFlag", IntegerType(), nullable=False),
        StructField("raiseerrorFlag", IntegerType(), nullable=False),
        StructField("totalRows", IntegerType(), nullable=False)
    ])

    # Create an empty DataFrame with the schema
    dfprocesseddimfact = spark.createDataFrame([], schema)

    dfprocesseddimfact.show()

    print("The Delta table SilverDebbiesTraininglh.processeddimfact does not exist. Create empty dataframe")

If there is no data we can create an empty schema.

Get the Total of Dim Contestant rows (None Historical)

from pyspark.sql.functions import col, sum

# Filter the dataframe where name is "Dim Contestant"
filtered_df = dfprocesseddimfact.filter((col("Name") == "Dim Contestant") & (col("historicalCheckFlag") == 0)& (col("source_filename") != "Calculated Row"))

display(filtered_df) 


# Calculate the sum of TotalRows
total_rows_sum = filtered_df.groupBy("Name").agg(sum("TotalRows").alias("TotalRowsSum"))

# Rename the "Name" column to "SelectedName"
total_rows_sum = total_rows_sum.withColumnRenamed("Name", "SelectedName")


display(total_rows_sum) 

# Print the result
print(f"The sum of TotalRows where name is 'Dim Contestant' is: {total_rows_sum}")

Both Initial (None Historical Loads) equal 10

Get Historical Rows from our latest process and join to total Loads of the previous Load

#Join total_rows_sum to audit3_df
from pyspark.sql.functions import broadcast

# Broadcast the total_rows_sum dataframe
broadcast_total_rows_sum = broadcast(total_rows_sum)

# Perform the left join
df_checkolddata = audit3_df.filter(
    (audit3_df["historicalCheckFlag"] == 1) & (audit3_df["source_filename"] != "Calculated Row")
    ).join(
    broadcast_total_rows_sum,
    audit3_df["name"] == broadcast_total_rows_sum["SelectedName"],
    "left"
)

# Show the result
display(df_checkolddata)

So Series 1 and 2 have 10 contestants. After S3 load there are still 10 contestants. Everything is fine here. The historical records row allows us to ensure we haven’t caused any issues.

Set Raise Error Flag if there are issues

And now we can set the error flag if there is a problem which can raise an alert.

from pyspark.sql.functions import col

# Assuming df is your DataFrame
total_rows_sum = df_checkolddata.agg({"TotalRowsSum": "sum"}).collect()[0][0]
total_rows = df_checkolddata.agg({"totalRows": "sum"}).collect()[0][0]

# Check if TotalRowsSum equals totalRows
if total_rows_sum == total_rows:
    print("No issue with data")

else:
    df_checkolddata = df_checkolddata.withColumn("raiseerrorFlag", lit(1))

    print("Old and historical check does not match. Issue. Set raiseerrorFlag = 1")

# Show the updated DataFrame
df_checkolddata.show()

raiseerrorFlag is still 0

Remove the historical check row if there is no error

from pyspark.sql.functions import col

# Alias the DataFrames
audit3_df_alias = audit3_df.alias("a")
df_checkolddata_alias = df_checkolddata.alias("b")

# Perform left join on the name column and historicalCheckFlag
df_audit4 = audit3_df_alias.join(df_checkolddata_alias, 
                              (col("a.name") == col("b.name")) & 
                              (col("a.historicalCheckFlag") == col("b.historicalCheckFlag")), 
                              how="left")

# Select only the columns from audit3_df and TotalRowsSum from df_checkolddata, and alias historicalCheckFlag
df_audit5 = df_audit4.select(
    "a.*",
    col("b.raiseerrorFlag").alias("newraiseErrorFlag")
)


# Filter out rows where newraiseerrorflag is 0 because its fine. We don';'t need to therefore know about the historical update
df_audit6  = df_audit5.filter((col("newraiseErrorFlag") != 0) | (col("newraiseErrorFlag").isNull()))

# Remove the columns newraiseerrorflag
df_audit7 = df_audit6.drop("newraiseErrorflag")


# Show the result
display(df_audit7)

the historical row has been removed. if there isn’t a problem, there is no need to hold this record.

If the data has multiple audit rows. Flag as an issue. Else remove

# Count the number of rows where ContestantName is "Not Known"
dfcount_not_known = dfnewdimc.filter(col("ContestantName") == "Not Known").count()

# Print the result
print(f"Number of rows where ContestantName is 'Not Known': {dfcount_not_known}")

# Check if count_not_known is 1
if dfcount_not_known == 1:
    # Remove row from df_audit7 where source_filename is "CalculatedRow"
    df_audit8 = df_audit7.filter(col("source_filename") != "Calculated Row")

    print(f"1 dim audit row. We can remove the row from the audit table")

else:
    # Set source_filename to "Multiple Calculated Rows issue"
    df_audit8 = df_audit7.withColumn("source_filename", 
                                     when(col("source_filename") == "Calculated Row", 
                                          lit("Multiple Calculated Rows issue"))
                                     .otherwise(col("source_filename")))


    print(f"multiple dim audit row. We can specify this in the audit table")                                

# Show the updated df_audit7
df_audit8.show()

The Audit rows has now been removed. Unless there is an issue we don’t need it. this will be useful if we accidentally process multiple audit rows into the table.

Save as Delta Parquet

from delta.tables import DeltaTable

#You can also add the none default daya lake by clicking +Lakehouse
audit3_df.write.mode("append").option("overwriteSchema", "true").format("delta").saveAsTable("SilverDebbiesTraininglh.processedDimFact")

# Print a success message
print("Parquet file appended successfully.")

And save the data as a Delta Parquet processedDimFact in our Silver transform layer.

We can use both of these Delta Parquet Files later.

Check processdimfact

Dim Contestant is complete for now. But we will come back later to update the code.

Dim Episode

Create Episode Dim

Small change. We are adding processedTime as the current date and time.

from pyspark.sql.functions import current_timestamp

dftmEp = dftm.select("Series","Episode No" ,"Episode Name").distinct()

# Add the current timestamp as processedTime
dftmEp = dftmEp.withColumn("processedTime", current_timestamp())

dftmEp = dftmEp.orderBy("Series", "Episode No")

display(dftmEp)

Add a Default row

Just like Contestant. We need ProcessedTime adding

from pyspark.sql.functions import current_timestamp
 # Add the current timestamp to the ProcessedTime column
    new_row = new_row.withColumn("ProcessedTime", current_timestamp())

Again, Here is the screenshot taken on the first load of S1 and S2

And our S3 Load 2 hits the if block and no default row is created

Get the Series, Episode Name and Source File of the Processed data

#Get just the series, Episode Name and source_filename from our current load

df_SeEp = dftm.selectExpr("`Series` as SeriesFinal", "`Episode Name` as EpisodeFinal", "source_filename").distinct()

display(df_SeEp) 

` are used in PySpark to handle column names that contain special characters, spaces, or reserved keywords.

Join df_SeEp to dfnewdimt

Join the list pf currently processed series and episodes to everything in the dimension so far (the data frame we created when we checked the dim


from pyspark.sql.functions import broadcast, when, col

# Broadcast df_contestant_name and perform the left join
df_joined = dfnewdimt.join(
    broadcast(df_SeEp),
    (dfnewdimt.Series == df_SeEp.SeriesFinal) & (dfnewdimt.EpisodeName == df_SeEp.EpisodeFinal),
    "left"
)
# Add the currentFlag column
df_joined2 = df_joined.withColumn("historicalCheckFlag", when(col("SeriesFinal").isNull(), 1).otherwise(0))

# Drop the ContestantNameFinal column
df_joined3 = df_joined2.drop("SeriesFinal").drop("EpisodeFinal")

#If its an audit row set sourceFileNameFinal to Calculated Row
df_joined4 = df_joined3.withColumn(
    "source_filename",
    when((col("source_filename").isNull()) & (col("EpisodeKey") == -1), "Calculated Row")
    .otherwise(col("source_filename"))
)

# Show the result
display(df_joined4)

All historicalCheckFlags are set to 0 if its the current Load (s3). 1 when there is no match and its historical data. the source_filename from df.SeEp has been left in the result which can be used later.

Again we have set the Episode Keys source_filename to “calculated Row” because there is no source to join this one record on by the series and episode name.

Add audit row

At the end of the process. We will add an audit row

from pyspark.sql import functions as F

# Add the new column
dfdimaudit = df_joined4.withColumn("sequence", F.lit(3))

dfdimaudit = dfdimaudit.withColumn("parquetType", F.lit("Delta parquet"))

dfdimaudit = dfdimaudit.withColumn("analyticsType", F.lit("Dimension"))

dfdimaudit = dfdimaudit.withColumn("name", F.lit("Dim Episode"))

# Extract the date from source_processedTime
dfdimaudit  = dfdimaudit.withColumn("processedDate", F.to_date("processedTime"))

dfdimaudit =  dfdimaudit.withColumn("raiseerrorFlag", F.lit(0))

# Select the required columns and count the rows
dfdimauditgr = dfdimaudit.groupBy("sequence","parquetType","analyticsType","name", "source_filename","processedDate","historicalCheckFlag","raiseerrorFlag").count()

# Alias count to totalRows
dfdimauditgr = dfdimauditgr.withColumnRenamed("count", "totalRows")

# Show the data
display(dfdimauditgr) 

for Load 2 we have 5 records in the current load with the source filename set. The historical load of 11 and the calculated Audit -1 row.

Bring back our processeddimfact data

from delta.tables import DeltaTable
from pyspark.sql.functions import col, max

# Load the Delta table as a DataFrame
dfprocesseddimfact = spark.read.format("delta").table("SilverDebbiesTraininglh.processeddimfact")

# Find the maximum BatchNo
max_batch_no = dfprocesseddimfact.agg(max("BatchNo")).collect()[0][0]

# Filter the DataFrame to include only rows with the maximum BatchNo
dfprocesseddimfact = dfprocesseddimfact.filter(col("BatchNo") == max_batch_no)

# Display the table
display(dfprocesseddimfact)
dfprocesseddimfact.printSchema()

Here we find the max batch number and then filter our data frame to only give us the max batch number which is the one we are working on. (Because at this point, we are in the middle of a batch of dims and facts)

Create a distinct batch No

distinct_batch_no = dfprocesseddimfact.select("batchNo").distinct()

display(distinct_batch_no)

We just want that distinct Max Batch number for the next process

Join to our Dim Episode Audit row to bring in the batch number via Left outer join

# Alias the dataframes
dfdimauditgr_alias = dfdimauditgr.alias("dfdimauditgr")
distinct_batch_no_alias = distinct_batch_no.alias("dfprocesseddimfact")

# Perform inner join
audit2_df = dfdimauditgr_alias.crossJoin(distinct_batch_no_alias)

# Select specific columns
audit2_df = audit2_df.select(
    distinct_batch_no_alias.batchNo,
    dfdimauditgr_alias.sequence,
    dfdimauditgr_alias.parquetType,
    dfdimauditgr_alias.analyticsType,
    dfdimauditgr_alias.name,
    dfdimauditgr_alias.source_filename,
    dfdimauditgr_alias.totalRows,
    dfdimauditgr_alias.processedDate,
    dfdimauditgr_alias.historicalCheckFlag,
    dfdimauditgr_alias.raiseerrorFlag
)

audit2_df = audit2_df.distinct()

# Show result
display(audit2_df)

The cross join (Cartesian Join) is because there is only 1 batch number set in distinct_batch_no (the Max Batch number is the one we are working on) So no need to add join criteria.

Set totalRows to Int

Once again TotalRows needs setting to Int

Get the processeddimfact data

# Load the Delta table as a DataFrame
    dfprocesseddimfact = spark.read.format("delta").table("SilverDebbiesTraininglh.processeddimfact")

    # Display the table
    dfprocesseddimfact.show()  # Use show() instead of display() in standard PySpark

    # Print the schema
    dfprocesseddimfact.printSchema()
    # Load the Delta table as a DataFrame
    dfprocesseddimfact = spark.read.format("delta").table("SilverDebbiesTraininglh.processeddimfact")

    # Display the table
    dfprocesseddimfact.show()  # Use show() instead of display() in standard PySpark

    # Print the schema
    dfprocesseddimfact.printSchema()

We will need this to check our historical checked data against the original update

Get the total rows of our None historical rows that have been previously loaded

Same logic as Contestant

Join Historical records with new audit to see if the numbers have changed

No changes to the old numbers.

Set raiseerrorFlag if there are issues

There are no issues

If no issues. Record is removed

As there are no issues the record is removed

If Audit row -1 is only in once. No need to hold the audit row

Append into processeddimfact

And check the data looks good

We head off into Dim Task now and here is where we hit some issues. We need to add more code into the process.

Dim Task

checking the count of rows between the original load, and the historical check

When we count the rows of processed data they come to 64. But when we run the check again it comes to 65. Which would trigger the raise error flag. And this is why

dfnewdimt_filtered = dfnewdimt.filter(dfnewdimt["Task"] == "Buy a gift for the Taskmaster")
display(dfnewdimt_filtered)

I settled on Task and TaskOrder to create the unique row. However this isn’t the case. we have two series where our Task and Task Order are identical.

What we really want is one record in the dimension applied to the correct seasons in the fact table. Not a duplicate like we have above.

The following code is adding Series 3 to both “Buy a gift for the taskmaster” and is consequently undercounting.

Series three is a good opportunity to update the logic as we know there is a record already added from season 2

Bring back old dimTask

from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Initialize Spark session
#spark = SparkSession.builder.appName("DeltaTableCheck").getOrCreate()

# Path to the Delta table
table_path = "abfss://########-####-####-####-############@onelake.dfs.fabric.microsoft.com/########-####-####-####-############/Tables/dimtask"

# Check if the Delta table exists
if DeltaTable.isDeltaTable(spark, table_path):
    # Load the Delta table as a DataFrame
    dfprevdimt = spark.read.format("delta").load(table_path)
    
else:
    # Define the schema for the empty DataFrame
    schema = StructType([
        StructField("Task", StringType(), True),
        StructField("TaskOrder", IntegerType(), True),
        StructField("TaskType", StringType(), True),
        StructField("Assignment", StringType(), True),
        StructField("ProcessedTime", TimestampType(), True)
    ])
    
    # Create an empty DataFrame with the specified schema
    dfprevdimt = spark.createDataFrame([], schema)

    print("no dim. Create empty dataframe")

# Display the DataFrame
dfprevdimt.show()

If its the first time, create an empty dataset. else bring back the dataset

Join new Tasks to all the Previously added tasks

The decision has been made to use all the columns in the join, Just in case a Task has a different assignment or Type.

Our Task that we know is identical for Series 2 and 3 appears. We use the dftm dataframe. This contains the data before we create the distinct dimension, as this contains source_filename. Inner Join only brings back the matching records.

Add this to a Delta Parquet File

We can now have a static item we can use later. and we can report on these items

If task already exists. Remove from the new load

from pyspark.sql.functions import col,current_timestamp

# Alias the DataFrames
dfprevdimt_df_alias = dfprevdimt.alias("old")
dftm_alias = dftm.alias("new")

# Perform left join on the name column and historicalCheckFlag
dftmdupRemoved = dftm_alias.join(dfprevdimt_df_alias, 
                              (col("new.Task") == col("old.Task")) & 
                              (col("new.Task Order") == col("old.TaskOrder"))& 
                              (col("new.Task Type") == col("old.TaskType"))& 
                              (col("new.Assignment") == col("old.Assignment")), 
                              how="Left_anti")



#And reset the dimension
dfTasksdupRemoved = dftmdupRemoved.select("Task","Task Order" ,"Task Type", "Assignment").distinct()

# Add the current timestamp as processedTime
dfTasksdupRemoved = dfTasksdupRemoved.withColumn("processedTime", current_timestamp())

# Show the result
display(dfTasksdupRemoved)

The left anti join bring back records that exist in the new data frame and removes anything that also exists in the pre existing data

dtfm is again used so we have to reset the dimension.

dfTasksdupRemoved now becomes our main dataframe so we need to ensure its used across the Notebook where neccessary

We then go on to create the default row (If its never been run before) and keys

Add a Default Row

Code changed to use dfTasksdupRemoved

Create the Task Key

Again, we have had a change to code to use the updated dataframe

Get Task details from current load

Previously, This was just Task and Order but we know that we need more in the join.

Create a historical Record check

Buy a gift for the Taskmaster is flagged as historical (Its in Series 2) and we have removed it from series 3. And we have added this information to a Delta Parquet.

We have also added more join criteria.

Create the Audit Rows

Add a value into source filename if full because its a historical record

Bring back the ProcessedDimfact

Get the Current batch

Join the batch number into the dataframe

Load the processed Dim Fact

Count the rows

Total Rows is correct at 65

Add the Rows of the historical check against rows that were originally processed

These now match

If Issue. Set RaiseErrorFlag = 1

No issues

Remove historical check if there are no issues

Check that there aren’t multiple rows in the data for the default -1 row. If all ok. remove default row from audit

And append the data into processeddimfact.

There is other logic to consider here. Imagine if we loaded in Series 1,2 and 3 together. Would the logic still remove the duplicate task. This will need running again to test

Taskmaster Fact

The fact table doesn’t have default records so we need to deal with this slightly differently. Lets run through the code again

The new fact data has been processed at the point we continue with the logic

Get Data of current process we can join on

Join current and already processed Fact data

  • df_currentfact is the current S3 data
  • dfnewfact is the newly loaded fact table that contains everything. S1,2 and 3.

dfnewfact is on the left of the join. So logically. If current and new both exist we get the match.

Create the Audit Row

If source Filename is null. Add Historical Data Check

Bring back the latest batch from processeddimFact

Get Current Batch No

Add the batch number to the audit

Load all of the processed dim fact

Total Up the none historical rows (totals created at the time)

Add the total rows (As at the time they were created to the total historical rows)

We can already see they match. 307 rows for S1 and S2

Reset the Raise Error flag to 1 if there is a problem

Remove the historical check row if there are no issues

And now you can append into the Delta parquet table and check the processed dim and fact.

And then we are back to setting the processedflag to 1 at the end of the fact table.

Conclusion

Now we have an audit of our Loads (2 Loads so far)

We have done a lot of work creating auditing. We can now add files and hopefully run the pipeline to automate the run. then we can create Power BI reporting.

We now have three audit Delta parquet tables

  1. processedFiles – This goes through each Taskmaster Series File and allows you to see what you are working on. the fullyProcessedFlag is set to 0 when the fact table is complete
  2. processeddimFact – (Shown above) The detailed audit for the transformation, dim and fact tables. Containing, number of records. The date time processed. And a flag for issues
  3. dimremovenoneuniquetaskaudit. Tasks are possible to duplicate. For example, we have the same task (And all of its components like sequence) are identical between series 2 and 3. We only need the one row so the duplicate is removed

There is more that we could do with this. of course there always is. However the big one is to make sure that we get none duplicated task information if series 2 and 3 are processed together.

We could also add more detail into the audit. For example any issues with the data such as Column A must always be populated with either A B C or D. If Null then flag as error. If anything other that specified codes then error.

We will come back to this but in the next blog I want to try something a bit different and set up the Data Warehouse. How would this project look if using the warehouse. And why would you choose one over the other.

So in part 19 we will look at setting the warehouse up and how we deal with the source data in the data lake

Microsoft Fabric Part 17. Taskmaster Project. Adding more Pyspark code and meta data to the Transformed Data file

In Part 16 we created a pipeline to run through our 5 Notebooks.

We also ran some sql to check the data and found that Series 1 2 and 3 had been added 4 times into the delta parquet files.

We want to add more information through the run so we can error check the loading. So in this blog we are going back to the parquet files to make some changes.

This post will specifically be for the Transformed parquet file in the silver layer.

We are again, deleting al the files created and starting again. However, instead of starting with S1 S2 and S3 in the data lake we are going to start with S1 only. And build from there.

These amendments are made before we get started

Only the changes will be mentioned. We won’t go through all the code blocks again if there is no change to them.

Taskmaster Transformed

Update the Log Schema

We are changing our log file to include NoRows and contestanttransformednoRows (So we can check that we aren’t adding duplicates here)

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType,TimestampType


# Create a Spark session
spark = SparkSession.builder.appName("CreateEmptyDataFrame").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("filename", StringType(), True),
    StructField("processedTime", TimestampType(), True),
    StructField("NumberOfRows", IntegerType(), True),
    StructField("ContestantTransformedNumberOfRows", IntegerType(), True),
    StructField("fullyProcessedFlag", IntegerType(), True)
])


# Create an empty DataFrame with the specified schema
df_log  = spark.createDataFrame([], schema)

# Show the schema of the empty DataFrame
df_log.printSchema()

display(df_log)

Check ProcessedFiles exist

The next step is to check if the ProcessedFile exist. However, we want to change this file from parquet (file) to Delta parquet (Table) so this codeblock needs to change

from pyspark.sql.utils import AnalysisException

# Define the tablename
table_name = “SilverDebbiesTraininglh.ProcessedFiles”

# Check if the file exists
try:
spark.read.format(“delta”).table(table_name)

print(f”delta parquet {table_name} already exists. Skipping write operation.”)

except Exception as e:
if “TABLE_OR_VIEW_NOT_FOUND” in str(e):
# File does not exist, proceed with writing
df_log.write.format(“delta”).mode(“append”).saveAsTable(table_name)
print(f”File {table_name} does not exist. Writing data to {table_name}.”)

else:
# Re-raise the exception if it’s not the one we’re expecting
raise e

The variable e holds the exception object

The processedFiles file is now a Delta Parquet managed table. We will need to address this throughout the processing of the data.


List all the files that have already been processed

Another code block to change because we are now looking at tables, not files

# This creates a dataframe with the file names of all the files which have already been processed

df_already_processed = spark.sql("SELECT * FROM SilverDebbiesTraininglh.processedfiles")


display(df_already_processed)

At the moment, there is no data available.

Check the number of records again when null rows have been deleted

Part of the transform code. There is a possibility that there are null rows come through from the file.

We need to run the following code again after we remove the null rows

from pyspark.sql.functions import count

notmrows_df = dftm.groupBy("source_filename").agg(count("*").alias("NumberOfRows"))
   
display(notmrows_df)

We don’t want to log the number of rows and then find out that we have lots of null rows that have been lost and not recorded.

Join Contestants to Taskmaster data, only return contestants in the current set

Found an issue here. Josh is also in Champion of Champions and is in here twice. We only want S1. Therefore we need to change this join to reflect this.

We need to go back up one block to

This is what we join to. We create a dataframe that gives us the min episode date for age as at that point in time. We only join to Contestant. We need to also join on Series to avoid this issue.

Updated to include Series

Now back to the join.

# Join the min_episode_date into contestants
dfcont = dfcont.join(dfminSeriesCont,
            (dfcont["Name"] == dfminSeriesCont["Contestant"])&
            (dfcont["series_label"] == dfminSeriesCont["Series"]), "inner")\
.drop(dfminSeriesCont.Contestant)\
.drop(dfminSeriesCont.Series)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dfcont)

Series has been added to the join. And we remove Series and Contestant using drop, so we only get the data from the Contestant side.

Now we only have 5 records. 5 contestants to a series

Get the number of rows from the contestants file

We also want to record how many rows we have in the contestants file so we can check if anything has errored here.

from pyspark.sql.functions import count

cresult_df = dfcontAge.groupBy("series_label").agg(count("*").alias("ContestantTransformedNumberOfRows"))

display(cresult_df)

Add number of Rows to the log for both the taskmaster file and the contestant file

# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, current_date, col,regexp_extract

#Get the distinct list of file names we are going to process
filenamesdf = dftm.withColumn("filename",col("source_filename")).select("filename").distinct()

# Remove everything after the "?" character, there used to be more in here but we dont have to worry about this since a code change
filenamesdf= filenamesdf.withColumn("filename", regexp_replace("filename", "\\?.*", ""))

#Add a process Date
filenamesdatedf = filenamesdf.withColumn("processedTime",current_timestamp())

# Join df_log with result_df on the filename column
filenamesRowsdf = filenamesdatedf.join(notmrows_df.select("source_filename", "NumberOfRows"), filenamesdatedf.filename ==notmrows_df.source_filename, "left").drop("source_filename")

display(filenamesRowsdf)

# add in the Contestant Transformed number of rows
filenamescnorowsdf = filenamesRowsdf.join(
    cresult_df.select("series_label", "ContestantTransformedNumberOfRows"),
    regexp_extract(filenamesRowsdf["filename"], r"Taskmaster_(S\d+)_", 1) == cresult_df["series_label"],
    "left"
    ).drop("series_label")

display(filenamescnorowsdf)


#Add a flag
df_log = filenamescnorowsdf.withColumn("fullyProcessedFlag",lit(0))

display(df_log)

Adding NumberOfRows

Adding the extra code has selected source_filename and NumberOfRows from notmrows_df. Left joined to our filenamesdatedf

Adding Contestant Number of Rows

Same again but we have to take the S1 from the filename using regexp_extract

Finally we add the flag.

This matches our schema.

Change ContestantTransformedNumberOfRows and NumberOfRows: from long to int

We have another issue in that an int has been reset to long. We need to deal with this before adding into the delta parquet.

from pyspark.sql.functions import col

# Change the data type of ContestantTransformedNumberOfRows and NumberOfRows from long to integer
df_log = df_log.withColumn("ContestantTransformedNumberOfRows", col("ContestantTransformedNumberOfRows").cast("integer"))
df_log = df_log.withColumn("NumberOfRows", col("NumberOfRows").cast("integer"))

# Display the schema to verify the change
df_log.printSchema()

And change the last section when we update processedFiles to Delta Parquet.

# Define the tablename
table_name = "SilverDebbiesTraininglh.ProcessedFiles"


df_log.write.format("delta").mode("append").saveAsTable(table_name)

print(f"File {table_name} does not exist. Writing data to {table_name}.")     

Check the processedFiles Parquet File

Finally, change this code to check the Delta Parquet file.

# This creates a dataframe with the file names of all the files which have already been processed

df_already_processed = spark.sql("SELECT * FROM SilverDebbiesTraininglh.processedfiles")


display(df_already_processed)

Conclusion

We have done a few updates here.

First of all. The ProcessedFiles Parquet has been changed to Delta Parquet for a few reasons. one is that we can use the SQL Analytic Endpoint to check the file. Another is that we can connect to the Delta Parquet with Power BI and create Audit Reporting.

Another change is that we have introduced new fields to the ProcessFiles Delta Parquet.

  • We have the NumberOfRows from the Taskmaster file. Always split by series.
  • And ContestantTransformedNumberOfRows to check that we just have 5 contestants for every series in the ContestantsTransformed File.

We can now move on an update the dimensions.

Microsoft Fabric Part 17. Taskmaster Project. The Pipeline. Run multiple notebooks from one Notebook activity using the foreach activity

In Part 16 we processed Series 4 of Taskmaster. we had a lot of issues along the way to resolve.

Now we can create a pipeline to process all of the Notebooks we have created. We will start as simple as possible and try and build in better processes as we go along.

New Data Pipeline

Back in the Fabric workspace. Create the first Data Pipeline

Add activity: Notebook 1 Transformed

Click on Notebook to create the first Notebook

Its very simple at the moment. There are no parameters and nothing else to set.

Now for the next Notebook

Create an on Success connection between Transformed Silver and Dim Contestant.

Now we can set up the rest of the pipeline

This is as basic as it gets and it all goes well, this will be fine. However there are some things we haven’t dealt with

  • We have successes. But what happens if anything fails?
  • Each one of these is a Notebook. Imagine having lots of these here. Is there a way of having one Notebook that processes each notebook iteratively? This would clean up the pipeline
  • It would be super useful to record the metrics. What has been processed. How many rows etc. Can we get metrics from the run?

Iterate through Each Notebook

Lets create our first Pipeline post to process each notebook using just one Notebook activity.

In the first instance. We are going to add series 5 into the Data Lake. Technically, each Notebook should run and do nothing because there is no data to add

Create a csv file

Also get the IDs of the workspace and the notebooks. These were taken from the Fabric url’s. e.g.

And add this file into the bronze delta lake

Now we should be able to use this information in the pipeline

Create a lookup

In the pipeline we need a Lookup activity to get the information from the JSON file

And we can preview the data

Add a ForEach Activity

Drag and drop a ForEach activity onto your pipeline canvas and create an On Success Relationship between this and the Lookup.

Sequential is ticked because there are multiple rows for each notebook and we want to move through them sequentially.

Set the Items in Settings by clicking to get to the pipeline expression builder

We are using the output.value of our lookup activity.

@activity(‘GetNotebookNames’).output.value

  • @activity refers to the actual activity we are using. GetNotebookNames
  • .output accesses the output of the activity
  • .value gets the specific value of the output.

Configure the Notebook Activity Inside ForEach

Inside the Foreach. Add a Notebook

Again, click to use the Dynamic content expression builder to build

Workspace ID: @item().workspaceID

Notebook ID: @item().notebookID

Note, when you first set this up you see Workspace and Notebook Name. It changes to ID, I think because we are using the item() but this can be confusing.

This is the reason ID has been added into the csv file. But we still wanted the names in the file in order to better understand what is going on.

  • @item(): This refers to the current item in the iteration which is a row. When you’re looping through a collection of items, @item() represents each individual item as the loop progresses.
  • .notebookID: This accesses the notebookID property of the current item. And the notebookID is a column in the csv file

Running the pipeline

You can check the inputs and outputs of each activity.

If it fails you can also open the failure information.

Monitor

Click Monitor to get more information

Here you can see a run before certain issues were resolved.

Click on the failed text bubble to get to the details. I had added the wrong column name.

Go back to the Pipeline.

We ran with no new file. It would be useful at this point to create some SQL to quickly check the data manually. We also need to add this new Pipeline activity into our medallion architecture

Medallion Architecture task Flow

In the Fabric Workspace.

Check using SQL endpoint on the data

Technically, there should be only up to season 4 still. it would be nice to have some SQL that would quickly tell us what has happened to the data.

You can only use SQL on Delta Parquet files so we can’t look at our parquet Processedfile. this may be a reason to change this to a Delta parquet file in future so we can use SQL to check information easily.

From our Workspace, go into our gold lake.

We now want to work with the SQL analytics endpoint

Choose New SQL Query and name Check Dim and Facts.

these are the queries that have been created to check our data

--CONTESTANTS
--Make sure there are no duplicate contestants
SELECT ContestantID, ContestantName, COUNT(*)  As total FROM [GoldDebbiesTraininglh].[dbo].[dimcontestant]
Group BY ContestantID, ContestantName
Order by ContestantID
--Get the total of rows (Minus the default row) and divide by 5, which should tell you how manyseries of contestants we have
SELECT (Count(*)-1)/5 AS totalSeries FROM [GoldDebbiesTraininglh].[dbo].[dimcontestant]

4 Series. this is correct

--EPISODES
--Again, make sure we don't have duplicate records
Select Series, EpisodeName, Count(*) as Total 
from [GoldDebbiesTraininglh].[dbo].[dimepisode]
GROUP BY Series, EpisodeName
Order by Count(*) desc
--And get a distinct list of series
SELECT Distinct Series FROM [GoldDebbiesTraininglh].[dbo].[dimepisode]
Where Series <> '-1'
Order by Series
--TASKS
--Check we have no duplicates
SELECT Task, TaskOrder, COUNT(*) as Total FROM [GoldDebbiesTraininglh].[dbo].[dimtask]
GROUP BY Task, TaskOrder
Order by Count(*) Desc
--Check the number of rows per series via the fact table
SELECT e.Series, COUNT(*)AS Total FROM [GoldDebbiesTraininglh].[dbo].[facttaskmaster] f
INNER JOIN [GoldDebbiesTraininglh].[dbo].[dimepisode] e on f.EpisodeKey = e.EpisodeKey
GROUP BY e.Series
Order by COUNT(*) Desc

Now we can see a problem. Series one 2 and 3 are quite small. But here they have way more rows than S4. Something has gone wrong.

We need to add more into the scripts to ensure that we can identify specific issues. Like what were the original number of rows etc. It is only because we can remember that 240 was the original row number for S4 that we know S4 is correct. We need to identify this quickly and specifically.

Lets do more checking.

--And check the  duplicates with fact as the central table
SELECT d.date As SeriesStartDate, e.Series,e.EpisodeName, c.ContestantName, t.Task, t.TaskOrder, t.TaskType,  f.Points, COUNT(*) AS Total
FROM [GoldDebbiesTraininglh].[dbo].[facttaskmaster] f 
INNER JOIN [GoldDebbiesTraininglh].[dbo].[dimcontestant] c on f.ContestantKey = c.ContestantKey
INNER JOIN [GoldDebbiesTraininglh].[dbo].[dimepisode] e on f.EpisodeKey = e.EpisodeKey
INNER JOIN [GoldDebbiesTraininglh].[dbo].[dimdate] d on d.DateKey = f.SeriesStartDateKey
INNER JOIN [GoldDebbiesTraininglh].[dbo].[dimtask] t on t.TaskKey = f.TaskKey
Where  e.Series = 'S1'
GROUP BY d.date, e.Series,e.EpisodeName, c.ContestantName, t.Task, t.TaskOrder, t.TaskType,  f.Points
Order by COUNT(*) DESC

There should only be one row per Contestant,

But each has 4. the data has been added 4 times

Changing the Where criteria to S2, S3 and S4. There is 1 row for each for S4. S2 S3 have 4 rows. we know they were added along with S1. So now we have to understand why this is.

To do this we could do with some meta data in the Delta Parquet tables of the date and time the information was added.

Conclusion

We now have a pipeline that iterates through our Notebooks and we have discovered some issues along the way.

In the next blog post we are going to do some updates to the Notebooks and check why Season 1 has been added 4 times. then we can move on to our next Pipeline updates. recording more meta data about the runs

Microsoft Fabric Part 16. Taskmaster Project. Update to Taskmaster transformed Notebook for Contestants

In Part 15 we created a delta load of transformed dims and facts in the gold layer of our Fabric Architecture.

The Taskmaster file contains source_filename. e.g. Taskmaster_S1_01092015.csv Which we have used to partition the data.

We also have a transformed Contestants file. Each contestant belongs to a series. the file isn’t big enough to Partition. The Contestants file consists of data that we have prepared and extra data from Kaggle.

However, we have only prepped the data for the first run. We need to add more code for new files, and there are still amendments we can make to make this better.

its also useful to see all the issues along the way and how they are resolved. knowing what can hit you can be really handy when working on a project because you have already seen and dealt with issues

Load a new file into Azure Data Lake

  • Taskmaster season 4 has been loaded
  • Contestants already has season 4 in the data

Taskmaster Transformed Pyspark Notebook

Lets look at the way the new file will be dealt with

The data lake is short-cut to our bronze delta lake. And here we use mssparkutils to list the files. S4 is new.

  • The next code block removes Contestants from the list
  • And the next creates our empty dataframe consisting of filename, processedTime and fullyprocessedFlag
  • Check ProcessedFiles exists. if not create a file with the empty dataframe (Image below)

It’s worth just looking at this code block in more detail as it was recently updated. If the ProcessedFiles file already exists we don’t have to do anything. If it doesn’t exist we create the file using our empty template we created.

The file is already created and we know it contains Series 1, 2 and 3 that we have already added.

  • List files already processed (Image below)

Here we can see that S1 S2 and S3 have been fully processed

  • return only the none processed file (Image below)

And we find anything in the folder that isn’t in our processed Files Parquet file. In this case S4

  • Next we create an empty data frame for the taskmaster data
  • And for every file we process the data into the data frame

Checking the total rows in the S4 file. There are 240 rows (At this point there are more episodes to a series)

  • next the df_log dataframe is created with the Season 4 file name, Our processed file and 0 as the fullyprocessedFlag
  • Transform code. This is detailed in previous posts. Includes adding the Contestants.csv data. Adding in the people data from Kaggle to join with contestants.
  • The data is added to taskmastertransformed files so we only deal with the current set

Error Message when checking the TaskmasterTransformed file

We have hit our first issue

Caused by: org.apache.spark.SparkException: Parquet column cannot be converted in file abfss://986472b4-7316-485c-aa22-128e1cb29544@onelake.dfs.fabric.microsoft.com/ee31b1a4-16bf-4aae-aaab-f130bd4d53c6/Files/Data/Silver/taskmasterTransformed.parquet/source_filename=Taskmaster_S4_13062017.csv/part-00008-e963cfd6-7007-439b-9e0d-63b33eddabb3.c000.snappy.parquet. Column: [Points], Expected: double, Found: INT32.

There is an issue with Points. Expected double. Found INT32. Looking at the data,

This is the only point that isn’t a standard 0 to 5 number. And this should be fine as an integer. Here is where the issue is. Its actually in S1 which we have already processed

Josh did his own solo task and as we can see here, Points are decimal and not an integer. This doesn’t happen often at all. Its usually 1 through to 5. So this is the first time we have issues. To resolve this issue I am going to delete all the files again. remove S4 from the data lake and start again. Ensuring points is double not integer.

The change is here when we create the empty dataframe to append into

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType,TimestampType, DoubleType


# Create a Spark session
spark = SparkSession.builder.appName("CreateEmptyDataFrame").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Series", StringType(), True),
    StructField("Episode No", IntegerType(), True),
    StructField("Episode Name", StringType(), True),
    StructField("Episode Date", DateType(), True),
    StructField("Task Order", IntegerType(), True),
    StructField("Task", StringType(), True),
    StructField("Task Type", StringType(), True),
    StructField("Assignment", StringType(), True),
    StructField("Contestant", StringType(), True),
    StructField("Team", StringType(), True),
    StructField("Points", DoubleType(), True),
    StructField("Winner", IntegerType(), True),
    StructField("source_filename", StringType(), True),
    StructField("source_processedTime", TimestampType(), True)
])

# Create an empty DataFrame with the specified schema
dftm  = spark.createDataFrame([], schema)

# Show the schema of the empty DataFrame
dftm.printSchema()

display(dftm)

Changing to Doubletype is the only change made so far. The data is loaded through to dims and facts. S4 file is added into the datalake and we go again

This time when we get to the code that checks TaskmasterTransformed it works

from pyspark.sql.functions import input_file_name, regexp_extract

workspace_id = "########-####-####-####-############"
lakehouse_id = "########-####-####-####-############"

parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/taskmasterTransformed.parquet")

df_already_processed = spark.read.parquet(parquet_file)

display(df_already_processed)
  • Add the ContestantTransformed to a file and check
  • Add the log to a file and check

Note that we now have S4 logged with the process flag of 0. We continue on to the dims

Updating Dimension Dim Contestant

We bring back the list of unprocessed files

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType,TimestampType, DoubleType()


# Create a Spark session
spark = SparkSession.builder.appName("CreateEmptyDataFrame").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Series", StringType(), True),
    StructField("Episode No", IntegerType(), True),
    StructField("Episode Name", StringType(), True),
    StructField("Episode Date", DateType(), True),
    StructField("Task Order", IntegerType(), True),
    StructField("Task", StringType(), True),
    StructField("Task Type", StringType(), True),
    StructField("Assignment", StringType(), True),
    StructField("Contestant", StringType(), True),
    StructField("Team", StringType(), True),
    StructField("Points", DoubleType(), True),
    StructField("Winner", IntegerType(), True),
    StructField("source_processedTime", TimestampType(), True),
    StructField("Year", StringType(), True),
    StructField("day", StringType(), True),
    StructField("month", StringType(), True),
    StructField("source_filename", StringType(), True)
    
])

# Create an empty DataFrame with the specified schema
dftm  = spark.createDataFrame([], schema)

# Show the schema of the empty DataFrame
dftm.printSchema()

display(dftm)

Again, at this point we create an empty data frame to load into, and Points has been changed to DoubleType

  • Loop through the files that need processing. Adding just the partition from parquet file into a data frame.
  • Check the number of rows
  • Add to CurrentProcessedTMData.parquet so all the other dims and the fact can use this.
  • Get Contestant data and get the correct series from this data to match to taskmaster partition
  • Now filter to only the contestants we are working with
  • Merge Contestants and Taskmaster data
  • If the delta parquet file doesn’t exist. Add the default row of -1. Else do nothing. I had made an error in Part 15 . It was pointing towards the silver lakehouse files when it should have been pointing to gold tables. (See correct code Change below)

If New Create -1 Row. Else do nothing

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import Row


workspace_id = "########-####-####-####-############"
lakehouse_id = "########-####-####-####-############"

path_to_parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/dimcontestant")

# Check if the Delta Parquet file exists
delta_file_exists = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()).exists(spark._jvm.org.apache.hadoop.fs.Path(path_to_parquet_file))

if delta_file_exists:
    print("Delta Parquet file exists. We don't need to add the NA row")
else:

    # Add a Default row
    # Create a sample DataFrame
    data = [(-1, "Not Known","Not Known","Not Known","Not Known","Not Known","Not Known","0","NA","0","0","0")]
    columns = ["Contestant ID", "Contestant Name", "Team","Image","From","Area","Country","Seat","Gender","Hand","Age","Age Range"]
    new_row  = spark.createDataFrame(data, columns)

    # Union the new row with the existing DataFrame
    dfContfinal = dfContfinal.union(new_row)

    # Show the updated DataFrame
    dfContfinal.show(1000)

    print("Delta Parquet file does not exist. Add NA Row")

The only bit of code changed here is the path_to_parquet File.

New Code. If Row -1 doesn’t exist, get the max Key +1

We created the following code in the last post

from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number


# Process the DataFrame based on ContestantID
if dfContfinal.filter(dfContfinal["Contestant ID"] == -1).count() > 0:

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Contestant ID"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dfContfinalKey = dfContfinal.withColumn("ContestantKey", row_number().over(window_spec) - 2)

    # Show the result
    dfContfinalKey.show(1000)

    print("Data Contains Default row. No data Previously processed")
    
else:
    
    print("Data Contains No Default row. Data Previously processed")

So, if this is a new run. we will create the default -1 row, If we are adding to the data already there, we will already have a default row in the data. We won’t have created the -1 row. this time. We need to get the last Key and + 1.

We already have the code that created a Key from -1 if its new data in the if clause. We now need the else.

else:

    # Load data from the Delta table
    dfdimc = spark.read.format("delta").load("abfss://########-####-####-####-############@onelake.dfs.fabric.microsoft.com/########-####-####-####-############/Tables/dimcontestant")
     
   # Get the maximum value of ContestantKey
    max_contestant_key = dfdimc.agg({"ContestantKey": "max"}).collect()[0][0] 

    #Now, Take his number and create my Keys after the last key +1
    start_value = max_contestant_key+1

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Contestant ID"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dfContfinalKey = dfContfinal.withColumn("ContestantKey", row_number().over(window_spec) + start_value - 1)
    
    # Show the DataFrame
    dfContfinalKey.show()
    
    print("Data Contains No Default row. Data Previously processed")
  • So here he bring back the data from Dim Contestant.
  • We get the max key
  • create the start value by adding +1
  • Then create the window ordering by Contestant ID
  • Add in the Key using the Window_spec and using our Start_Value to increment from the last item in the dimension. In this case we are starting from ID 15
  • Append to the Delta parquet file

Error when appending into Parquet File

When running this code

from delta.tables import DeltaTable

#You can also add the none default data lake by clicking +Lakehouse
aliased_df.write.mode("append").option("overwriteSchema", "true").format("delta").saveAsTable("GoldDebbiesTraininglh.dimContestant")

AnalysisException: Failed to merge fields ‘ContestantID’ and ‘ContestantID’. Failed to merge incompatible data types LongType and IntegerType

We can test this by looking at the schemas.

This is correct. What about our original data?

So our original data was added to delta file as long. integers go up to 2,147,483,647, there seems to be no reason to store as long.

Because this is just a test, the decision is made to once again go back to the start and ensure that the data is integer from the first instance.

Remove all the files and tables from Gold and Silver. And Season 4 from the data lake. to go through the process again.

And whilst updating. we can also sort out the issues above

Taskmaster Transformed. No Updates.

Dim Contestant V2

Where is our integer becoming long and can we hit it at that point?

merged_df has an integer but it becomes long here when we get rid of any null team records.

# Cast ContestantID to IntegerType
dfContfinal = dfContfinal.withColumn("Contestant ID", dfContfinal["Contestant ID"].cast("int"))

# Verify the schema to ensure the change
dfContfinal.printSchema()

However, it also happens again at dfContFinal so we can actually move the above code block to after here (And then keep checking the schema)

This appears to be the last time that the ID gets changed to Long.

This shows the importance of checking your schema, especially when appending data into files.

Dim Episode V2

The current only change here to point to the correct gold tables

Also checked the schema to ensure nothing was long

The same thing is happening here so we need some code before we create the delta parquet to sort this out.

Dim Task V2

Same change again to point to the Gold Delta Table and not silver files.

And reset long to int

Taskmaster Fact V2

No changes

We now are at the point where we can add in Series 4, run the taskmaster transformed notebook and go back to Dim Contestant V2

Unfortunately, After the update, Another error has occurred in Dim Contestant

Age Error. Age has changed to String

AnalysisException: Failed to merge fields ‘Age’ and ‘Age’. Failed to merge incompatible data types StringType and IntegerType

So we repeat the process, again. and add more code to change Age back to Int.

we basically need to repeat the process until every error in contestant is resolved. Thankfully, this is the last problem with Dim contestant

Dim Episode V2

Create Episode Key

from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Process the DataFrame based on ContestantID
if dftmEp.filter(dftmEp["Episode No"] == -1).count() > 0:

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Series"), col("Episode No"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dftmEpKey = dftmEp.withColumn("EpisodeKey", row_number().over(window_spec) - 2)

    # Show the result
    dftmEpKey.show(1000)

    print("Data Contains Default row. No data Previously processed. Add Key from -1")
    
else:

     # Load data from the Delta table
    dfdime = spark.read.format("delta").load("abfss://986472b4-7316-485c-aa22-128e1cb29544@onelake.dfs.fabric.microsoft.com/ce55b91c-ed51-4885-9440-378a3c18450b/Tables/dimepisode")
     
    # Get the maximum value of ContestantKey
    max_episode_key = dfdime.agg({"EpisodeKey": "max"}).collect()[0][0] 

    #Now, Take his number and create my Keys after the last key +1
    start_value = max_episode_key+1

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Series"), col("Episode No"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dftmEpKey = dftmEp.withColumn("EpisodeKey", row_number().over(window_spec) + start_value - 1)
    
    # Show the DataFrame
    dftmEpKey.show()
    
    print("Data Contains No Default row. Data Previously processed. Add Key to continue sequence")  

Added the else to our Key creation code block.

Dim Task V2

Create Key

from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Process the DataFrame based on ContestantID
if dftmTsk.filter(dftmTsk["Task Order"] == -1).count() > 0:

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Task Order"), col("Task"))

    # Add a new column "TaskKey" using row_number() over the specified window
    dftmTskKey = dftmTsk.withColumn("TaskKey", row_number().over(window_spec) - 2)

    # Show the result
    dftmTskKey.show(1000)

    print("Data Contains Default row. No data Previously processed")
    
else:
     # Load data from the Delta table
    dfdimt = spark.read.format("delta").load("abfss://986472b4-7316-485c-aa22-128e1cb29544@onelake.dfs.fabric.microsoft.com/ce55b91c-ed51-4885-9440-378a3c18450b/Tables/dimtask")
     
    # Get the maximum value of ContestantKey
    max_task_key = dfdimt.agg({"TaskKey": "max"}).collect()[0][0] 

    #Now, Take his number and create my Keys after the last key +1
    start_value = max_task_key+1

    # Create a window specification partitioned by "Series" and ordered by "Episode No"
    window_spec = Window.orderBy(col("Task Order"), col("Task"))

    # Add a new column "EpisodeKey" using row_number() over the specified window
    dftmTskKey = dftmTsk.withColumn("TaskKey", row_number().over(window_spec) + start_value - 1)
    
    # Show the DataFrame
    dftmTskKey.show()
    
    print("Data Contains No Default row. Data Previously processed. Add Key to continue sequence")      

The else has been added to this code block.

Conclusion

After running the fact table again, everything is successful.

Season 4 is set as fully processed.

We should now be able to repeat the process across all the other taskmaster series until completion.

We are now ready to add the notebooks into a Pipeline in the next blog post, so we can set up automated processing. And then move back to Power BI reporting.

The only step not yet set up is the Data Engineering side where we would usually automate the process of pulling though the data. Later on, we might have a think about how we can set something up to mimic a proper business scenario where the data is dropped for us to process.

Microsoft Fabric Part 8. Taskmaster Project. Data checks using the SQL analytics Endpoint

SQL Analytics Endpoint

Lets switch to the SQL analytics endpoint

Only the Delta table are available with the SQL endpoint.

Lets try some options

New SQL Query

We can use SQL to check that we are happy with what has been created

SELECT e.Series, e.EpisodeName,t.Task, t.TaskType, c.ContestantName, f.Points 
FROM [DebbiesFabricLakehouse].[dbo].[facttaskmaster] f
inner join [DebbiesFabricLakehouse].[dbo].[dimepisode] e on f.EpisodeKey = e.EpisodeKey
inner join [DebbiesFabricLakehouse].[dbo].[dimtask] t on t.TaskKey = f.TaskKey
inner join [DebbiesFabricLakehouse].[dbo].[dimcontestant] c ON c.ContestantKey = f.ContestantKey
Where e.EpisodeName = 'The poet and the egg'
Order by t.Task

Immediately we can spot some problems.

Task 1 has 10 records and not 5. there should always be 5. 
We can write a query to see how many issues there are 

With CTEtm (Series, EpisodeName,Task, TaskType, ContestantName, Points)

AS 
(SELECT e.Series, e.EpisodeName,t.Task, t.TaskType, c.ContestantName, f.Points 
FROM [DebbiesFabricLakehouse].[dbo].[facttaskmaster] f
inner join [DebbiesFabricLakehouse].[dbo].[dimepisode] e on f.EpisodeKey = e.EpisodeKey
inner join [DebbiesFabricLakehouse].[dbo].[dimtask] t on t.TaskKey = f.TaskKey
inner join [DebbiesFabricLakehouse].[dbo].[dimcontestant] c ON c.ContestantKey = f.ContestantKey)

SELECT Series, EpisodeName, Task, count(*) AS TotalRows
FROM CTEtm 
GROUP BY Series, EpisodeName, Task
Having COUNT(*)>5

There are 146 issues.

You cant manipulate the data with UPDATE’s inserts etc.

New Visual Query

lets try and get an issue example using a visual query

Before merging with the contestant, the contestant needed to be dragged into the analytics pane.

This is very similar to the power Query Editor and could be of use to people who aren’t as proficient in SQL and prefer using visual tools

Personally I prefer SQL. So I will stick to this.

Back in SQL Query

SELECT e.Series, e.EpisodeName,t.Task, t.TaskType, c.ContestantName, f.Points 
FROM [DebbiesFabricLakehouse].[dbo].[facttaskmaster] f
inner join [DebbiesFabricLakehouse].[dbo].[dimepisode] e on f.EpisodeKey = e.EpisodeKey
inner join [DebbiesFabricLakehouse].[dbo].[dimtask] t on t.TaskKey = f.TaskKey
inner join [DebbiesFabricLakehouse].[dbo].[dimcontestant] c ON c.ContestantKey = f.ContestantKey
Where e.Series = 'S1'
AND t.Task = 'Buy a gift for the Taskmaster'
Order by c.ContestantName

Clearly we have an issue with Dim Contestant. We have two of every contestant. This has been super useful. We cant resolve in SQL so its time to go back to the contestant notebook.

The SQL code is going to be kept here in Queries

You can also moved to Shared Queries to other developers can access your queries.

Back in the Contestants V2 Notebook

We have an issue in the Contestants Dim.

After we drop the records where Team is null we now need to add additional Pyspark to check for Duplicates

from pyspark.sql.functions import min, substring

# Group by "Contestant" and aggregate the minimum "Episode Date"
dfContfinalGrp = dfContfinal.groupBy("Contestant Name").count()
dfContfinalGrp = dfContfinalGrp.filter(col("count") > 1)

# Show the resulting DataFrame
dfContfinalGrp.show()

These will cause issues. Why is it happening?

filter_column = dfContfinal.filter(col("Contestant Name") == "Noel Fielding")

# Show the resulting DataFrame
filter_column.show()

Its seat causing the issues because these contestants have been on Taskmaster more than once.

This causes us an issue. This is fine, Because the granularity is the contestant and seat. We need to update the Fact table key accordingly. And it goes even further than this. What we really need to do is go back to the original transformation notebook to ensure we can join on Seat

Back to the Transformation Notebook

There is a query that merges Contestants and people together.

Instead of loosing this information. We need to load it into PARQUET for use later and we need to keep series in this dataframe

One tweak is to keep the series in the transformed contestant PARQUET file and then make sure it matches S1 S2 etc in the main file

# Join the extra contestant information
dfcont = dfc.join(dfp, dfc[“Name”] == dfp[“contestant”], “left_outer”).drop(dfp.contestantID)\
.drop(dfp.contestant).drop(dfp.team).drop(dfp.team_label).drop(dfp.champion)\
.drop(dfp.TMI)

# The resulting DataFrame ‘joined_df’ contains all rows from dftask and matching rows from dfob
display(dfcont)

series has been removed from .drop()

Create S1 instead of Series 1 etc in the transformed contestant file.

from pyspark.sql.functions import regexp_replace

# Assuming you have a PySpark DataFrame named dfcont
dfcont = dfcont.withColumn("series_label", regexp_replace("series_label", "Series ", "S"))

# Show the updated DataFrame
dfcont.show()

Back to the Fact Notebook

change to the code when adding in the contestant key

Before we continue. We want to add the seat into the main tm dataframe

#Join tm to contestants to get the seat
dftm = dftm.join(drctrans, (dftm["Contestant"] == drctrans["Name"])& (dftm["Series"] == drctrans["series_label"]), "left_outer")\
.drop(drctrans.Name).drop(drctrans.Image).drop(drctrans.From).drop(drctrans.Area).drop(drctrans.Country).drop(drctrans.series).drop(drctrans.series_label)\
.drop(drctrans.dob).drop(drctrans.gender)\
.drop(drctrans.hand).drop(drctrans.age).drop(drctrans.age_decimal).drop(drctrans.ageRange)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dftm)

Here we add in seat from the transformed contestant data

#We want the seat in the main table
dftm = dftm.join(dfc, (dftm["Contestant"] == dfc["ContestantName"])& (dftm["Seat"] ==dfc["Seat"]), "left_outer")\
.drop(dfc.ContestantID).drop(dfc.ContestantName).drop(dfc.Team).drop(dfc.Image).drop(dfc.From).drop(dfc.Area).drop(dfc.Country).drop(dfc.Seat).drop(dfc.Gender)\
.drop(dfc.Hand).drop(dfc.Age).drop(dfc.AgeRange)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dftm)

And updated the above code to also join on Seat now we have seat in both the main table and the dim table to get the correct Key.

Back in the SQL endpoint

SELECT e.Series, e.EpisodeName,t.Task, t.TaskType, c.ContestantName, f.Points 
FROM [DebbiesFabricLakehouse].[dbo].[facttaskmaster] f
inner join [DebbiesFabricLakehouse].[dbo].[dimepisode] e on f.EpisodeKey = e.EpisodeKey
inner join [DebbiesFabricLakehouse].[dbo].[dimtask] t on t.TaskKey = f.TaskKey
inner join [DebbiesFabricLakehouse].[dbo].[dimcontestant] c ON c.ContestantKey = f.ContestantKey
Where e.Series = 'S1'
AND t.Task = 'Buy a gift for the Taskmaster'
Order by c.ContestantName

The fixes have removed a lot of issues but we are still left with 6 episodes causing issues. like the one above.

looking at the keys. its clearly the task key.

SELECT * FROM  [DebbiesFabricLakehouse].[dbo].[dimtask] Where TaskKey IN (64,100)

Back to the Episodes Notebook Attempt 2

The really good thing about the SQL Endpoint is that I can quickly check the work that has been done for issues like this before moving onto the semantic model

Now this issue is highly probable because some tasks across series may have the same Task Name. And its the order that gives it its uniqueness in the dimension

Again, we go back to the fact dataframe to add TaskOrder into the join

Back to the Episodes Notebook Attempt 3

Lets see how this fix has helped

Conclusion

The SQL endpoint has helped us fix

  • Contestants where a contestant has been on more than one series
  • Tasks, when a task has the same name

as a developer with a lot of experience in SQL this is a really great way of quickly creating code to check for errors. And you have the graphical functionality if you aren’t a SQL person.

I’ts a real win.

Next. We have our star schema and its been tested. Time to create the Semantic Model.

Microsoft Fabric Part 7. Taskmaster Project. Creating the fact table

For our Taskmaster Posts 1 2 3 and 4 we had a really good look at Pyspark using an online dataset. but abandoned it when we realised that the data wasn’t quite as we wanted it and some data sets were missing.

Parts 5 and 6 we got a new data set and created a transform layer and dimensions. Now its time to finally create the fact table

Lets create a new Notebook – Taskmaster Fact V2

Date Keys

Our Date Keys will be integer. if we take a quick look at our Delta PARQUET file

df = spark.sql("SELECT * FROM DebbiesFabricLakehouse.dimdate LIMIT 1000")
display(df)

Out date key is in the following Int Format 20240416
We want the Episode date Key to follow this trend.

At transformation level we created Year month and Day. We just need to merge these to create the date

Merge Date Key

from pyspark.sql.functions import col, concat_ws, expr

dftm = dftm.withColumn("DateKey", concat_ws("", col("year"), col("month"), col("day")).cast("int"))
display(dftm)

Bring through the Episode Key

Now we want to add the Delta PARQUET table to a dataframe. So we can add the key to the dataframe. Then we can create another dataframe with all the keys and the points metric to make the fact table.

dfep = spark.sql("SELECT * FROM DebbiesFabricLakehouse.dimepisode")
display(dfep)
# Add Episode Key to the df
dftm = dftm.join(dfep, (dftm["Series"] == dfep["Series"]) & (dftm["Episode Name"] == dfep["EpisodeName"]), "left_outer")\
.drop(dfep.Series).drop(dfep.EpisodeNo).drop(dfep.EpisodeName)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dftm)

And set any null values to -1. Our default not known

dftm = dftm.fillna(-1, subset=[‘EpisodeKey’])

Bring through the Task Key

We can repeat the above to bring in the Task Key

dft = spark.sql("SELECT * FROM DebbiesFabricLakehouse.dimtask")
display(dft)
# Add Task Key to the df
dftm = dftm.join(dft, (dftm["Task"] == dft["Task"]), "left_outer")\
.drop(dft.Task).drop(dft.TaskType).drop(dft.Assignment).drop(dft.TaskOrder)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dftm)
dftm = dftm.fillna(-1, subset=['TaskKey'])

Bring in Contestant Key

dfc = spark.sql("SELECT * FROM DebbiesFabricLakehouse.dimtask")
display(dfc)
# Add Contestant Key to the df
dftm = dftm.join(dfc, (dftm["Contestant"] == dfc["ContestantName"]), "left_outer")\
.drop(dfc.ContestantID).drop(dfc.ContestantName).drop(dfc.Team).drop(dfc.Image).drop(dfc.From).drop(dfc.Area).drop(dfc.Country).drop(dfc.Seat).drop(dfc.Gender)\
.drop(dfc.Hand).drop(dfc.Age).drop(dfc.AgeRange)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dftm)
dftm = dftm.fillna(-1, subset=['ContestantKey'])

And now we have all of our keys

Partitioning

We want to Partition the fact table by Series date, However, We only have the individual task information.

Lets see if we can add another date Key for Series.

MIN and Group Episode date by Series

from pyspark.sql.functions import min, substring

# Group by "Contestant" and aggregate the minimum "Episode Date"
dfminSeries= dftm.groupBy("Series").agg(min("Episode Date").alias("min_episode_date"))

#And create year month and day and set as a Key
dfminSeries = dfminSeries.withColumn("Year", col("min_episode_date").substr(1, 4))
dfminSeries = dfminSeries.withColumn("month", col("min_episode_date").substr(6, 2))
dfminSeries = dfminSeries.withColumn("day", substring(col("min_episode_date"), 9, 2))

dfminSeries = dfminSeries.withColumn("SeriesStartDateKey", concat_ws("", col("year"), col("month"), col("day")).cast("int"))

# Show the resulting DataFrame
dfminSeries.show()

Merge the New Key into the main dataframe

# Add SeriesStartDateKey Key to the df
dftm = dftm.join(dfminSeries, (dftm["Series"] == dfminSeries["Series"]), "left_outer")\
.drop(dfminSeries.Series).drop(dfminSeries.min_episode_date).drop(dfminSeries.Year).drop(dfminSeries.month).drop(dfminSeries.day)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dftm)

Create the Fact table

dftmfact = dftm.select(
    col("EpisodeKey"),
    col("TaskKey"),
    col("ContestantKey"),
    col("SeriesStartDateKey"),
    col("DateKey").alias("EpisodeDateKey"),
    col("Points").cast("integer"),
    col("Winner").cast("integer")
)

dftmfact.show()

This can now be saved to our Delta PARQUET and PARQUET so we have our full set of data to create the star schema.

Delta PARQUET Partitioned.

We now want to Partition our fact table by the SeriesStartDateKey

from delta.tables import DeltaTable

dftmfact.write.mode("overwrite").option("overwriteSchema", "true")\
.partitionBy("SeriesStartDateKey").format("delta").saveAsTable("factTaskmaster")

So Why partition?

Partitioning the Parquet table gives you specific benefits

If we were to just look at one series. The execution engine can identify the partition and only read that partition. it significantly reduces the data scanned.

Faster query performance.

Delta Lake will automatically create the partitions for you when you append data, simplifying data management.

Partitioning is really useful for large datasets. Allowing you to skip partitions.

Lets see what this actually looks like once run.

At this level it looks no different to the unpartitioned Delta PARQUET File.

If we go to the Workspace

Click on the Semantic Model

Then the Lakehouse

You can right click and View the underlying files.

Lets have a look at another way of doing this.

One Lake File Explorer

Its time to download the One Lake File Explorer which is a new app available with Fabric.

https://www.microsoft.com/en-us/download/details.aspx?id=105222

We can now see the One Lake in our File Explorer just like you can in One Drive, And you also get a local copy.

Lets have a look at the Taskmaster Partitioned Delta Table against a None Partitioned Table

None Partitioned

Partitioned

We have 3 parts at 4 kb each for this partition. What is the recommended size?

64 mb to 1 gb is around the file size we want achieve. Our file sizes are small because there isn’t much data at the moment.

So we have 3 change files in the delta log which correspond to the 3 PARQUET Files. the Delta log lets Fabric know which file to go with when we are looking at our data.

So what do we do when we want to clean up old files?

Maintenance – Optimize and VACUUM

We can optimize our file sizes and also Vacuum old data outside of our retention threshold.

Creating the PARQUET Table.

Now its time to create the PARQUET table that is not delta. This is only happening to test functionality between the two.

In a previous post we learned that you couldn’t partition a PARQUET table. You need to update to Delta to do this,

dftmfact.write.mode("overwrite").parquet('Files/Data/Silver/factTaskmaster.parquet')

Conclusion

Its important to note that Delta PARQUET and PARQET have the same PARQUET files.

Delta just creates the extra delta log tables to hold the changes. The PARQUET is a columnar storage solution, in the same way as the power BI Columnar data store.

So we now have our gold layer of Facts and Dimensions. Both as PARQUET( unmanaged) and Delta PARQUET (Managed)

In the next post we will see what we can do with these files.

Microsoft Fabric Part 6. Taskmaster Project. Creating a Silver transformed layer and Finishing of the Dimensions

In part 4. A new Data source was used at task level and the first dimension was added. Lets quickly create the rest of the dims.

Then we are ready to move on to the next steps in Fabric.

Creating a Silver transformed layer

There are a few changes that would be good to make to the notebooks. The base data needs a level of transforming before we kick off with Dims and Facts.

It would be good to save the transformed data once in the Silver layer and then use this PARQUET file to work from. This means we never have to repeat any code for dims and facts.

We want

  • Silver Transformed data as PARQUET unmanaged files
  • Gold Dims and facts in Delta PARQUET Managed tables

Taskmaster Transformed Data

Create a new Notebook

##drop all rows with null values just in case
dftm = dftm.na.drop(how='all')

display(dftm)

Just in case. Get rid of Empty Rows

Clean Special Characters

Clean Special Characters in Task

The above was all from Part 5.

Resolve Episode date issues

Currently we have 28/07/2015 and when transforming to date the values are being lost. We need to change the date to 2015-07-28

#Update the date so we can transform it to a date column
from pyspark.sql.functions import col

# Get the Year
dftm_cleaned = dftm_cleaned.withColumn("Year", col("Episode Date").substr(-4, 4))

# Show the resulting DataFrame
dftm_cleaned.show()

Now add in Day

from pyspark.sql.functions import col, substring

# Assuming you have a DataFrame called 'df' with a column named 'Episode Date'
dftm_cleaned = dftm_cleaned.withColumn("day", substring(col("Episode Date"), 1, 2))

# Show the resulting DataFrame
dftm_cleaned.show()

And month

from pyspark.sql.functions import col, substring

# Assuming you have a DataFrame called 'df' with a column named 'Episode Date'
dftm_cleaned = dftm_cleaned.withColumn("month", col("Episode Date").substr(4, 2))

# Show the resulting DataFrame
dftm_cleaned.show()

Merge together and convert to date

from pyspark.sql.functions import concat_ws, col,to_date


# Assuming you have a DataFrame called 'df' with columns 'year', 'month', and 'day'
dftm_cleaned = dftm_cleaned.withColumn("Episode Date", concat_ws("-", col("year"), col("month"), col("day")))
dftm_cleaned = dftm_cleaned.withColumn("Episode Date", to_date("Episode Date", "yyyy-MM-dd"))

# Show the resulting DataFrame
display(dftm_cleaned)

Contestant transformations

we also need to bring through the contestant data which we will join later. This is just one file.

dfc = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/TaskMasterSeriesFiles/Taskmaster/Contestants.csv")

We have our old data set and although we couldn’t turn it into a star schema. there is data that we could use in there to create a more complete data set that can be joined to this data above.

People

#Read the file into a dataframe 
dfp = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/people.csv")

#And rename a couple of columns
dfp  = dfp.withColumnRenamed("id","contestantID").withColumnRenamed("name","contestant")

display(dfp)

In our original people data We have some great stuff here, Age Hand, Seat. We can use these to create some great Power BI visuals. Lets see if we can merge it into our new data

Join Contestant to people data

# Join the extra contestant information
dfcont = dfc.join(dfp, dfc["Name"] == dfp["contestant"], "left_outer").drop(dfp.contestantID).drop(dfp.series)\
.drop(dfp.series_label).drop(dfp.contestant).drop(dfp.team).drop(dfp.team_label).drop(dfp.champion)\
.drop(dfp.TMI)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dfcont )

And lets see if we have any nulls we need to add in.

filtered_df = dfcont.filter(dfcont['seat'].isNull()) 
display(filtered_df)

Just the one.

Name from new Dataset – Asim Chaudry name from old Data set – Asim Chaudhry

This is completely sortable before the join. The name is incorrect in the new data set so lets insert a code block to update before the join

# Assuming you have already created the DataFrame 'df'
from pyspark.sql.functions import when

# Replace "Asim" with "Ashim" in the 'name' column
dfc = dfc.withColumn("Name", when(dfc["Name"] == "Asim Chaudry", "Asim Chaudhry").otherwise(dfc["Name"]))


dfc.show(1000)

Now we can redo the join and have no null values

Create Age – Create Distinct Min Episode Date

Now this time we want to create the age as at the start of their task master series. Not current age.

So we need to create a new data frame consisting of just the contestant and min Series date

# Assuming you have already created the DataFrame 'dftm_cleaned'
from pyspark.sql.functions import min

# Group by "Contestant" and aggregate the minimum "Episode Date"
dfminSeriesCont = dftm_cleaned.groupBy("Contestant").agg(min("Episode Date").alias("min_episode_date"))

# Show the resulting DataFrame
dfminSeriesCont.show()

Note that we have already transformed the date and set as a date field in the dataframe.

Merge Episode Date with Contestant

Now we need to merge into our contestant file

# Join the min_episode_date into contestants
dfcont = dfcont.join(dfminSeriesCont,dfcont["Name"] == dfminSeriesCont["Contestant"], "left_outer").drop(dfminSeriesCont.Contestant)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dfcont)

Add Age

from pyspark.sql.functions import datediff
from pyspark.sql.functions import col

# Convert the date of birth column to a date type in dfCont
dfcont = dfcont.withColumn("dob", dfcont["dob"].cast("date"))

#Calculate age using dob and current date 
dfcontAge = dfcont.withColumn("age_decimal", datediff(dfcont["min_episode_date"], dfcont["dob"]) / 365)

#Convert from decimal to int  
dfcontAge = dfcontAge.withColumn("age", col("age_decimal").cast("int"))

#Convert Contestant ID to int
dfcontAge = dfcontAge.withColumn("contestant ID", col("contestant ID").cast("int"))

display(dfcontAge)

Create an Age Group

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col

# Apply conditional logic
dfcontAge = dfcontAge.withColumn("ageRange", when(col("age") < 20, "0 to 20")
                               .when((col("age") >= 20) & (col("age") <= 30), "20 to 30")
                               .when((col("age") >= 30) & (col("age") <= 40), "30 to 40")
                               .when((col("age") >= 40) & (col("age") <= 50), "40 to 50")
                               .when((col("age") >= 50) & (col("age") <= 60), "50 to 60")
                               .otherwise("Over 60"))

# Show the result
dfcontAge.show()

Save as PARQUET

Save both files as unmanaged PARQUET Files

We don’t need to save as Delta PARQUET because this is the silver layer which will be used to create the Delta PARQUET dim and fact tables.

You can’t directly partition a non-Delta Parquet file, but you can optimize it using V-Order for better performance. V-Ordering is true as default but if you want to check you can always use this code

Save your Taskmaster transformed PARQUET Files

Dim Episode

We can now update the original Notebook to use the transformed silver layer data, and remove all the cleaning that’s now in the notebook we have prepared

from pyspark.sql.functions import input_file_name, regexp_extract

parquet_file = "Files/Data/Silver/taskmasterTransformed.parquet"
dftm = spark.read.parquet(parquet_file)

display(dftm)

Notice the change to the code now we are reading a parquet file.

All the transformation code has been removed and we start at creating the episode Dim

##drop all rows with null values
dftmEp = dftmEp.na.drop(how='all')

display(dftmEp)

Create a Default Row

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a sample DataFrame
data = [(-1, -1,"Not Known")]
columns = ["series", "episode No", "Episode Name"]
new_row  = spark.createDataFrame(data, columns)

# Union the new row with the existing DataFrame
dftmEp = dftmEp.union(new_row)

# Show the updated DataFrame
dftmEp.show(1000)

Create an Episode Key

from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Create a window specification partitioned by "Series" and ordered by "Episode No"
window_spec = Window.orderBy(col("Series"), col("Episode No"))

# Add a new column "EpisodeKey" using row_number() over the specified window
dftmEpKey = dftmEp.withColumn("EpisodeKey", row_number().over(window_spec) - 2)

# Show the result
dftmEpKey.show(1000)

Clean the Column Names

import re

# Select columns with modified names (without special characters)
dftmEpKey_cleaned = dftmEpKey.select(*[col(c).alias(re.sub('[^0-9a-zA-Z]', '', c)) for c in dftmEpKey.columns])

# Show the result
dftmEpKey_cleaned.show(1000)

And save to Delta PARQUET and PARQUET (Just to give us the different examples to play with

Dim Task

In this dimension we want the Task, Task Type and Assignment (Solo or Group)

We can hopefully create this quite quickly in a new notebook

Create a new notebook Dim Task V2

The format is completely the same every time

Get the Task fields

Add a default row

Create the Key

Resolve invalid characters in the column names

Save as Delta PARQUET and PARQUET

Dim Contestant

We now get to the contestant. Create a new Notebook. Dim Contestant V2

Merge Contestant with the Taskmaster main file

from pyspark.sql.functions import col

# Perform the left join
merged_df = dftm.join(dfc, dftm["Contestant"] == dfc["Name"], "left_outer")\
                .select(
                    dfc["Contestant ID"],
                    dftm["Contestant"].alias("Contestant Name"),
                    dftm["Team"],
                    dfc["Image"],
                    dfc["From"],
                    dfc["Area"],
                    dfc["country"].alias("Country"),
                    dfc["seat"].alias("Seat"),
                    dfc["gender"].alias("Gender"),
                    dfc["hand"].alias("Hand"),
                    dfc["age"].alias("Age"),
                    dfc["ageRange"].alias("Age Range"),
                ).distinct()

# Show the resulting DataFrame
merged_df.show()

In the main file. Teams in only populated when its a group task.

Therefore we are getting two records. one with and one without a team so we need to remove rows

dfContfinal = merged_df.dropna(subset=["Team"])

# Show the resulting DataFrame
dfContfinal.show(1000)

Add a Default Row

Create a Contestant Key

Clean Column Names

Alias Column Names

renaming all columns to get past the issues with special characters in columns

Save as Delta PARQUET and PARQUET

Save the File and ensure you have committed source control

At the end of this post, we have all our dimensions (Date was created in a previous blog).

Next post will be creating the central fact table

And better yet, we have had more of a chance to convert all our year of SQL knowledge across to pyspark.

Microsoft Fabric Part 5. Taskmaster Project. Creating and using Shortcuts from an Azure Data Lake and creating the first dimension

Parts 1 2 3 and 4 were attempting to transform data into a star schema for power BI using notebooks in fabric.

However there was missing data which meant we couldn’t go to the level of detail we wanted for the star (Episode level)

Now we have a new data set and the csv files have been transferred into a data lake manually.

We can attempt to set up a pipeline to import the data in another blog.

in Microsoft Fabric we go to data Engineering

And open up the Fabric Workspace.

We already have a Lakehouse and notebooks containing the work from the last few blog posts

Lets open the Lakehouse

Although we are using the medallion architecture, you may still wish to call your folders, for example raw and staging .

Create a Shortcut

For this exercise we are going to do something different and have the raw data in a datalake in Azure that we add a shortcut to.

the Files have been placed in a datalake and we want to shortcut to these files. Imagine that the process is already in place pre Fabric and we have decided to stick with this process.

This will be added to every time a series finishes.

We also have a Lookup contestants file in this folder that will be updated by having data appended to it

This new exercise will work with partitioning by series for the fact table. Back in Fabric.

Lets add a Shortcut to our new Taskmaster folder in the Bronze raw data area.

https://learn.microsoft.com/en-us/fabric/onelake/onelake-shortcuts

We need the connection settings. get this from the Data Lake. Endpoints Data lake Storage, Primary endpoint data lake storage and sign in.

Tip Make sure you have Storage Blog Data Contributor Role

You want the URL with dfs (Distributed File System), not blob.

However, note that the Data Lake is in North Europe.

Creating a shortcut to a data lake in a different geographical area can result in Egress charges

So we need to check that our Data Lake and Lakehouse are in the same area.

Go back to the Workspace and Workspace Settings

Thankfully both are in North Europe so this should be fine.

Note. To keep down Egress charges you can set up caching which keeps the files for 24 hours without needing to access the data in the shortcut. but this is only available for

  • GCS (Google Cloud Storage)
  • S3 (Amazon S3 Simple Storage Service)
  • S3 compatible shortcuts (Any Services using Amazon Simple Storage Service)

Back to our Shortcut. Click Next.

We want to create a Shortcut to the Taskmaster Folder in the raw Folder. Click Next

Click Create

We now have a shortcut set up to the Azure Data Lake. Later on we can add another file and see what happens.

Create a Notebook

Lets Create a New Notebook

%%configure
{
    "defaultLakehouse": {  
        "name": "DebbiesFabricLakehouse"
    }
}

First of all I’m going to configure to the current Lakehouse

Load in all the files in the Shortcut Folder

from pyspark.sql.functions import input_file_name, regexp_extract

dftm = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/TaskMasterSeriesFiles/Taskmaster/Taskmaster_*.csv")

dftm = dftm.withColumn("filename", regexp_extract(input_file_name(), r"([^/]+)$", 1))

display(dftm)
  • * alias has been used to get all series files and Series End Dates, and Only csv files.
  • input_file_name() brings us back the file name
  • regexp_extract(, r”([^/]+)$”, 1)) allows us to remove the URL and just keep the filename.

The File name may be important later.

Remove empty records

We have some null rows come through that we want to remove from the data set

how=’all’

This parameter is used with dropna() to drop rows where ALL values are NULL

Removing Special characters

There are issues. We have some strange characters coming through in the data set.

We can use a filter function on the col Column episode name LIKE and bring through a distinct list.

from pyspark.sql.functions import regexp_replace

# Replace '�' with 'eclair' in the 'text' column
dftm_cleaned = dftm.withColumn("Episode Name", regexp_replace("Episode Name", "A pistachio �clair", "A pistachio eclair"))

# Replace any remaining '�' with an empty string
dftm_cleaned = dftm_cleaned.withColumn("Episode Name", regexp_replace("Episode Name", "�", "'"))

# Show the resulting DataFrame
display(dftm_cleaned)

We have two.

é in eclair and any item with ‘

So we actually need to update based on Logic:

from pyspark.sql.functions import regexp_replace

# Replace '�' with 'eclair' in the 'text' column
dftm_cleaned = dftm.withColumn("Episode Name", regexp_replace("Episode Name", "�", "eclair"))

# Replace any remaining '�' with an empty string
dftm_cleaned = dftm_cleaned.withColumn("Episode Name", regexp_replace("Episode Name", "�", ""))

# Show the resulting DataFrame
dftm_cleaned.show(1000)

regexp_replace

This function replaces all substrings of a string that matches a specified pattern with a replacement string. Good for cleaning and transformation.

it might be best to check task name too by repeating the above filter and changing Episode name to Task.

we have Greg says…

and the rest are ‘

We can also deal with this

from pyspark.sql.functions import regexp_replace

# Replace '�' with 'eclair' in the 'text' column
dftm_cleaned = dftm_cleaned.withColumn("Task", regexp_replace("Task", "Greg says�", "Greg says..."))

# Replace any remaining '�' with an empty string
dftm_cleaned = dftm_cleaned.withColumn("Task", regexp_replace("Task", "�", "'"))

# Show the resulting DataFrame
display(dftm_cleaned)

Create Dim episode Dimension

Lets have a look at just S1. Does it have all the Episodes?

Yes it looks good. So the episode Dimension consists of Series and Episode.

from pyspark.sql.functions import to_date

dftmEp = dftm_cleaned.select("Series","Episode No" ,"Episode Name").distinct()

dftmEpOrdered = dftmEp.orderBy(col("Series"),col("Episode No"))

display(dftmEpOrdered)

This is a Distinct List

And we have created an OrderBy Data Frame to display.

There is an empty row we need to remove

##drop all rows with null values

dftmEp = dftmEp.na.drop(how='all')

display(dftmEp)

We have used this code block before to remove the fully null rows.

Add A Default Row

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a sample DataFrame
data = [(-1, -1,"Not Known")]
columns = ["series", "episode No", "Episode Name"]
new_row  = spark.createDataFrame(data, columns)

# Union the new row with the existing DataFrame
dftmEp = dftmEp.union(new_row)

# Show the updated DataFrame
dftmEp.show(1000)

Now we have the recommended Default row. It’s time to add a key

from pyspark.sql.functions import col

# Create a window specification partitioned by "Series" and ordered by "Episode No"
window_spec = Window.orderBy(col("Series"), col("Episode No"))

# Add a new column "EpisodeKey" using row_number() over the specified window
dftmEpKey = dftmEp.withColumn("EpisodeKey", row_number().over(window_spec) - 2)

# Show the result
dftmEpKey.show(1000)

Window.OrderBy

This function is used to define the ordering within a window specification. It allows you to specify the order in which rows are processed within a partition

row_number().over

The ROW_NUMBER() function, when used with the OVER() clause, assigns a sequential integer number to each row in the result set of an SQL query.

Create Delta PARQUET File

We have our first dimension. Lets add it to Silver folder in files as unmanaged and Tables as a managed Delta PARQUET table. that way we can see what we can do with both

We don’t want to partition the dimension. The Fact table will be partitioned

from delta.tables import DeltaTable

dftmEpKey.write.mode("overwrite").option("overwriteSchema", "true").format("delta").saveAsTable("dimEpisode")

However, There is an error coming up

AnalysisException: Found invalid character(s) among ‘ ,;{}()\n\t=’ in the column names of your schema. Please upgrade your Delta table to reader version 2 and writer version 5 and change the column mapping mode to ‘name’ mapping. You can use the following command:

Instead of upgrading. We want to remove the special characters. The columns are

|Series|Episode No| Episode Name|EpisodeKey|

Cleaning up Columns in a dataframe

import re

# Select columns with modified names (without special characters)
dftmEpKey_cleaned = dftmEpKey.select(*[col(c).alias(re.sub('[^0-9a-zA-Z]', '', c)) for c in dftmEpKey.columns])

# Show the result
dftmEpKey_cleaned.show(1000)

It seems it may have been the spaces.

re.sub

In PySpark, the re.sub() function is used to replace substrings that match a specified regular expression pattern with a string of your choice.:

re.sub('[^0-9a-zA-Z]', '', c) removes any characters that are not alphanumeric (letters or digits) from the column name.

The expression [col(c).alias(re.sub('[^0-9a-zA-Z]', '', c)) for c in dftmEpKey.columns] is a list comprehension that performs the following steps for each column name (c):

  • col(c).alias(...) creates a new column with the modified name (without special characters) using the alias() method.
from delta.tables import DeltaTable

dftmEpKey_cleaned.write.mode("overwrite").option("overwriteSchema", "true").format("delta").saveAsTable("dimEpisode")

And we can save to delta PARQUET

from delta.tables import DeltaTable

So, we now have an episode dimension. lets save the Notebook as Dim Taskmaster Episode V2 (We already have the original one saved)

Remember to commit in Source Control.

We want to create Task and Contestant dimensions. We already have a date dimension to work with.

In the next post, these extra dimensions and fact table will be created. and then we can see how they can be used

Microsoft Fabric Part 4. Taskmaster Project. Adding the fact table

In parts 1 and 2 we created and updated DimContestant

In Part 3 we created DimTask, DimEpisode and DimDate

Its time to create the fact table. the first thing we need to do is to get an understanding of what Facts we have and how they would join to the data we have created in the dims so far.

Lets create a notebook.

%%configure
{
    "defaultLakehouse": {  
        "name": "DebbiesFabricLakehouse"
    }
}

Adding possible csvs into DataFrames

Attempts

#Read the first file into a dataframe
dfattempts = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/attempts.csv")
# dfattempts now is a Spark DataFrame containing CSV data from "Files/Data/Bronze/attempts.csv".
display(dfattempts)

Contains points and ranks and appears to be at the episode and contestant level

episode_scores

#Read the file into a dataframe
dfepsc = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/episode_scores.csv")

display(dfepsc)

episode_scores is new to this project but seems a really good data set. Holds scores and Ranks at the episode and contestant level (but not at task level)

we actually don’t need this because its just aggregated data. We can remove this from our transformations. Power BI will aggregate for us

episodes

#Read the file into a dataframe
dfepis = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/episodes (1).csv")

display(dfepis)

This contains Dates and points. However the Scores are at Episode Level. We will be able to get this information from DAX, summing the individual Scores so we don’t need the scores at all. All we are interested in is the air date.

Lets look at the row counts

ct = dfepsc.count()
print(f"dfepsc: {ct}")

ct = dfepis.count()
print(f"dfepis: {ct}")

ct = dfattempts.count()
print(f"dfattempts: {ct}")

the number of columns vary. We need to make sense of what we have

Create the data we need from attempts

# Select multiple columns with aliases
dfdfattemptsf = dfattempts.select(
    dfattempts.task.alias('attTaskID'), #Join to DimTask
    dfattempts.task_label.alias('attTask'),
    dfattempts.episode.alias('attEpisode'),
    dfattempts.series.alias('attSeries'),
    dfattempts.contestant_label.alias('attContestant'), #Join to DimContestant
    dfattempts.points.alias('attPoints')
).distinct()

# Show the resulting DataFrame
display(dfdfattemptsf)

The Points in Attempts are the best ones we have at the moment because the Task csv was empty.

we are clearly now hitting real data problems and in a normal project we would be going back to the team creating the data to get them to resolve the problems.

Filter attempts

the attempts is our main table but we could do with the episode and Series information working with our dimensions we have and we only have Ids for both

Are episode and series related with the correct IDs? Lets do some filter queries to look at series 1, episode 1.

After a few filtered queries, there is clearly a some data issues, or unknowns that I am not aware of. With attempts. The episode 1 and series 1 doesn’t match the contestants and tasks (I know because I watched it).

For episodes (1) (Above) we do have the correct information for episode and series. But we aren’t using these to get data.

More problems with Episode Scores. Episode appears to be correct. But the series ID is completely different so we cant match on this. But we aren’t using this data any more.

We need to look at our PARQUET file DImEpisode

The first thing we need to do is Load our DimContestant PARQUET data back into a dataframe and to to this, we have to update the code slightly as previously we have loaded in csv files.

# Read Parquet file using read.parquet()
dfdimep  = spark.read.parquet("Files/Data/Silver/DimEpisode.parquet")
display(dfdimep)

To Get the path, Click on you File in the Files Folder in the Lakehouse and Copy ABFS Path, Copy Relative path for spark gives you a more concise path

We are now using the silver layer of the medallion architecture in the path. All our bronze landing data has been transformed.

Are our IDs correct in the Dim for Attempts, or for the Tasks by Objective csv?

Now we have a real issue. Our Episodes cant join to attempts.

Join with one Condition

We can join episode Scores to Episode first via the episode name? This would be an issue if you have repeating episode names in Series, but we are lucky that this is not the case.

# condition
original_condition = dfepis["title"] == dfepscf["scEpisode"]

# Now perform the left outer join
dfEpScorej =  dfepis.join(dfepscf, on=original_condition, how="full").drop(dfepscf.scEpisode)

display(dfEpScorej)

Filter new transformed table for episode 1 series 1

import pyspark.sql.functions as F

filtered_df = dfEpScorej.filter((F.col('episode') == 1) & (F.col('series') ==1))
display(filtered_df)

Filtering on Task 1 Season 1 gives us all 5 contestants against the episode. There are lots of columns we can remove. For example, we don’t need to hard code the winner. This can be done via Power BI DAX. So, the above join will be slightly updated with .drop.

# condition
original_condition = dfepis["title"] == dfepscf["scEpisode"]

# Now perform the left outer join
dfEpScorej =  dfepis.join(dfepscf, on=original_condition, how="full").drop(dfepscf.scEpisode)\
.drop(dfepscf.scEpisodeID).drop(dfepis.winner).drop(dfepis.winner_label).drop(dfepis.studio_date).drop(dfepis.winner).drop(dfepis.finale).drop(dfepis.TMI)

display(dfEpScorej)


Full is being used as the join because we want each side even if they haven't been joined. 

Use dfEpScorej now instead of dfepis and dfepscf

We have the attempts (dfattemptsf) left

So how can we join this final table?

  • We can’t join by Task because Episodes doesn’t include task
  • We can’t join by Series because the Series IDs don’t match and neither does Episode.
  • We can match by Contestant but Contestants can take part in multiple series so the join doesnt work

As it stands we cant actually match in this table so we need to go with Episodes and Scores for the fact table.

Add the Dimension Keys into the transformed table

Join DimContestants Via Contestant Name

We Can’t use ID but we can use name.

Also when we join we only want the Key and we can immediately remove everything relating to Contestant.

# Original condition
original_condition = dfdimcont["contestant"] == dfEpScorej["sccontestant"]

# Adding another condition based on the 'name' column
#combined_condition = original_condition & (dfdimcont["contestant"] == dfEpScorej["sccontestant"])

# Now perform the left outer join
dfEpScorej = dfEpScorej.join(dfdimcont, on=original_condition, how="left")\
.drop(dfdimcont.TeamName).drop(dfdimcont.teamID).drop(dfdimcont.age).drop(dfdimcont.gender)\
.drop(dfdimcont.hand).drop(dfdimcont.ageRange).drop(dfdimcont.dob)\
.drop(dfdimcont.contestant).drop(dfdimcont.contestantID)\
.drop(dfEpScorej.sccontestant).drop(dfEpScorej.scContestantID)

display(dfEpScorej)

If we have any null values they can be replaced with -1

But first we can check if any exists

dfEpScorej.filter(dfEpScorej.ContestantKey.isNull()).show()

fillna to replace null with a value

Even though we have none at the moment, there could be some in the future so fillna allows us to replace with our Default keys

dfEpScorej = dfEpScorej.fillna(-1, subset=[“ContestantKey”])
dfEpScorej.show()

Add Episode Key

All is good. we can join it into the transformed table. Removing everything but the key using .drop

the data was checked that it was ok before adding all the .drop’s into the code,

# Original condition
original_condition = dfdimep["seriesID"] == dfEpScorej["series"]

# Adding another condition based on the 'name' column
combined_condition = original_condition & (dfdimep["episodeID"] == dfEpScorej["episode"])

# Now perform the left outer join
df = dfEpScorej.join(dfdimep, on=combined_condition, how="left")\
.drop(dfdimep.episodeTitle).drop(dfdimep.episodeID).drop(dfdimep.series).drop(dfdimep.seriesID)\
.drop(dfEpScorej.series_label).drop(dfEpScorej.series).drop(dfEpScorej.episode).drop(dfEpScorej.title)

display(df)

Set Key to -1 if NULL

dfdimtask = dfdimtask.fillna(-1, subset=["taskKey"])
dfdimtask.show()

Conclusion

This specific project has been abandoned because the data doesn’t give me what I need join wise to create the star schema. Mostly because there is clearly a main table with missing data.

But, I can’t wait to take what I have learned and try again with another data set.

Parts 1 to 4 have given me some great insight into analysis transforming and modelling with Pyspark.

Microsoft Fabric Part 3. Taskmaster Project. Adding Dimensions: Tasks, Episodes and Date to Notebooks and creating Delta PARQUET and PARQUET Files

In Parts 1 and 2, we did a lot of analysis on the data. Understood Contestants and the data between the csv files attempts and people. Its now time to create some of the other dimensions we need for the project.

  • DimTasks
  • DimEpisodes
  • DimDate

Back to our Taskmaster Notebook to see what other data is available to use. But first of all. When creating stored procedures to transform data on SQL DB, The stored procedures are usually split into Dims and Facts. Its time to split this notebook up into the corresponding dims and facts.

Dim Contestants

A copy of the original file is saved as DimContestants. Then the new notebook is opened and everything Customer related is saved, including the initial configuring of the Lakehouse.

The notebook ends with the creation of the overwritten files.

The Code

spark.stop()

Is more interesting. Do we want to stop Spark after every dimension and fact creation. Or only at the end of the process?

One to think about but for this, lets have this after every notebook has run.

DimEpisode

Another copy is made to create DimEpisode. everything is removed apart from code that can relate to DimEpisode.

#Read the first file into a dataframe Teams
dfep = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/episodes (1).csv")
# dfattempts now is a Spark DataFrame containing CSV data from "Files/Data/Bronze/episodes (1).csv".

#And rename a couple of columns
#dfpeopletransf  = dfpeople.withColumnRenamed("id","contestantID").withColumnRenamed("name","contestant")

display(dfep.distinct())

We have some key information like dates in here but we will save these as keys for the fact table when we get to the fact table.

There is also a really important fact in here. Points.

Create Series and Episodes

# Select multiple columns with aliases
dfepisode = dfep.select(
    dfep.series.alias('seriesID'),
    dfep.series_label.alias('series'),
        dfep.episode.alias('episodeID'),
    dfep.title.alias('episodeTitle')
).distinct()

# Show the resulting DataFrame
display(dfepisode)

Check that we have distinct Episodes

#Lets immediately do the group by and min value now we know what we are doing to create a distinct list (Removing any IDs for a different series)

#import the function col to use. 
from pyspark.sql.functions import col
from pyspark.sql.functions import count as _count

#Creates the full distinct list. 

dfepisode.groupBy('Series', 'episodeID', 'episodeTitle' )\
       .agg(_count('episodeTitle').alias('TotalRecords'))\
       .filter(col('TotalRecords')>1).show(1000)

This looks good. We just need to add a default row and a key and we have ourselves another dimension which will eventually contain a hierarchy between series and episode.

Add default row for series and episode

we are now doing the same code blocks we created for Contestants.

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a sample DataFrame
data = [(-1, "Not Known","-1","Not Known")]
columns = ["seriesID", "series", "episodeID", "episodeTitle"]
new_row  = spark.createDataFrame(data, columns)

# Union the new row with the existing DataFrame
dfepisode = dfepisode.union(new_row)

# Show the updated DataFrame
dfepisode.show(1000)
#Just in case. We distinct again to remove any extra NA Rows
dfepisode = dfepisode.distinct()
dfepisode.show(1000)

Create the Episode Key

#Create an Episode Key
# Imports Window and row_number
from pyspark.sql import Window
from pyspark.sql.functions import row_number

# Applying partitionBy() and orderBy()
window_spec = Window.partitionBy().orderBy("episodeID")

# Add a new column "row_number" using row_number() over the specified window
dfepisode = dfepisode.withColumn("EpisodeKey", row_number().over(window_spec)- 2)

# Show the result
dfepisode.show(1000)

Create Delta PARQUET

What is the difference between Delta PARQUET and PARQUET

The Basic PARQUET File is the same, but we also have a _delta_log directory and checkpoint files. The Delta is simply a folder that contains these objects.

from delta.tables import DeltaTable

dfepisode.write.mode("overwrite").option("overwriteSchema", "true").format("delta").saveAsTable("dimEpisode")

We have re worked the code so we overwrite the schema also if it has changed to avoid errors

Create PARQUET

#Read the first file into a dataframe Teams
dftasks = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/tasks_by_objective.csv")

display(dftasks.distinct())

DimTask

Save a copy again and create DimTask

The Notebooks can be easily selected from the left hand pane and are automatically saved.

And in the workspace we can see that we have three new files to add

Our Last dimension before adding a dim date table will be tasks. After loading the tasks csv, it appears to be empty which is odd because it says there should be over 809 rows so something does appear to have gone wrong with the data source.

We need to look around the files and see what we can get.

We already know that the attempts contains tasks but doesn’t have as much data in it.

Getting Tasks from Attempts

#Read the first file into a dataframe
dfattempts = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/attempts.csv")
# dfattempts now is a Spark DataFrame containing CSV data from "Files/Data/Bronze/attempts.csv".
display(dfattempts)

attempts is read into this notebook as a DataFrame again because there is a task column in this table.

# Select multiple columns with aliases
dftaskat = dfattempts.select(
    dfattempts.task.alias('taskID'),
    dfattempts.task_label.alias('task_label')
).distinct()

# Show the resulting DataFrame
display(dftaskat)

Task Objectives

dftaskob = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/tasks_by_objective.csv")

display(dftaskob.distinct())

We have two task data source possibilities. Lets count the records

print(dftaskat.count())
print(dftaskob.count())

It would be useful to join them and then see what we have

First of all, a quick glance at both schemas helps

Update dftaskob with different column names

dftaskob = dftaskob.withColumnRenamed(“task”,”taskObID”).withColumnRenamed(“task_label”,”taskOb”)

display(dftaskob)

This is because we are going to join and its easier to rename the columns that would duplicate in the joined table before we get to the actual join.

Join dftaskob and dftaskat

# condition
condition = dftaskat["taskID"] == dftaskob["taskObID"]

# Now perform the left outer join
dftaskcheck = dftaskat.join(dftaskob, on=condition, how="full").drop(dftaskob.taskObID)

display(dftaskcheck)

It seems like good sense to create the condition(s) first to simplify the code block.

A Full outer join is used and taskObID is dropped because its part of the join so we don’t really need it.

Checking Tasks in Attempts but not in Objectives

filtered_df = dftaskcheck.filter(dftaskcheck['id'].isNull())
filtered_df.show(1000)

rows = filtered_df.count()
print(f"DataFrame Rows count: {rows}")

There are 165 rows in attempt, not in task objective

filtered_df = dftaskcheck.filter(dftaskcheck['taskid'].isNull())
filtered_df.show(1000)

rows = filtered_df.count()
print(f"DataFrame Rows count: {rows}")

There are 264 tasks in Task objective that aren’t in attempts

#How many match?
filtered_df = dftaskcheck.filter((dftaskcheck['task'] == dftaskcheck['taskOb']))
filtered_df.show(1000)

rows = filtered_df.count()
print(f"DataFrame Rows count where ID and Task ID are identical: {rows}")

157 tasks are in both task objective and attempts. Lets now try to get a full list.

COALESCE and REPLACE

# We are going to COALESCE to get the item that isnt null into a null field
from pyspark.sql.functions import coalesce
from pyspark.sql import functions as F

dftaskcheck = dftaskcheck.withColumn("newTaskID",coalesce(dftaskcheck.taskID,dftaskcheck.id))
dftaskcheck = dftaskcheck.withColumn("newTask",coalesce(dftaskcheck.task,dftaskcheck.taskOb))
#dftaskcheck= dftaskcheck.select(col("newTaskID").alias("taskID"), col("newtask").alias("task"), 
#col("objective").alias("objectiveID"),col("objective_label").alias("objective")).distinct()

dftask = dftaskcheck.select(
    dftaskcheck.newTaskID.alias('taskID'),
    dftaskcheck.newTask.alias('task'),
    dftaskcheck.objective_label.alias('objective'),
).distinct()


# Get Rid of â–² and â–¼
dftask = dftask.withColumn("objective", F.regexp_replace("objective", "â–²", ""))
dftask = dftask.withColumn("objective", F.regexp_replace("objective", "â–¼", ""))

display(dftask)

So here coalesce has been used to return the DataFrame with a new column that gives us the none null item.

Then to finish â–² and â–¼ are removed.

We now just need to clean up the null objectives. Lets just assume the null objectives are points.

Remove Null values using fillna and subset

dftask = dftask.fillna("points", subset=["objective"])
dftask.show(1000)

fillna() deals with missing values. It allows you to replace or fill in null values with specified values

Create default NA Row

We have done this code multiple times now so the blog post will just show the code and not screenshots

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a sample DataFrame
data = [(-1, "Not Known","Not Known")]
columns = ["taskID", "task", "objective"]
new_row  = spark.createDataFrame(data, columns)

# Union the new row with the existing DataFrame
dftask  = dftask.union(new_row)

# Show the updated DataFrame
dftask.show(1000) 

Running a DISTINCT just in case we applied the above more than once.

#Just in case. We distinct again to remove any extra NA Rows
dftask = dftask.distinct()
dftask.show(1000)

Creating the Contestant Key

# Imports Window and row_number
from pyspark.sql import Window
from pyspark.sql.functions import row_number

# Applying partitionBy() and orderBy()
window_spec = Window.partitionBy().orderBy("taskID")

# Add a new column "row_number" using row_number() over the specified window
dftask = dftask.withColumn("taskKey", row_number().over(window_spec)- 2)

# Show the result
dftask.show(1000)

Create DELTA PARQUET TABLE

from delta.tables import DeltaTable

dftask.write.mode("overwrite").format("delta").saveAsTable("dimTask")

CREATE PARQUET Table in Files Folder

dftask.write.mode("overwrite").parquet('Files/Data/Silver/DimTask.parquet')

Remember, In the context of the Lakehouse the tables created in the Files folder are external tables.

You don’t see external table in semantic model or SQL endpoint so when we get further with this exercise it will be interesting to see what we can do with both PARQUET files.

This is the end of the Task Notebook.

Dim Date

We have dates in our data which means, for a STAR schema we need a Date dimension and yes, We can create this in a Notebook.

Create a new notebook Dim Date

Start of again by configuring the Lakehouse and starting the spark session

There are many ways to do this. You could use SQL Magic, Or embed SQL into the Pyspark code. You could create it via a dataflow and send the result to the Lakehouse. This exercise is all about trying to do everything using PySpark Code.

# Import necessary modules
import datetime
#from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

# Create a Spark session
spark = SparkSession.builder.appName("DateRangeGenerator").getOrCreate()

# Define your start and end dates
start_date = '2020-01-01'
end_date = '2027-12-21'

# Convert start and end dates to datetime objects
start_datetime = datetime.datetime.strptime(start_date, '%Y-%m-%d')
end_datetime = datetime.datetime.strptime(end_date, '%Y-%m-%d')

# Calculate the number of days between start and end
num_days = (end_datetime - start_datetime).days

# Generate date range
dfdate = spark.createDataFrame([(start_datetime + datetime.timedelta(days=i),) for i in range(num_days)], ['calendarDate'])

# Optional: Convert to timestamp
dfdate = dfdate.withColumn('timestamp', col('calendarDate').cast(TimestampType()))

# Show the resulting DataFrame
dfdate.show()

Lets look at this in more detail:

We import the necessary modules:

  • import datetime datetime function is used to work with this format.
  • from pyspark.sql import SparkSession These are the entry points for working with Data Frames. ( I don’t think we need this because a Spark session is already running).
  • from pyspark.sql.functions import col, expr.
    • col is a function and allows you to reference a column in a DataFrame by its name. E.g. col(“Column_Name”)
    • expr is a function that allows you to execute SQL Like expressions with Pyspark.
  • from pyspark.sql.types import StructType, StructField, StringType, TimestampType
    • StructType is used to define a schema and represents a collection of fields.
    • StructField are these fields in the StructType. And the fields in question are strings and timestamps

And now into the main Code block

  • The Start_date and End_dates are defined and can be changed when necessary
  • Both dates are converted into datetime format Year Month and Day
  • Next we get the number of dates between start and end in .days
  • Then the date_df is generated.
    • range(num_days)
    • Generates a sequence of integers from 0. Each of these integers represents the number of dates from the start date.
    • [(start_datetime + timedelta(days=i),) for i in range(num_days)]:
    • This creates a list of tuples (Tuples are used to store multiple items in a single variable) so if there is a week of days, i would be 7 and we would have 7 rows of dates.
    • Finally we create another column which has been converted to timestamp. Which we will use moving forward.

Add Columns to DimDate

We are now going to set a range of columns up for Date

#You can have all the functions separated by a comma rather than repeating
from pyspark.sql.functions import year, month, dayofmonth, quarter,concat,lit, col, when, date_format #to get month name

# Create a new DataFrame with an alias for the 'timestamp' column
dfdate2 = dfdate.select(
    dfdate.timestamp.alias('date')
).distinct()


# Add a new column 'year' by extracting the year from the 'date' column
dfdate2 = dfdate2.withColumn('year', year(dfdate2['date']))

#Add the month no
dfdate2 = dfdate2.withColumn('monthNo', month(dfdate2['date']))

#And the month Name
dfdate2 = dfdate2.withColumn('monthName', date_format(dfdate2['date'], 'MMMM'))

#Add the day of month
dfdate2 = dfdate2.withColumn('dayofmonth', dayofmonth(dfdate2['date']))

#Add Quarter as a number
dfdate2 = dfdate2.withColumn('quarterNo', quarter(dfdate2['date']))

#Add Q to the start of Quarter for Power BI
dfdate2 = dfdate2.withColumn("quarter", concat(lit("Q"),col("quarterNo")))

#Create financial year
dfdate2 = dfdate2.withColumn('financialYear',
                   when(dfdate2['monthNo'] >= 5, dfdate2['year']).otherwise(dfdate2['year'] - 1))

#Create season 
dfdate2 = dfdate2.withColumn('season',
                            when((col('monthNo') >= 3) & (col('monthNo') <= 5), 'Spring')
                            .when((col('monthNo') >= 6) & (col('monthNo') <= 8), 'Summer')
                            .when((col('monthNo') >= 9) | (col('monthNo') <= 2), 'Summer')
                            .otherwise('Not known'))



# Show the resulting DataFrame
dfdate2.show(1000)

Create a current Day Flag

its always useful to have some date flags to use in Analysis

from pyspark.sql.functions import current_date, col

# Add a flag column indicating whether the date is the current day
dfdate2 = dfdate2.withColumn("currentDayFlag", col("date") == current_date())

# Show the result
dfdate2.filter(dfdate2['currentDayFlag'] == True).show()

Note that we can now quickly look at the current date (this would need to be updated every day)

Create a current month Flag

from pyspark.sql.functions import month, current_date

dfdate2 = dfdate2.withColumn("currentMonthFlag", month(dfdate2['date']) == month(current_date()))

# Show the result
dfdate2.filter(dfdate2['currentMonthFlag'] == True).show()

Create a current year Flag

from pyspark.sql.functions import year, current_date

dfdate2 = dfdate2.withColumn("currentYearFlag", year(dfdate2['date']) == year(current_date()))

# Show the result
dfdate2.filter(dfdate2['currentYearFlag'] == True).show()

Create the Date Key

For this we need Year, Month and Day to create an integer key which will be recreated in the fact table

from pyspark.sql.functions import concat_ws, lpad, col

# Assuming you have a DataFrame named 'df' with columns 'year', 'month', and 'day'
dfdate2 = dfdate2.withColumn('DateKey',
                   concat_ws('', col('year'),
                             lpad(col('monthNo'), 2, '0'),
                             lpad(col('dayOfMonth'), 2, '0')).cast('int'))

dfdate2.show()
  • lpad is used to left pad a string column
  • concat_ws is used to concatename string columns into a single column

Save as Delta PARQUET and PARQUET

from delta.tables import DeltaTable

dfdate2.write.mode("overwrite").format("delta").saveAsTable("dimDate")
dfdate2.write.mode("overwrite").parquet('Files/Data/Silver/DimDate.parquet')

Update Source Control

Clicking back onto the Fabric Workspace. Make sure you update Source Control

Conclusion

In part three, we created individual notebooks for each dimensions.

We now have DimTask, DimEpisode, DimContestant and Dimdate

Next we want to start on the facts. And we want to revisit each dimension to see if we are happy with the types of each column.

Tell me what you would do? Would you have one notebook with everything in one? Or split up as I have done?

How would you deal with the importing of all the functions? Is there anything I can do better here?

Are there better ways of writing the code?

Part three and Im starting to really enjoy working with Pyspark. the takeaway is that I can use my SQL brain so its starting to really work for me.

So many questions though. for example, we can do full and appends to write the data but what about a more incremental approach?

Design a site like this with WordPress.com
Get started