Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Next up in the FabCon + SQLCon recap series: The roadmap for Microsoft SQL and Maximizing Developer experiences in Fabric. All sessions are available on-demand after the live show. Register now

Reply
TimoRiikonen
Advocate V
Advocate V

How to pass results one by one from Notebook to Dataflow in Data Pipeline?

Hello,
We have too much data to import all of it on one go, so I am trying to import it one file at a time.

I have

- Notebook that reads a list of filenames and outputs it

- Planning to have: Dataflow to that will take one filename and import its contents to Lakehouse. Files are in XML format and I have written the extraction to dataflow already so I would rather use dataflow than notebook in here.

 

And now I am trying to build a data pipeline with a simple foreach loop into it and I need help with this.

TimoRiikonen_0-1730298571659.png

 

 

@activity('Read filelist').output

The function 'length' expects its parameter to be an array or a string. The provided value is of type 'Object'.

 

@activity('Read filelist').output.value

The expression 'length(activity('Read filelist').output.value)' cannot be evaluated because property 'value' doesn't exist, available properties are 'runId, runStatus, sessionId, sparkPool, error, lastCheckedOn, metadata, highConcurrencyModeStatus, exitValue'.

 

@activity('Read filelist').output.result

The function 'length' expects its parameter to be an array or a string. The provided value is of type 'Object'.

 

@activity('Read filelist').output.result.value

The expression 'length(activity('Read filelist').output.result.value)' cannot be evaluated because property 'value' doesn't exist, available properties are 'runId, runStatus, sessionId, sparkPool, error, lastCheckedOn, metadata, highConcurrencyModeStatus, exitValue'.

5 REPLIES 5
TimoRiikonen
Advocate V
Advocate V

Ok, sequential setting was right there.
Next issue: How to pass the filename to dataflow?
I can define parameters in dataflow, but I can't test this except by running the data pipeline I believe.

I did 2 attempts to pass parameters, does either of them work?

"activities": [
{
"name": "Dataflow1",
"type": "RefreshDataflow",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"isSequential": true,
"parameters": {
"FileName1": {
"value": "@item()",
"type": "string"
}
},
"inputs": [
{
"referenceName": "MyInputDataset",
"type": "DatasetReference",
"parameters": {
"FileName2": "@item()"
}
}
],



TimoRiikonen
Advocate V
Advocate V

Thank you @spencer_sa ,
I managed steps 1 - 3 and possibly #4 as well, but data pipeline runs these concurrently, which would be a disaster.
Proof: All of the dataflows have the same starting time even though all of their duration is more than a second.

TimoRiikonen_0-1730363038714.png

These are the last lines from the JSON, so the concurrency should be one and thus I expected these to run sequentially:

        "concurrency": 1,
        "lastModifiedByObjectId": "d3444903-5e8a-433e-91e2-cdfbfb8816aa",
        "lastPublishTime": "2024-10-30T12:47:58Z"
    }
}

I'd have thought to ensure the ForEach items get processed sequentially you'd need to make sure that the 'Sequential' checkbox is ticked in the ForEach step;

spencer_sa_0-1730368315341.png

(or if you prefer your pipeline in JSON format)

spencer_sa_1-1730368366923.png

 

Here is the Notebook code I used for anyone who is looking for solution

import json

df = spark.sql("SELECT Name FROM DE_LH_100_Bronze.TicketFullBulkFilelist LIMIT 4")

# Convert the DataFrame to JSON format and collect the results

json_rdd = df.toJSON().collect()

# json_object now contains the data as a JSON-compatible list of dictionaries

# print(json_rdd)

mssparkutils.notebook.exit(json_rdd)
spencer_sa
Impactful Individual
Impactful Individual

Step 1 - Have your Notebook create a JSON string from the list of filenames. e.g    filenamelist = '["file1"."file2"]'
Step 2 - use the following to finish your notebook script.     notebookutils.notebook.exit(filenamelist)

Step 3 - On the foreach container, parse the exitValue into an array @Anonymous(activity('Read filelist').output.result.exitValue)
Step 4 - Inside the for each container you refer to each filename using @Item()


See here for a similar problem:  https://community.fabric.microsoft.com/t5/Data-Pipeline/Referencing-notebook-exit-value-as-a-variable-in-a-data-pipeline/m-p/3507053

Incidentally this is the solution we've used to output a list of unprocessed items, as ADF expression language has a 'contains', but not a 'does not contain' collection function.

Helpful resources

Announcements
FabCon and SQLCon Highlights Carousel

FabCon &SQLCon Highlights

Experience the highlights from FabCon & SQLCon, available live and on-demand starting April 14th.

New to Fabric survey Carousel

New to Fabric Survey

If you have recently started exploring Fabric, we'd love to hear how it's going. Your feedback can help with product improvements.

Join our Fabric User Panel

Join our Fabric User Panel

Share feedback directly with Fabric product managers, participate in targeted research studies and influence the Fabric roadmap.

March Fabric Update Carousel

Fabric Monthly Update - March 2026

Check out the March 2026 Fabric update to learn about new features.