Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Don't miss out! 2025 Microsoft Fabric Community Conference, March 31 - April 2, Las Vegas, Nevada. Use code MSCUST for a $150 discount. Prices go up February 11th. Register now.

Reply
TimoRiikonen
Helper V
Helper V

How to pass results one by one from Notebook to Dataflow in Data Pipeline?

Hello,
We have too much data to import all of it on one go, so I am trying to import it one file at a time.

I have

- Notebook that reads a list of filenames and outputs it

- Planning to have: Dataflow to that will take one filename and import its contents to Lakehouse. Files are in XML format and I have written the extraction to dataflow already so I would rather use dataflow than notebook in here.

 

And now I am trying to build a data pipeline with a simple foreach loop into it and I need help with this.

TimoRiikonen_0-1730298571659.png

 

 

@activity('Read filelist').output

The function 'length' expects its parameter to be an array or a string. The provided value is of type 'Object'.

 

@activity('Read filelist').output.value

The expression 'length(activity('Read filelist').output.value)' cannot be evaluated because property 'value' doesn't exist, available properties are 'runId, runStatus, sessionId, sparkPool, error, lastCheckedOn, metadata, highConcurrencyModeStatus, exitValue'.

 

@activity('Read filelist').output.result

The function 'length' expects its parameter to be an array or a string. The provided value is of type 'Object'.

 

@activity('Read filelist').output.result.value

The expression 'length(activity('Read filelist').output.result.value)' cannot be evaluated because property 'value' doesn't exist, available properties are 'runId, runStatus, sessionId, sparkPool, error, lastCheckedOn, metadata, highConcurrencyModeStatus, exitValue'.

5 REPLIES 5
TimoRiikonen
Helper V
Helper V

Ok, sequential setting was right there.
Next issue: How to pass the filename to dataflow?
I can define parameters in dataflow, but I can't test this except by running the data pipeline I believe.

I did 2 attempts to pass parameters, does either of them work?

"activities": [
{
"name": "Dataflow1",
"type": "RefreshDataflow",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"isSequential": true,
"parameters": {
"FileName1": {
"value": "@item()",
"type": "string"
}
},
"inputs": [
{
"referenceName": "MyInputDataset",
"type": "DatasetReference",
"parameters": {
"FileName2": "@item()"
}
}
],



TimoRiikonen
Helper V
Helper V

Thank you @spencer_sa ,
I managed steps 1 - 3 and possibly #4 as well, but data pipeline runs these concurrently, which would be a disaster.
Proof: All of the dataflows have the same starting time even though all of their duration is more than a second.

TimoRiikonen_0-1730363038714.png

These are the last lines from the JSON, so the concurrency should be one and thus I expected these to run sequentially:

        "concurrency": 1,
        "lastModifiedByObjectId": "d3444903-5e8a-433e-91e2-cdfbfb8816aa",
        "lastPublishTime": "2024-10-30T12:47:58Z"
    }
}

I'd have thought to ensure the ForEach items get processed sequentially you'd need to make sure that the 'Sequential' checkbox is ticked in the ForEach step;

spencer_sa_0-1730368315341.png

(or if you prefer your pipeline in JSON format)

spencer_sa_1-1730368366923.png

 

Here is the Notebook code I used for anyone who is looking for solution

import json

df = spark.sql("SELECT Name FROM DE_LH_100_Bronze.TicketFullBulkFilelist LIMIT 4")

# Convert the DataFrame to JSON format and collect the results

json_rdd = df.toJSON().collect()

# json_object now contains the data as a JSON-compatible list of dictionaries

# print(json_rdd)

mssparkutils.notebook.exit(json_rdd)
spencer_sa
Solution Supplier
Solution Supplier

Step 1 - Have your Notebook create a JSON string from the list of filenames. e.g    filenamelist = '["file1"."file2"]'
Step 2 - use the following to finish your notebook script.     notebookutils.notebook.exit(filenamelist)

Step 3 - On the foreach container, parse the exitValue into an array @Json(activity('Read filelist').output.result.exitValue)
Step 4 - Inside the for each container you refer to each filename using @Item()


See here for a similar problem:  https://community.fabric.microsoft.com/t5/Data-Pipeline/Referencing-notebook-exit-value-as-a-variabl...

Incidentally this is the solution we've used to output a list of unprocessed items, as ADF expression language has a 'contains', but not a 'does not contain' collection function.

Helpful resources

Announcements
Las Vegas 2025

Join us at the Microsoft Fabric Community Conference

March 31 - April 2, 2025, in Las Vegas, Nevada. Use code MSCUST for a $150 discount!

ArunFabCon

Microsoft Fabric Community Conference 2025

Arun Ulag shares exciting details about the Microsoft Fabric Conference 2025, which will be held in Las Vegas, NV.

December 2024

A Year in Review - December 2024

Find out what content was popular in the Fabric community during 2024.