Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Be one of the first to start using Fabric Databases. View on-demand sessions with database experts and the Microsoft product team to learn just how easy it is to get started. Watch now

Reply
TimoRiikonen
Helper V
Helper V

How to pass results one by one from Notebook to Dataflow in Data Pipeline?

Hello,
We have too much data to import all of it on one go, so I am trying to import it one file at a time.

I have

- Notebook that reads a list of filenames and outputs it

- Planning to have: Dataflow to that will take one filename and import its contents to Lakehouse. Files are in XML format and I have written the extraction to dataflow already so I would rather use dataflow than notebook in here.

 

And now I am trying to build a data pipeline with a simple foreach loop into it and I need help with this.

TimoRiikonen_0-1730298571659.png

 

 

@activity('Read filelist').output

The function 'length' expects its parameter to be an array or a string. The provided value is of type 'Object'.

 

@activity('Read filelist').output.value

The expression 'length(activity('Read filelist').output.value)' cannot be evaluated because property 'value' doesn't exist, available properties are 'runId, runStatus, sessionId, sparkPool, error, lastCheckedOn, metadata, highConcurrencyModeStatus, exitValue'.

 

@activity('Read filelist').output.result

The function 'length' expects its parameter to be an array or a string. The provided value is of type 'Object'.

 

@activity('Read filelist').output.result.value

The expression 'length(activity('Read filelist').output.result.value)' cannot be evaluated because property 'value' doesn't exist, available properties are 'runId, runStatus, sessionId, sparkPool, error, lastCheckedOn, metadata, highConcurrencyModeStatus, exitValue'.

5 REPLIES 5
TimoRiikonen
Helper V
Helper V

Ok, sequential setting was right there.
Next issue: How to pass the filename to dataflow?
I can define parameters in dataflow, but I can't test this except by running the data pipeline I believe.

I did 2 attempts to pass parameters, does either of them work?

"activities": [
{
"name": "Dataflow1",
"type": "RefreshDataflow",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"isSequential": true,
"parameters": {
"FileName1": {
"value": "@item()",
"type": "string"
}
},
"inputs": [
{
"referenceName": "MyInputDataset",
"type": "DatasetReference",
"parameters": {
"FileName2": "@item()"
}
}
],



TimoRiikonen
Helper V
Helper V

Thank you @spencer_sa ,
I managed steps 1 - 3 and possibly #4 as well, but data pipeline runs these concurrently, which would be a disaster.
Proof: All of the dataflows have the same starting time even though all of their duration is more than a second.

TimoRiikonen_0-1730363038714.png

These are the last lines from the JSON, so the concurrency should be one and thus I expected these to run sequentially:

        "concurrency": 1,
        "lastModifiedByObjectId": "d3444903-5e8a-433e-91e2-cdfbfb8816aa",
        "lastPublishTime": "2024-10-30T12:47:58Z"
    }
}

I'd have thought to ensure the ForEach items get processed sequentially you'd need to make sure that the 'Sequential' checkbox is ticked in the ForEach step;

spencer_sa_0-1730368315341.png

(or if you prefer your pipeline in JSON format)

spencer_sa_1-1730368366923.png

 

Here is the Notebook code I used for anyone who is looking for solution

import json

df = spark.sql("SELECT Name FROM DE_LH_100_Bronze.TicketFullBulkFilelist LIMIT 4")

# Convert the DataFrame to JSON format and collect the results

json_rdd = df.toJSON().collect()

# json_object now contains the data as a JSON-compatible list of dictionaries

# print(json_rdd)

mssparkutils.notebook.exit(json_rdd)
spencer_sa
Continued Contributor
Continued Contributor

Step 1 - Have your Notebook create a JSON string from the list of filenames. e.g    filenamelist = '["file1"."file2"]'
Step 2 - use the following to finish your notebook script.     notebookutils.notebook.exit(filenamelist)

Step 3 - On the foreach container, parse the exitValue into an array @Json(activity('Read filelist').output.result.exitValue)
Step 4 - Inside the for each container you refer to each filename using @Item()


See here for a similar problem:  https://community.fabric.microsoft.com/t5/Data-Pipeline/Referencing-notebook-exit-value-as-a-variabl...

Incidentally this is the solution we've used to output a list of unprocessed items, as ADF expression language has a 'contains', but not a 'does not contain' collection function.

Helpful resources

Announcements
Las Vegas 2025

Join us at the Microsoft Fabric Community Conference

March 31 - April 2, 2025, in Las Vegas, Nevada. Use code MSCUST for a $150 discount!

Dec Fabric Community Survey

We want your feedback!

Your insights matter. That’s why we created a quick survey to learn about your experience finding answers to technical questions.

ArunFabCon

Microsoft Fabric Community Conference 2025

Arun Ulag shares exciting details about the Microsoft Fabric Conference 2025, which will be held in Las Vegas, NV.

December 2024

A Year in Review - December 2024

Find out what content was popular in the Fabric community during 2024.