Microsoft Fabric Part 3. Taskmaster Project. Adding Dimensions: Tasks, Episodes and Date to Notebooks and creating Delta PARQUET and PARQUET Files

In Parts 1 and 2, we did a lot of analysis on the data. Understood Contestants and the data between the csv files attempts and people. Its now time to create some of the other dimensions we need for the project.

  • DimTasks
  • DimEpisodes
  • DimDate

Back to our Taskmaster Notebook to see what other data is available to use. But first of all. When creating stored procedures to transform data on SQL DB, The stored procedures are usually split into Dims and Facts. Its time to split this notebook up into the corresponding dims and facts.

Dim Contestants

A copy of the original file is saved as DimContestants. Then the new notebook is opened and everything Customer related is saved, including the initial configuring of the Lakehouse.

The notebook ends with the creation of the overwritten files.

The Code

spark.stop()

Is more interesting. Do we want to stop Spark after every dimension and fact creation. Or only at the end of the process?

One to think about but for this, lets have this after every notebook has run.

DimEpisode

Another copy is made to create DimEpisode. everything is removed apart from code that can relate to DimEpisode.

#Read the first file into a dataframe Teams
dfep = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/episodes (1).csv")
# dfattempts now is a Spark DataFrame containing CSV data from "Files/Data/Bronze/episodes (1).csv".

#And rename a couple of columns
#dfpeopletransf  = dfpeople.withColumnRenamed("id","contestantID").withColumnRenamed("name","contestant")

display(dfep.distinct())

We have some key information like dates in here but we will save these as keys for the fact table when we get to the fact table.

There is also a really important fact in here. Points.

Create Series and Episodes

# Select multiple columns with aliases
dfepisode = dfep.select(
    dfep.series.alias('seriesID'),
    dfep.series_label.alias('series'),
        dfep.episode.alias('episodeID'),
    dfep.title.alias('episodeTitle')
).distinct()

# Show the resulting DataFrame
display(dfepisode)

Check that we have distinct Episodes

#Lets immediately do the group by and min value now we know what we are doing to create a distinct list (Removing any IDs for a different series)

#import the function col to use. 
from pyspark.sql.functions import col
from pyspark.sql.functions import count as _count

#Creates the full distinct list. 

dfepisode.groupBy('Series', 'episodeID', 'episodeTitle' )\
       .agg(_count('episodeTitle').alias('TotalRecords'))\
       .filter(col('TotalRecords')>1).show(1000)

This looks good. We just need to add a default row and a key and we have ourselves another dimension which will eventually contain a hierarchy between series and episode.

Add default row for series and episode

we are now doing the same code blocks we created for Contestants.

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a sample DataFrame
data = [(-1, "Not Known","-1","Not Known")]
columns = ["seriesID", "series", "episodeID", "episodeTitle"]
new_row  = spark.createDataFrame(data, columns)

# Union the new row with the existing DataFrame
dfepisode = dfepisode.union(new_row)

# Show the updated DataFrame
dfepisode.show(1000)
#Just in case. We distinct again to remove any extra NA Rows
dfepisode = dfepisode.distinct()
dfepisode.show(1000)

Create the Episode Key

#Create an Episode Key
# Imports Window and row_number
from pyspark.sql import Window
from pyspark.sql.functions import row_number

# Applying partitionBy() and orderBy()
window_spec = Window.partitionBy().orderBy("episodeID")

# Add a new column "row_number" using row_number() over the specified window
dfepisode = dfepisode.withColumn("EpisodeKey", row_number().over(window_spec)- 2)

# Show the result
dfepisode.show(1000)

Create Delta PARQUET

What is the difference between Delta PARQUET and PARQUET

The Basic PARQUET File is the same, but we also have a _delta_log directory and checkpoint files. The Delta is simply a folder that contains these objects.

from delta.tables import DeltaTable

dfepisode.write.mode("overwrite").option("overwriteSchema", "true").format("delta").saveAsTable("dimEpisode")

We have re worked the code so we overwrite the schema also if it has changed to avoid errors

Create PARQUET

#Read the first file into a dataframe Teams
dftasks = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/tasks_by_objective.csv")

display(dftasks.distinct())

DimTask

Save a copy again and create DimTask

The Notebooks can be easily selected from the left hand pane and are automatically saved.

And in the workspace we can see that we have three new files to add

Our Last dimension before adding a dim date table will be tasks. After loading the tasks csv, it appears to be empty which is odd because it says there should be over 809 rows so something does appear to have gone wrong with the data source.

We need to look around the files and see what we can get.

We already know that the attempts contains tasks but doesn’t have as much data in it.

Getting Tasks from Attempts

#Read the first file into a dataframe
dfattempts = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/attempts.csv")
# dfattempts now is a Spark DataFrame containing CSV data from "Files/Data/Bronze/attempts.csv".
display(dfattempts)

attempts is read into this notebook as a DataFrame again because there is a task column in this table.

# Select multiple columns with aliases
dftaskat = dfattempts.select(
    dfattempts.task.alias('taskID'),
    dfattempts.task_label.alias('task_label')
).distinct()

# Show the resulting DataFrame
display(dftaskat)

Task Objectives

dftaskob = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/tasks_by_objective.csv")

display(dftaskob.distinct())

We have two task data source possibilities. Lets count the records

print(dftaskat.count())
print(dftaskob.count())

It would be useful to join them and then see what we have

First of all, a quick glance at both schemas helps

Update dftaskob with different column names

dftaskob = dftaskob.withColumnRenamed(“task”,”taskObID”).withColumnRenamed(“task_label”,”taskOb”)

display(dftaskob)

This is because we are going to join and its easier to rename the columns that would duplicate in the joined table before we get to the actual join.

Join dftaskob and dftaskat

# condition
condition = dftaskat["taskID"] == dftaskob["taskObID"]

# Now perform the left outer join
dftaskcheck = dftaskat.join(dftaskob, on=condition, how="full").drop(dftaskob.taskObID)

display(dftaskcheck)

It seems like good sense to create the condition(s) first to simplify the code block.

A Full outer join is used and taskObID is dropped because its part of the join so we don’t really need it.

Checking Tasks in Attempts but not in Objectives

filtered_df = dftaskcheck.filter(dftaskcheck['id'].isNull())
filtered_df.show(1000)

rows = filtered_df.count()
print(f"DataFrame Rows count: {rows}")

There are 165 rows in attempt, not in task objective

filtered_df = dftaskcheck.filter(dftaskcheck['taskid'].isNull())
filtered_df.show(1000)

rows = filtered_df.count()
print(f"DataFrame Rows count: {rows}")

There are 264 tasks in Task objective that aren’t in attempts

#How many match?
filtered_df = dftaskcheck.filter((dftaskcheck['task'] == dftaskcheck['taskOb']))
filtered_df.show(1000)

rows = filtered_df.count()
print(f"DataFrame Rows count where ID and Task ID are identical: {rows}")

157 tasks are in both task objective and attempts. Lets now try to get a full list.

COALESCE and REPLACE

# We are going to COALESCE to get the item that isnt null into a null field
from pyspark.sql.functions import coalesce
from pyspark.sql import functions as F

dftaskcheck = dftaskcheck.withColumn("newTaskID",coalesce(dftaskcheck.taskID,dftaskcheck.id))
dftaskcheck = dftaskcheck.withColumn("newTask",coalesce(dftaskcheck.task,dftaskcheck.taskOb))
#dftaskcheck= dftaskcheck.select(col("newTaskID").alias("taskID"), col("newtask").alias("task"), 
#col("objective").alias("objectiveID"),col("objective_label").alias("objective")).distinct()

dftask = dftaskcheck.select(
    dftaskcheck.newTaskID.alias('taskID'),
    dftaskcheck.newTask.alias('task'),
    dftaskcheck.objective_label.alias('objective'),
).distinct()


# Get Rid of â–² and â–¼
dftask = dftask.withColumn("objective", F.regexp_replace("objective", "â–²", ""))
dftask = dftask.withColumn("objective", F.regexp_replace("objective", "â–¼", ""))

display(dftask)

So here coalesce has been used to return the DataFrame with a new column that gives us the none null item.

Then to finish â–² and â–¼ are removed.

We now just need to clean up the null objectives. Lets just assume the null objectives are points.

Remove Null values using fillna and subset

dftask = dftask.fillna("points", subset=["objective"])
dftask.show(1000)

fillna() deals with missing values. It allows you to replace or fill in null values with specified values

Create default NA Row

We have done this code multiple times now so the blog post will just show the code and not screenshots

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a sample DataFrame
data = [(-1, "Not Known","Not Known")]
columns = ["taskID", "task", "objective"]
new_row  = spark.createDataFrame(data, columns)

