The Microsoft data Journey so far. From SQL Server, Azure to Fabric

Having lived and breathed Microsoft for over 20 years, it is important to sometimes stop and think about all the changes over those years and all the growth and learning gained from each change to the analytics space.

I started working with on premises Microsoft Products. We had a large room full of Microsoft 2000 servers and a long journey to finally upgrade to 2008 R2.

Integration Services was the orchestration tool of choice and Reporting services (SSRS) was the server based reporting tool.

We were starting to move from basic management reporting into business intelligence, especially with the introduction of SQL Server Analysis Services that became part of the environment when we finally pushed to 2008 R2.

We have come along way since those days.

On Premises Analytics

One of the biggest issues with On Premises was the upgrading to new Servers. We had a lot of servers and applications tied to those servers. Upgrading to the latest release was never easy and sometimes couldn’t be done because of the system it supported.

This led to a real disparity of servers. Some still at 2000. Some 2008 R2. A few lucky ones moving to later versions.

Another big issue specially for the analytics team was the use of the servers. Spinning up a new database needed a lot of work to make sure that whatever was required wouldn’t run out of space or memory. Because there was only a certain amount of these resources for all services.

There were examples of simply not being able to work on a project because of these restrictions.

There is nothing more frustrating as a developer to know there are later releases out there but you are stuck on an old version. Or knowing that you could do so much more with a bit more compute power or space allocation.  There was no room to grow. You had to understand your full limit and work from there. 

Reporting Services (SSRS)

Its interesting to look back on SSRS, Microsoft’s Paginated reporting original solution after using Power BI for so long now.

Yes it delivered fairly basic paginated reporting but it didn’t quite deliver the experience we really wanted to go with for our new Business Intelligence vision.

On Premises to Azure

A career move presented me with the opportunity to start using Azure and Power BI.

Finally, the floodgates seemed to open and new possibilities seemed to be endless. Here are just a few examples of the changes happening at this point

  • Azure allowing us to always be on the latest version. No more wishing that you could use SQL Server 2014 whilst stuck on 2008 R2.
  • Power Bi, interactive data visualisation. The complete gamechanger. We will look at that more later
  • Azure SQL Databases. Here we can now spin up small cheap solutions for development work. Scaling up as we go. Never needing to pay for more than we use. Even having the advantages of upping compute during peak loading times. Always being on the latest version, and so many possibilities of choice.  
  • Serverless SQL DB for example. Great for Dev and UAT. Only unpausing compute resources when you need them.
    • We can still work with our SQL Skills building stored procedures to transform data.
  • Azure Data Lake. Secure Cloud storage for structured and unstructured data. A landing area for our data that also creates opportunities for our Data Science experts.
  • Azure Data Warehouse (Which upgraded to Synapse in 2019) was the offering that allows for MPP Massively parallel processing for big scale data. Along with the serverless SQL Pools (Synapse) to finally give us the chance to do analysis and transformations on the data pre the SQL Database load.
  • Data Factory. The Azure Data Orchestration tool. Another big gamechanger, offering so much more flexibility than Integration Services. Having a solution that can access Cloud resources and on premises resources. So much connectivity.

Power BI

Power BI is Microsoft’s modern analytics platform that gives the user the opportunity to shape their own data experience.

  • To drill through to new detail.
  • Drill down into hierarchies.
  • Filter data.
  • Use AI visuals to gain more insight.
  • Better visuals

And at the heart of everything. The Power BI Tabular storage model. The Vertipaq engine, Creating reporting that can span over multiple users all interacting with these report pages. Each user sending queries to the engine at speed.

I have worked with Analysis Services in the past, along with SSRS. Creating Star Schemas sat in columnar storage without needing to set up Analysis Services was a huge win for me as it was a much easier process.

Of course, you can’t talk about Power BI without understanding how different each license experience is. From the Power BI Pro Self Service environment, through to Power BI Premium Enterprise Level License.

There has been a lot of changes and Premium continues to create fantastic additional functionality. Premium sits on top of the Gen 2 Capacity offering larger model sizes. More compute. Etc.

As a take away. When working with Pro, you should always work with additional Azure resources, like Azure SQL DB, Integration Services etc to get the best end product.

With Azure and Power BI we have worked with the recommended architectures and produced quality analytics services time and time again. But, there were still some issues and pain points along the way.

And this is where Fabric comes in.

Fabric

Fabric is the (SaaS) Software as a Service Solution, pulling together all the resources needed for analytics, data science and real time reporting.  Fabric concentrates on these key areas to provide an all in one solution.

