Design a site like this with WordPress.com
Get started

Incremental Processing in Data Factory using Watermark Table

I am pulling tweets into an Azure Table Storage area and then processing them into a Warehouse

The following shows the very basic Data factory set up

Connections

I have created a Linked Service for the Azure Storage Table PowerBIMentions

And another Linked Service for my Azure SQL Server Table PowerBIMentions

Datasets

the Storage Table was set up in Logic Apps. I wont go into detail about the schema here

I have created a SQL Table to Mirror the Azure Table Storage

Watermark Table in SQL

The Watermark Table is as follows

CREATE TABLE [staging].[watermarktable](
[TableName] varchar NOT NULL,
[WaterMarkDate] [datetime] NULL,
[WaterMarkValue] varchar NULL,
[WaterMarkOffSetDate] datetimeoffset NULL
) ON [PRIMARY]

For the time being, the Watermark value is to set the Date in the same format as is in the Azure Table storage.

Ignore the offest date for the time being

I have then created a stored Procedure to add the table name storage.PowerBIMentions, the maximum created Date from what is in the table and then some extra script to format that date into the correct format in Azure Table storage.

To understand how this should be formatted you can look at the Azure Table using Microsoft Azure Storage Explorer

https://azure.microsoft.com/en-gb/features/storage-explorer/

As you can see the format is YYYY-MM-DDHH:MM:SS.000Z

I haven’t included the code for the SP here but it is fairly straightforward. You take the max date from the table you are interested in and also include code for format the value to the correct value

Pipeline

Added a Pipeline called CopyPowerBIMentionsIncremental

The Source is the Storage mentions table

the Sink (destination) is the Azure SQL Table

And then we can easily import the schemas and map

Then run the very simple Pipeline which just moved the data. Then Runs the Watermark Stored procedure to recreate the MAX Date time

Change Pipeline to include a lookup activity

Now the next time we want to process we have to only process new records into the table.

the Query to select the correct date from the Watermark table

SELECT MAX(WatermarkValue) AS WatermarkValue From [staging].[watermarktable]
WHERE WatermarkValue IS NOT NULL
AND TableName = ‘staging.PowerBImentions’

You can test this in SQL Server Management Studio

CreatedAt gt ‘@{activity(‘LookupWatermarkOld’).output.firstRow.WaterMarkValue}’

CreatedAt is the column in the source Azure SQL Table table which has the following date format

OperatorURI expression
Equaleq
GreaterThangt
GreaterThanOrEqualge
LessThanlt
LessThanOrEqualle
NotEqualne

Then it uses the Lookup activity and the WaterMarkValue from the SQL Table

Test the New Watermark Logic

in SSMS

SELECT COUNT(*) FROM [staging].[PowerBImentions]

there are 16 records and the max date is 2019-11-01T16:00:55.000Z

Run the new data Factory (Debug)

The new record count is 21

The new Max Date is 2019-11-01T17:01:52.000Z

And Lets to a quick check to make sure there are no duplicates

SELECT TweetText , COUNT() FROM [staging].[PowerBImentions] GROUP BY TweetText HAVING COUNT() > 1

its all working. We have no duplicates and this will ultimately help keep the processing down

There is another way to do incremental processing by using change tracking. Ill be looking at that in a later post

Advertisement

Quick ‘Logic App’ Tip. Error on Foreach loop. An action failed. no dependent actions succeeded

I have been setting up a Logic App to add tweets to Azure table storage. While setting up a loop to add in Keywords into a table (1 tweet may add multiple rows to the Keywords table) I came across this error

the tweet has generated 10 rows of data (Because there are 10 keywords)

I have set up an increment counter to add to the ID so each one will get 1 2 3 and so on. If you toggle through the records you notice this

  • record Increment
  • 1 1
  • 2 6
  • 3 3
  • 4 7
  • 5 2
  • 6 4
  • 7 10
  • 8 8
  • 9 9
  • 10 5

And the proceeding table storage doesn’t work because of conflicts. clearly the increment isn’t happening in order. there is a quick solution to this one

go into Edit mode in the Logic App

every for each with an increment variable click … to get to Settings

make sure concurrency is set to on and that the degree of parellelism is one only. Setting to 1 will ensure the iterations are sequential

Adding a Logic App to Refresh your Analysis Services Data

You have a shiny new Analysis Services model containing all the data you need for your Power BI reports and Dashboards