# Union the new row with the existing DataFrame
dftask  = dftask.union(new_row)

# Show the updated DataFrame
dftask.show(1000) 

Running a DISTINCT just in case we applied the above more than once.

#Just in case. We distinct again to remove any extra NA Rows
dftask = dftask.distinct()
dftask.show(1000)

Creating the Contestant Key

# Imports Window and row_number
from pyspark.sql import Window
from pyspark.sql.functions import row_number

# Applying partitionBy() and orderBy()
window_spec = Window.partitionBy().orderBy("taskID")

# Add a new column "row_number" using row_number() over the specified window
dftask = dftask.withColumn("taskKey", row_number().over(window_spec)- 2)

# Show the result
dftask.show(1000)

Create DELTA PARQUET TABLE

from delta.tables import DeltaTable

dftask.write.mode("overwrite").format("delta").saveAsTable("dimTask")

CREATE PARQUET Table in Files Folder

dftask.write.mode("overwrite").parquet('Files/Data/Silver/DimTask.parquet')

Remember, In the context of the Lakehouse the tables created in the Files folder are external tables.

You don’t see external table in semantic model or SQL endpoint so when we get further with this exercise it will be interesting to see what we can do with both PARQUET files.

This is the end of the Task Notebook.

Dim Date

We have dates in our data which means, for a STAR schema we need a Date dimension and yes, We can create this in a Notebook.

Create a new notebook Dim Date

Start of again by configuring the Lakehouse and starting the spark session

There are many ways to do this. You could use SQL Magic, Or embed SQL into the Pyspark code. You could create it via a dataflow and send the result to the Lakehouse. This exercise is all about trying to do everything using PySpark Code.

# Import necessary modules
import datetime
#from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

# Create a Spark session
spark = SparkSession.builder.appName("DateRangeGenerator").getOrCreate()

# Define your start and end dates
start_date = '2020-01-01'
end_date = '2027-12-21'

# Convert start and end dates to datetime objects
start_datetime = datetime.datetime.strptime(start_date, '%Y-%m-%d')
end_datetime = datetime.datetime.strptime(end_date, '%Y-%m-%d')

# Calculate the number of days between start and end
num_days = (end_datetime - start_datetime).days

# Generate date range
dfdate = spark.createDataFrame([(start_datetime + datetime.timedelta(days=i),) for i in range(num_days)], ['calendarDate'])

# Optional: Convert to timestamp
dfdate = dfdate.withColumn('timestamp', col('calendarDate').cast(TimestampType()))

# Show the resulting DataFrame
dfdate.show()

Lets look at this in more detail:

We import the necessary modules:

  • import datetime datetime function is used to work with this format.
  • from pyspark.sql import SparkSession These are the entry points for working with Data Frames. ( I don’t think we need this because a Spark session is already running).
  • from pyspark.sql.functions import col, expr.
    • col is a function and allows you to reference a column in a DataFrame by its name. E.g. col(“Column_Name”)
    • expr is a function that allows you to execute SQL Like expressions with Pyspark.
  • from pyspark.sql.types import StructType, StructField, StringType, TimestampType
    • StructType is used to define a schema and represents a collection of fields.
    • StructField are these fields in the StructType. And the fields in question are strings and timestamps

And now into the main Code block

  • The Start_date and End_dates are defined and can be changed when necessary
  • Both dates are converted into datetime format Year Month and Day
  • Next we get the number of dates between start and end in .days
  • Then the date_df is generated.
    • range(num_days)
    • Generates a sequence of integers from 0. Each of these integers represents the number of dates from the start date.
    • [(start_datetime + timedelta(days=i),) for i in range(num_days)]:
    • This creates a list of tuples (Tuples are used to store multiple items in a single variable) so if there is a week of days, i would be 7 and we would have 7 rows of dates.
    • Finally we create another column which has been converted to timestamp. Which we will use moving forward.

Add Columns to DimDate

We are now going to set a range of columns up for Date

#You can have all the functions separated by a comma rather than repeating
from pyspark.sql.functions import year, month, dayofmonth, quarter,concat,lit, col, when, date_format #to get month name

# Create a new DataFrame with an alias for the 'timestamp' column
dfdate2 = dfdate.select(
    dfdate.timestamp.alias('date')
).distinct()


# Add a new column 'year' by extracting the year from the 'date' column
dfdate2 = dfdate2.withColumn('year', year(dfdate2['date']))

#Add the month no
dfdate2 = dfdate2.withColumn('monthNo', month(dfdate2['date']))

#And the month Name
dfdate2 = dfdate2.withColumn('monthName', date_format(dfdate2['date'], 'MMMM'))

#Add the day of month
dfdate2 = dfdate2.withColumn('dayofmonth', dayofmonth(dfdate2['date']))

#Add Quarter as a number
dfdate2 = dfdate2.withColumn('quarterNo', quarter(dfdate2['date']))

#Add Q to the start of Quarter for Power BI
dfdate2 = dfdate2.withColumn("quarter", concat(lit("Q"),col("quarterNo")))

#Create financial year
dfdate2 = dfdate2.withColumn('financialYear',
                   when(dfdate2['monthNo'] >= 5, dfdate2['year']).otherwise(dfdate2['year'] - 1))

#Create season 
dfdate2 = dfdate2.withColumn('season',
                            when((col('monthNo') >= 3) & (col('monthNo') <= 5), 'Spring')
                            .when((col('monthNo') >= 6) & (col('monthNo') <= 8), 'Summer')
                            .when((col('monthNo') >= 9) | (col('monthNo') <= 2), 'Summer')
                            .otherwise('Not known'))



# Show the resulting DataFrame
dfdate2.show(1000)

Create a current Day Flag

its always useful to have some date flags to use in Analysis

from pyspark.sql.functions import current_date, col

# Add a flag column indicating whether the date is the current day
dfdate2 = dfdate2.withColumn("currentDayFlag", col("date") == current_date())

# Show the result
dfdate2.filter(dfdate2['currentDayFlag'] == True).show()

Note that we can now quickly look at the current date (this would need to be updated every day)

Create a current month Flag

from pyspark.sql.functions import month, current_date

dfdate2 = dfdate2.withColumn("currentMonthFlag", month(dfdate2['date']) == month(current_date()))

# Show the result
dfdate2.filter(dfdate2['currentMonthFlag'] == True).show()

Create a current year Flag

from pyspark.sql.functions import year, current_date

dfdate2 = dfdate2.withColumn("currentYearFlag", year(dfdate2['date']) == year(current_date()))

# Show the result
dfdate2.filter(dfdate2['currentYearFlag'] == True).show()

Create the Date Key

For this we need Year, Month and Day to create an integer key which will be recreated in the fact table

from pyspark.sql.functions import concat_ws, lpad, col

# Assuming you have a DataFrame named 'df' with columns 'year', 'month', and 'day'
dfdate2 = dfdate2.withColumn('DateKey',
                   concat_ws('', col('year'),
                             lpad(col('monthNo'), 2, '0'),
                             lpad(col('dayOfMonth'), 2, '0')).cast('int'))

dfdate2.show()
  • lpad is used to left pad a string column
  • concat_ws is used to concatename string columns into a single column

Save as Delta PARQUET and PARQUET

from delta.tables import DeltaTable

dfdate2.write.mode("overwrite").format("delta").saveAsTable("dimDate")
dfdate2.write.mode("overwrite").parquet('Files/Data/Silver/DimDate.parquet')

Update Source Control

Clicking back onto the Fabric Workspace. Make sure you update Source Control

Conclusion

In part three, we created individual notebooks for each dimensions.

We now have DimTask, DimEpisode, DimContestant and Dimdate

Next we want to start on the facts. And we want to revisit each dimension to see if we are happy with the types of each column.

Tell me what you would do? Would you have one notebook with everything in one? Or split up as I have done?

How would you deal with the importing of all the functions? Is there anything I can do better here?

Are there better ways of writing the code?

Part three and Im starting to really enjoy working with Pyspark. the takeaway is that I can use my SQL brain so its starting to really work for me.

So many questions though. for example, we can do full and appends to write the data but what about a more incremental approach?

Microsoft Fabric Part 2. Taskmaster Project. Updating a dim Contestants table in Notebook and Delta PARQUET File

This blog is the next step for

Microsoft Fabric Part 1. Taskmaster Project. Creating a dim Contestants table in Notebook and Delta PARQUET File

In Part 1, a Contestants Dimension was created from the csv file attempts csv. The data from this was transformed and added into a PARQUET file and a DELTA PARQUET file so we can test the differences later.

There are more csv files to look at and one specifically gives us more details for the Contestants dim

people.csv

The task here is to go back to the original Notebook and see if we can add this new information

