Microsoft Fabric Part 12. Taskmaster Project. Pyspark Broadcast

In Part 10 we created the reporting and Project PBIP file. In Part 11 we looked at new Task flows and added our items into a medallion architecture.

Its now time to do some small tweaks to the project so far. One thing I discovered is the Pyspark broadcast function.

This function is used to optimise the performance of spark jobs by reducing data shuffling.

Broadcast is really good for join operations. And we have a lot of joins in the the notebooks. can we make the code better?

First of all. What is data shuffling?

Data Shuffling

In Distributed data processing its about redistributing data across partitions or nodes in a cluster. It happens when the data gets re-organised.

  • Joins
  • Aggregations
  • Groupings

So, data is distributed in partitions to allow for parallel processing

When you join two partitions together you get data shuffling (During transformation)

Shuffling is expensive and creates network overhead.

These are the stages of shuffling.

  • Map Stage – When the data is processed in partitions
  • Shuffle Stage – Shuffling and exchanging
  • Reduce Stage – Further processing after shuffling

We want to try and avoid unnecessary shuffling by using broadcast. Lets go back to our notebooks

Taskmaster transformed Notebook

# Join the extra contestant information
dfcont = dfc.join(dfp, dfc["Name"] == dfp["contestant"], "left_outer").drop(dfp.contestantID)\
.drop(dfp.contestant).drop(dfp.team).drop(dfp.team_label).drop(dfp.champion)\
.drop(dfp.TMI)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dfcont)

This is the original Join. Clicking run, it takes 6 seconds.

If you run it again it takes two seconds. Why is it quicker?

  • File caching
  • The second run will benefit from better data distribution and partitioning
  • JVM warmup. (Java Virtual Machine) has optimized on the second run

How do you get it to have time of run 1?

spark.catalog.clearCache()

Clears the memory before trying again.

Working with Broadcast on a join

You need to Broadcast the smaller table. In the above instance we can create a quick check of sizes on the two data sets.

dfcrow_count = dfc.count()
dfprow_count = dfp.count()

print(f"Number of rows in the dfc DataFrame: {dfcrow_count}")
print(f"Number of rows in the dfp DataFrame: {dfprow_count}")

Immediately we can see which dataframe is to be broadcast. dfc

from pyspark.sql import SparkSession, functions as F

df_small = F.broadcast(dfc)

# Join the extra contestant information
dfcont = df_small.join(dfp, df_small["Name"] == dfp["contestant"], "left_outer").drop(dfp.contestantID)\
.drop(dfp.contestant).drop(dfp.team).drop(dfp.team_label).drop(dfp.champion)\
.drop(dfp.TMI)

# The resulting DataFrame 'joined_df' contains all rows from dftask and matching rows from dfob
display(dfcont)

dfcont.explain()

Explain allows us to see the plan.

This took 1 second to run as opposed to the 6 seconds previously. if you are working with a lot of data this could really change things considerably with your runs.

Lets remove the original join and lock this one in.

The Execution Plan

Gives you more detail on how the command is being executed.

Conclusion

With this in mind, we can go through all the code and update anything where we can with a broadcast which should really help with the processing.

A really great addition to the Pyspark toolbox

Design a site like this with WordPress.com
Get started