I have the following problem to resolve (And this was my initial attempt at figuring out a solution)

I have a data source with data that contains a date. the data contains daily snapshot of a record. this means that a record will be in the data set once per day. This will amount to a lot of data and we would rather hold it as files in Azure Data Lake Gen2 Storage
The logic is to pull daily files from the source database into dated files within the Azure data Lake
Once running this will probably pick up a days data because the rest are already created. However on the initial run I want it to pick up all the days that have been loading.
At the minute I have about 6 months of data to load
Tables in Azure SQL Database (destination)
CREATE TABLE [audit].[MemberDailyMetricsDates]( [DATEUTC] datetime2 NULL ) ON [PRIMARY]
This table collects all the dates from the source snapshot table
Remember, the table records the same record every day with any changes to the record.
CREATE TABLE [audit].[IdWatermarks]( [TableName] nvarchar NOT NULL, [WatermarkValue] [bigint] NOT NULL, [WatermarkDate] [datetime] NULL, [WatermarkDate2] datetime2 NULL ) ON [PRIMARY] GO
this is where I add the dates from the tables to show where we are.
For example if we add the last lot of data from 02/01/2020 then this value will be stored in the watermark table. I record them in different formats just in case.
CREATE TABLE [audit].[ProcessingMetrics]( [ID] [int] IDENTITY(1,1) NOT NULL, [TableName] varchar NULL, [DateProcessed] [datetime] NULL, [DateUTC] datetime2 NOT NULL, [Duration] [int] NOT NULL, [NoRows] [int] NULL, PRIMARY KEY CLUSTERED ( [ID] ASC )WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF) ON [PRIMARY] ) ON [PRIMARY] GO ALTER TABLE [audit].[ProcessingMetrics] ADD DEFAULT (getdate()) FOR [DateProcessed] GO
We can record meta data from the data factory Pipeline as it runs
Stored Procedures in SQL Database (destination)
/*Debbie Edwards 17/12/2019 Created initial SP Create a watermark table EXEC [audit].[audit].[USP_Watermark] '2014-12-31 00:00:00.000' */ ALTER PROCEDURE [audit].[USP_Watermark] @NullDate datetime2 AS BEGIN IF EXISTS (Select * FROM [audit].[IdWatermarks] WHERE TableName = 'DailyMetrics') DELETE FROM [audit].[IdWatermarks] WHERE TableName = 'DailyMetrics'; --DECLARE @NullDate datetime2 SET @NullDate = '2014-12-31 00:00:00.000' WITH CTEDailyMetricsDates (DATEUTC) AS (SELECT ISNULL(MAX(DATEUTC),@NullDate) FROM [audit].[DailyMetricsDates]) INSERT INTO [audit].[IdWatermarks] (TableName, WatermarkValue, WatermarkDate, WatermarkDate2) SELECT 'DailyMetrics',CONVERT(bigint,CONVERT(datetime, MAX(DATEUTC))), MAX(DATEUTC), MAX(DATEUTC) FROM CTEDailyMetricsDates END
This is the Stored procedure that you run to create the watermark
Currently I’m just running this for one table. You could redesign to run this SP for different tables.
Also note, If there is nothing in the table I am setting a default date to work from @NullDate
/*Debbie Edwards 18/12/2019 Created initial SP Update Processing details EXEC [audit].[USP_UpdateMetrics] */ ALTER PROCEDURE [audit].[USP_UpdateMetrics] @TableName varchar(100), @DateUTC datetime2, @Duration int, @NoRows int AS BEGIN INSERT INTO [audit].[ProcessingMetrics] (TableName, DateUTC, Duration, NoRows) VALUES (@TableName, @DateUTC, @Duration, @NoRows) END
the Pipeline will run this stored Procedure to add meta data to the ProcessingMetrics table
Data Lake Gen2 Storage Account
Along with the Destination SQL Database you need to have an Azure Data Lake
Quick Set up for the Azure Data Lake. Setting up an Azure Data Lake V2 to use with power BI dataflows in Service (As a data source) …. and Setting up a service principal will give you information on how to:
- Set up the Storage Account
- Ensure Hierarchical name space is enabled
- Create a file system
- Grant reader Role to power BI service
- Set up the Service Principal (Set up the App Registration)
Set up Data Factory
Now we are ready to set up everything with a Data Factory
create your data Factory and then go to author and monitor

Connections


AzureSQLDatabasereportingdb is the Source Azure SQL Database
Our Destination is a Destination Azure SQL Database AzureSQLDatabase[destination]reportingdb
And we have a gen 2 Data Lake Storage Account AzureDataLakeStorageGen2 (Which will need the Service Principal account setting up in order to use) See setting up a service principal….

DataSets
the Linked Services are in place. Now we can add Data sets
Source Data Set – Azure SQL database

Add an Azure SQL Server Dataset. the first thing we need are two parameters for the TableName and Table Schema

Connect this up to the Source Linked Service and use the parameters to create a table with schema
The Schema will change when ever you use different table parameters so no need to set at this point
Destination Data Set – Azure SQL database


Destination Data Set – Azure data Lake Gen2

