I am pulling tweets into an Azure Table Storage area and then processing them into a Warehouse
The following shows the very basic Data factory set up
Connections
I have created a Linked Service for the Azure Storage Table PowerBIMentions
And another Linked Service for my Azure SQL Server Table PowerBIMentions
Datasets

the Storage Table was set up in Logic Apps. I wont go into detail about the schema here
I have created a SQL Table to Mirror the Azure Table Storage
Watermark Table in SQL
The Watermark Table is as follows
CREATE TABLE [staging].[watermarktable](
[TableName] varchar NOT NULL,
[WaterMarkDate] [datetime] NULL,
[WaterMarkValue] varchar NULL,
[WaterMarkOffSetDate] datetimeoffset NULL
) ON [PRIMARY]
For the time being, the Watermark value is to set the Date in the same format as is in the Azure Table storage.
Ignore the offest date for the time being
I have then created a stored Procedure to add the table name storage.PowerBIMentions, the maximum created Date from what is in the table and then some extra script to format that date into the correct format in Azure Table storage.
To understand how this should be formatted you can look at the Azure Table using Microsoft Azure Storage Explorer
https://azure.microsoft.com/en-gb/features/storage-explorer/

As you can see the format is YYYY-MM-DDHH:MM:SS.000Z
I haven’t included the code for the SP here but it is fairly straightforward. You take the max date from the table you are interested in and also include code for format the value to the correct value
Pipeline
Added a Pipeline called CopyPowerBIMentionsIncremental

The Source is the Storage mentions table

the Sink (destination) is the Azure SQL Table

And then we can easily import the schemas and map
Then run the very simple Pipeline which just moved the data. Then Runs the Watermark Stored procedure to recreate the MAX Date time

Change Pipeline to include a lookup activity
Now the next time we want to process we have to only process new records into the table.

the Query to select the correct date from the Watermark table
SELECT MAX(WatermarkValue) AS WatermarkValue From [staging].[watermarktable]
WHERE WatermarkValue IS NOT NULL
AND TableName = ‘staging.PowerBImentions’
You can test this in SQL Server Management Studio

CreatedAt gt ‘@{activity(‘LookupWatermarkOld’).output.firstRow.WaterMarkValue}’
CreatedAt is the column in the source Azure SQL Table table which has the following date format

Operator | URI expression |
---|---|
Equal | eq |
GreaterThan | gt |
GreaterThanOrEqual | ge |
LessThan | lt |
LessThanOrEqual | le |
NotEqual | ne |
Then it uses the Lookup activity and the WaterMarkValue from the SQL Table
Test the New Watermark Logic
in SSMS
SELECT COUNT(*) FROM [staging].[PowerBImentions]
there are 16 records and the max date is 2019-11-01T16:00:55.000Z
Run the new data Factory (Debug)
The new record count is 21
The new Max Date is 2019-11-01T17:01:52.000Z
And Lets to a quick check to make sure there are no duplicates
SELECT TweetText , COUNT() FROM [staging].[PowerBImentions] GROUP BY TweetText HAVING COUNT() > 1
its all working. We have no duplicates and this will ultimately help keep the processing down
There is another way to do incremental processing by using change tracking. Ill be looking at that in a later post