Supplies are limited. Contact info@espc.tech right away to save your spot before the conference sells out.
Get your discountScore big with last-minute savings on the final tickets to FabCon Vienna. Secure your discount
Hello! I am trying to pass information from a pipeline run to a notebook and I am unable to do so.
I have a forEach loop that runs 8 pipelines to copy data using Invoke Pipeline (preview) and waits for them to complete before running a notebook. I want to pass the names of only the pipeline runs that wrote files to the notebook. However, the Invoke Pipeline activity only shows Pipeline Run ID and pipeline name as outputs.
Is there a way I can get the same pipeline output information that I would get from the individual pipeline run to pass through the Invoke Pipeline activity? I think I could make an API call to the Fabric API using the pipeline run ID, but I can't figure out how to call it successfully from the pipeline. All help is appreciated!
Solved! Go to Solution.
Hi @esjetmore,
Currently the new Invoke Pipeline (Preview) activity is having issues with Pipeline return value. Once it is available, you can use it to return values from the invoked Pipelines.
Invoke pipeline activity - Microsoft Fabric | Microsoft Learn
Before existence of Pipeline return value and Fail Activity in ADF, we used to copy empty files with specific name from one location to another when a particular pipeline finishes. We then check the existence of file to know which pipelines successfully completed before starting another process. You can try that with Lakehouse.
Or, Pipeline REST API is simple if you can pass all the pipeline Run IDs as an array (using set Variable) to the notebook and in the notebook you can call the Fabric REST API using the below code.
#Helper function to call Fabric REST API
from notebookutils import mssparkutils as msu
import requests
import time
def call_fabric_api(method, uri, payload=None):
endpoint = "https://api.fabric.microsoft.com/v1"
# Get PBI Access token and have it in the header for Authorization
headers = {
"Authorization": "Bearer " + msu.credentials.getToken("pbi"),
"Content-Type": "application/json"
}
#Create new session and try sending a request to the Fabric REST API
session = requests.Session()
try:
url = f"{endpoint}/{uri}"
response = session.request(method, url, headers=headers, json=payload, timeout=120)
print(response.json())
return response.json()
except requests.RequestException as ex:
print(ex)
And using something like this:
method = "get"
base_uri = f"workspaces/<your WS Id>/items/<pipeline id>/jobs/instances/<job ID>"
data = call_fabric_api(method,base_uri)
if data["status"] == "Completed":
# Add your logic here
https://learn.microsoft.com/en-us/fabric/data-factory/pipeline-rest-api#get-item-job-instance
Hi @esjetmore,
Currently the new Invoke Pipeline (Preview) activity is having issues with Pipeline return value. Once it is available, you can use it to return values from the invoked Pipelines.
Invoke pipeline activity - Microsoft Fabric | Microsoft Learn
Before existence of Pipeline return value and Fail Activity in ADF, we used to copy empty files with specific name from one location to another when a particular pipeline finishes. We then check the existence of file to know which pipelines successfully completed before starting another process. You can try that with Lakehouse.
Or, Pipeline REST API is simple if you can pass all the pipeline Run IDs as an array (using set Variable) to the notebook and in the notebook you can call the Fabric REST API using the below code.
#Helper function to call Fabric REST API
from notebookutils import mssparkutils as msu
import requests
import time
def call_fabric_api(method, uri, payload=None):
endpoint = "https://api.fabric.microsoft.com/v1"
# Get PBI Access token and have it in the header for Authorization
headers = {
"Authorization": "Bearer " + msu.credentials.getToken("pbi"),
"Content-Type": "application/json"
}
#Create new session and try sending a request to the Fabric REST API
session = requests.Session()
try:
url = f"{endpoint}/{uri}"
response = session.request(method, url, headers=headers, json=payload, timeout=120)
print(response.json())
return response.json()
except requests.RequestException as ex:
print(ex)
And using something like this:
method = "get"
base_uri = f"workspaces/<your WS Id>/items/<pipeline id>/jobs/instances/<job ID>"
data = call_fabric_api(method,base_uri)
if data["status"] == "Completed":
# Add your logic here
https://learn.microsoft.com/en-us/fabric/data-factory/pipeline-rest-api#get-item-job-instance
User | Count |
---|---|
4 | |
4 | |
2 | |
2 | |
2 |
User | Count |
---|---|
10 | |
8 | |
7 | |
6 | |
6 |