Microsoft Fabric Part 2. Taskmaster Project. Updating a dim Contestants table in Notebook and Delta PARQUET File

This blog is the next step for

Microsoft Fabric Part 1. Taskmaster Project. Creating a dim Contestants table in Notebook and Delta PARQUET File

In Part 1, a Contestants Dimension was created from the csv file attempts csv. The data from this was transformed and added into a PARQUET file and a DELTA PARQUET file so we can test the differences later.

There are more csv files to look at and one specifically gives us more details for the Contestants dim

people.csv

The task here is to go back to the original Notebook and see if we can add this new information

The first bit of code to create goes after the following blocks of code.

  • Starting Spark Session
  • Reading attempts.csv into a dataframe
  • Renaming columns and creating dfattempttrans
  • creating the distinct dfcont from dtattempts (Creating the Contestants Dim)
#Read the first file into a dataframe Teams
dfpeople = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/people.csv")
# dfattempts now is a Spark DataFrame containing CSV data from "Files/Data/Bronze/people.csv".

#And rename a couple of columns
dfpeople = dfpeople.withColumnRenamed("id","contestantID").withColumnRenamed("name","contestant")
display(dfpeople)

We have used .withColumnRenamed to change a couple of columns.

ID to ContestantID and name to contestant

Before moving on. the assumption is that People is a distinct list.

Lets count the rows in each table so far

row_countp = dfpeople.count()
row_countc = dfcont.count()
print(f"Number of rows in the dfpeople: {row_countp}")
print(f"Number of rows in the dfcont: {row_countc}")

There is a lot more people in our new DataFrame. But are they unique?

#We need to test. Do we have unique Contestants?

#We need to import a function for col
from pyspark.sql.functions import *

dfpeople.groupBy('contestant')\
       .agg(min('contestantID').alias('MinContestantID'),count('contestant').alias('TotalRecords'))\
       .filter(col('TotalRecords')>1).show(1000)

Using \ allows us to break the code onto separate lines

We have the same issue here where some of the people have more than one record so lets filter a few

filtered_df = dfpeople.filter(dfpeople['name'] == 'Liza Tarbuck')
filtered_df.show()

The original assumption in Part 1 was incorrect. We have multiple based on the Champion of Champion series here. So the ID is not for the contestant. it’s for the contestant and series and some people have been on multiple series.

Do the IDs match between the tables?

filtered_df = dfcont.filter(dfcont['contestant'] == 'Bob Mortimer')
filtered_df.show()

Yes. So we need to rethink dim contestants. Actually if they exist in both we only need to use people. So we need to join them and check out if this is the case

Checking values exist in People that we can see in attempts rename Columns

We want to join the contestants that we obtained the attempts table to dfpeople. Simply so we can check what is missing?

We know we have more in people. But what happens if there are values missing in people than are in attempts? This will cause us more problems.

We will join on the ID and the contestant so before the join. Lets rename the dfCont columns so we don’t end up with duplicate column names

#And rename a couple of columns
dfcont = dfcont.withColumnRenamed("contestantID","cont_contestantID").withColumnRenamed("contestant","cont_contestant")

display(dfcont)

join dfCont and dfPeople

Please note that when joining tables, its better to create a new DataFrame rather than update an original one, because if you have to run it again, the DataFrame you join from will have changed.

# Original condition
original_condition = dfpeople["contestantID"] == dfcont["cont_contestantID"]

# Adding another condition based on the 'name' column
combined_condition = original_condition & (dfpeople["contestant"] == dfcont["cont_contestant"])

# Now perform the left outer join
dfcontcheck = dfpeople.join(dfcont, on=combined_condition, how="full")

display(dfcontcheck)

So we have everything in one table. Lets do some filtering to check our findings

Filter for Null Values

from pyspark.sql import functions as F
dfcontcheck.filter(F.isnull("cont_contestantID")).show(1000)

72 records that don’t exist in dfattempt which is fine

from pyspark.sql import functions as F
dfcontcheck.filter(F.isnull("contestantID")).show(1000)

And none in attempts that are in people.

People is therefore our master data set and the PySpark code can be changed accordingly.

At the moment there is a lot of checks happening in the PySpark code. Once up and running and ready to be turned into a proper transformation notebook. All these checks could be removed if required to simplify.

Updating the Code with the new Logic

We are taking the code from Part 1 and completely updating based on the new findings

Start Spark session