On the whole, for an analytics project, working with customers, our (basic) architecture for analytics projects was as follows:

  • Extract data into a Data Lake using Integration Services on a Schedule
  • Load the data into SQL Server Database
  • Transform the data into STAR schema (Facts and Dimensions) for Power BI analytics
  • Load the data into Power BI (Import mode where ever possible. But obviously there are opportunities for Direct Query, and Composite modelling for larger data sets)

We can see here that the data is held in multiple areas.

Synapse starts to address this with the Serverless SQL Pools. We can now create Notebooks of code to transform our data on the file itself. Rather than in the SQL Database on the fully structured data.

Fabric has completely changed the game. Lets look into how in a little more detail.

Medallion architecture

First of all, we need to understand the architectures we are working with. The medallion architecture gives us specific layers

  • Gold –  Our landing area. The data is added to the lake. As is. No Processing
  • Silver – The Data is transformed and Processed
  • Gold – the data is structured in a way that can be used for Analytics. The Star schema for Power BI.

Fabric allows us to work with the medallion architecture seamlessly. And as announced at Microsoft build in May of this year. We now have Task Flows to organise and relate resources. The Medallion architecture is one of the Flows that you can immediately spin up to use.

Delta Lake

The Delta lake enhances Data Lake performance by providing ACID transactional processes.

A – Atomicity, transactions either succeed or fail completely.

C – Consistency, Ensuring that data remains valid during reads and writes

I – Isolation, running transactions don’t interfere with each other

D – Durability, committed changes are permanent. Uses cloud storage for files and transaction logs

Delta Lake is the perfect storage for our Parquet files.

Notebooks

Used to develop Apache Spark jobs so we can now utilise code such as Pyspark and transform the data before adding into a new file ready to load.

Delta Parquet

Here is where it gets really interesting. In the past our data has been held as CSV’s, txt etc. Now we can add in Parquet files into our architecture.

Parquet is an open source, columnar storage file format.

The Power BI data model is also a columnar data store. This creates really exciting opportunities to work with larger models and have the full suite of Power BI DAX and functionality available to us.

But Fabric also allows us to create our Parquet Files as Delta Parquet, adhering to the ACID guarantees.

The Delta is and additional layer over Parquet that allows us to do such things as time travel with the transaction log. We can hold versions of the data and run VACUUM to remove old historical files not required anymore.

Direct Lake Mode

Along with Parquet we get a new Power BI Import mode to work with. Direct Lake allows us to connect directly to Delta Parquet Files and use this columnar data store instead of the Power BI Import mode columnar model.

This gives us a few advantages:

  1. Removes an extra layer of data
  2. Our data can be partitioned into multiple files. And Power BI can use certain partitions. Meaning we can have a much bigger model.
  3. Direct Query, running on top of a SQL DB is only as quick as the SQL DB. And you can’t use some of the best Power BI Capabilities like DAX Time Intelligence. With Direct Lake you get all the functionality of an Import model.

SQL Analytics Endpoints

If you are a SQL obsessive, like myself you can analyse the data using the SQL analytics endpoint within a file. No need to process into a structured SQL Database

Data Warehouse

Another one for SQL obsessives and for Big Data reporting needs. There will be times when you still want to serve via a structured Data Warehouse.

Conclusion

Obviously this is just a very brief introduction to Fabric and there is so much more to understand and work with. However  using the Medallion architecture we can see a really substantial change in the amount of data layers we have to work with.

And the less we have of data copies, the better our architecture will be.  There are still a lot of uses for the Data Warehouse but for many smaller projects, this offers us so much more.

Its been a long journey and knowing Microsoft, there will be plenty more fantastic new updates coming. Along the way, I would say that these three ‘jumps’ were the biggest game changes for me, and I can’t wait to see what Fabric can offer.

And remember, always use a STAR schema.

*first published on TPXImpact Website

Microsoft Fabric Part 14. Taskmaster Project. Creating a Delta Load on Transformed Notebook

Previously, Our Notebooks take every csv file in the folder to transform.

We don’t want to do this. We want to Load the Files that haven’t been processed.

And further down the line we want to be able to do every file, Or just the unprocessed ones.

We want to do a Delta Load. This means changing the pyspark code we have created.

And a big thankyou to everyone who helped on the forums. especially
frithjof_v who helped with the code.

We want to do the following

For this exercise, Taskmastertransformed.parquet has been deleted. We are going to start at the beginning. And Load Series 1,2,3. Then 4,5 and 6

Go into the DataLake in Azure and remove all the Taskmaster files apart from 1,2 and 3.

Back to Taskmaster Transformed Notebook

Using mssparkutils, Get List of Files to be processed from Bronze folder

# abfss path to the folder where the csv files are located 
files_path = 'abfss://############@onelake.dfs.fabric.microsoft.com/############/Files/Data/Bronze/TaskMasterSeriesFiles/taskmaster'