It has been added into an Analysis Service because A There is a lot of data, possibly too much for a Power BI Imports, B You want to do incremental processing without needing Power BI Premium and C You want to refresh more than 8 times a day.

Everything is all set up but the final part of the puzzle is how to schedule the refresh into the Analysis Services Tabular model.

Its easy enough in Power BI, You simply set the schedule in the Service. Lets have a look at how to do this using one of the Options. Logic Apps

Create a Service Principal (SPN)

First we need to Create a new Azure Active Directory Application and Service Principal to use with Role based Access Control. Whenever you have code that needs to access and or modify resources (The logic App will Refresh the Analysis Services data) You need to create an Identity for the App.

Sign into Azure

Go to Azure Active Directory

Next go to App Registrations and + New registration

The web URL is simply one set like the example in the documentation

Then click Register

Assign the Application to a role

Before you assign resources,, the app must be assigned a role. For this example we are going to assign a role at the subscription scope for our Proof of Concept Subscription

Go to All Services and then Subscriptions

Im going to select the Proof of Concept Subscription

Next select Access Control and add Role assignment

Note I have selected the new analysisservicesapp

Click save to add the Role Assignment

The Service Principal is set up. Now we need to get the values so we can sign into the app

Get Sign in Values

Go to Azure Active Directory

Go to App Registrations again and select the new application

Copy the Tenant ID for later use

Copy the Application ID for later use

Set up your Credentials

I was going to create a certificate but I found it an extremely complicated process and I couldn’t in the end export the created certificate in Powershell

Because I haven’t been able to Create and Export a certificate I am going to use Client Secrets instead

Click on New Client Secret

Once added you need to copy the ID for use later. You wont be able to access this information again so DONT LOSE IT

Give the Service Principal Name Authority to administer the Analysis Services

Still in your App click on API Permissions

Next View API Permissions

+ Add a permission and find Azure Analysis Services in APIs my organisation uses

Ensure that the Permission allows you to read and write all models (Tick the box)

then Add permissions

Note that although permissions have changed we still need admin consent

Grant Admin Consent

Granting admin consent requires you to sign in as global administrator, an application administrator, or a cloud application administrator.

From Azure Active Directory go to App registrations and select the app

Within the App go to App Permissions

grant Admin Consent

Configure Permissions in Azure Analysis Services

Next we need to ensure the new Server principal created as server administrator permissions in Analysis Services

Open SQL Server Management Studio by Right clicking and Run as Administrator, Connect to the Analysis Service

Add in your User name (In my case the email address) but not the Password.

You add your Office 365 password when you sign into your Account (Next)

Right click on the Analysis Services name and go to Properties, Then Security

Click add, then Search for the app that has been created. This can then be added as a server administrator (Allows it to add models, amend models, refresh data etc)

Click OK

Create the Logic App

Finally, we can create the Logic App in Azure

The Logic App will be triggered by a HTTP Request which will be triggered by Data Factory the Azure Orchestration tool

In Azure Go to Logic Apps and Add

Then New Step and search for HTTP and then HTTP

Method

Post. Post appends the form data inside the body of the HTTP request

URI (Unified Resource Indicator)

URI = https://your server region/servers/aas server name/models/your database name/refreshes

Here is my example:

https://ukwest.asazure.windows.net/servers/adventureworksas/models/AdventureWorksV2/refreshes

The Server is the Analysis Services created in Visual Studio.

The Model is the model that we want to refresh. There may be multiple models on the server

Headers

As per the example I have set to Content-Type and application/json

Queries

Nothing set

Body

This is where you set up al the processing information

{

    “Type”: “Full”,

    “CommitMode”: “transactional”,

    “MaxParallelism”: 2,

    “RetryCount”: 2,

    “Objects”: [

        {

            “table”: “DimCustomer”

        },

        {

            “table”: “DimDate”

        },

      {

            “table”: “DimEmployee”

        },

      {

            “table”: “DimProduct”

        },

      {

            “table”: “DimProductCategory”

        },

     {

            “table”: “DimProductSubCategory”

        },

     {

            “table”: “DimSalesTerritory”

        },

     {

            “table”: “FactResellerSales”

        }

    ]

}

Type: Full