The first bit of code to create goes after the following blocks of code.

  • Starting Spark Session
  • Reading attempts.csv into a dataframe
  • Renaming columns and creating dfattempttrans
  • creating the distinct dfcont from dtattempts (Creating the Contestants Dim)
#Read the first file into a dataframe Teams
dfpeople = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/people.csv")
# dfattempts now is a Spark DataFrame containing CSV data from "Files/Data/Bronze/people.csv".

#And rename a couple of columns
dfpeople = dfpeople.withColumnRenamed("id","contestantID").withColumnRenamed("name","contestant")
display(dfpeople)

We have used .withColumnRenamed to change a couple of columns.

ID to ContestantID and name to contestant

Before moving on. the assumption is that People is a distinct list.

Lets count the rows in each table so far

row_countp = dfpeople.count()
row_countc = dfcont.count()
print(f"Number of rows in the dfpeople: {row_countp}")
print(f"Number of rows in the dfcont: {row_countc}")

There is a lot more people in our new DataFrame. But are they unique?

#We need to test. Do we have unique Contestants?

#We need to import a function for col
from pyspark.sql.functions import *

dfpeople.groupBy('contestant')\
       .agg(min('contestantID').alias('MinContestantID'),count('contestant').alias('TotalRecords'))\
       .filter(col('TotalRecords')>1).show(1000)

Using \ allows us to break the code onto separate lines

We have the same issue here where some of the people have more than one record so lets filter a few

filtered_df = dfpeople.filter(dfpeople['name'] == 'Liza Tarbuck')
filtered_df.show()

The original assumption in Part 1 was incorrect. We have multiple based on the Champion of Champion series here. So the ID is not for the contestant. it’s for the contestant and series and some people have been on multiple series.

Do the IDs match between the tables?

filtered_df = dfcont.filter(dfcont['contestant'] == 'Bob Mortimer')
filtered_df.show()

Yes. So we need to rethink dim contestants. Actually if they exist in both we only need to use people. So we need to join them and check out if this is the case

Checking values exist in People that we can see in attempts rename Columns

We want to join the contestants that we obtained the attempts table to dfpeople. Simply so we can check what is missing?

We know we have more in people. But what happens if there are values missing in people than are in attempts? This will cause us more problems.

We will join on the ID and the contestant so before the join. Lets rename the dfCont columns so we don’t end up with duplicate column names

#And rename a couple of columns
dfcont = dfcont.withColumnRenamed("contestantID","cont_contestantID").withColumnRenamed("contestant","cont_contestant")

display(dfcont)

join dfCont and dfPeople

Please note that when joining tables, its better to create a new DataFrame rather than update an original one, because if you have to run it again, the DataFrame you join from will have changed.

# Original condition
original_condition = dfpeople["contestantID"] == dfcont["cont_contestantID"]

# Adding another condition based on the 'name' column
combined_condition = original_condition & (dfpeople["contestant"] == dfcont["cont_contestant"])

# Now perform the left outer join
dfcontcheck = dfpeople.join(dfcont, on=combined_condition, how="full")

display(dfcontcheck)

So we have everything in one table. Lets do some filtering to check our findings

Filter for Null Values

from pyspark.sql import functions as F
dfcontcheck.filter(F.isnull("cont_contestantID")).show(1000)

72 records that don’t exist in dfattempt which is fine

from pyspark.sql import functions as F
dfcontcheck.filter(F.isnull("contestantID")).show(1000)

And none in attempts that are in people.

People is therefore our master data set and the PySpark code can be changed accordingly.

At the moment there is a lot of checks happening in the PySpark code. Once up and running and ready to be turned into a proper transformation notebook. All these checks could be removed if required to simplify.

Updating the Code with the new Logic

We are taking the code from Part 1 and completely updating based on the new findings

Start Spark session

%%configure
{
    "defaultLakehouse": {  
        "name": "DebbiesFabricLakehouse"
    }
}

Read People into dfPeople DataFrame

#Read the first file into a dataframe Teams
dfpeople = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/people.csv")
# dfattempts now is a Spark DataFrame containing CSV data from "Files/Data/Bronze/people.csv".

#And rename a couple of columns
dfpeopletransf  = dfpeople.withColumnRenamed("id","contestantID").withColumnRenamed("name","contestant")

print(dfpeopletransf.count())
print(dfpeopletransf.distinct().count())
display(dfpeopletransf.distinct())

.withColumnRenamed brings back the dfpeople data set but renames the specified columns

Create dfCont to create our initial contestants DataFrame

Previously we had a lot of code blocks checking and creating Min IDs for a distinct contestant list. but now we have more of an understanding we can do this with less code.

#Lets immediately do the group by and min value now we know what we are doing to create a distinct list (Removing any IDs for a different series)

#import the function col to use. 
from pyspark.sql.functions import col

#Creates the full distinct list. 

dfcont = dfpeopletransf.groupBy('contestant', 'dob', 'gender','hand' )\
       .agg(min('contestantID').alias('contestantID'),count('contestant').alias('TotalRecords'))


display(dfcont)

groupby. all the distinct values we want. Contestant, dob, gender and hand all are unique with the name. (Previously we just had contestant)