%%configure
{
    "defaultLakehouse": {  
        "name": "DebbiesFabricLakehouse"
    }
}

Read People into dfPeople DataFrame

#Read the first file into a dataframe Teams
dfpeople = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/people.csv")
# dfattempts now is a Spark DataFrame containing CSV data from "Files/Data/Bronze/people.csv".

#And rename a couple of columns
dfpeopletransf  = dfpeople.withColumnRenamed("id","contestantID").withColumnRenamed("name","contestant")

print(dfpeopletransf.count())
print(dfpeopletransf.distinct().count())
display(dfpeopletransf.distinct())

.withColumnRenamed brings back the dfpeople data set but renames the specified columns

Create dfCont to create our initial contestants DataFrame

Previously we had a lot of code blocks checking and creating Min IDs for a distinct contestant list. but now we have more of an understanding we can do this with less code.

#Lets immediately do the group by and min value now we know what we are doing to create a distinct list (Removing any IDs for a different series)

#import the function col to use. 
from pyspark.sql.functions import col

#Creates the full distinct list. 

dfcont = dfpeopletransf.groupBy('contestant', 'dob', 'gender','hand' )\
       .agg(min('contestantID').alias('contestantID'),count('contestant').alias('TotalRecords'))


display(dfcont)

groupby. all the distinct values we want. Contestant, dob, gender and hand all are unique with the name. (Previously we just had contestant)