First of all, create a FileName parameter. Because we are creating multiple files, each file needs to be renamed to the UTCDate from the Foreach loop later. We need to set this up within the For each rather than here. Therefore will just set the name as a parameter at this point.

What we want to do is add the file into our file system created in the data lake. and we want to add a date to every single file because these will be the snapshots
In add dynamic content
@dataset().FileName
This is where the actual information for Filename from the data set will be added within the copy activity later.
I imported a schema from a test file I already created.
Pipeline 1 Create Member Daily Metric Dates into destination database
Our first part of the process is to create a list of dates in the source data that we dont yet have in the Data Lake. The first time we run it, thats literally every date we have so far in the source data base

Lets have a look at the Activities in the Pipeline
LookupWatermarkOld

I know that my dates are in datetime2 format so I’m using this and changing it to WatermarkValue so all the steps always use WatermarkValue no matter what the format. Here is the query in full
SELECT MAX(WatermarkDate2) AS WatermarkValue From [audit].[IdWatermarks] WHERE WatermarkDate2 IS NOT NULL AND TableName = 'DailyMetrics'
We record the table name of the source in the Watermark table. Basically this will tell us what the date we need to use to work from. remember, we set up a default if there is nothing in the dataset to work with which will run everything into the data lake
CopyDailyMetricsDateUTC
This is where we get to the Copy part of the activity. We are simply copying the dates into the table of the snapshots that we haven’t done yet.

Here we add the parameters of the table we want to use from Source. These parameters were set up in the data set section.
Add a Query. We want the Distinct Date from the source table Where the DATEUTC (The column in the table) is greater than WatermarkValue from the Previous Watermark Activity
SELECT DISTINCT DateUTC FROM [dbo].[DailyMetrics]
WHERE DateUTC > ‘@{activity(‘LookupWatermarkOld’).output.firstRow.WaterMarkValue}’

Now we are at the destination data set

The only item in here is DateUTC
USP.Watermark

This will trigger the Watermark Stored procedure and uses the Azure SQL database destination Linked Service
This is everything required for Section 1 that will run all the dates into our table based on snapshots we haven’t yet prepared
Pipeline 2 CopyMemberDailyMetricsDateUTC

Again, lets have a look at these Activities in more detail
LookupDates

this is the Lookup Activity that takes the full record set of Dates from the destination SQL Server
Note that we haven’t ticked First row only because the entire data set is required
CopyDailyFilesIntoDL (ForEach Activity)

We are taking the output.value from our activity LookupDates
Because it only has one column we don’t need to specify anything further. output.value means the entire record set
Sequential – The Items will be read one by one from the data set
there are 2 activities within this foreach loop. Doubly click on the activity to see what is happening in the loop

Again Lets have a look at this in more detail
CopyDailyMetrics

The source is the table in the source database because we want to store everything in daily snapshot files.
The Query is a SELECT FROM WHERE Statement. Select all the columns from the DailyMetric table where the DateUTC in the source data is Equal to DateUTC in @Item which is generated by the ForEach activity

the sink uses the Data Lake. Remember, in the data set we set the Filename parameter and here is where is gets set. Click on the Filename to view Dynamic content

This is Concatenating a file name, the Item UTC Date and .csv
Use @item() to iterate over a single enumeration in ForEach activity . This value is generated by the ForEach activity (In the Pipeline)
We are setting the date as a string in order to use within the file name. Because the For each loop is setting the item for us and we are inside this activity we can create the filename in this copy activity rather than in the data set its self.
We can then simply Import schemas to view the mappings between the source and sink
SPUpdateDetails
Finally, we want to keep a record of everything we are going to be doing in the loop so we can run our stored Procedure to add the meta data into a table in our source database

We can use out SQL destination Linked Service for this

the Stored procedure contains 4 parameters.
We can take the Date from the Item in the foreach loop.
Duration, and NoRows can be added from metadata. See https://docs.microsoft.com/en-gb/azure/data-factory/copy-activity-overview Monitor programmatically
- Duration @activity(‘CopyDailyMetrics’).output.copyDuration
- No Rows @activity(‘CopyDailyMetrics’).output.rowsRead
the table name is simple DailyMetrics
USP_TruncateMemberDailyMetricsDates
Finally, now that everything is complete we can truncate the date table. Outside of the foreachloop
the max date is held in Watermark which will be used to create the new files Next time (Set in Pipeline 1)
Test your solution
Now we have everything in place we can test each pipeline. By Clicking debug
You can see the files coming in via Azure Table Storage

If you get errors, a great way of debugging is to go into the code. I had an error and after a file nam it had /n/n.
I simply removed this empty row and it worked.
Pipeline 3
There is now a Pipeline 1 to create the dates. And pipeline 2 to create the files. This is good because they can be tested separately.
We need a top level Pipeline to run them

And this is the new model for the flow of this Data factory

Add a trigger
now we have successfully tested the Pipelines we can set them up in a trigger by adding a parent pipeline that runs all the Pipelines in order
Another Post will be created on Adding Triggers
Considerations
You need to make sure that the last file (When you run it is complete) In our case the snapshot table is run at 12 AM and takes around 20 minutes so we need to set this pipeline off at 1 AM
Add Alerts to Monitor the Pipeline
Another Post will be created on Adding Monitors