The ultimate Fabric, Power BI, SQL, and AI community-led learning event. Save €200 with code FABCOMM.
Get registeredEnhance your career with this limited time 50% discount on Fabric and Power BI exams. Ends August 31st. Request your voucher.
hello
I want to call Fabric pipeline from a Notebook, so my entire pipeline will be in one single spark session.
Backgound:
I have a pipeline, which create (~100)new tables, actually it is using copy activity to copy tables from data source. And I want these tables are all enabled CDF.
It has been approved that
Solved! Go to Solution.
Hi @cfccai,
Thank you for reaching out in Microsoft Community Forum.
1.for deployment purpose Instead of attaching lakehouses to the notebook, use ABFSS paths dynamically in your code:
abfss_path = "abfss://<workspace>@onelake.dfs.fabric.microsoft.com/..."
df = spark.read.format("delta").load(abfss_path)
2. Keep using spark.conf.set('spark.databricks.delta.properties.defaults.enableChangeDataFeed', 'true') to enable Change Data Feed (CDF) for tables created during the Spark session.
3. If you're using the Copy activity in your pipeline, pass the ABFSS path for each table dynamically and ensure CDF is enabled as the tables are created.
Please continue using Microsoft community forum.
If you found this post helpful, please consider marking it as "Accept as Solution" and select "Yes" if it was helpful. help other members find it more easily.
Regards,
Pavan.
Hi @cfccai ,
Have you tried configuring in Spark Environment?
Also note that CDC will be pretty soon available in Copy Job
Regards,
Srisakthi
Hi @cfccai,
I hope this information is helpful. Please let me know if you have any further questions or if you'd like to discuss this further. If this answers your question, kindly "Accept as Solution" and give it a 'Kudos' so others can find it easily.
Thank you,
Pavan.
Hi @cfccai,
I wanted to follow up since we haven't heard back from you regarding our last response. We hope your issue has been resolved.
If the community member's answer your query, please mark it as "Accept as Solution" and select "Yes" if it was helpful.
If you need any further assistance, feel free to reach out.
Thank you,
Pavan.
Hi @cfccai,
I wanted to follow up since we haven't heard back from you regarding our last response. We hope your issue has been resolved.
If the community member's answer your query, please mark it as "Accept as Solution" and select "Yes" if it was helpful.
If you need any further assistance, feel free to reach out.
Thank you,
Pavan.
Hi @cfccai
Thanks for reaching out in Microsoft Community Forum.
you cannot directly call a Fabric pipeline within the same Spark session from a notebook. But you can trigger the pipeline externally using the Fabric REST API or PowerShell.
If you found this post helpful, please consider marking it as "Accept as Solution" and select "Yes" if it was helpful.help other members find it more easily.
Thank you,
Pavan.
In addition, for deployment purpose, we don't want attach any lakehouses to that specific notebook which contains my target tables.
So I am seaching for a way that using abfss path as variable or using spark setting spark.databricks.delta.properties.defaults.enableChangeDataFeed
Hi @cfccai ,
Have you tried configuring in Spark Environment?
Also note that CDC will be pretty soon available in Copy Job
Regards,
Srisakthi
Hi @cfccai,
Thank you for reaching out in Microsoft Community Forum.
1.for deployment purpose Instead of attaching lakehouses to the notebook, use ABFSS paths dynamically in your code:
abfss_path = "abfss://<workspace>@onelake.dfs.fabric.microsoft.com/..."
df = spark.read.format("delta").load(abfss_path)
2. Keep using spark.conf.set('spark.databricks.delta.properties.defaults.enableChangeDataFeed', 'true') to enable Change Data Feed (CDF) for tables created during the Spark session.
3. If you're using the Copy activity in your pipeline, pass the ABFSS path for each table dynamically and ensure CDF is enabled as the tables are created.
Please continue using Microsoft community forum.
If you found this post helpful, please consider marking it as "Accept as Solution" and select "Yes" if it was helpful. help other members find it more easily.
Regards,
Pavan.
Thanks for your reply, Pavan.
let me explain my situation. I am creating a streamlined pipeline, which extract data from data source to Fabric lakehouse, and it is incremental load by CDF. That is, I should enable CDF for all the tables(we have hundreds and thousands tables for many customers).
I was thinking of using table's abfss path as variables in command 'ALTER TABLE TableA SET TBLPROPERTIES (delta.enableChangeDataFeed = true)'
While spark sql does NOT recognize it:
Turn to spark setting:
Hi @cfccai,
To call and run a Fabric pipeline from a Notebook while ensuring all actions happen within a single Spark session, you can use the following approach:
Steps
1. After you ensuring that the Change Data Feed (CDF) setting is applied at the beginning of the session:
spark.conf.set('spark.databricks.delta.properties.defaults.enableChangeDataFeed', 'true')
2. Use Azure Data Factory (ADF) REST API: Trigger pipelines using the ADF REST API. You can use requests in Python or other libraries to call the pipeline.
Python Code to Trigger Pipeline:
import requests
# Azure Active Directory token
def get_access_token(tenant_id, client_id, client_secret):
url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
payload = {
'grant_type': 'client_credentials',
'client_id': client_id,
'client_secret': client_secret,
'resource': 'https://management.azure.com/'
}
response = requests.post(url, data=payload)
response.raise_for_status()
return response.json().get('access_token')
# Trigger Fabric pipeline
def trigger_pipeline(subscription_id, resource_group, factory_name, pipeline_name, token, parameters={}):
url = f"https://management.azure.com/subscriptions/{subscription_id}/resourceGroups/{resource_group}/provide..."
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
response = requests.post(url, headers=headers, json=parameters)
response.raise_for_status()
return response.json()
# Fill in your Azure credentials
tenant_id = "your-tenant-id"
client_id = "your-client-id"
client_secret = "your-client-secret"
subscription_id = "your-subscription-id"
resource_group = "your-resource-group"
factory_name = "your-factory-name"
pipeline_name = "your-pipeline-name"
# Get the access token
token = get_access_token(tenant_id, client_id, client_secret)
# Trigger the pipeline
run_response = trigger_pipeline(subscription_id, resource_group, factory_name, pipeline_name, token)
print(f"Pipeline run ID: {run_response.get('runId')}")
Monitor the Pipeline (Optional): If needed, you can monitor the pipeline execution by polling the status using the ADF REST API.
Note:
Ensure Tables Are Created in Current Session: Since the pipeline is external, you may need to verify the tables are properly created within the same Spark session by running checks or integrating additional validation logic in the Notebook.
Ensure proper access and authentication for the REST API call.
If you found this post helpful, please consider marking it as "Accept as Solution" and select "Yes" if it was helpful. help other members find it more easily.
Thank you,
Pavan.
I want these sink tables are all enabled CDF in Fabric lakehouse.
User | Count |
---|---|
2 | |
2 | |
1 | |
1 | |
1 |
User | Count |
---|---|
4 | |
3 | |
2 | |
2 | |
2 |