# the mssparkutils.fs.ls method lists all the files (and/or subfolders) inside the folder.
files = mssparkutils.fs.ls(files_path)

# Convert FileInfo objects to list of tuples (this creates a list of the file names in the folder.)
file_data = [(file.name,) for file in files]

# This creates a dataframe consisting of the file names in the folder
df_files_in_folder = spark.createDataFrame(file_data, ["name"])

# Show the DataFrame (this step can be removed)
display(df_files_in_folder)



To get the abfss path for this code, Right click on the folder and Copy ABFSS path.

Remember, an abfss path is an azure blob file system path.

We have the three starting files in the shortcutted Data Lake Folder.

Remove the none Taskmaster file from the list

from pyspark.sql.functions import col

# Filter rows that start with "Taskmaster"
df_files_in_folder = df_files_in_folder.filter(col("name").startswith("Taskmaster"))

# Show the resulting dataframe
df_files_in_folder.show()

The data frame has been filtered

The Pre Processed Audit File.

We want to have a file that contains a list of all the Processed files. This is created later on, but one was needed from the offset. We need an empty dataframe to check so we don’t get an error.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType,TimestampType


# Create a Spark session
spark = SparkSession.builder.appName("CreateEmptyDataFrame").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("filename", StringType(), True),
    StructField("processedTime", TimestampType(), True),
    StructField("fullyProcessedFlag", IntegerType(), True)
])

# Create an empty DataFrame with the specified schema
df_log  = spark.createDataFrame([], schema)

# Show the schema of the empty DataFrame
df_log.printSchema()

display(df_log)

Create the Initial Parquet Processed Files File if it doesn’t already exist

We are going to read from the ProcessedFiles Parquet file so We need a file to read from. Even if its the first time running.

from pyspark.sql.utils import AnalysisException

# Define the paths
workspace_id = "#######-####-####-####-############"
lakehouse_id = "########-####-####-####-############"
file_path = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles.parquet"

# Check if the file exists
try:
    spark.read.parquet(file_path)
    print(f"File {file_path} already exists. Skipping write operation.")
except AnalysisException:
    # File does not exist, proceed with writing
    df_log.write.mode("append").parquet(file_path)
    print(f"File {file_path} does not exist. Writing data to {file_path}.")

We we use a try:

for exception handling.

E.g. try and read the file. If true. You can read it then don’t do anything

Except:

If the try was false. Move to the AnalysisException.

In this case. Simply create the file. And print that the file didn’t exist so we created it. And it will be empty because it is created from our empty schema. All ready to start from scratch.

List Files from the processed Log

# This creates a dataframe with the file names of all the files which have already been processed, using the processedFiles Parquet File

from pyspark.sql.functions import input_file_name, regexp_extract

workspace_id = “#####-####-####-####-#############”
lakehouse_id = “#####-####-####-####-#############”

parquet_file = (f”abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles.parquet”)

df_already_processed = spark.read.parquet(parquet_file)

display(df_already_processed)

Nothing has been processed. this is the first attempt so nothing has as yet been logged

Which files haven’t been processed?

# Selecting only the filename column from df_already_processed
df_already_processed_filenames = df_already_processed.select("filename")

# Selecting only the name column from df_files_in_folder
df_files_in_folder_names = df_files_in_folder.select("name")

# Performing subtract operation to find non-matching rows 
# (so only the file names of the files which have not been processed already are kept)
df_to_process = df_files_in_folder_names.subtract(df_already_processed_filenames)

# Showing the resulting DataFrame of files which have not yet been processed (this step can be removed)
display(df_to_process)

Note here we get just the filename and name columns. Then remove already processed from Processed using subtract

At this point there could be No files. 1 file or Multiple files to process. We can see our initial 3 files to process

What we need to do is create for loop to get all the none processed files into a dataframe. But first we need to create an empty dataframe to load them into

Create an Empty Dataframe

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType,TimestampType


# Create a Spark session
spark = SparkSession.builder.appName("CreateEmptyDataFrame").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Series", StringType(), True),
    StructField("Episode No", IntegerType(), True),
    StructField("Episode Name", StringType(), True),
    StructField("Episode Date", DateType(), True),
    StructField("Task Order", IntegerType(), True),
    StructField("Task", StringType(), True),
    StructField("Task Type", StringType(), True),
    StructField("Assignment", StringType(), True),
    StructField("Contestant", StringType(), True),
    StructField("Team", StringType(), True),
    StructField("Points", IntegerType(), True),
    StructField("Winner", IntegerType(), True),
    StructField("source_filename", StringType(), True),
    StructField("source_processedTime", TimestampType(), True)
])

# Create an empty DataFrame with the specified schema
dftm  = spark.createDataFrame([], schema)

# Show the schema of the empty DataFrame
dftm.printSchema()

