Data Factory to move daily snapshot data in Azure SQL DB to files in a Gen2 Data Lake

I have the following problem to resolve (And this was my initial attempt at figuring out a solution)

I have a data source with data that contains a date. the data contains daily snapshot of a record. this means that a record will be in the data set once per day. This will amount to a lot of data and we would rather hold it as files in Azure Data Lake Gen2 Storage

The logic is to pull daily files from the source database into dated files within the Azure data Lake

Once running this will probably pick up a days data because the rest are already created. However on the initial run I want it to pick up all the days that have been loading.

At the minute I have about 6 months of data to load

Tables in Azure SQL Database (destination)

CREATE TABLE [audit].[MemberDailyMetricsDates](
     [DATEUTC] datetime2 NULL
 ) ON [PRIMARY]

This table collects all the dates from the source snapshot table

Remember, the table records the same record every day with any changes to the record.

CREATE TABLE [audit].[IdWatermarks](
     [TableName] nvarchar NOT NULL,
     [WatermarkValue] [bigint] NOT NULL,
     [WatermarkDate] [datetime] NULL,
     [WatermarkDate2] datetime2 NULL
 ) ON [PRIMARY]
 GO

this is where I add the dates from the tables to show where we are.

For example if we add the last lot of data from 02/01/2020 then this value will be stored in the watermark table. I record them in different formats just in case.

CREATE TABLE [audit].[ProcessingMetrics](
     [ID] [int] IDENTITY(1,1) NOT NULL,
     [TableName] varchar NULL,
     [DateProcessed] [datetime] NULL,
     [DateUTC] datetime2 NOT NULL,
     [Duration] [int] NOT NULL,
     [NoRows] [int] NULL,
 PRIMARY KEY CLUSTERED 
 (
     [ID] ASC
 )WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF) ON [PRIMARY]
 ) ON [PRIMARY]
 GO
 ALTER TABLE [audit].[ProcessingMetrics] ADD  DEFAULT (getdate()) FOR [DateProcessed]
 GO

We can record meta data from the data factory Pipeline as it runs

Stored Procedures in SQL Database (destination)

/*Debbie Edwards
 17/12/2019 Created initial SP
 Create a watermark table
 EXEC [audit].[audit].[USP_Watermark]    '2014-12-31 00:00:00.000'
 */

 ALTER PROCEDURE [audit].[USP_Watermark] @NullDate datetime2

 AS   

 BEGIN

 IF EXISTS (Select * FROM [audit].[IdWatermarks]  
 WHERE TableName = 'DailyMetrics')

 DELETE FROM [audit].[IdWatermarks]  
 WHERE TableName = 'DailyMetrics';

 --DECLARE @NullDate datetime2 SET @NullDate = '2014-12-31 00:00:00.000'

 WITH CTEDailyMetricsDates (DATEUTC)
 AS
 (SELECT ISNULL(MAX(DATEUTC),@NullDate) FROM [audit].[DailyMetricsDates]) 

 INSERT INTO [audit].[IdWatermarks] 
 (TableName, WatermarkValue, WatermarkDate, WatermarkDate2)
 SELECT 'DailyMetrics',CONVERT(bigint,CONVERT(datetime, MAX(DATEUTC))), MAX(DATEUTC), MAX(DATEUTC)
 FROM CTEDailyMetricsDates
 END 

This is the Stored procedure that you run to create the watermark

Currently I’m just running this for one table. You could redesign to run this SP for different tables.

Also note, If there is nothing in the table I am setting a default date to work from @NullDate

/*Debbie Edwards
 18/12/2019 Created initial SP
 Update Processing details
 EXEC [audit].[USP_UpdateMetrics]
 */
 ALTER PROCEDURE [audit].[USP_UpdateMetrics] @TableName varchar(100),   @DateUTC datetime2, 
 @Duration int,  @NoRows int

 AS   

 BEGIN

 INSERT INTO [audit].[ProcessingMetrics]
 (TableName, DateUTC, Duration, NoRows)
 VALUES

 (@TableName, @DateUTC, @Duration, @NoRows)

 END

the Pipeline will run this stored Procedure to add meta data to the ProcessingMetrics table

Data Lake Gen2 Storage Account

Along with the Destination SQL Database you need to have an Azure Data Lake

Quick Set up for the Azure Data Lake. Setting up an Azure Data Lake V2 to use with power BI dataflows in Service (As a data source) …. and Setting up a service principal will give you information on how to:

  • Set up the Storage Account
  • Ensure Hierarchical name space is enabled
  • Create a file system
  • Grant reader Role to power BI service
  • Set up the Service Principal (Set up the App Registration)

Set up Data Factory

Now we are ready to set up everything with a Data Factory

create your data Factory and then go to author and monitor

Connections

AzureSQLDatabasereportingdb is the Source Azure SQL Database

Our Destination is a Destination Azure SQL Database AzureSQLDatabase[destination]reportingdb

And we have a gen 2 Data Lake Storage Account AzureDataLakeStorageGen2 (Which will need the Service Principal account setting up in order to use) See setting up a service principal….

DataSets

