Data Factory to move daily snapshot data in Azure SQL DB to files in a Gen2 Data Lake

I have the following problem to resolve (And this was my initial attempt at figuring out a solution)

I have a data source with data that contains a date. the data contains daily snapshot of a record. this means that a record will be in the data set once per day. This will amount to a lot of data and we would rather hold it as files in Azure Data Lake Gen2 Storage

The logic is to pull daily files from the source database into dated files within the Azure data Lake

Once running this will probably pick up a days data because the rest are already created. However on the initial run I want it to pick up all the days that have been loading.

At the minute I have about 6 months of data to load

Tables in Azure SQL Database (destination)

CREATE TABLE [audit].[MemberDailyMetricsDates](
     [DATEUTC] datetime2 NULL
 ) ON [PRIMARY]

This table collects all the dates from the source snapshot table

Remember, the table records the same record every day with any changes to the record.

CREATE TABLE [audit].[IdWatermarks](
     [TableName] nvarchar NOT NULL,
     [WatermarkValue] [bigint] NOT NULL,
     [WatermarkDate] [datetime] NULL,
     [WatermarkDate2] datetime2 NULL
 ) ON [PRIMARY]
 GO

this is where I add the dates from the tables to show where we are.

For example if we add the last lot of data from 02/01/2020 then this value will be stored in the watermark table. I record them in different formats just in case.

CREATE TABLE [audit].[ProcessingMetrics](
     [ID] [int] IDENTITY(1,1) NOT NULL,
     [TableName] varchar NULL,
     [DateProcessed] [datetime] NULL,
     [DateUTC] datetime2 NOT NULL,
     [Duration] [int] NOT NULL,
     [NoRows] [int] NULL,
 PRIMARY KEY CLUSTERED 
 (
     [ID] ASC
 )WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF) ON [PRIMARY]
 ) ON [PRIMARY]
 GO
 ALTER TABLE [audit].[ProcessingMetrics] ADD  DEFAULT (getdate()) FOR [DateProcessed]
 GO

We can record meta data from the data factory Pipeline as it runs

Stored Procedures in SQL Database (destination)

/*Debbie Edwards
 17/12/2019 Created initial SP
 Create a watermark table
 EXEC [audit].[audit].[USP_Watermark]    '2014-12-31 00:00:00.000'
 */

 ALTER PROCEDURE [audit].[USP_Watermark] @NullDate datetime2

 AS   

 BEGIN

 IF EXISTS (Select * FROM [audit].[IdWatermarks]  
 WHERE TableName = 'DailyMetrics')

 DELETE FROM [audit].[IdWatermarks]  
 WHERE TableName = 'DailyMetrics';

 --DECLARE @NullDate datetime2 SET @NullDate = '2014-12-31 00:00:00.000'

 WITH CTEDailyMetricsDates (DATEUTC)
 AS
 (SELECT ISNULL(MAX(DATEUTC),@NullDate) FROM [audit].[DailyMetricsDates]) 

 INSERT INTO [audit].[IdWatermarks] 
 (TableName, WatermarkValue, WatermarkDate, WatermarkDate2)
 SELECT 'DailyMetrics',CONVERT(bigint,CONVERT(datetime, MAX(DATEUTC))), MAX(DATEUTC), MAX(DATEUTC)
 FROM CTEDailyMetricsDates
 END 

This is the Stored procedure that you run to create the watermark

Currently I’m just running this for one table. You could redesign to run this SP for different tables.

Also note, If there is nothing in the table I am setting a default date to work from @NullDate

/*Debbie Edwards
 18/12/2019 Created initial SP
 Update Processing details
 EXEC [audit].[USP_UpdateMetrics]
 */
 ALTER PROCEDURE [audit].[USP_UpdateMetrics] @TableName varchar(100),   @DateUTC datetime2, 
 @Duration int,  @NoRows int

 AS   

 BEGIN

 INSERT INTO [audit].[ProcessingMetrics]
 (TableName, DateUTC, Duration, NoRows)
 VALUES

 (@TableName, @DateUTC, @Duration, @NoRows)

 END

the Pipeline will run this stored Procedure to add meta data to the ProcessingMetrics table

Data Lake Gen2 Storage Account

Along with the Destination SQL Database you need to have an Azure Data Lake

Quick Set up for the Azure Data Lake. Setting up an Azure Data Lake V2 to use with power BI dataflows in Service (As a data source) …. and Setting up a service principal will give you information on how to:

  • Set up the Storage Account
  • Ensure Hierarchical name space is enabled
  • Create a file system
  • Grant reader Role to power BI service
  • Set up the Service Principal (Set up the App Registration)

Set up Data Factory

Now we are ready to set up everything with a Data Factory

create your data Factory and then go to author and monitor

Connections

AzureSQLDatabasereportingdb is the Source Azure SQL Database

Our Destination is a Destination Azure SQL Database AzureSQLDatabase[destination]reportingdb

And we have a gen 2 Data Lake Storage Account AzureDataLakeStorageGen2 (Which will need the Service Principal account setting up in order to use) See setting up a service principal….

DataSets

the Linked Services are in place. Now we can add Data sets

Source Data Set – Azure SQL database

Add an Azure SQL Server Dataset. the first thing we need are two parameters for the TableName and Table Schema

Connect this up to the Source Linked Service and use the parameters to create a table with schema

The Schema will change when ever you use different table parameters so no need to set at this point

Destination Data Set – Azure SQL database