source_filename and source_processedTime is for our meta data we are going to create, in the next step

Create the For Loop

from pyspark.sql.functions import current_timestamp, lit, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from datetime import datetime

# Loop through the dataframe which consist of the filenames
for row in df_to_process.rdd.collect(): # Collecting to driver (local) as a list of Rows 
    # Extract filename from the current row
    filename = row["name"]

    # Read the current csv file into a dataframe
    dfnf = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(files_path +"/" + filename)
    
    # Add filename column to the dataframe
    dfnf = dfnf.withColumn("source_filename", lit(filename))
    tidsstempel = current_timestamp()
    
    # Add current timestamp column ("source_processedTime") to the dataframe
    dfnf = dfnf.withColumn("source_processedTime", tidsstempel)
    
    # Append the dataframe to the table in Silver lakehouse
    #df.write.format("delta").mode("append").save('<Insert the abfss path to your silver layer Lakehouse table here>')

    dftm = dftm.union(dfnf)
    
    # Create a single-row DataFrame with the filename (this will be appended to the log table)
    single_row = [[filename]]
    single_row_schema = StructType([
        StructField("filename", StringType(), False)
    ])
   
    display(dftm)
    
  • Here, we are creating a for Loop, Collecting the list of every file name in df_to_process.
  • Then we get Name from the current row
  • Read the csv file of that name into a dataframe.
  • Add the file name, and a Date and time stamp.
  • Then Append to the dataframe created (dftm) so we could have 1 or many files here. usually it will be just one.
  • Repeat until all the unprocessed files are through into dftm

The Original Creation of the dataframe has been commented out.

Check dftm dataframe.

from pyspark.sql.functions import count

result_df = dftm.groupBy("source_filename").agg(count("*").alias("Total"))
   
display(result_df)

Here we can see the total of rows against each source file. Make sure you don’t accidentally process again into the dataframe.

Create the df_log

# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, current_date, col

#Get the distinct list of file names we are going to process
filenamesdf = dftm.withColumn("filename",col("source_filename")).select("filename").distinct()


# Remove everything after the "?" character, there used to be more in here but we dont have to worry about this since a code change
filenamesdf= filenamesdf.withColumn("filename", regexp_replace("filename", "\\?.*", ""))

#Add a process Date
filenamesdatedf = filenamesdf.withColumn("processedTime",current_timestamp())

#Add a flag
df_log = filenamesdatedf.withColumn("fullyProcessedFlag",lit(0))

display(df_log)

The following took the filename from our data, and added it into a data frame with the date and time of the process and 0 for our not fully processed flag.

The data is only fully processed when it has been added to the dimension and fact tables.

We can now run all the transformations as we did in previous posts. after bringing in the data.

  • Drop rows with all NULL values
  • Remove Special characters
  • Update the Date
  • Add day and add month and create new episode Date column
  • Bring in the Contestant Data
  • Bring in People data (Originally from Kaggle so may need manual updates eventually)
  • Join Contestants to the people data set (And check that everything maches)
  • Create age as at the time of the show. first get the minimum Episode Date
  • Join the Episode Date into the People dataframe
  • And calculate the age

Calculate the Age issue

IF we run the process and there are No Files to Process in the delta load table. the code can’t calculate using the min Series date.

Obviously this should rarely happen. But the use case for this is there so we need to handle it better.

We actually need to go back 1 step to joining the min_episode_date into df_Cont above. We only care about those contestants with an episode. So we need to change

# Join the min_episode_date into contestants
dfcont = dfcont.join(dfminSeriesCont,dfcont["Name"] == dfminSeriesCont["Contestant"], "left_outer").drop(dfminSeriesCont.Contestant)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dfcont)

Simply changing this to inner only brings back rows that exist on both sides.

Now, we get to the Code to calculate the age. This is the amended code (Just in case we have Nulls)

from pyspark.sql.functions import datediff, col, when

# Convert the date of birth column to a date type in dfCont
dfcont = dfcont.withColumn("dob", dfcont["dob"].cast("date"))

#Calculate age using dob and current date 
dfcontAge = dfcont.withColumn("age_decimal", when(dfcont["min_episode_date"].isNotNull(), datediff(dfcont["min_episode_date"], dfcont["dob"]) / 365)
.otherwise(None))

#Convert from decimal to int  
dfcontAge = dfcontAge.withColumn("age", col("age_decimal").cast("int"))

#Convert Contestant ID to int
dfcontAge = dfcontAge.withColumn("contestant ID", col("contestant ID").cast("int"))

display(dfcontAge)

Here we add a when function. so when min episode date isn’t null we do the calculation. otherwise nothing happens. None is a special value to denote the absence of a value.

Also as a new code block. We want to make sure everyone has an age.