the type of Processing to perform. refresh command types can be

  • full – Process the Entire Model and recalculate all the dependents (Measures, columns etc)
  • clearValues – Clear values in the objects and dependents
  • calculate – recalculate your formulas
  • dataOnly – refresh the data in the objects, does not force recalculation
  • automatic – If the object needs refreshing and recalculating then do so.
  • add- Append data to the partition and recalculate dependents (measures, columns etc)
  • defragment – Defragmentation option will clean up values in dictionaries that are no longer used

CommitMode: transactional

Determines if objects are committed in batches of when complete. Models include

  • default
  • transactional
  • partialBatch

Its always a good idea to set up Partitions within your data, especially on the Fact table to process the data

However the above is an example specifiying what to process and when. You can simplify the code if you wish

{

    “CommitMode”: “transactional”,

    “MaxParallelism”: 2,

    “RetryCount”: 2,

    “Type”: “Full”,

}

The following is an example of adding Partitions into your code. In this example, DimCustomer is one Partition.

“table”: “DimCustomer”,

“partition”: “DimCustomer”

The fact table can be made up of many partitions. For example you could have one for each year. Then you can decide which partitions to process. For example, you may only want to process the current years data.

Authentication

Active Directory OAuth

Tenant

Use the Tenant ID that we collected previously

Audience