Next we agg (aggregate with min contestantID and cont the number of contestants as total records

If you are used to SQL you would see the following

SELECT MIN(contestantID) AS contestantID, contestant, dob, gender, hand, Count(*) AS TotalRecords
FROM dfpeopletransf
GROUP BY contestant, dob, gender, hand

Create a Current Age

# Create a Spark session
#spark = SparkSession.builder.appName("AgeCalculation").getOrCreate()

# Convert the date of birth column to a date type in dfCont
dfcont = dfcont.withColumn("dob", dfcont["dob"].cast("date"))

# add the current date and set up into a new dataframe
df = dfcont.withColumn("current_date", current_date())

#Calculate age using dob and current date 
df = df.withColumn("age_decimal", datediff(df["current_date"], df["dob"]) / 365)

#Convert from decimal to int  
df = df.withColumn("age", col("age_decimal").cast("int"))

#Convert Customer ID to int
df = df.withColumn("contestantID", col("contestantID").cast("int"))

#Update dfCont
dfcont = df.select("contestantID", "contestant", "dob","gender","hand", "age")

dfcont.show(1000)

It would be fun to filter by Age and Age Group. lets create the age first based on the current Date,

  • First of all we ensure date is a date .cast(“date”)
  • Then using .withColumn we add the current date to the data frame current_date()
  • Next. we use datediff to get the age from the date of birth and the current date
  • Set age to int and customerID to int
  • Next update dfCont with the full list of columns (Excluding current_date because that was only needed to create the age.

Create Age group

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col

# Create a Spark session
#spark = SparkSession.builder.appName("ConditionalLogic").getOrCreate()

# Apply conditional logic
dfcont = dfcont.withColumn("ageRange", when(col("age") < 20, "0 to 20")
                               .when((col("age") >= 20) & (col("age") <= 30), "20 to 30")
                               .when((col("age") >= 30) & (col("age") <= 40), "30 to 40")
                               .when((col("age") >= 40) & (col("age") <= 50), "40 to 50")
                               .when((col("age") >= 50) & (col("age") <= 60), "50 to 60")
                               .otherwise("Over 60"))

# Show the result
dfcont.show()

We have created what would be a CASE in SQL

using .WithColumn to create a new column. .when .otherwise are used to create this new data item

Create New Row for -1 NA Default Row

We are back to almost the original code. Creating our Default row but this time with more columns

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a sample DataFrame
data = [(-1, "Not Known","1900-01-01","NA","NA","0","NA")]
columns = ["contestantID", "contestant", "dob", "gender", "hand", "age", "ageRange"]
new_row  = spark.createDataFrame(data, columns)

# Union the new row with the existing DataFrame
dfcont = dfcont.union(new_row)

# Show the updated DataFrame
#dfcont.show(1000)

dfcont.show(1000)

We can now see Not known in the list

Running a DISTINCT just in case we applied the above more than once.

#Just in case. We distinct again to remove any extra NA Rows
dfcont = dfcont.distinct()
dfcont.show(1000)

Checking we have unique Contestants GROUPBY

#And lets check the new distinct list with Min IDs we have made
#We need to import a function for col
from pyspark.sql.functions import *

dfcont.groupBy('contestant', 'dob', 'age','ageRange', 'hand','gender')\
        .agg(count('contestant').alias('TotalRecords'))\
        .filter(col('TotalRecords')>1).show(1000)


#Hopefully there should be no records

We have all unique contestants, including the extra information we have added to the dimension.

Creating the Contestant Key

We can now create slightly better code than Part 1 using less code blocks

#Now finally for this we need a Key a new column
# Imports Window and row_number
from pyspark.sql import Window
from pyspark.sql.functions import row_number

# Applying partitionBy() and orderBy()
window_spec = Window.partitionBy().orderBy("ContestantID")

# Add a new column "row_number" using row_number() over the specified window
dfcont = dfcont.withColumn("ContestantKey", row_number().over(window_spec)- 2)

# Show the result
dfcont.show(1000)

Previously a partition column with the value 1 was added so we could do the following: .partitionBy(“partition”)

But simply using partitionBy() and not including a column removes the need for the extra code and column and does the same thing.

Adding the Team

Previously the Team was added from attempts.csv but we know that we can change this to people now and have better team information.

#Creates the full distinct list. 
dfContTeam = dfpeopletransf.select(col("contestantID").alias("teamcontestantID"), col("contestant").alias("teamContestant"), 
col("team").alias("teamID"),col("team_label").alias("TeamName")).distinct()

display(dfContTeam)

Remove Rows with Empty team information

#Now remove the rows with null teamids

# Import necessary libraries and create a Spark session
#from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()

# Drop rows with NULL values in the "id" column
dfContTeam = dfContTeam.na.drop(subset=["teamId"])

# Show the filtered DataFrame
dfContTeam.show(truncate=False)

Join team to Contestant

#Now what we want to do is add TeamName into dfcont

dfcont = dfcont.join(dfContTeam, dfcont["contestant"] == dfContTeam["teamContestant"], how="left").drop(dfContTeam.teamContestant).drop(dfContTeam.teamcontestantID)

display(dfcont)

This time we cleaned up the code by adding the two columns to drop in the above statement so we don’t have two names and IDs from both data sets.

Create DELTA PARQUET TABLE

from delta.tables import DeltaTable

dfcont.write.mode("overwrite").format("delta").saveAsTable("dimContestant")

Because we are overwriting our new data set should overwrite our old Dimension. However, there will be conflict issues because the schema is so different so for this exercise, The files will be removed before being re-added.

CREATE PARQUET Table in Files Folder

dfcont.write.mode("overwrite").parquet('Files/Data/Silver/DimContestant.parquet')

We now have options of using the Delta PARQUET, or just the PARQUET File.

Conclusion

In Part 1 we used attempts csv for contestants. For Part 2 we did lots of analysis and checked that actually, people was a better data source for contestants. So we changed the input for the initial DataFrame to people.csv

We also used the opportunity to tidy up the code, now we are getting to grips with Pyspark as an alternative to SQL.

Next on the list is to create more dimensions.

  • Dim Task
  • Dim Episode
  • Dim Date

And the Taskmaster Stats Fact Table

Then to do some analysis, using the inbuilt visuals in a Notebook, and Power BI.

Microsoft Fabric Part 1. Taskmaster Project. Creating a dim Contestants table in Notebook and Delta PARQUET File

Its so exciting at the moment with all the changes going on in the world of Microsoft and I wanted to use the trial version of Fabric to see if I could move my ‘standard’ working practices over into Fabric.

The first thing was to find a practice dataset for this exercise so of course I went for my favorite stats based show. UK Taskmaster.

Kaggle

Kaggle delivered the goods with csv files of taskmaster information

https://www.kaggle.com/datasets/sujaykapadnis/comedians-challenged-ridiculous-taskmaster-ds

The challenge is to deliver a star schema from this data. Transforming it and creating Dims and Facts for the ultimate goal of a star schema.

CSV – SQL DB – Power BI

Usually I use Data factory to pull the data into a SQL database. In here I create Stored procedures to transform the data. These stored procedures are then added into the pipeline so we can rerun when needed.

Finally, the dimensions and facts are imported into Power BI from the Dims and Facts in the SQL Database.

So the question is. can I transform my files using Pyspark within a Notebook, and then create the Dim and Fact tables in DELTA PARQUET

I have used SQL for over 20 years and its my weapon of choice so this is going to be a real puzzler of a task.

Getting Started – Create the Fabric Workspace

I have my Fabric Trial and a Workspace added to the Fabric Trial.

Azure Devops

I always set up an Azure Devops

https://dev.azure.com

Here I have just Added a new Project. to Devlops Debbies Training

Visual Studio

I am going to use Visual Studio for the next task, but there are lots of other ways you can do this next step

And connect to the repository that has been created (You need to log in to see your repos)

Click clone and you will then have a local copy of the code base. it doesn’t support everything at the moment but it does support Notebooks, which is what will be used for this exercise.

C:\Users\DebbieEdwards\source\repos\DebbiesTraining

Connect Devops to Fabric

Back in Fabric

We will go back to the repos Soon to start using it.

Create a Delta lake in your Fabric Workspace in Data Engineering Fabric Experience

Clicking on data Engineering takes you to the Data engineering area of fabric where you can set up a Lakehouse.

I have simply called the Lakehouse DebbiesFabricLakehouse

When the Data Lakehouse is completed you get the following

And for the time being, Lets just use Lakehouse.

Add CSV Files into the Lakehouse

Notice that we have the following folders in the Lakehouse

For this project, the medallion architecture is being used.

  • Bronze is where the csv files are stored in the lakehouse. the raw data zone
  • Silver is the transformed PARQUET data will go (Which we will look at later)
  • Gold is the final transformed STAR SCHEMA layer. The curated zone.

I have all the taskmaster csv files and I am going to simply upload them to the Lakehouse Files / Data / Bronze folder

Refresh the folder and you should see your files in the pane.

Create the Notebook

Now we can start to work with the files and here is where the fun starts. Instead of a pipeline that pulls data into SQL before the transformations, Lets create a Notebook

We are ready to go. this is what I can see to the right of the code pane.

And Pyspark will be the Spark language of choice. I wont use any SQL just to see how it goes, away from my beloved SQL.

I am doing very simple code at the moment and hope make it better and better the further I get so lets just start adding Pyspark and markdown (Descriptions)

This is the first lab I used to get going

https://microsoftlearning.github.io/mslearn-fabric/Instructions/Labs/02-analyze-spark.html

when you first run the notebook you have to start a spark session against the delta lake. To do this I have started with the following code

%%configure
{
    "defaultLakehouse": {  
        "name": "DebbiesFabricLakehouse"
    }
}

Read the CSV File into a DataFrame

#Read the first file into a DataFrame
dfattempts = spark.read.format("csv").option("header","true").load("Files/Data/Bronze/attempts.csv")
# dfattempts now is a Spark DataFrame containing CSV data from "Files/Data/Bronze/attempts.csv".
display(dfattempts)

Now we read our first csv into a data frame. A DataFrame is basically a table of data.

We can click on the play icon to run this part of the code. We can now start transforming.

Contestant transformation

The first thing I want to do it get out contestants. Create a unique key and add the key to the Dim table and the transformation table which still contains all the facts.

(I have gone back and forth in the code as I have worked on it so some code has been amended when certain issues have come to light)

#We will use this later in the process. the Following code renames the columns from the original data frame but brings back the entire dataframe again
#We actually only need the contestant part

dfattempttrns = dfattempts.withColumnRenamed("contestant","contestantIDToRemove").withColumnRenamed("contestant_label","contestantToRemove")
dfattempttrns.printSchema()


print(dfattempttrns.count())
print(dfattempttrns.distinct().count())
display(dfattempttrns.distinct())

A Transformed DataFrame is created from the main DataFrame and initially, two items have been renamed. contestant to contestantIDToRemove and contestant_label to contestantToremove. We will see why later.

We also print the count of contestants and we display the transformed data frame.

Create Contestants Dim

The line from pyspark.sql.functions import col imports the col function from the pyspark.sql.functions module

Here is an explanation of what it does.

Pyspark is a Python library used for working with big data and performing distributed data processing using Apache Spark.

The pyspark.sql.functions module provides a set of built in functions that can be used for data manipulation and transformation in Spark DataFrames.

The col function is one of these built in functions. It is used to:

  • Reference a column in a DataFrame.
  • You can use it to: Select a specific column.
  • Apply mathematical operations to a specific column. Create new columns based on existing ones
#import the function col to use. 
from pyspark.sql.functions import col

#Creates the full distinct list. 
dfcont = dfattempts.select(col("contestant").alias("contestantID"), col("contestant_label").alias("contestant")).distinct()

display(dfcont)

This code gives us a distinct list of Contestant IDs and Names and we have aliased the new names in this DataFrame. this uses our original first DataFrame, not the transformed one.

Again, we display the table

Order

#dfcont.orderBy([‘contestantid’], ascending = [True]).show(1000)

dfcont = dfcont.orderBy([‘contestantid’], ascending = [True])

dfcont.show(1000)

This code block just orders the data. The interesting part of the code here is that instead of display we use show to print out the DataFrame. If you leave it as show() only 20 rows are printed.

If you leave the code at show() only 20 rows are returned

Create a DataFrame of values

This is for information only. I wanted to see how a DataFrame is created of information specified in the code block.

from pyspark.sql import SparkSession

In PySpark, the SparkSession class serves as the entry point for programming with the Dataset and DataFrame API. It allows you to create DataFrames, register them as tables, execute SQL queries, cache tables, and read Parquet files.

From pyspark.sql import Row

In PySpark, the Row class is used to represent a row of data in a DataFrame. Each field in a Row object can be accessed by its name or index.

#https://saturncloud.io/blog/adding-new-rows-to-pyspark-dataframe-a-comprehensive-guide/
#Just for information. Not being used but here is how you write your own dataframe
from pyspark.sql import SparkSession
from pyspark.sql import Row

#I dont think I need this because I already have a spark session?
#spark = SparkSession.builder.appName('AddRows').getOrCreate()

data = [(70, 'Sam Campbell'),
         (71,'Julian Clary'),
         (72,'Lucy Beaumont'),
         (73,'Sue Perkins'),
         (73,'Susan Wokoma')]
columns = ["contestantID", "contestant"]
dfnarowCont = spark.createDataFrame(data, columns)

dfnarowCont.show()

Create a New Row for the default NC Row

for a dimension we always have a default row of Not Known and a key of -1. For example if we are looking at Customer orders and a customer hasn’t bought anything this month

DateKey Customer KeyQuantity
0101202256 244
01012022-10

Here we have -1 in the fact dimension so we can then bring through all the default Not known descriptors from the customer table if we need to, when analysing in Power BI.

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a sample DataFrame
data = [(-1, "Not Known")]
columns = ["contestantID", "contestant"]
new_row  = spark.createDataFrame(data, columns)

# Union the new row with the existing DataFrame
dfcont = dfcont.union(new_row)

# Show the updated DataFrame
#dfcont.show(1000)

dfcont.show(1000)

Now, lets run a distinct, in case we have run the above code numerous times and we have multiple default rows.

#Just in case. We distinct again to remove any extra NA Rows
dfcont = dfcont.distinct()
dfcont.show(1000) 

Testing the DataFrame for Unique values

https://www.statology.org/pyspark-count-by-group/

Our Dimension should consist of unique values. So we need to test our new dataset

from pyspark.sql.functions import *

dfcont.groupBy('contestant')\
       .agg(min('ContestantID').alias('MinContestantID'),count('contestant').alias('TotalRecords'))\
       .filter(col('TotalRecords')>1).show(1000)

Notice we don’t have dfcont = Because this loads the results into a data frame. here we are just looking at the data frame. It is basically just a SELECT statement

agg is used to calculate more than one aggregate (multiple aggregates) at a time on grouped DataFrame.

So to perform the agg, first, you need to perform the groupBy() on DataFrame which groups the records based on single or multiple column values, and then do the agg() to get the aggregate for each group.

The only thing that makes these values none distinct is that they will have different ContestantIDs. So this query is bringing back the min(ContestantID)

    Here we can see that we do have some bad data that we can transform and correct so everything in both the dim and the transformation tables holds the min ID

    Create a DataFrame of all contestants with Min Contestant ID

    Lets set a DataFrame and use it to update. We bring back the entire contestant list with the min IDs only. Creating a distinct list

    #We need to import a function for col
    from pyspark.sql.functions import *
    
    dfminContID = dfcont.groupBy('contestant')\
                         .agg(min('ContestantID').alias('MinContestantID'),count('contestant').alias('TotalRecords'))
    
    dfminContID.show(1000)

    GROUPBY and AGG and FILTER

    • Group on Contestants
    • Aggregate gives us a count of contestants called TotalRecords
    • Then we filter to only find records where TotalRecords is greater than 1

    Lets check the new distinct list with Min IDs we have made

      #We need to import a function for col
      from pyspark.sql.functions import *
      
      dfminContID.groupBy('contestant')\
              .agg(count('contestant').alias('TotalRecords'))\
              .filter(col('TotalRecords')>1).show(1000)

      JOIN two DataFrames Where JOIN Columns are the same

      Now what we want to do is update the two DataFrames dfcont with Min ID’s.

      from pyspark.sql import SparkSession

      from pyspark.sql.functions import when

      # Initialize Spark session I dont think is needed here as we already have it

      #spark = SparkSession.builder.appName(“MySparkApp”).getOrCreate()

      # Join DataFrames based on ‘Name’

      joinedCont_df = dfcont.join(dfminContID, on=”contestant”, how=”left_outer”)

      # Show the updated DataFrame

      joinedCont_df.show(1000)

      for the next step lts reupdate dfCont with our updated min value

      #We want to reset back to ContestantID and Contestant, this is a distinct List
      
      dfcont = joinedCont_df.select(col("minContestantID").alias("contestantID"), col("contestant")).distinct()
      dfcont.show(1000)
      

      We updated ContestantID with MinContestantID

      So, we have reset the ContestantID with the correct min ID

      You can’t modify the existing dataframe as it is immutable, but you can return a new dataframe with the desired modifications.

        Recheck a specific contestants data – Filter

        Lets do a filter on a contestant we know had a duplicate ID

        
        dfcont.select(col("contestantID"), col("contestant")).filter(dfcont.contestant == "Noel Fielding").distinct().show(1000)

        Update the ID of the transformed Table

        We also want to update the Contestant ID of our transformed table (dfattempttrns) we created near the beginning of the notebook. Eventually we can create a fact table from dfattempttrns with Contestant Keys, so we want to join them via the ContestantID.

        this is where the data items contestantIDTORemove and contestantToremove comes in handy

        #we also want to update the ID of our transformed table we created near the beginning of the notebook
        
        dfattempttrns = dfattempttrns.join(dfcont, dfattempttrns["contestantToRemove"] == dfcont["contestant"], how="inner")
        
        dfattempttrns = dfattempttrns.join(dfcont, on="contestant", how="inner")"""
        
        display(dfattempttrns)

        There are two extra columns at the end from our corrected dfcont DataFrame.

        the join was on the contestantname

        #Lets have a look at the schema

        dfattempttrns.printSchema()

        Here we have a little code to display the Schema of the updated dfattemptstrans DataFrame

        Drop Columns

        # Drop columns
        cols_to_drop = ["contestantIDToRemove", "contestantToRemove"]
        dfattempttrns = dfattempttrns.drop(*cols_to_drop)
        dfattempttrns.show()

        And just to be sure we can now check for our Contestant again

        #Lets do a filter on a contestent we know had a duplicate ID
        #dfattempttrns.filter(dfattempttrns.contestant == "Noel Fielding").show(1000)
        from pyspark.sql import SparkSession
        from pyspark.sql.functions import col
        dfattempttrns.select(col("contestantID"), col("contestant")).filter(dfattempttrns.contestant == "Noel Fielding").distinct().show(1000)
        

        Its now working. there is only one record in both the transformation table and the dimension table so the join on ID’s will work.

        Creating the Dim Key – Adding a column .WithColumn

        For contestants Dim we need a Key. So we need a new column. Lets add a new column to create the partition on. I’m sure there are lots of ways to do this and later in the process I will be able to really hone the code down with more Pyspark knowledge.

        from pyspark.sql.functions import lit
        
        # add column for discount
        dfcont = dfcont.withColumn("Partition", lit(1))
        # display the dataframe
        dfcont.show(1000)

        Create the Contestant Key using Partition by

        #Now finally for this we need a Key a new column
        # Imports Window and row_number
        from pyspark.sql import Window
        from pyspark.sql.functions import row_number
        
        # Applying partitionBy() and orderBy()
        window_spec = Window.partitionBy("Partition").orderBy("ContestantID")
        
        # Add a new column "row_number" using row_number() over the specified window
        dfcont = dfcont.withColumn("ContestantKey", row_number().over(window_spec)- 2)
        
        # Show the result
        dfcont.show(1000)

        Use – 2 which starts the ID no at -1 (-1,0) so the default can have the -1 key as standard

        from pyspark.sql import Window

        • Imports the Window class from the pyspark.sql module123.
        • The Window class is used to define a window specification in the DataFrame API2. A window specification is defined by specifying partitioning expressions and ordering expressions2.
        • The partitioning expressions are used to partition the data into groups, and the ordering expressions are used to order the data within each group2.

        from pyspark.sql.functions import row_number

        • The row_number function from the pyspark.sql.functions module is used to assign a unique row number to each row in a DataFrame.
        • It is commonly used in Spark SQL to create a sequential identifier for rows based on a specified order.

        because the NA ID is -1 for the default row it immediately gets the Key of -1 because of the order which is just what we wanted.

        #We want to remove the partition key from the dim
        
        dfcont = dfcont.select(col("contestantKey"), col("contestantID"), col("contestant")).distinct()
        dfcont.show(1000)

        Adding Team to Dim Contestant

        there is another item we want to add into the contestant dim. The Team.

        First of all, lets get a distinct list of everything we need from dfattempttrns. Namely. the contestantID and Contestant, TeamID and TeamName (These two are new, we haven’t worked with them so far)

        #import the function col to use. But shouldnt it already be available?
        #from pyspark.sql.functions import col
        
        #Creates the full distinct list. 
        dfContTeam = dfattempttrns.select(col("contestantID").alias("teamcontestantID"), col("contestant").alias("teamContestant"), 
        col("team").alias("teamID"),col("team_label").alias("TeamName")).distinct()
        
        display(dfContTeam)

        The original DataFrame includes the tasks and team is only set on a team task. In Contestants we simply want to know which team the contestant is in.

        we have created a new DataFrame called dfContteam

        Remove rows with NULL team IDs

        # Import necessary libraries and create a Spark session
        ##from pyspark.sql import SparkSession
        spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()
        
        # Drop rows with NULL values in the "id" column
        dfContTeam = dfContTeam.na.drop(subset=["teamId"])
        
        # Show the filtered DataFrame
        dfContTeam.show(truncate=False)

        Join tables to add team to contestants Dim using the contestant Name

        #Now what we want to do is add TeamName into dfcont
        
        #If we were joining on items with different columns names we could use the following
        dfcont = dfcont.join(dfContTeam, dfcont["contestant"] == dfContTeam["teamContestant"], how="inner")
        
        dfattempttrns = dfattempttrns.join(dfcont, on="contestant", how="inner")"""
        display(dfcont)

        When the dfContTeam was created. Contestants was called teamContestant and this is the join you see in the code.

        Drop Columns

        # Again we need to Drop columns we don't need from the data frame
        cols_to_drop = ["teamContestant", "teamID","teamcontestantID"]
        dfcont = dfcont.drop(*cols_to_drop)
        display(dfcont)
        

        Finally. A Dim Contestant DataFrame with everything we need.

        Adding ContestantID into the transformed DataFrame

        Before creating a PARQUET File it would be good to get the Contestant Key into the transformed table by joining the tables again on contestantID

        #join contestant to Contestant to remove
        dfattempttrns = dfattempttrns.join(dfcont, 
                                            dfattempttrns["contestantID"] == dfcont["contestantID"], 
                                            how="inner").drop(dfcont.contestantID).drop(dfcont.contestant)
        
        
        display(dfattempttrns)

        We now have .drop(dfcont.contestantID).drop(dfcont.contestant) in the code which removes our duplicate columns before they are added into the transformation table

        UPDATE – We have learned how to update using different column names AND Update using the same column name, whilst removing any columns from table B that are identical to table A

        Creating the DELTA PARQUET FILE

        Finally we can add Dim Contestants to a PARQUET file. We don’t need any partitions because its just a small dim table.

        from delta.tables import DeltaTable
        
        dfcont.write.mode("overwrite").format("delta").saveAsTable("dimContestant")
        

        After a refresh on tables

        this creates a Delta Lake table called DimContestant which is stored in Delta Format and provides ACID Transactions. for example, Time travel and versioned data.

        The Black triangle icon means that it is a DELTA PARQUET file.

        Save as PARQUET File

        However you can also do the following

        dfcont.write.mode("overwrite").parquet('Files/Data/Silver/DimContestant.parquet')
        

        This saves the DataFrame as a Parquet file into a specific folder in Files.

        Its columnar storage that works really well for analytics workloads but doesn’t support ACID DeltaLake transactional support live time travel mentioned above.

        Lets have a look at what we have created (the data is none partitioned)

        A PARQUET File is a columnar storage file. In the same way as Power BI has a columnar data store when you import the data.

        Our data is in the snappy.parquet file.

        The Success File is not a specific file format. Its the concept related to the execution so here we can see that the creation of the file was a success.

        So:

        • Delta Lake PARQUET with advanced features and transactional capabilities. Tables have to be in Tables folder in order to be recognized by other engines like SQL endpoint and semantic model. 
        • PARQUET for storage and simplicity. But are only visible for Spark?

        We will look at these both in more detail in a later exercise.

        Stopping the Spark Job

        Its always worth stopping the Spark job at the end of processing

        spark.stop()

        It will stop automatically after a time but this ensures that the process is stopped immediately upon the end of the transformation process

        Using Source Control

        Clicking back on the workspace. We added our workspace to Devops. So now we can see that there has been a change.

        when working with other people there will be feature branches to connect to rather than main but because this is just a one person exercise. Committing to main is fine for the time being.

        And in Devops we can see that the code has been committed.

        Conclusion

        We have taken an example group of csv files and transformed them so far to create one dimension for a star schema.

        This dimension data will be added into a star schema via the contestant key.

        We have also set up Devops to commit the notebook code into the GIT repository.

        In later posts we will move on to other dimensions. Create the fact tables and see what we can do with both the DELTA PARQUET and PARQET Files.

        Azure Synapse – Creating a Type 1 Upload with Pipelines

        Working through exercises and learning paths for Microsoft Synapse is a really good way of becoming familiar with concepts and tools. And whilst these are a fantastic source of learning. the more you do, the more questions you end up asking, Why use this option over this option for example?

        SO, lets look at one of the initial Pipeline creation exercises for Azure Synapse in more detail and try and answer questions that arise.

        https://microsoftlearning.github.io/dp-203-azure-data-engineer/Instructions/Labs/10-Synpase-pipeline.html

        The end game is to take a file from the Serverless SQL Pool (Data Lake) and load it into the dedicated SQL Pool. The Synapse Data Warehouse.

        But if the data is already there, we simply update it without adding it a duplicate record.

        for this you need Azure Synapse and a Data Lake

        Analytics Pools

        In Synapse Studio, Go to Manage

        Our starting points are the Built in, Serverless SQL Pool Connected to a datalake (storage account resource in Azure)

        We also have a Dedicated SQL Pool which you can see in data

        This DB can also be used as Serverless without having to un pause the dedicated capacity by adding External data source, External File Format and External Tables.

        Back to Manage. We now know we have SQL Pools. Both Serverless and Dedicated. Serverless will be our source. Dedicated is the destination.

        Linked Services

        Default Linked Services

        The following were already created. Its useful to understand what is used and not used for this dataflow

        • Name synapse*********-WorkspaceDefaultStorage
        • Type: Azure Data Lake Storage Gen 2

        This is the default linked service to the Data Lake Storage Gen 2 in Azure. You can find this in Azure as your Storage account resource.

        • Name synapse********WorkspaceDefaultSqlServer
        • Type: Azure Synapse Analytics
        • Authentication Type :System Assigned managed Identity

        This one isn’t used for this particular process. However what makes it different to the other Azure Synapse Analytics Linked Service that has been set up (Below)? We will answer that shortly

        • Name DataWarehouse_ls
        • Type: Azure Synapse Analytics
        • Authentication Type :System Assigned managed Identity
        • Database Name: TrainingDB

        This was added for this project and is the Dedicated SQL Pool Database.

        The answer seems to be that there is no difference, apart from the original is parameterised. Here are the differences.

        • Fully qualified Domain Name: In the one created it was called synapse*****.sql.azuresynapse,net . In the automated Linked Service its tcp:synapse*****.sql.azuresynapse,net,1433. This has more detail, like the transformation control protocol and the tcp port.
        • DatabaseName: In the one created it was called TrainingDB after the database in Synapse Workspace. In the automated Linked Service its @{linkedService<>.DBName)
        • Parameters: the created one doesnt have any parameters. The default ls has Name DBName type String.

        It will be interesting to see if changing the linked service to the default one will change things for us.

        Integrate

        For this task we create a Pipeline Load Product Data Type 1 and, in Move and transform

        We use a data flow.

        Copy activity is the simpler process to load data. Specify the source, sink and data mapping. the data flow can also be used to transform the data as well as copy. At run time the data flow is executed in a Spark environment rather than the Data Factory Execution runtime

        Copy Activity: is around £0.083 and hour. the orchestration is around £1.237 per 100b runs

        Data Flows: starts a t£0.0228 an hour

        The DataFlow is called LoadProductsData

        Settings

        Staging should only be configured when your data flow has Azure Synapse Analytics as a sink or source. Which it does with the Data Warehouse Destination (Sink)

        Double click on the dataflow to get into the details

        Source

        Sources are the first to be created

        Source Type – Integration dataset, Inline, Workspace DB

        There is no information in the dataflow as to what the source types are and what is best to choose. Looking at the documentation we can glean the following:

        Some Formats can support both inline and dataset objects. Dataset objects are reusable and can be used in other data flow and copy activities. they are good for hard schemas. Datasets are NOT based on Spark.

        Inline datasets are good for flexible schemas or one off instances. They are also good for parameterised sources. Inline datasets are based on Spark.

        Integration Dataset
        Inline data Set
        Workspace Db

        The integration dataset allows you to connect straight to the linked service object. Whereas inline connects to the linked service set up in Data Factory linked Services.

        Basically. the integration Dataset bypasses an extra step. And Workspace DB allows you to do the same but its only available for Azure Synapse Analytics objects (Like the data warehouse)

        So for this project. The integration data set has been used which is not based on spark.

        Data set

        A Data set needs to be created.

        • Type: Azure Datalake Storage Gen2
        • Format: Delimited text (Because we are working with csv files)
        • Name: Products_Csv
        • Linked service: synapsexxxxxxx-WorkspaceDefaultStorage (Find in Manage, Linked Services. this is the Azure Data Lake Storage Gen2 Linked Service)
        • File path: files/data/Product.csv (In the data lake we have a files collection. then a data folder)
        • First row as header: Selected (there is a header in the csv files)
        • Import schema: From connection/store (Take from the actual data source)
        • Allow schema drift: Selected

        In Data Factory there is a dataset folder where you can find all your data sets. And of course, parameterise them so you can use for example, the same csv data set for all your csv file. However there is no data set folder within Integrate.

        So where can you find your datasets one created in Synapse?

        Schema Drift

        What is Schema Drift? If you have a schema which changes. For example, new columns are added and old deleted. You would have to develop against this drift, making sure that you are capturing the right information. this could mean a lot of development work and affects your projects, from source, through to your Power BI Analytics models.

        It would be really useful if we could remove some development work and allow the Pipeline to deal with this.

        Schema Drift allows for schema remodelling without constant redevelopment of upstream schemas.

        Projection

        Here we see the projected columns from the data source. Hopefully in the flow we could also parameterise so a data flow with the same functionality could control multiple dataflows. e.g. 5 csv files being moved to 5 tables in a database.

        Another source is also created

        Data transformation – Lookup

        from the + under the first source. Lookup is selected

        Here we take the Products in the csv file and look them up against the destination table in the dedicated SQL Pool.

        Match on

        Why has last row been chosen? If we chose Any row. we just specify the lookup conditions. Which is from the csv files Product ID to the dedicated SQL pools ProductAltKey. So this is the only lookup required. Great when you know there are no duplicates.

        Because last row has been chosen, a sort order is required to the data which is the AltProductKey.

        Match on

        Note that this is all that is being used in this instance. Does the business key already exist in the data warehouse?

        Alter Row

        From the lookup, a new activity is selected. Alter row

        Now we are loading into the DW.

        The incoming stream is the previous activity. Here we add Alter Row conditions.

        In this example the join between the tables find two matches. Where it doesn’t match there is Null for both the ProductKey and ProductAltKey in the results set.

        Therefore, If ProductKey in the warehouse is null in the results set. Insert the record. If its not null then UPSERT. (Update and Insert)

        We could refine this with more logic where we check the actual data row and if its not changed we can do nothing. Only update and insert when the row has changed. This is better for larger data sets.

        Debugging

        Now we have a simple data flow its time to turn on the spark cluster and debug.

        Once the spark pool has started, you can click on the end sink and go to Preview

        The icons to the left show New and Upserted records. There were three ProductIDs that already existed in the data warehouse. Now we can also go back to the pipeline and trigger now

        So what has happened?

        In monitor you can check how the flow is doing. Once it shows as succeeded, go to the table and run a SQL statement

        The first thing to note is that we now have all the records from the csv file in the dedicated SQL pool data warehouse. But what about the staging options in the dataflow settings?

        If we now go into Data – Linked and to the Primary datalake store we can see we have a folder called Stage_Products. Its empty. So what does it do?

        If you run it again and refresh, we can see a PARQUET file is created before being deleted.

        Basically the source table is loaded into the staging table. Then the transformation process can work over the staging table, rather than using the actual source file. Once completed the staging file is removed.

        So throughout the creation of the basic type 1 dataflow, a lot of questions have been asked and answered. And a lot of ideas for more complex load processes. How to use Parameterisation of objects where possible. How to create Reuseable content?

        So lots more to come.

        Power Query Editor – Avoiding the creation of all the helper Queries and Folders in power BI

        A current project being worked on starts with a folder in Sharepoint with multiple files that need to be imported together (All with the same Spec)

        There are lots of groups of files to import into Power BI

        • We need the File Name – it will be used to create the file snapshot date
        • There is a header over the columns – this needs removing and then the headers promoting

        Get Data > Folder creates helper folders that clutter up the back end and aren’t the best solution.

        This is the code that gets created

        let

            Source = Folder.Files("C:\Users\Name\Company\Project\Data"),

            #"Filtered Rows" = Table.SelectRows(Source, each Text.Contains([Name], "FileName")),

            #"Filtered Hidden Files1" = Table.SelectRows(#"Filtered Rows", each [Attributes]?[Hidden]? <> true),

            #"Invoke Custom Function1" = Table.AddColumn(#"Filtered Hidden Files1", "Transform File (8)", each #"Transform File (8)"([Content])),

            #"Renamed Columns1" = Table.RenameColumns(#"Invoke Custom Function1", {"Name", "Source.Name"}),

            #"Removed Other Columns1" = Table.SelectColumns(#"Renamed Columns1", {"Source.Name", "Transform File (8)"}),

            #"Expanded Table Column1" = Table.ExpandTableColumn(#"Removed Other Columns1", "Transform File (8)", Table.ColumnNames(#"Transform File (8)"(#"Sample File (8)"))),

            #"Promoted Headers" = Table.PromoteHeaders(#"Expanded Table Column1", [PromoteAllScalars=true]),

        You don’t want lots of these files and folders cluttering up the beck end of Power BI slowing things down. Try the following.

        We are going to run through this again but change what we do

        Get Data / File

        Let’s collect all FileA files

        Select Binary and Name and Right Click – Remove Other Columns

        Now go to Add Column – Custom Column

        Lets build this function up

        Returns the contents of the Excel workbook. Content was added here. Normally when you click the arrow button on Content, this is where Power BI creates the Helper queries for you which we don’t want.

        If you click the new custom column (Just next to Table)  you can see the content

        Now we can amend this custom column even more

        = Table.AddColumn(#"Removed Other Columns", "Custom", each Table.Skip(Excel.Workbook([Content]){0}[Data]))

        Table.Skip allows us to skip over the header which start at {0}

        And we can even add promote headers to this one bit of code Table.Promoteheaders

        = Table.AddColumn(#"Removed Other Columns", "Custom", each Table.PromoteHeaders(Table.Skip(Excel.Workbook([Content]){0}[Data])))

        Now we can click the Arrow on the Custom column and bring through all the columns in the table.

        No Helper Queries are created and we have done numerous tasks within that one line of code

        If you have already created everything you can still do this and grab the code to replace the old code with at the start of each query. then remove the helper queries.

        Power BI. Using flags in a junk dimension

        We currently have a project where the metrics are actually flags to count whether a record is true or false rather than business metrics like Amount, SaleAmount etc

        • Is something completed? 1 or 0
        • Is something in Development? 1 or 0
        • Is something Out of Scope ? 1 or 0

        Etc.

        Now, if you left these in the fact table you could sum them to create a measure. But you are left with thousands of rows of just 1s and 0s. against all the keys in the fact table.

        Also, they aren’t really metrics. They are true and false flags and as a consequence should not be in a fact table.

        The above solutions is not what Kimball would recommend. Kimball recommends a Junk Dimension

        Definition from Kimball: — A junk dimension is the combination of several row-level cardinality flags and attributes into a single dimension table rather than modeling them as a separate dimension

        It should also be noted that a junk dimension can contain flags and other attributes that don’t quite fit anywhere else.

        The star schema looks like this (This is just an example STAR)

        Flags are split between two areas D1 and D2 which has also been added to the junk dimension as its basically just a title to split the metrics into two rows in a table.

        These two areas are two completely separate low level fact tables, with different metrics and dimensions. think of for example ‘Human resources’ and Marketing’.

        we have them here because we have a high level granularity STAR Schema comparing some of the similar flags across the two departments.

        This could have been added as a separate dimension but as its just one data item it seemed more sensible to add it here.

        So instead of having them repeated over and over again in Fact. We just have every option available set in the junk dimension once. So every single record where F1 is true for Area D1 goes to Key 0.

        Summing the flags

        The flags are Bool. True false columns so we cant simply sum them up. If we were to do that we would need to convert them to int

        Flag int = CONVERT([Flag],INTEGER)

        Once converted we could sum them.

        SUM([Flag Int]

        The junk dimension is used against multiple fact tables and here is the first issue.

        If we create DAX this way, there is no mention of the central fact table. Therefore if you use it it just counts all the true flags in the dimension, completely separately from the fact table.

        We need that join to the fact table in the DAX

        So we dont need to Convert the Flag to an int to use it in a measure. We can do the following:

        Measure is true = CALCULATE(COUNT(‘Fact1′[JunkKey]),’Dim Junk'[Flag]=TRUE()|| ‘Dim Junk'[Area] = “D1” )

        CALCULATE

        The CALCULATE function evaluates the Count of the Junk keys that are already integer within the fact table. This is important because we have now created a reference to one of the fact tables.

        And we are counting where the Flag is equal to True in the Dim

        ||

        Allows us to add another and. In this case we are only looking at the D1 area.

        We can add these logically into the separate fact tables

        Measure is true = CALCULATE(COUNT(‘Fact2′[JunkKey]),’Dim Junk'[Flag]=TRUE()|| ‘Dim Junk'[Area] = “D1” )

        And now we have a fact table with measures we can use for reporting. And we have made sure out Power BI data model is as small as it can be in memory.

        Power BI Concatenating two measures together under one column header for a table display

        Another requirement came through for a table that showed the current month figures against last months figures,

        However, to save space the two values were in the same column, last month contained in a ()

        So part one was to create the date time intelligence for last month,

        And then to attempt to concatenate the two figures just for the table

        Time intelligence to see this time last month

        There are various ways of doing this

        Prev Month At Risk = CALCULATE([At Risk],PARALLELPERIOD('Dim'[Date], -1,MONTH))

        Here we use CALCULATE to evaluate the SUM of At Risk (created in a base measure already) in a modified filter context. Which is PARALLELPERIOD using the date from the Date Dim.

        PARALLELPERIOD takes the current set of dates (For us its month and year) and shifts the first and last date in the period specified a number of intervals. -1 takes us back a month.

        This is the chosen method but you can also use

        CALCULATE([At Risk],DATEADD(‘DimDate'[Date],-1,MONTH))
        DATEADD returns a table that contains dates shifted forwards or backwards in time the number of intervals from the date in the current context. Again we are sing Month and Year in the current Context.

        Returns a table that contains a column of dates, shifted either forward or backward in time by the specified number of intervals from the dates in the current context

        PREVIOUSMONTH('Dim Report Snapshot Date'[Date]))

        PREVIOUSMONTH returns all dates from the previous month using the first date in your context. As we are using month, our first date is 0103 so it goes back to 0102

        CALCULATE([Risk],SAMEPERIODLASTYEAR('DimDate'[Date])))

        Returns a table (Its a table expression) Returns a single column of date time values from the same period in your current context from last year, SAMEPERIODLAST Year can only go back a year.

        Concatenate current month and Last Month into one single value

        This is for a table only. Not to be used for Proper analytics.

        So first we need to convert the number into a string.

        And then we can concatenate. The following was used

        At Risk =
        VAR varMeasure = CONVERT([At Risk],STRING)

        VAR varPrev = CONVERT([At Risk],STRING)

        VAR varConcata = CONCATENATE(varMeasure,”(“)

        VAR varConcatb = CONCATENATE(varPrev,”)”)

        RETURN

        CONCATENATE(varConcata,varConcatb)

        There will be lots of other ways to do this but I decided on the following
        Creating Variables to hold:

        • The measure converted to a string
        • The previous (Month) measure converted to a string
        • CONCATENATE (You can only concatenate two things using this method) so The measure was concatenated with (
        • CONCATENATE, Doing the second part of the concatenation. The previous Month String value with )
        • Finally we return a Concatenation of a and B so make a string value we can use for the table.

        To CONCATENATE more that two columns you can use

        Column D = [column A] & " - " & [column B] & " - " & [column B]

        So were we have 5 measures and a total. To view them in the table under one column header we have concatenated them together.

        Power BI Composite Modelling (Multiple datasets Report) – Couldn’t load the data for this visual

        This blog is for anyone using the new App experience (August 22) and has created a report using multiple datasets and the users can’t see the data

        We have

        • A workspace
        • A Dataflow
        • Multiple Datasets
        • A report using all the datasets
        • An App with testers
        • There are two testers with access to the testing report

        The app is published but the users only see visuals with no data. When they try to refresh they see this error

        This seems to be a issue with the composite model. It turns out that for users of composite model reports you need to have the following turned on.

        This means that the people in the testers group can view the composite report. But as a after effect they can also build reports over the datasets.

        I believe Microsoft may be aware and are looking into this. But for the time being. Any users of composite reports need to have this permission selected.

        Power BI – Deployment Pipeline Quick Tips – Setting dataflow Environment sources and Publishing (Direct Query) Datasets containing multiple Datasets

        You need Premium or Premium Per user to work with Deployment Pipelines

        This happens right at the beginning of the Deployment Pipeline process when you have just added Dev to the Pipeline and you need to deploy Test and Prod

        Tip – Changing Data source Rules for Dataflow

        You now need to deploy your dev dataflow which is connected to the dev database into Test. You cant change the data source rule until you have a data source to work with.

        After deploy, the test dataflow is still against the dev data source (Azure SQL database)

        Click Test Deployment Settings

        Deployment Rules – Click on your dataflow

        Data Source Rules – Change This (Your Dev Details) to this (Select and choose your Test SQL Server and Database)

        And Save

        The tip here is to then deploy your dataflow Dev to Test again. Only then will it use the new settings.

        To check go back to the workspace and go to settings for the dataflow

        Deploying Datasets that contain multiple data sets

        This is specific to setting up Power BI With the Following Option

        With this option set you can create smaller data sets, probably based on a star schema. Then if required you can connect to another data set. And then connect to more data sets and data. Without this option you can only connect to one data set.

        This has changed from a Direct Query Connection (The standard way. 1 Data Set Only) to Live Query Connection (Analysis Services and Multiple data sets)

        Tip- Move your hybrid data set after the original data sets

        So here, what we can do is move the dataflows, and datasets A B and C at the same time.

        Once completed move Star A B and C so it goes after the schemas its based on

        Then do the report.

        If you try and do them all together you will get errors.

        So these are just a couple of tips to look out for when setting up your Pipelines for the first time. And if you use the setting that allows you to connect to multiple data sets.

        Power BI – App Viewers can’t see the data in the report

        We recently had an issue where a shared dataset (pbix) had been set up over a SQL Database.

        This was then published to Power BI

        A new pbix was created.

        Power Platform – Power BI datasets was chosen and the shared dataset was selected. Then reports were created and published to Service.

        An App was set up and a user was added to view the report.

        However when they came to view the report, they would see the report but not the data. All they had was messages about not having access to the data.

        At first we struggled to understand what the problem was and then it started to add up.

        Previously we had worked on a project with dataflows and multiple datasets being used for one report. So we have the following ticked

        This worked great for this specific project. We were in Premium. There were dataflows.

        However, this project is just a test report, not set up in Premium and without dataflows.

        The above setting is a blanket setting that sets every pbix to you create from Live Query to Direct Query

        Live Query is where it live connects to just one data set only and then when you publish your report over the data set it uses that initial shared dataset and doesn’t create a new data set because the DAX, model etc. is all set up in that specific data set.

        Direct Query is a slight change. You Direct Query the data source (the data set) and crucially you can also direct Query other data sets, even other data sources like data bases and flat files all together. But that Shared Data set is also direct querying its data source.

        Direct query is a good one for real time analysis from a transactional database. But many DAX expressions aren’t available over Direct Query straight over a database. For example, time based intelligence DAX. So the reports are much simpler in Power BI. And more complex to set up at the database end for the users.

        In this instance, the reason we have issues is because there is no dataflow at the start of the Power BI process. 

        If you are using Direct Query over a dataflow, the data is imported into Power BI into the dataflow. The dataset Direct Queries the Dataflow.  Your users are then added to the workspace App and they can see the data because they have access to the dataflow.

        Without the dataflow, your data set is calling data directly as Direct Query.  Which is essentially where Power BI always calls from the data base and not from the Power BI Columnar data store.

        So the users were opening up the App, and trying to access data straight from the database because there is no dataflow holding the data. Because the user doesn’t have access to the database, there is no data to be seen.

        So the issue here I think is that Power BI should be allowing us to switch this option on and off, depending up the choices we make on set up. Not just have it as a blanket option over ever single report like it does now. 

        Without dataflows you want to Live connect to the shared dataset. Not Direct Query right down to the datasource.

        With a dataflow its fine to Direct Query because the users have access to the dataflow data in the workspace

        Design a site like this with WordPress.com
        Get started