# Select distinct Name and age, order by age in descending order
result_df = dfcontAge.select("Name", "age").distinct().orderBy(col("age").desc())

# Show the result
result_df.show()

Another possible one to watch out for in the automated process. We don’t want anything to come through without a set age.

From here we can continue with the final previous blog post logic

  • Create an age group

Changes Creating the Parquet file

There is something we need to understand at this point, You can’t pull data from a parquet file based on a filter. You can only filter the dataframe. We want to delta load facts and dimensions. Therefore we should partition the taskmastertransformed file, to avoid loading in all the data.

We create 3 Parquet Files here.

  • TaskmasterTransformed.
  • contestantTransformed (From data that we manually update in on file)
  • ProcessedFiles.parquet which feeds our for loop with the processed file names.

This is the original Create Parquet file for TaskmasterTransformed

Note, the mode is overwrite. We now want to append the data

workspace_id = “########-####-####-############”

lakehouse_id = “########-####-####-####-############”

dftm_cleaned.write.partitionBy(“source_filename”).mode(“append”).parquet(f”abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/taskmasterTransformed.parquet”)

  • PartitonBy has been added to create filename partitions
  • append has replaced Overwrite for the mode

Check the Parquet File for taskmasterTransformed

And we can have a look at the Parquet file created in the Silver Lakehouse

# This creates a dataframe with the file names of all the files which have already been processed

from pyspark.sql.functions import input_file_name, regexp_extract

workspace_id = "########-####-####-############"

lakehouse_id = "########-####-####-####-############"

parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/taskmasterTransformed.parquet")

df_already_processed = spark.read.parquet(parquet_file)

display(df_already_processed)

contestantTransformed

At this point, we also save the overwritten ContestantTransformed information.