Next we agg (aggregate with min contestantID and cont the number of contestants as total records

If you are used to SQL you would see the following

SELECT MIN(contestantID) AS contestantID, contestant, dob, gender, hand, Count(*) AS TotalRecords
FROM dfpeopletransf
GROUP BY contestant, dob, gender, hand

Create a Current Age

# Create a Spark session
#spark = SparkSession.builder.appName("AgeCalculation").getOrCreate()

# Convert the date of birth column to a date type in dfCont
dfcont = dfcont.withColumn("dob", dfcont["dob"].cast("date"))

# add the current date and set up into a new dataframe
df = dfcont.withColumn("current_date", current_date())

#Calculate age using dob and current date 
df = df.withColumn("age_decimal", datediff(df["current_date"], df["dob"]) / 365)

#Convert from decimal to int  
df = df.withColumn("age", col("age_decimal").cast("int"))

#Convert Customer ID to int
df = df.withColumn("contestantID", col("contestantID").cast("int"))

#Update dfCont
dfcont = df.select("contestantID", "contestant", "dob","gender","hand", "age")

dfcont.show(1000)

It would be fun to filter by Age and Age Group. lets create the age first based on the current Date,

  • First of all we ensure date is a date .cast(“date”)
  • Then using .withColumn we add the current date to the data frame current_date()
  • Next. we use datediff to get the age from the date of birth and the current date
  • Set age to int and customerID to int
  • Next update dfCont with the full list of columns (Excluding current_date because that was only needed to create the age.

Create Age group

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col

# Create a Spark session
#spark = SparkSession.builder.appName("ConditionalLogic").getOrCreate()

# Apply conditional logic
dfcont = dfcont.withColumn("ageRange", when(col("age") < 20, "0 to 20")
                               .when((col("age") >= 20) & (col("age") <= 30), "20 to 30")
                               .when((col("age") >= 30) & (col("age") <= 40), "30 to 40")
                               .when((col("age") >= 40) & (col("age") <= 50), "40 to 50")
                               .when((col("age") >= 50) & (col("age") <= 60), "50 to 60")
                               .otherwise("Over 60"))

# Show the result
dfcont.show()

We have created what would be a CASE in SQL

using .WithColumn to create a new column. .when .otherwise are used to create this new data item

Create New Row for -1 NA Default Row

We are back to almost the original code. Creating our Default row but this time with more columns

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a sample DataFrame
data = [(-1, "Not Known","1900-01-01","NA","NA","0","NA")]
columns = ["contestantID", "contestant", "dob", "gender", "hand", "age", "ageRange"]
new_row  = spark.createDataFrame(data, columns)

# Union the new row with the existing DataFrame
dfcont = dfcont.union(new_row)

# Show the updated DataFrame
#dfcont.show(1000)

dfcont.show(1000)

We can now see Not known in the list

Running a DISTINCT just in case we applied the above more than once.

#Just in case. We distinct again to remove any extra NA Rows
dfcont = dfcont.distinct()
dfcont.show(1000)

Checking we have unique Contestants GROUPBY

#And lets check the new distinct list with Min IDs we have made
#We need to import a function for col
from pyspark.sql.functions import *

dfcont.groupBy('contestant', 'dob', 'age','ageRange', 'hand','gender')\
        .agg(count('contestant').alias('TotalRecords'))\
        .filter(col('TotalRecords')>1).show(1000)


#Hopefully there should be no records

We have all unique contestants, including the extra information we have added to the dimension.

Creating the Contestant Key

We can now create slightly better code than Part 1 using less code blocks

#Now finally for this we need a Key a new column
# Imports Window and row_number
from pyspark.sql import Window
from pyspark.sql.functions import row_number

# Applying partitionBy() and orderBy()
window_spec = Window.partitionBy().orderBy("ContestantID")

# Add a new column "row_number" using row_number() over the specified window
dfcont = dfcont.withColumn("ContestantKey", row_number().over(window_spec)- 2)

# Show the result
dfcont.show(1000)

Previously a partition column with the value 1 was added so we could do the following: .partitionBy(“partition”)

But simply using partitionBy() and not including a column removes the need for the extra code and column and does the same thing.

Adding the Team

Previously the Team was added from attempts.csv but we know that we can change this to people now and have better team information.

#Creates the full distinct list. 
dfContTeam = dfpeopletransf.select(col("contestantID").alias("teamcontestantID"), col("contestant").alias("teamContestant"), 
col("team").alias("teamID"),col("team_label").alias("TeamName")).distinct()

display(dfContTeam)

Remove Rows with Empty team information

#Now remove the rows with null teamids

# Import necessary libraries and create a Spark session
#from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()

# Drop rows with NULL values in the "id" column
dfContTeam = dfContTeam.na.drop(subset=["teamId"])

# Show the filtered DataFrame
dfContTeam.show(truncate=False)

Join team to Contestant

#Now what we want to do is add TeamName into dfcont

dfcont = dfcont.join(dfContTeam, dfcont["contestant"] == dfContTeam["teamContestant"], how="left").drop(dfContTeam.teamContestant).drop(dfContTeam.teamcontestantID)

display(dfcont)

This time we cleaned up the code by adding the two columns to drop in the above statement so we don’t have two names and IDs from both data sets.

Create DELTA PARQUET TABLE

from delta.tables import DeltaTable

dfcont.write.mode("overwrite").format("delta").saveAsTable("dimContestant")

Because we are overwriting our new data set should overwrite our old Dimension. However, there will be conflict issues because the schema is so different so for this exercise, The files will be removed before being re-added.

CREATE PARQUET Table in Files Folder

dfcont.write.mode("overwrite").parquet('Files/Data/Silver/DimContestant.parquet')

We now have options of using the Delta PARQUET, or just the PARQUET File.

Conclusion

In Part 1 we used attempts csv for contestants. For Part 2 we did lots of analysis and checked that actually, people was a better data source for contestants. So we changed the input for the initial DataFrame to people.csv

We also used the opportunity to tidy up the code, now we are getting to grips with Pyspark as an alternative to SQL.

Next on the list is to create more dimensions.

  • Dim Task
  • Dim Episode
  • Dim Date

And the Taskmaster Stats Fact Table

Then to do some analysis, using the inbuilt visuals in a Notebook, and Power BI.

The Dos and Donts of Power BI relationships and Modelling

Bad Modelling 1: Single flat file table e.g. Salesextract

When people first start out using Power BI as their Analytics platform, there is a tendency to say, lets import all the data in one big flat file, like an Excel worksheet.

This way of working is just not well organised and doesn’t give you a friendly analytics structure.

Avoid Wide Tables

Narrow tables are much better to work with in Power BI. As the data volumes grows it will affect performance and bloat your model and become inefficient. then, when you create measures, things will start getting even more overly complex in the one long and wide table.

Not to mention the point when you have to add another table and create joins. You may be faced with the many to many join because of your wide table.

STAR SCHEMA are the recommended approach to modelling in Power BI

Stars with a few Snowflaked dimensions are also ok.

If you have a flat file wide table its always important to convert to an above data model with narrow dimension tables and a fact table in the middle with all your measures.

Remember, Chaos is a flat file.

Model Relationships propagate filters to other tables.

In this example the ProductID propagates down to the sales table. 1 Product can be sold many times. (1 to many)

With a snowflake you can add another level

CategoryA Propagates down to the Sales Fact table

Deliver the right number of tables with the right relationships in place.

Power BI was designed for the people who never had to think about the design of data warehouses. originally, this self service tool would allow any one with little or no knowledge of best practice to import data from their own sources, excel spreadsheets, databases etc without any knowledge of how they were set up.

This becomes an issue when the recommended Power BI model is the fact and dimension schemas as above.

Understanding OLAP models go a long way to helping you set up Power BI

  • Dimensions Filter and group
  • Facts Summarise measures

Bad Modelling 2: Direct Query your Transactional Database

When you connect up to OLTP and drag in all your tables ( there may be hundreds of them) using Direct Query there are lots of things to consider.

the overall performance depends on the underlying data source

When you have lots of users opening shared reports, lots of visuals are refreshed and queries are sent to the underlying source. This means that the source MUST handle these query loads for all your users AND maintain reasonable performance for those using the OLTP as they enter data.

You are not the most important person in this scenario. The person(s) using the database to add data is the most important person

OLTP is designed for speedy data input. OLAP is designed for speedy retrieval of data for analytics. These are to very different things.

With OLTP, you have row-Store indexes (Clustered Index, Non-Clustered Index) and these are slow for data analysis. They are perfect for OLTP style workloads. Data Warehouse queries, consume a huge amount of data, this is another reason why using OLTP as your direct query data source isn’t the best approach.

Also your Direct Query means you loose a fair amount of DAX functionality time time based DAX calculations, What if Parameters, etc.

I was chatting to someone about this on the forums and they gave me a fantastic analogy

When you connect into a transactional database with Direct Query, its like being in a busy restaurant and getting all the customers to go and get their food from the kitchen.

It slows down the customers because of the layout of the kitchen. They don’t know where anything is, and other customers are also milling around trying to find where their starter is.

the Kitchen staff who are now trying to prepare the food are having to fight for physical space. Look at the pastry chef, trying to work around 10 customers asking where their various desserts are?

So you set up a reporting area. This is where the food gets placed, someone shouts service and a waiter will go and speedily deliver the course to the correct table.

No one needs to go into the kitchen unless they are in food prep. Everything works in the most efficient way.

Model relationships Dos

  • Only 1 ID to One ID. If you have composite keys they need to be merged
  • No recursive Relationships (relationships that go back to the same table. the example always used for this is the managerID in the employer table
  • the Cardinality is 1 to many. 1 to 1. many to one. (Many to Many needs a specific approach in Power BI)
  • Cardinality determines whether it has filter group behavior or summarise behavior
  • There can only be one active path (relationship) Between two tables. All your other paths will be inactive (But you can set up DAX to use them)

In this example OrderDateKey is the active relationship because we use this the most and joins to DateKey

ShipdateKey and DueDateKey also join to DateKey in the date table and are inactive.

DAX Functions for Relationships to help with modelling decisions

RELATED

When creating calculated columns you can only include fields from the same table. Unless you use RELATED

For example, I’m adding the column Colour into the SalesOrderDetail table which has a Many to One join to Products •Colour = RELATED(Products[Colour])

RELATED allows you to use data from the one side in the many side of the join

RELATEDTABLE

RELATEDTABLE Uses data from the Many side of the Join

TotalSales = SUMX(RELATEDTABLE(SalesOrderDetail),SalesOrderDetail[LineTotal])

USERELATIONSHIP

Forces you to use a relationship instead of the active relationship

=CALCULATE(SUM(InternetSales[SalesAmount]), USERELATIONSHIP(InternetSales[DueDate], DateTime[Date]))

CROSSFILTER

Modifies the filter direction Disables propagation. You can actually do this in the model by changing the filter to both directions instead of single. OR you can do it for a specific DAX query using CROSSFILTER

TREATAS

Create virtual relationships between tables

CALCULATE([Total Budget],

TREATAS(VALUES(Dates[Year]),’Unconnected Budged Data'[Year]))

Our Unconnected Budgeted Data is in Year only and its not joined to our main model.

Here we connect up to Year in Date. then we can create a visal with Date from the Date dimension. Total sales from our connected data which is at daily level and Total Budget from our unconnected budgeted data at a different level of granularity.

PATH

Naturalise a recursive relationship with the PATH function

Getting your model right and understanding your data sources is the most important thing to get right with Power BI. make sure you don’t have lots of headaches six months into your project. Its better to spend the time now, than having to start again later.

Design a site like this with WordPress.com
Get started