Last time I promised to blog about Azure Data Factory Data Flows, but decided to do this first. My business problem was to process files on On-Premise file share with SSIS without moving original files anywhere. Challenge was that with SSIS that is not easily done without possibility to move the original file. Maybe with log table and custom code.
I decided to use Azure Data Factory (ADF) and Azure Blob Storage to tackle this challenge. Even ADF is missing couple of critical features, but I managed to use workarounds. I wanted to write about this, because I couldn’t find any good instructions for this process in one place.
In ADF there is a nice component called “Get Metadata”. Basically you could do this also with SSIS script component. I decided to use File Creation Date and store that into BatchControl table for the next run.
First I use “Lookup” activity to get previous executions max file creation date.
Select BatchDate, DATEADD(HH, -2, BatchDate) AS BatchDate2 From BatchControl Where BatchId = (Select Max(BatchId) From BatchControl Where BatchName = 'GAVFiles' AND Completed = 1)
ADF has an interesting feature to change the timestamp (sometimes) to UTC time. That’s why I had to deduct two hours from the value to get correct comparison value.
Then I create new row into BatchControl table. I decided to create stored procedures into database to handle BatchControl. You could maybe achieve the same functionality by using “Lookup” activity, because you can write there free hand SQL.
I use as a default value for my batch date the value I just selected in my previous activity, I’ll explain later why. You can do that by using ADF expression language (“Add dynamic content”).
Next there is “Get Metadata” activity. Because there are many files in the source, I use “Child Items” metadata attribute, which produces an array of file names. Note, that in Dataset properties File value should be empty to be able to use “Child Items” metadata. Other note, this approach is slow if you have hundreds or thousands of files.
Next I have “For Each” activity.
As Items I use again expression, which read previously created file list. Exact syntax of the last part you need check from the documentation or when debugging, from the output of “Get Metadata” activity.
Into “For Each” activity you define sub activities, which is like own pipeline.
First I have again “Get Metadata” activity, but this time it processes only single file at the time and gets “Created” metadata. Next I have “If Condition” activity to decide if file should be processed or not. There I compare previous max batch date and file metadata created date. Note ADF is not able compare dates directly, so I use ticks-functions to convert dates to integer values.
Then again I have a sub pipeline where I copy file from on-premise file share to Azure Storage and update batch control table.
There was a tricky part in defining the source data set. You need to define into source properties an expression @item().name to be able to handle names, which are coming through pipeline.
Within UpdateBatch stored procedure I have a logic, which updates BatchDate value only if value is greater than previous value, because I’m not sure in which order ADF processed the files. At least it wasn’t latest as last.
Next there shall be “Execute SSIS Package” activity, which shall process the files from blob storage. After processing the files I have a “Lookup” activity, which checks the batch dates between current and previous runs to identify has there been new files.
Select A.BatchDate As BatchDate1, B.BatchDate as BatchDate0 From (Select BatchDate, 1 AS Dummy From BatchControl Where BatchId = (Select Max(BatchId) From BatchControl Where BatchName = 'GAVFiles' AND Completed = 1)) A JOIN (Select BatchDate, 1 AS Dummy From BatchControl Where BatchId = (Select Max(BatchId) From BatchControl Where BatchName = 'GAVFiles' AND Completed = 0)) B ON A.Dummy = B.Dummy
This has to be done, because “Copy” activity fails if source container is empty, which in my case is valid scenario. I think this behavior for “Copy” activity is very odd. I tried to use “Get Metadata” activity to check file existence, but it didn’t work at least with blob files and wildcards in file name. This feature is also, let’s say unexpected.
I utilized “If Condition” to tackle this problem where I used this expression @greater(activity(‘LookupNewFiles’).output.firstRow.BatchDate0, activity(‘LookupNewFiles’).output.firstRow.BatchDate1)
Next challenge was that there is no move or delete file activity in ADF. I think it is currently most requested feature for ADF and it’s already in implementation pipeline. After SSIS has processed the files, I wanted to move them to different location. This can be achieved with Azure Logic App. That I learned from here: https://kromerbigdata.com/2018/03/15/azure-data-factory-delete-from-azure-blob-storage-and-table-storage/
I have to activities within “If Condition”.
First one copies the files and next one deletes the file. Copying could be done in one batch, because I copy all the files from source container. Deletion of the files has to be one by one. It is in the separate pipeline, because “If Condition” can’t contain “For Each” activity. Separate pipeline is a workaround for that.
My Delete pipeline consist of two activities and within “ForEach” activity there is one activity.
”Webactivity” calls Logic App, which does the actual blob deletion. Logic App itself is quite simple. Details about that from link above.
The challenge for me was to solve the problem how can I send changing file names to Logic App. After some serious testing I found this code worked in Body part of the “Web” activity.
And that was it! Not any simple template, but I managed to do it. I hope this helps also somebody else to do the same, because this scenario is quite common.
However my next challenge is to get SSIS package deployed into SSIS IR Integration Catalog. The deployment fails, if I have a blob source in my package and I don’t know yet why.