March 31 - April 2, 2025, in Las Vegas, Nevada. Use code MSCUST for a $150 discount! Early bird discount ends December 31.
Register NowBe one of the first to start using Fabric Databases. View on-demand sessions with database experts and the Microsoft product team to learn just how easy it is to get started. Watch now
Hello everyone!
We are ingesting data from an Azure storage account. The pipeline has a copy activity to the OneLake/Files storage and a small notebook to translate the data into OneLake/Tables.
The notebook does some logging, which serves us well, but we can't recognise when the data in Azure is stale.
The idea would be to somehow get the modification time of each file on the source. Is there any way to do that?
Best Regards,
Jörg
Is it possible to use the Get Metadata activity?
I never considered this! It's a good pointer - but what next? I can't put the metadata into a variable in the notebook ("Error: Unable to convert metadata to string.")
I see that Set Variable would accept arrays, but how do I get that from inside the Notebook?
You can pass variables from the pipeline into the Notebook:
https://learn.microsoft.com/en-us/fabric/data-factory/notebook-activity#notebook-settings
How do you do the copy, btw? Are you using a loop to run through the files in the Azure Storage and copy every one of them?
If so, you could perhaps use the outputs from the Get Metadata activity to select which files to copy before copying them with the Copy Activity?
EDIT: If you need to convert between array and string, or other manipulations, you could have a look and see if some functions can work for you:
https://learn.microsoft.com/en-us/fabric/data-factory/expression-language#functions
E.g.
and Activities
https://learn.microsoft.com/en-us/fabric/data-factory/activity-overview#control-flow-activities,
e.g.
So, the original pipeline is Copy -> Notebook1 -> Notebook2 which is basically the ELT steps. The Copy gets all files as binary from the Azure source using flatten hierarchy. Notebook1 interprets the files (which are basically CSV, but with some weird details that dataflows can't handle) and adds their content to the table, and Notebook2 does some aggregations on all of the data (fresh and old).
Splitting this using Foreach would create quite some overhead, because the notebooks do have some setting up that is only done once. Also, I'm really not keen on rewriting the whole thing.
So I tried experimenting with Get Metadata -> Notebook to try interpreting the data.
Weirdly enough, it takes the combination
@join(array(activity('Get Metadata1').output), '$')
to actually work. Unfortunately, that's still not what I'm looking for. It's some telemetry of the pipeline step, not the metadata I was looking for.
I really appreciate you taking the time to walk me through this, but I would much prefer some more detailed documentation or a debugging mode.
Did you try to use the ChildItems from the Metadata Activity?
Field list -> Arguments -> ChildItems
https://learn.microsoft.com/en-us/fabric/data-factory/get-metadata-activity
Also ref.:
Do you want to
a) check the modified time of the files in the source (Azure storage), or
b) check the time when the file was copied by the pipeline?
Do you copy all files from Azure storage into Lakehouse Files folder each time the data pipeline runs?
Or do you want to select only specific files to copy from the Azure storage (based on the file's modified time)?
If you want to check the modified time in the Azure storage, I think you will need to use the Get Metadata activity. (I am hoping that the modified time is available as part of the get metadata ChildItems output. If not, perhaps you could try to connect the Notebook directly to the Azure storage and avoid using Data pipeline copy. Or perhaps we could use Dataflow Gen2 to get the metadata and pass it to the data pipeline via an intermediate table. But I would make some more attempts with Get Metadata first.)
If you want to find the time when the file was processed by the data pipeline, then there are other methods for doing that, let me know if this is what you're looking for.
I need a), the original modification time. The reason is this: Distributed and unreliable systems store the source data on SFTP on Azure. The pipeline works pretty well already, but we have no monitoring for the source systems.
I have come one step close to a solution meanwhile. Like you, I have considered childItems - by itself, it's not useful, because it doesn't contain the modification time. But I can do:
GetMetadata1 -> ForEach(childItems): GetMetadata2: modTime
The outputs of each individual GetMetadata2 contain what I need. Unfortunately, ForEach doesn't gather this output.
A work-around would be to have a notebook activity inside the ForEach to store this information bit by bit in a table, but that's rather ugly.
Could you use Append to array variable inside the ForEach?
That way you can store the outputs of each ForEach iteration and use the data stored in the variable in other places in the pipeline.
Do you want to copy only the files where the modified time satisfies some condition? If so, maybe you can use an IF condition in the pipeline, possibly inside the ForEach activity.
Or you want to copy all the files, and just store the modified time also, and then the notebook from Files -> Table will implement logic to use only the files which satisfies some modified time condition?
User | Count |
---|---|
6 | |
2 | |
2 | |
1 | |
1 |