Destination Data Set – Azure data Lake Gen2

First of all, create a FileName parameter. Because we are creating multiple files, each file needs to be renamed to the UTCDate from the Foreach loop later. We need to set this up within the For each rather than here. Therefore will just set the name as a parameter at this point.

What we want to do is add the file into our file system created in the data lake. and we want to add a date to every single file because these will be the snapshots

In add dynamic content

@dataset().FileName

This is where the actual information for Filename from the data set will be added within the copy activity later.

I imported a schema from a test file I already created.

Pipeline 1 Create Member Daily Metric Dates into destination database

Our first part of the process is to create a list of dates in the source data that we dont yet have in the Data Lake. The first time we run it, thats literally every date we have so far in the source data base

Lets have a look at the Activities in the Pipeline

LookupWatermarkOld

I know that my dates are in datetime2 format so I’m using this and changing it to WatermarkValue so all the steps always use WatermarkValue no matter what the format. Here is the query in full

SELECT MAX(WatermarkDate2) AS WatermarkValue From [audit].[IdWatermarks]
WHERE WatermarkDate2 IS NOT NULL
AND TableName = 'DailyMetrics'

We record the table name of the source in the Watermark table. Basically this will tell us what the date we need to use to work from. remember, we set up a default if there is nothing in the dataset to work with which will run everything into the data lake

CopyDailyMetricsDateUTC

This is where we get to the Copy part of the activity. We are simply copying the dates into the table of the snapshots that we haven’t done yet.

Here we add the parameters of the table we want to use from Source. These parameters were set up in the data set section.

Add a Query. We want the Distinct Date from the source table Where the DATEUTC (The column in the table) is greater than WatermarkValue from the Previous Watermark Activity

SELECT DISTINCT DateUTC FROM [dbo].[DailyMetrics]
WHERE DateUTC > ‘@{activity(‘LookupWatermarkOld’).output.firstRow.WaterMarkValue}’

Now we are at the destination data set

The only item in here is DateUTC

USP.Watermark

This will trigger the Watermark Stored procedure and uses the Azure SQL database destination Linked Service

This is everything required for Section 1 that will run all the dates into our table based on snapshots we haven’t yet prepared

Pipeline 2 CopyMemberDailyMetricsDateUTC

Again, lets have a look at these Activities in more detail

LookupDates

this is the Lookup Activity that takes the full record set of Dates from the destination SQL Server

Note that we haven’t ticked First row only because the entire data set is required

CopyDailyFilesIntoDL (ForEach Activity)

We are taking the output.value from our activity LookupDates

Because it only has one column we don’t need to specify anything further. output.value means the entire record set

Sequential – The Items will be read one by one from the data set

there are 2 activities within this foreach loop. Doubly click on the activity to see what is happening in the loop

Again Lets have a look at this in more detail

CopyDailyMetrics

The source is the table in the source database because we want to store everything in daily snapshot files.

The Query is a SELECT FROM WHERE Statement. Select all the columns from the DailyMetric table where the DateUTC in the source data is Equal to DateUTC in @Item which is generated by the ForEach activity 

the sink uses the Data Lake. Remember, in the data set we set the Filename parameter and here is where is gets set. Click on the Filename to view Dynamic content

This is Concatenating a file name, the Item UTC Date and .csv


Use @item() to iterate over a single enumeration in ForEach activity . This value is generated by the ForEach activity (In the Pipeline)

We are setting the date as a string in order to use within the file name. Because the For each loop is setting the item for us and we are inside this activity we can create the filename in this copy activity rather than in the data set its self.

We can then simply Import schemas to view the mappings between the source and sink

SPUpdateDetails

Finally, we want to keep a record of everything we are going to be doing in the loop so we can run our stored Procedure to add the meta data into a table in our source database

We can use out SQL destination Linked Service for this

the Stored procedure contains 4 parameters.

We can take the Date from the Item in the foreach loop.

Duration, and NoRows can be added from metadata. See https://docs.microsoft.com/en-gb/azure/data-factory/copy-activity-overview Monitor programmatically

  • Duration @activity(‘CopyDailyMetrics’).output.copyDuration
  • No Rows @activity(‘CopyDailyMetrics’).output.rowsRead

the table name is simple DailyMetrics

USP_TruncateMemberDailyMetricsDates

Finally, now that everything is complete we can truncate the date table. Outside of the foreachloop

the max date is held in Watermark which will be used to create the new files Next time (Set in Pipeline 1)

Test your solution

Now we have everything in place we can test each pipeline. By Clicking debug

You can see the files coming in via Azure Table Storage

If you get errors, a great way of debugging is to go into the code. I had an error and after a file nam it had /n/n.

I simply removed this empty row and it worked.

Pipeline 3

There is now a Pipeline 1 to create the dates. And pipeline 2 to create the files. This is good because they can be tested separately.

We need a top level Pipeline to run them

And this is the new model for the flow of this Data factory

Add a trigger

now we have successfully tested the Pipelines we can set them up in a trigger by adding a parent pipeline that runs all the Pipelines in order

Another Post will be created on Adding Triggers

Considerations

You need to make sure that the last file (When you run it is complete) In our case the snapshot table is run at 12 AM and takes around 20 minutes so we need to set this pipeline off at 1 AM

Add Alerts to Monitor the Pipeline

https://azure.microsoft.com/en-gb/blog/create-alerts-to-proactively-monitor-your-data-factory-pipelines/

Another Post will be created on Adding Monitors