https:/*asazure.windows.net

Client ID

Use the Client ID that we collected Previously

Credential Type

Secret (Remember that we added the Secret to the App)

Secret

The Secret ID that we saved (Remember, for this one you cant go back to Azure and look at the secret. Its a one time ID Only)

Save and test the Logic App by Clicking Run

Consume Logic App with Azure Data Factory

We now have a logic app that will incrementally process the data. We now need to schedule this by creating a Data Factory or orchestrate the processing

First we need to copy the HTTP POST URL within the logic App When a HTTP request is received activity

And then create a new Data Factory in Azure

for the time being I am not enabling GIT

In author and Monitor create a pipeline and drag a web activity across to the design pane.

And then set up the schedule by adding a trigger

this should now be enough to process all your data into the Analysis Service model which has a line connection into Power BI.

What we need to do is to check that this is doing its job. i will be looking at this in a later blog post

Power BI Streaming Data sets (And Push Data Sets) Part 2

We are going to add some new information into the data set

In Power BI Service, Click Edit against the #Taskmaster dataset

I’m actually going to add 7 more fields

  • Description, Sentiment and Name are Text
  • CreatedAt is a date
  • Count, Favorited and followersCount are Numbers.  Click Done

Return to Microsoft Flow

Edit the flow and go to add rows to a dataset

  • Count is an Expression
  • It is simply set as 1

You can Test and Save

Any new data that comes through will contain these data items

Creating sentiment is slightly more involved but lets go ahead and create sentiment

After Detect Sentiment update your flow with the following logic

Create Sentiment which is Neutral. If Score is over 0.7 then Positive. If under 0.3 then negative

We can add this to our data set


We now have lots of new information to use in our streaming reports

Back in Power BI Service

I’ve attempted to add a new Custom Streaming Tile to the dashboard based on a Line chart to look at the count of records

Unfortunately this Streaming visual doesn’t seem to work and immediately I can see a fatal flaw with using streaming data set visuals for this kind of data

These visuals are for data that pulls through almost constant stream of data. They are not for data that has a feed that at some points doesn’t have much data coming through. You need to have lots of data in the time frame of the streaming dataset (For example 60 minutes).

I have the #Taskmaster Stream set up for Historical Data Analysis

when Historic data analysis is enabled, the dataset created becomes both a streaming dataset and a push dataset

streaming dataset, has no underlying database

A Push data set

For a push dataset has a few limitations on how much data can be pushed in:

  • 75 max columns
  • 75 max tables
  • 200,000 max rows stored per table in FIFO dataset
  • 5,000,000 max rows stored per table in ‘none retention policy’ dataset

The data is then stored in Power BI and we can actually access it from Desktop.

This means I can create some really nice reports against my taskmaster data set

You cant create Hierarchies of calculated columns over the Push data set. Only Measures. You are very limited on what you can do. The Key Influencers visual is also unable to work with push data which is a shame because this is the perfect visual for analysing positive and negative tweets

I should have brought date across as a date in the first instance because Month is now just a number and I cant change this in Power BI. We have date but only against the very latest data

Time was already in the data set but this consists of date and Time which you cant reset to create a date hierarchy

I cant add day names to the day number(Taskmaster is on on Wednesday so I expect the levels to go up then)

So the push data set is fairly simple to set up but its incredibly limiting to what you can do

Our initial page by Month. I needed to Add Month Name into the Flow

Next I drill through on the 9th of October. the name of the day would be great because taskmaster is on a Wednesday

Finally we drill through to the hourly tweets.

As you can see, We have a few negatives in the data set which, on reading aren’t actually negative. The Cognitive API doesn’t recognise sarcasm for instance.

There you go, We started out with a streaming data set and ended up with a push data set.

The push data set doesn’t need a refresh which is them main reason to go for them.

One last quick thing about this push dataset. When I attempted to republish the report I got the following error

To get past this error I had to delete my report in Power BI Service before trying again. I have never seen this kind of conflict before and I’m assuming its an issue with the Push data set

Power BI Service Data Lineage View

I was logging into Power BI this morning when I saw this exciting new feature

we are always looking at new solutions to provide good data lineage so this is well worth a look

Data lineage includes the data origin, what happens to it and where it moves over time. Data lineage gives visibility while greatly simplifying the ability to trace errors back to the root cause in a data analytics process. 

Wikipedia

I have an App workspace set up for Adventureworks so lets have a look at Lineage using this project

Column 1 is my data source. I can see I’m using a local database and I’m also using an xlsx spreadsheet to bring in data.

In most of my projects I’m working on the ETL in Data factory, transforming data in Stored Procedures etc. for example, for a social media feed, I have a logic app that moves tweets to an Azure Data Storage NOSQL table. Data Factory then transfers this data across into a central Azure Data Warehouse. The Power BI Lineage would pick up at the data Warehouse stage. It wont take into account that there is a lot of work previous to this

Column 2 is the data set in Power BI

Column 3 provides Report information

Column 4 displays the Dashboards

You can click on a data flow node to drill down into more detail

Currently you cant go any further to look at the data items

Click on the Link icon to see the data flow for that item. In this case the Report.

This is a great start but there definitely needs to be more information here to make it something that you would want to use as a proper Data Lineage Tool

  • It would be good to see the fields in each Entity for the Data Sets
  • As an extra, it would be great to see what fields are being used in Measures and calculated Fields
  • Reports – For me, Id like to know for every page in my report
    • What field am I using from the data source
    • What calculated columns I have created (Even better with the DAX Logic)
    • Any Name changes from Data Source to Power BI
    • What measures I have created (Even better with the DAX Logic)
  • For the Dashboard, What items I am using in the dashboards (Fields, Measures, Calculated Columns
  • An Important part of data lineage is getting and understanding of the entire process. This includes data transformations pre Power BI. If you cant do that in here, it would be great to be able to extract all the information out so you can use it in some way with your other Linage information to provide the full story. for example:

Azure Data Catalogue

Azure Data Catalog is a fully managed cloud service. Users can discover and consume data sources via the catalog and is a single , central place for all the organisation to contribute and understand all your data sources.

https://eun-su1.azuredatacatalog.com/#/home

I have already registered Our Data Catalog, and I have downloaded the desktop app

As an Example I want to connect to Azure Table Storage (Connect using Azure Account name and Access Key)

At this point I’m registering everything in the storage table. then I can view the information in the Azure Portal.

You can add a friendly Name, description, Add in expert (in this case me). Tags and management information

I have added Data Preview so you can view the data within the object. there is also documentation and Column information to look at

In the data catalog you can manually add lots of description to your tables along with documentation.

This is great for providing lots of information about your data . You can explore databases and open the information in other formats (Great if you need to supply information to another Data lineage package

I will be having a look at the Azure Data catalog in more detail later to see how it could help to provide full data lineage

Azure Data Factory

Data factory is the Azure ETL Orchestration tool. Go into Monitoring for Lineage Information. However, there doesn’t seem to be a way to export this information to use. Data Factory wont take into account the work done in, for example a stored Procedure

Again this is another area to look into more.

Stored Procedures

When you use Stored Procedures to transform you data, its harder to provide automated Linage on your code. There are automated data lineage tool for SQL out there, but it would be great if there was a specific Tool within Azure that creates Data Lineage information from your Stored Procedures

Azure Logic Apps

Data for my project is collected via Logic Apps before being Processed into an Azure Data Warehouse.

Essentially, we need out data lineage to capture everything all in one place.

And just as important. everything should be as automated as possible. If I quickly create a measure, the data lineage should reflect this with no manual input needed (Unless you want to add some description to the new measure as to why it was created)

Power BI Streaming Data sets Part 1

What you need

  • Power BI
  • Microsoft Flow  Or Logic Apps in Azure
  • Cognitive Service. Text Analytics in Azure

I love the show Taskmaster and I really want to see how much other people love taskmaster and when people are tweeting about it.

I want to see everything happening real time in my Power BI report.

For this example, I am going to use a streaming data set. The data will be pushed into the Power BI service, but Power BI will only store the data in a temporary cache which expires. We can give it a window of time to look at.

There is no underlying database with a streaming data set so I wont be able to use my usual report visuals. I will need to use the Streaming visuals available in Power BI

There are other ways to do real time data such as Push Data sets and PubNub streaming datasets but for this example I want to concentrate on the Streaming data set

Log into the power BI Service https://powerbi.microsoft.com/en-us/landing/signin/

I’m going to do this simply in my Workspace

Click on Create in the right hand corner of the screen

Then click on Streaming dataset

Choose API to create an API endpoint

API “a set of functions and procedures that allow the creation of applications which access the features or data of an operating system, application, or other service.”

Our Streaming Dataset will contain these values.

Extra time dimension fields have been created to use drill/down features.

The outcome of the sentiment analysis is placed in the field ‘score’. From 0 to 1 Positive to Negative

Historical data set’ to ‘on’, if you want to save the data for analysis later in time.

Power BI makes a JSON like format (a REST Webservice on the background).

JSON – is an open-standard file format that uses human-readable text to transmit data objects consisting of attribute–value pairs and array data types

REST – ‘Representational State Transfer’. It is primarily used to build Web services that are lightweight, maintainable, and scalable. A service based on REST is called a RESTful service. REST is not dependent on any protocol, but almost every RESTful service uses HTTP as its underlying protocol.

Once you click done, you can see that we have the new data set in our datasets list.

Microsoft Flow / Azure Logic App

The next stop is to create a Microsoft Flow (You can use Azure Logic App if you prefer)

Microsoft Flow is a cloud-based software tool that allows employees to create and automate workflows across multiple applications and services without the need for developer help.

https://flow.microsoft.com.  And sign up for a new account if you need one

Go to My Flows and Automated Flow

Click on When a new tweet is posted

And Create

When you are in the Flow you create New Steps to build up the logic required for your data. The Flow will create the Twitter feed for your Reports.

The first step I have simply added #Taskmaster to the When a new tweet is posted

You will also need to log into your twitter account at this point to get access to tweets

Click on New Step and add an action

The next step is to add some sentiment Analysis. I have used Text Analytics to do this

I have searched for sentiment and can then click on Text Analytics – Detect Sentiment

I have already logged into my Azure Text Analytics

I used the Key, Name and Endpoint details from Azure

You may need to set up your own Cognitive Service in Azure before you do this section.

Next step is to search for Power BI

Select Power BI – Add Rows to a Dataset

Next, add all the information about the RealTimeData streaming dataset

Most of the details can be added from the Dynamic Content picker. As you can see Score comes from the Detect Sentiment group. All the twitter data comes from the very first step ‘When a new tweet is posted ’ Group

The following are the expressions you need to add for the Time fields

  • Time formatDateTime(utcNow(),’MM/dd/yyyy HH:mm:ss’)
  • Month formatDateTime(utcNow(),’MM’)
  • Day dayOfMonth(utcNow())
  • Hour formatDateTime(utcNow(),’HH’)
  • Minute formatDateTime(utcNow(),’mm’)

Save your Workflow

Testing Service in Microsoft Flow

The real time analysis will be triggered by hashtags in Twitter

This service only allows you to get current tweets. No historical tweets can be generated at this point

The visualization tiles will only be kept for an hour so this is the kind of service you need to monitor fast paced information. 

In the Workflow you can save

Click Test and Create a tweet with one of the hashtags in the workflow.

The service will run and you should get pass or fail information.

We can now use this information to create a sentiment report in PowerBI

Back to Power BI Service

We already found the  dataset in the relevant workspace e.g. My Workspace

For this example, click on Create report against the dataset and add a table tile to the report pane. (The data may be refreshing at this level)

Leave like this for the time being. Save your report and pin table visual to dashboard by clicking the pin

I created the table simply to create a report and then a dashboard. Once at dashboard level Custom Streaming data tiles can be added. I am deleting the table when the dashboard is created for simplicity

Click on +Add Tile

Then choose the Custom Streaming data

At present, there isn’t many visualisations to choose from for this type of data

Click Next

The #Taskmaster dataset has been chosen

I am going to create a very simple bar chart to look at the last hours data and see what the sentiment score is based on location

Now I can see the sentiment score by Location.

Azure has provided the Cognitive Text services to analyse the data for a score.

It would be useful to understand what tweets are driving the sentiments by location.

I would also prefer to group on Positive neutral and negative Scores

In my next post I am going to look at how to create a sentiment description and add Description Name and Count into the Flow and streaming dataset

Azure Fundamentals training (Quick Notes)

I’ve been spending some time revising for the Azure Fundamentals course. Here is a quick list of some of the  more problematic test questions I’ve come across

Azure Advisor

  • Detects threats and vulnerabilities
  • Ensures Fault Tolerance
  • Helps reduce spending
  • Protects data from accidental deletions
  • Speeds up your apps

Application Gateway

Multiple instances of a web application are created across three availability Zones. The company also configures a networking product to evenly distribute service requests based on 3 different URL’s

Application insights

  • Feature of Azure monitor
  • Visually analyse telemetry data

ATP (Azure threat Protection)

  • Pass the ticket – Attacker stealing KERBEROS data
  • Pass the hash – Attacker stealing NTLM data
  • Suspected Brute Force attack – Multiple attempts to guess a users password

Compliance

http://servicetrust.microsoft.com – Compliance manager URL

  • Audit Reports – Service is within the trust Portal to determine Azure Compliance with GDPR
  • Compliance manager – Determines whether or not your services meet industry standards
  • GDPR – Standards enforced by a government Agency
  • Germany – Country with a dedicated trustee for customer data.
    • Physically isolated instance of Azure
  • Azure government – Only available in the US
  • ISO- International Standards based on non reg agency
  • NIST – Standard based none reg agency based in the United States
    • National Institute of Standards and technology

Cloud Shell, CLI and Powershell

Azure CLI

  • Az login
  • Cross platform command based line tool

Azure Cloud Shell

  • New-AzureRmVM
  • Web based tool after you log onto the Azure portal

Azure Powershell

  • Connect -AzureRMAccount
  • Use when you need to log into Azure without opening a web browser

Azure Governance

  • Locks – Prevent users from deleting resources
  • Advisor – Use information from the Security center to best practices
  • Initiatives – Define a set of policies

Cloud Computing terms

  • Fault Tolerance – Power Outage in a data center. Automatic Failover for continual operation
  • High Availability – Having data available when you need it

Fault tolerance and High Availability are both good for the scenario when you are moving on premise data centers to the cloud. The data is mission critical, there is a need for access to the data sources at all times. Changes are incremental and easy to predict.

  • Elasticity – Sudden spikes in traffic
  • Scalable – Increase the Number of VMs easily

Azure Locks

  • Multiple Locks applied to different scopes. The most restrictive lock is applied
  • The lock applies to all resources contained in a scope and any new resources added to the scope

Networking

  • NSG – Network Security Group. Inbound traffic for a virtual machine from specified IP addresses
  • DDoS- Distributed Denial of Service Prevents a flood of HTTP traffic to a VN that hosts IIS
  • Firewall – Create a rule that restricts network traffic

RBAC

Limit Access to Resources at the resource groups and resource Scope

Service Health

  • Notifies if App service usage exceeds the usage quota
  • Respond to planned Service outages
  • Implement a web hook to display health incidents

Azure – Data Factory – changing Source path of a file from Full File name to Wildcard

I originally had one file to import into a SQL Database  Survey.txt

The files are placed in Azure blob storage ready to be imported

I then use Data Factory to import the file into the sink (Azure SQL Database)

However, the data is actually in one worksheet a year. For full logic I need to be able to add a worksheet to the blob storage to get it imported and each worksheet will contain the year.

This means I need to change the Source and Pipeline in Data Factory

First of all remove the file name from the file path. I used 1 file to set up the Schema. All files are the same so this should be OK.

Next I go to the Pipeline and set up the Wildcard in here Survey*.txt

When the Pipeline is run, it will take all worksheets against for example Survey