the Linked Services are in place. Now we can add Data sets

Source Data Set – Azure SQL database

Add an Azure SQL Server Dataset. the first thing we need are two parameters for the TableName and Table Schema

Connect this up to the Source Linked Service and use the parameters to create a table with schema

The Schema will change when ever you use different table parameters so no need to set at this point

Destination Data Set – Azure SQL database

Destination Data Set – Azure data Lake Gen2

First of all, create a FileName parameter. Because we are creating multiple files, each file needs to be renamed to the UTCDate from the Foreach loop later. We need to set this up within the For each rather than here. Therefore will just set the name as a parameter at this point.

What we want to do is add the file into our file system created in the data lake. and we want to add a date to every single file because these will be the snapshots

In add dynamic content

@dataset().FileName

This is where the actual information for Filename from the data set will be added within the copy activity later.

I imported a schema from a test file I already created.

Pipeline 1 Create Member Daily Metric Dates into destination database

Our first part of the process is to create a list of dates in the source data that we dont yet have in the Data Lake. The first time we run it, thats literally every date we have so far in the source data base

Lets have a look at the Activities in the Pipeline

LookupWatermarkOld

I know that my dates are in datetime2 format so I’m using this and changing it to WatermarkValue so all the steps always use WatermarkValue no matter what the format. Here is the query in full

SELECT MAX(WatermarkDate2) AS WatermarkValue From [audit].[IdWatermarks]
WHERE WatermarkDate2 IS NOT NULL
AND TableName = 'DailyMetrics'

We record the table name of the source in the Watermark table. Basically this will tell us what the date we need to use to work from. remember, we set up a default if there is nothing in the dataset to work with which will run everything into the data lake

CopyDailyMetricsDateUTC

This is where we get to the Copy part of the activity. We are simply copying the dates into the table of the snapshots that we haven’t done yet.

Here we add the parameters of the table we want to use from Source. These parameters were set up in the data set section.

Add a Query. We want the Distinct Date from the source table Where the DATEUTC (The column in the table) is greater than WatermarkValue from the Previous Watermark Activity

SELECT DISTINCT DateUTC FROM [dbo].[DailyMetrics]
WHERE DateUTC > ‘@{activity(‘LookupWatermarkOld’).output.firstRow.WaterMarkValue}’

Now we are at the destination data set

The only item in here is DateUTC

USP.Watermark

This will trigger the Watermark Stored procedure and uses the Azure SQL database destination Linked Service

This is everything required for Section 1 that will run all the dates into our table based on snapshots we haven’t yet prepared

Pipeline 2 CopyMemberDailyMetricsDateUTC

Again, lets have a look at these Activities in more detail

LookupDates

this is the Lookup Activity that takes the full record set of Dates from the destination SQL Server

Note that we haven’t ticked First row only because the entire data set is required

CopyDailyFilesIntoDL (ForEach Activity)

We are taking the output.value from our activity LookupDates

Because it only has one column we don’t need to specify anything further. output.value means the entire record set

Sequential – The Items will be read one by one from the data set

there are 2 activities within this foreach loop. Doubly click on the activity to see what is happening in the loop

Again Lets have a look at this in more detail

CopyDailyMetrics

The source is the table in the source database because we want to store everything in daily snapshot files.

The Query is a SELECT FROM WHERE Statement. Select all the columns from the DailyMetric table where the DateUTC in the source data is Equal to DateUTC in @Item which is generated by the ForEach activity 

the sink uses the Data Lake. Remember, in the data set we set the Filename parameter and here is where is gets set. Click on the Filename to view Dynamic content

This is Concatenating a file name, the Item UTC Date and .csv


Use @item() to iterate over a single enumeration in ForEach activity . This value is generated by the ForEach activity (In the Pipeline)

We are setting the date as a string in order to use within the file name. Because the For each loop is setting the item for us and we are inside this activity we can create the filename in this copy activity rather than in the data set its self.

We can then simply Import schemas to view the mappings between the source and sink

SPUpdateDetails

Finally, we want to keep a record of everything we are going to be doing in the loop so we can run our stored Procedure to add the meta data into a table in our source database

We can use out SQL destination Linked Service for this

the Stored procedure contains 4 parameters.

We can take the Date from the Item in the foreach loop.

Duration, and NoRows can be added from metadata. See https://docs.microsoft.com/en-gb/azure/data-factory/copy-activity-overview Monitor programmatically

  • Duration @activity(‘CopyDailyMetrics’).output.copyDuration
  • No Rows @activity(‘CopyDailyMetrics’).output.rowsRead

the table name is simple DailyMetrics

USP_TruncateMemberDailyMetricsDates

Finally, now that everything is complete we can truncate the date table. Outside of the foreachloop

the max date is held in Watermark which will be used to create the new files Next time (Set in Pipeline 1)

Test your solution

Now we have everything in place we can test each pipeline. By Clicking debug

You can see the files coming in via Azure Table Storage

If you get errors, a great way of debugging is to go into the code. I had an error and after a file nam it had /n/n.

I simply removed this empty row and it worked.

Pipeline 3

