Incremental Processing in Data Factory using Watermark Table

I am pulling tweets into an Azure Table Storage area and then processing them into a Warehouse

The following shows the very basic Data factory set up

Connections

I have created a Linked Service for the Azure Storage Table PowerBIMentions

And another Linked Service for my Azure SQL Server Table PowerBIMentions

Datasets

the Storage Table was set up in Logic Apps. I wont go into detail about the schema here

I have created a SQL Table to Mirror the Azure Table Storage

Watermark Table in SQL

The Watermark Table is as follows

CREATE TABLE [staging].[watermarktable](
[TableName] varchar NOT NULL,
[WaterMarkDate] [datetime] NULL,
[WaterMarkValue] varchar NULL,
[WaterMarkOffSetDate] datetimeoffset NULL
) ON [PRIMARY]

For the time being, the Watermark value is to set the Date in the same format as is in the Azure Table storage.

Ignore the offest date for the time being

I have then created a stored Procedure to add the table name storage.PowerBIMentions, the maxium created Date from what is in the table and then some extra script to format that date into the correct format in Azure Table storage.

To understand for this should be formatted you can look at the Azure Table using Microsoft Azure Storage Explorer

https://azure.microsoft.com/en-gb/features/storage-explorer/

As you can see the format is YYYY-MM-DDHH:MM:SS.000Z

I haven’t included the code for the SP here but it is fairly straightforward. You take the max date from the table you are interested in and also include code for format the value to the correct value

Pipeline

Added a Pipeline called CopyPowerBIMentionsIncremental

The Source is the Storage mentions table

the Sink (destination) is the Azure SQL Table

And then we can easily import the schemas and map

Then run the very simple Pipeline which just moved the data. Then Runs the Watermark Stored procedure to recreate the MAX Date time

Change Pipeline to include a lookup activity

Now the next time we want to process we have to only process new records into the table.

the Query to select the correct date from the Watermark table

SELECT MAX(WatermarkValue) AS WatermarkValue From [staging].[watermarktable]
WHERE WatermarkValue IS NOT NULL
AND TableName = ‘staging.PowerBImentions’

You can test this in SQL Server Management Studio

CreatedAt gt ‘@{activity(‘LookupWatermarkOld’).output.firstRow.WaterMarkValue}’

CreatedAt is the column in the source Azure SQL Table table which has the following date format

OperatorURI expression
Equaleq
GreaterThangt
GreaterThanOrEqualge
LessThanlt
LessThanOrEqualle
NotEqualne

Then it uses the Lookup activity and the WaterMarkValue from the SQL Table

Test the New Watermark Logic

in SSMS

SELECT COUNT(*) FROM [staging].[PowerBImentions]

there are 16 records and the max date is 2019-11-01T16:00:55.000Z

Run the new data Factory (Debug)

The new record count is 21

The new Max Date is 2019-11-01T17:01:52.000Z

And Lets to a quick check to make sure there are no duplicates

SELECT TweetText , COUNT() FROM [staging].[PowerBImentions] GROUP BY TweetText HAVING COUNT() > 1

its all working. We have no duplicates and this will ultimately help keep the processing down

There is another way to do incremental processing by using change tracking. Ill be looking at that in a later post

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s