dfcontAge.write.mode("overwrite").parquet(f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/contestantTransformed.parquet")

But we have changed the logic.

  1. There may be no data in here, if you are processing and the files have been processed
  2. There should only be data that hasn’t previously been processed in here

Which again means that we need to change Overwrite to append.

Processed Files Parquet

workspace_id = "########-####-####-############"

lakehouse_id = "########-####-####-####-############"

df_log.write.mode("append").parquet(f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles.parquet")

Check the Log Parquet File

# This creates a dataframe with the file names of all the files which have already been processed

from pyspark.sql.functions import input_file_name, regexp_extract

workspace_id = "########-####-####-############"

lakehouse_id = "########-####-####-####-############"

parquet_file = (f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles.parquet")

df_plog = spark.read.parquet(parquet_file)

display(df_plog)

Conclusion

We now have a transformed dataset with 3 series, that are partitioned by series.

We also have a ProcessedFiles Parquet file with a flag to show that the process is not yet complete.

  • If you start from scratch. You create an empty Parquet file and the first files processed are appended at the end
  • Otherwise, new files are appended to the file

We will go on to complete the the dims and facts and then set the 0 to 1 in ProcessedFiles to show everything is completed in the next posts .

Then we can run again, adding the next series into the shortcutted Data Lake. And we can refine and make the process better as we go.

Microsoft Fabric Part 13. Changing Structure of Lakehouses

Before continuing on with this project, lets look at an amendment to the lake house structure. mostly because we decided we wanted to keep the structure of the Task Flow. Instead of having one Lakehouse for Gold Silver and Bronze. We want three lake houses. One each for Bronze Silver and Gold.

3 new lakehouses are created

And now, the Notebooks need updating

Taskmaster Transformed

In the notebook.

Bronze is the main lakehouse that we are pulling the data from. But you can also add another Lakehouse.

And use the arrows to switch between the two.

The only Code that needs to be changed is when we create the Silver PARQET file (Its not Delta PARQUET at this point.

From

To

To get the ABFS path to move from Default Data Lake to another Delta Lake, right click on the destination lake and Copy ABFS Path

dftm_cleaned.write.mode("overwrite").parquet('abfss://986472b4-7316-485c-aa22-128e1cb29544@onelake.dfs.fabric.microsoft.com/ee31b1a4-16bf-4aae-aaab-f130bd4d53c6/Files/Data/Silver/taskmasterTransformed.parquet')

And we can go further, by parameterising the workspace ID and the lakehouse ID

f has been added to allow us to add parameters into the location string.

So we can now use this when we want to create a PARQET file in a different Lakehouse to the default one.

And we have introduced parameters.

Now we want to know how to do with with a Delta Parquet file moving it into the Gold Lakehouse

Silver to Gold lakehouse Delta Parquet

To

from delta.tables import DeltaTable


#You can also add the none default data lake by clicking +Lakehouse
aliased_df.write.mode("overwrite").option("overwriteSchema", "true").format("delta").saveAsTable("GoldDebbiesTraininglh.dimContestant")

And again, we clicked + and Added the Gold Lakehouse as the none default.

How can you tell which is the default?

Hover over the Lakehouse Name to get the list.

Conclusion

We have now transformed the architecture of the Lakehouse to have three Lakehouses. gold. Silver and Bronze. instead of One Lakehouse with 3 folders for Gold Silver and Bronze,

This has allowed us to see how the code changes when creating files in none default Lakehouses. And has allowed us to set up our first parameters. and it also means we can use the medallion task flow as is without having to do any amendments.

It also feels right to have more separation of the three areas.

Microsoft Fabric Part 11. Taskmaster Project. New Fabric Feature, Task Flows

In Part 11 we created Power BI reporting and changed the usual PBIX to a PBIP Project file.

Before moving on, lets have a quick look at a new feature in Fabric. Task Flows

Back in the Fabric Workspace

We can now build a task flow. But what is it?

Its a new workspace feature to visualise the collection of processes for an end to end solution.

There are specific types of tasks that we will look at later

There are two ways of going about this.

  1. You have started a brand new project and you have your initial Task flow set up to start adding in tasks. This helps you work with your architecture,
  2. You have already created tasks (Like this project) and want to assign them to a task flow.

Lets begin

There are 8 flows available at the moment. Throughout the Fabric journey, the medallion architecture has been championed so lets go with this.

Colouring

  • Green Items are Get data
  • Blue Items are Store Data
  • Purple items are Prepare data
  • Yellow items are Visualise data
  • Red Items are Analyse and train Data

Creating new Tasks

If you haven’t yet started. When you set up your board. you need to assign items to each task.

There are no Data Engineering steps in this development project for getting data so lets start with the green tasks.

Click New Item

And we get options of what to create for Low Volume data. Great. We have recommended items (which we can change to all) So if you wanted to go Low code a Data Factory Dataflow Gen2 might be the way to go.

lets have a look at high volume data

Here, a Data Pipeline would probably be used for this project.

Data Factory was previously the go to for copying files across into the Data lake so as the data engineer, I would possibly pick pipelines for both types of data processing.

Already this is a great way of working. We know we need to do all these processes but the amount of possibilities can be bewildering. So let Fabric give you recommendations every step of the way.

Assigning Items to a Task

Bronze Data

If we quickly click new item to check the recommendations

We have already set up a Lakehouse for Taskmaster.

Click the little paper clip icon to attach a task

And Select the Lakehouse

Now we have 3 items at Bronze Data Level

  • DebbiesFabricLakehouse Top Level
    • The SQL Analytics Endpoint
    • The semantic Model (default) This wasn’t used because there were issues with errors occurring whilst developing.

Selecting the Bronze data Task shows you the objects in that task.

Lets move on to

Initial process

We have a Data Engineering notebook set up that Creates our initial transformed Silver layer. This can be added here

One current real issue for me is the vertical bar on Fabric and Power BI

You have to go to the end to scroll down. And you can’t see your items. Extremely annoying when working on smaller screens. This is actually a large screen but you cant make the box any bigger. You have to manually make the columns smaller each time .

I hope they sort this out soon

What happens when you accidentally assign an item to the wrong task flow

I have added the Lake house to the initial Process accidentally.

Hover over the tasks and select the ones you want to remove

And Unassign from all Tasks which appears

Silver Data

In this case the Lakehouse contains bronze and silver layer, so this is for both.

What happens when we assign the lakehouse to silver?

Well, you can’t do this. An item can only be for a single task. So how do we relate this to our medallion architecture above?

For the time being, go to Bronze Data and edit

Assign the Lakehouse

Click on Silver Data

Delete. We don’t need this

This leaves a gap in our flow

Connecting Tasks

Click the edge of the starting task and join to the end task

Further transformation

We have Notebooks that prep the dims and facts. Lets add them

Golden Data

Another Data Store and again our golden data is in Delta Parquet in the Lakehouse. Looks like we can edit the initial Store data again

Mini gripe. You can’t see all of the information so you cant tell its bronze silver and gold.

Delete Golden Data

Personally I would prefer to have all three on the pane and share the Data lake between all three tasks.

But where do we add the semantic model?

The Semantic model comes before visualising the data. But we have nowhere for this.

Creating paginated reports and semantic models from a task isn’t supported.

https://learn.microsoft.com/en-us/fabric/get-started/task-flow-overview

Well this is a real shame. it feels like this needs to be added before its a really usable solution.

Data Visualise

Our report is in a PBIX File.

We don’t have any Dashboards

Lets publish the Content from Pbix to the Fabric Workspace.

Then go back to the workspace

Interestingly these are now uncommitted items and show up in source control.

In Part 10 we did a lot of work getting the PBIX file into source control. What will this do? we will look at this later.

Back to Data Visualize

We add the reporting

The Semantic Model has also been added here, Hopefully a new task will be added soon specifically for the semantic model, there are currently no items types for semantic modelling.

There is no ML Service so we could get rid of this task.

Old Notebooks not being used (Filters).

There are a few Notebooks from the old version not part of this, can we see them?

Another issue. Selecting Not Specific for Task doesn’t filter the Tasks without a Task Specified. Something else that needs to be ironed out.

Conclusion.

There are currently a fair few issues with this but it is new and hopefully they will get ironed out.

I still don’t like scrolling in any of the Power BI / Fabric Service solutions because it doesn’t automatically resize to page size. This has been an annoyance for a long time now. I really wish it would get sorted.

Semantic modelling needs to be added. Filtering needs to be much better,

And I genuinely wish you could have an item in multiple tasks, especially store data. But overall I really love this feature and can’t wait to see if get some more development

Microsoft Fabric Part 8. Taskmaster Project. Data checks using the SQL analytics Endpoint

SQL Analytics Endpoint

Lets switch to the SQL analytics endpoint

Only the Delta table are available with the SQL endpoint.

Lets try some options

New SQL Query

We can use SQL to check that we are happy with what has been created

SELECT e.Series, e.EpisodeName,t.Task, t.TaskType, c.ContestantName, f.Points 
FROM [DebbiesFabricLakehouse].[dbo].[facttaskmaster] f
inner join [DebbiesFabricLakehouse].[dbo].[dimepisode] e on f.EpisodeKey = e.EpisodeKey
inner join [DebbiesFabricLakehouse].[dbo].[dimtask] t on t.TaskKey = f.TaskKey
inner join [DebbiesFabricLakehouse].[dbo].[dimcontestant] c ON c.ContestantKey = f.ContestantKey
Where e.EpisodeName = 'The poet and the egg'
Order by t.Task

Immediately we can spot some problems.

Task 1 has 10 records and not 5. there should always be 5. 
We can write a query to see how many issues there are 

With CTEtm (Series, EpisodeName,Task, TaskType, ContestantName, Points)

AS 
(SELECT e.Series, e.EpisodeName,t.Task, t.TaskType, c.ContestantName, f.Points 
FROM [DebbiesFabricLakehouse].[dbo].[facttaskmaster] f
inner join [DebbiesFabricLakehouse].[dbo].[dimepisode] e on f.EpisodeKey = e.EpisodeKey
inner join [DebbiesFabricLakehouse].[dbo].[dimtask] t on t.TaskKey = f.TaskKey
inner join [DebbiesFabricLakehouse].[dbo].[dimcontestant] c ON c.ContestantKey = f.ContestantKey)

SELECT Series, EpisodeName, Task, count(*) AS TotalRows
FROM CTEtm 
GROUP BY Series, EpisodeName, Task
Having COUNT(*)>5

There are 146 issues.

You cant manipulate the data with UPDATE’s inserts etc.

New Visual Query

lets try and get an issue example using a visual query

Before merging with the contestant, the contestant needed to be dragged into the analytics pane.

This is very similar to the power Query Editor and could be of use to people who aren’t as proficient in SQL and prefer using visual tools

Personally I prefer SQL. So I will stick to this.

Back in SQL Query

SELECT e.Series, e.EpisodeName,t.Task, t.TaskType, c.ContestantName, f.Points 
FROM [DebbiesFabricLakehouse].[dbo].[facttaskmaster] f
inner join [DebbiesFabricLakehouse].[dbo].[dimepisode] e on f.EpisodeKey = e.EpisodeKey
inner join [DebbiesFabricLakehouse].[dbo].[dimtask] t on t.TaskKey = f.TaskKey
inner join [DebbiesFabricLakehouse].[dbo].[dimcontestant] c ON c.ContestantKey = f.ContestantKey
Where e.Series = 'S1'
AND t.Task = 'Buy a gift for the Taskmaster'
Order by c.ContestantName

Clearly we have an issue with Dim Contestant. We have two of every contestant. This has been super useful. We cant resolve in SQL so its time to go back to the contestant notebook.

The SQL code is going to be kept here in Queries

You can also moved to Shared Queries to other developers can access your queries.

Back in the Contestants V2 Notebook

We have an issue in the Contestants Dim.

After we drop the records where Team is null we now need to add additional Pyspark to check for Duplicates

from pyspark.sql.functions import min, substring

# Group by "Contestant" and aggregate the minimum "Episode Date"
dfContfinalGrp = dfContfinal.groupBy("Contestant Name").count()
dfContfinalGrp = dfContfinalGrp.filter(col("count") > 1)

# Show the resulting DataFrame
dfContfinalGrp.show()

These will cause issues. Why is it happening?

filter_column = dfContfinal.filter(col("Contestant Name") == "Noel Fielding")

# Show the resulting DataFrame
filter_column.show()

Its seat causing the issues because these contestants have been on Taskmaster more than once.

This causes us an issue. This is fine, Because the granularity is the contestant and seat. We need to update the Fact table key accordingly. And it goes even further than this. What we really need to do is go back to the original transformation notebook to ensure we can join on Seat

Back to the Transformation Notebook

There is a query that merges Contestants and people together.

Instead of loosing this information. We need to load it into PARQUET for use later and we need to keep series in this dataframe

One tweak is to keep the series in the transformed contestant PARQUET file and then make sure it matches S1 S2 etc in the main file

# Join the extra contestant information
dfcont = dfc.join(dfp, dfc[“Name”] == dfp[“contestant”], “left_outer”).drop(dfp.contestantID)\
.drop(dfp.contestant).drop(dfp.team).drop(dfp.team_label).drop(dfp.champion)\
.drop(dfp.TMI)

# The resulting DataFrame ‘joined_df’ contains all rows from dftask and matching rows from dfob
display(dfcont)

series has been removed from .drop()

Create S1 instead of Series 1 etc in the transformed contestant file.

from pyspark.sql.functions import regexp_replace

# Assuming you have a PySpark DataFrame named dfcont
dfcont = dfcont.withColumn("series_label", regexp_replace("series_label", "Series ", "S"))

# Show the updated DataFrame
dfcont.show()

Back to the Fact Notebook

change to the code when adding in the contestant key

Before we continue. We want to add the seat into the main tm dataframe

#Join tm to contestants to get the seat
dftm = dftm.join(drctrans, (dftm["Contestant"] == drctrans["Name"])& (dftm["Series"] == drctrans["series_label"]), "left_outer")\
.drop(drctrans.Name).drop(drctrans.Image).drop(drctrans.From).drop(drctrans.Area).drop(drctrans.Country).drop(drctrans.series).drop(drctrans.series_label)\
.drop(drctrans.dob).drop(drctrans.gender)\
.drop(drctrans.hand).drop(drctrans.age).drop(drctrans.age_decimal).drop(drctrans.ageRange)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dftm)

Here we add in seat from the transformed contestant data

#We want the seat in the main table
dftm = dftm.join(dfc, (dftm["Contestant"] == dfc["ContestantName"])& (dftm["Seat"] ==dfc["Seat"]), "left_outer")\
.drop(dfc.ContestantID).drop(dfc.ContestantName).drop(dfc.Team).drop(dfc.Image).drop(dfc.From).drop(dfc.Area).drop(dfc.Country).drop(dfc.Seat).drop(dfc.Gender)\
.drop(dfc.Hand).drop(dfc.Age).drop(dfc.AgeRange)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dftm)

And updated the above code to also join on Seat now we have seat in both the main table and the dim table to get the correct Key.

Back in the SQL endpoint

SELECT e.Series, e.EpisodeName,t.Task, t.TaskType, c.ContestantName, f.Points 
FROM [DebbiesFabricLakehouse].[dbo].[facttaskmaster] f
inner join [DebbiesFabricLakehouse].[dbo].[dimepisode] e on f.EpisodeKey = e.EpisodeKey
inner join [DebbiesFabricLakehouse].[dbo].[dimtask] t on t.TaskKey = f.TaskKey
inner join [DebbiesFabricLakehouse].[dbo].[dimcontestant] c ON c.ContestantKey = f.ContestantKey
Where e.Series = 'S1'
AND t.Task = 'Buy a gift for the Taskmaster'
Order by c.ContestantName

The fixes have removed a lot of issues but we are still left with 6 episodes causing issues. like the one above.

looking at the keys. its clearly the task key.

SELECT * FROM  [DebbiesFabricLakehouse].[dbo].[dimtask] Where TaskKey IN (64,100)

Back to the Episodes Notebook Attempt 2

The really good thing about the SQL Endpoint is that I can quickly check the work that has been done for issues like this before moving onto the semantic model

Now this issue is highly probable because some tasks across series may have the same Task Name. And its the order that gives it its uniqueness in the dimension

Again, we go back to the fact dataframe to add TaskOrder into the join

Back to the Episodes Notebook Attempt 3

Lets see how this fix has helped

Conclusion

The SQL endpoint has helped us fix

  • Contestants where a contestant has been on more than one series
  • Tasks, when a task has the same name

as a developer with a lot of experience in SQL this is a really great way of quickly creating code to check for errors. And you have the graphical functionality if you aren’t a SQL person.

I’ts a real win.

Next. We have our star schema and its been tested. Time to create the Semantic Model.

Design a site like this with WordPress.com
Get started