There is now a Pipeline 1 to create the dates. And pipeline 2 to create the files. This is good because they can be tested separately.

We need a top level Pipeline to run them

And this is the new model for the flow of this Data factory

Add a trigger

now we have successfully tested the Pipelines we can set them up in a trigger by adding a parent pipeline that runs all the Pipelines in order

Another Post will be created on Adding Triggers

Considerations

You need to make sure that the last file (When you run it is complete) In our case the snapshot table is run at 12 AM and takes around 20 minutes so we need to set this pipeline off at 1 AM

Add Alerts to Monitor the Pipeline

https://azure.microsoft.com/en-gb/blog/create-alerts-to-proactively-monitor-your-data-factory-pipelines/

Another Post will be created on Adding Monitors

Power BI Service Data Lineage View

I was logging into Power BI this morning when I saw this exciting new feature

we are always looking at new solutions to provide good data lineage so this is well worth a look

Data lineage includes the data origin, what happens to it and where it moves over time. Data lineage gives visibility while greatly simplifying the ability to trace errors back to the root cause in a data analytics process. 

Wikipedia

I have an App workspace set up for Adventureworks so lets have a look at Lineage using this project

Column 1 is my data source. I can see I’m using a local database and I’m also using an xlsx spreadsheet to bring in data.

In most of my projects I’m working on the ETL in Data factory, transforming data in Stored Procedures etc. for example, for a social media feed, I have a logic app that moves tweets to an Azure Data Storage NOSQL table. Data Factory then transfers this data across into a central Azure Data Warehouse. The Power BI Lineage would pick up at the data Warehouse stage. It wont take into account that there is a lot of work previous to this

Column 2 is the data set in Power BI

Column 3 provides Report information

Column 4 displays the Dashboards

You can click on a data flow node to drill down into more detail

Currently you cant go any further to look at the data items

Click on the Link icon to see the data flow for that item. In this case the Report.

This is a great start but there definitely needs to be more information here to make it something that you would want to use as a proper Data Lineage Tool

  • It would be good to see the fields in each Entity for the Data Sets
  • As an extra, it would be great to see what fields are being used in Measures and calculated Fields
  • Reports – For me, Id like to know for every page in my report
    • What field am I using from the data source
    • What calculated columns I have created (Even better with the DAX Logic)
    • Any Name changes from Data Source to Power BI
    • What measures I have created (Even better with the DAX Logic)
  • For the Dashboard, What items I am using in the dashboards (Fields, Measures, Calculated Columns
  • An Important part of data lineage is getting and understanding of the entire process. This includes data transformations pre Power BI. If you cant do that in here, it would be great to be able to extract all the information out so you can use it in some way with your other Linage information to provide the full story. for example:

Azure Data Catalogue

Azure Data Catalog is a fully managed cloud service. Users can discover and consume data sources via the catalog and is a single , central place for all the organisation to contribute and understand all your data sources.

https://eun-su1.azuredatacatalog.com/#/home

I have already registered Our Data Catalog, and I have downloaded the desktop app

As an Example I want to connect to Azure Table Storage (Connect using Azure Account name and Access Key)

At this point I’m registering everything in the storage table. then I can view the information in the Azure Portal.

You can add a friendly Name, description, Add in expert (in this case me). Tags and management information

I have added Data Preview so you can view the data within the object. there is also documentation and Column information to look at

In the data catalog you can manually add lots of description to your tables along with documentation.

This is great for providing lots of information about your data . You can explore databases and open the information in other formats (Great if you need to supply information to another Data lineage package

I will be having a look at the Azure Data catalog in more detail later to see how it could help to provide full data lineage

Azure Data Factory

Data factory is the Azure ETL Orchestration tool. Go into Monitoring for Lineage Information. However, there doesn’t seem to be a way to export this information to use. Data Factory wont take into account the work done in, for example a stored Procedure

Again this is another area to look into more.

Stored Procedures

When you use Stored Procedures to transform you data, its harder to provide automated Linage on your code. There are automated data lineage tool for SQL out there, but it would be great if there was a specific Tool within Azure that creates Data Lineage information from your Stored Procedures

Azure Logic Apps

Data for my project is collected via Logic Apps before being Processed into an Azure Data Warehouse.

Essentially, we need out data lineage to capture everything all in one place.

And just as important. everything should be as automated as possible. If I quickly create a measure, the data lineage should reflect this with no manual input needed (Unless you want to add some description to the new measure as to why it was created)

Azure – Data Factory – changing Source path of a file from Full File name to Wildcard

I originally had one file to import into a SQL Database  Survey.txt

The files are placed in Azure blob storage ready to be imported

I then use Data Factory to import the file into the sink (Azure SQL Database)

However, the data is actually in one worksheet a year. For full logic I need to be able to add a worksheet to the blob storage to get it imported and each worksheet will contain the year.

This means I need to change the Source and Pipeline in Data Factory

First of all remove the file name from the file path. I used 1 file to set up the Schema. All files are the same so this should be OK.

Next I go to the Pipeline and set up the Wildcard in here Survey*.txt

When the Pipeline is run, it will take all worksheets against for example Survey

Create your website with WordPress.com
Get started