This time we’re going bigger than ever. Fabric, Power BI, SQL, AI and more. We're covering it all. You won't want to miss it.
Learn moreGet Fabric Certified for FREE during AI Skills Fest. This week only. Secure your voucher now.
Hello Fabric Community,
I’ve been struggling with the complex execution context in Fabric (p.s. Who's Calling?).
I created a notebook to deploy a (template) Semantic Model from a central workspace to a target workspace using sempy_labs.labs.
When I execute this notebook using a user identity, it works ✅
When I execute the notebook inside a pipeline with a connection (authenticated using a Service Principal), it fails ❌ with the following error (though the SP has contributor access on both source and target WS):
FabricHTTPException
403 Forbidden for URL: https://api.fabric.microsoft.com/v1.0/myorg/groups/xxxxxxxx-yyyyy-zzzzz-000000/datasets
Headers: {'Content-Length': '0', 'Strict-Transport-Security': 'max-age=31536000; includeSubDomains', 'X-Frame-Options': 'deny', 'X-Content-Type-Options': 'nosniff', 'Access-Control-Expose-Headers': 'RequestId', 'RequestId': 'a9bd2bed-ff7b-4c1c-b28d-551727ebe685', 'Date': 'Mon, 20 Apr 2026 18:56:10 GMT'}
When I execute the notebook inside a data pipeline without a connection, it occasionally works, but mostly throws this error:
WorkspaceNotFoundException
Workspace 'xxxxxxxx-yyyyy-zzzzz-000000' not found.
import sempy_labs as labs
from sempy_labs import directlake
import sempy_labs.report as rep
import time
import logging
import sys
import os
import requests
# all the variables and parameters are collected and passed correctly (ignore them)
# Helper functions
def get_workspace_id(workspace_name: str | None):
if not workspace_name:
return None
df = fabric.list_workspaces()
row = df.loc[df["Name"] == workspace_name, "Id"]
return row.iloc[0] if not row.empty else None
def get_item_id(workspace_id, item_name, item_type):
df = fabric.list_items(workspace=workspace_id)
row = df.loc[
(df["Type"] == item_type) &
(df["Display Name"] == item_name),
"Id"
]
return row.iloc[0] if not row.empty else None
def semantic_model_exists(workspace_id, dataset_name):
return get_item_id(workspace_id, dataset_name, "SemanticModel") is not None
def report_exists(workspace_id, report_name):
return get_item_id(workspace_id, report_name, "Report") is not None
def stage(name):
logger.info(f"=== {name} ===")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s",
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger("deployment")
def main():
stage("Resolve Context")
target_workspace_id = workspaceId
lakehouse_id = lakehouseId
if not target_workspace_id:
raise ValueError(f"Workspace not found: {TARGET_WORKSPACE_NAME}")
if not lakehouse_id:
raise ValueError(f"Lakehouse not found: {LAKEHOUSE_NAME}")
stage("Check Existing Assets")
sm_exists = semantic_model_exists(
target_workspace_id,
TARGET_SEMANTICMODEL_NAME
)
rep_exists = report_exists(
target_workspace_id,
SOURCE_REPORT_NAME
)
logger.info(f"Semantic model exists: {sm_exists}")
logger.info(f"Report exists: {rep_exists}")
# --------------------------------------------------
# SHORT CIRCUIT
# --------------------------------------------------
if sm_exists and rep_exists:
logger.info("Nothing to deploy")
else:
if not sm_exists:
stage("Deploy Semantic Model")
labs.deploy_semantic_model(
source_dataset=SOURCE_DATASET_NAME,
source_workspace=SOURCE_WORKSPACE_NAME,
target_dataset=TARGET_SEMANTICMODEL_NAME,
target_workspace=TARGET_WORKSPACE_NAME,
refresh_target_dataset=False,
overwrite=True
)
directlake.update_direct_lake_model_connection(
dataset=TARGET_SEMANTICMODEL_NAME,
workspace=TARGET_WORKSPACE_NAME,
source=LAKEHOUSE_NAME,
source_type="Lakehouse",
source_workspace=TARGET_WORKSPACE_NAME,
use_sql_endpoint=False
)
# --------------------------------------------------
# CLONE REPORT
# --------------------------------------------------
if not rep_exists:
stage("Clone Report")
rep.clone_report(
report=SOURCE_REPORT_NAME,
cloned_report=SOURCE_REPORT_NAME,
workspace=SOURCE_WORKSPACE_NAME,
target_workspace=TARGET_WORKSPACE_NAME,
target_dataset=TARGET_SEMANTICMODEL_NAME,
target_dataset_workspace=TARGET_WORKSPACE_NAME
)
rep.report_rebind(
report=SOURCE_REPORT_NAME,
dataset=TARGET_SEMANTICMODEL_NAME,
report_workspace=TARGET_WORKSPACE_NAME,
dataset_workspace=TARGET_WORKSPACE_NAME
)
# --------------------------------------------------
# FINAL REFRESH
# --------------------------------------------------
stage("Refresh Semantic Model")
time.sleep(10)
try:
labs.refresh_semantic_model(
dataset=TARGET_SEMANTICMODEL_NAME,
workspace=TARGET_WORKSPACE_NAME
)
except Exception as e:
raise
logger.info("Deployment completed successfully")
Can someone help me how can I achieve the deployment by the pipeline execution, preferably, using SP. Because my pipeline triggers via Fabric API from an external application using SP.
Solved! Go to Solution.
I have implemented a workaround function to bypass editing the XMLA endpoint (directlake.update_direct_lake_model_connection()) using a REST-based approach.
Here is my function to update the direct lake connection
def update_direct_lake_onelake_connection(
dataset_name: str,
workspace_name: str,
lakehouse_name: str,
lakehouse_workspace_name: str,
) -> None:
"""
Patch the OneLake DFS URL in a Direct Lake semantic model's shared
expressions to point to the correct target Lakehouse.
Uses the Fabric REST API (getDefinition / updateDefinition) so that
Service Principal authentication is honoured correctly — bypassing
the XMLA endpoint where SP tokens are not forwarded by sempy_labs.
Parameters
----------
dataset_name : Display name of the target semantic model.
workspace_name : Display name of the workspace containing the model.
lakehouse_name : Display name of the target lakehouse.
lakehouse_workspace_name : Display name of the workspace containing the lakehouse.
"""
logger.info("Resolving IDs for Direct Lake connection update...")
workspace_id = get_workspace_id(workspace_name)
lakehouse_workspace_id = get_workspace_id(lakehouse_workspace_name)
lakehouse_id = get_lakehouse_id(lakehouse_workspace_id, lakehouse_name)
dataset_id = get_semantic_model_id(workspace_id, dataset_name)
if not dataset_id:
raise ValueError(f"Semantic model '{dataset_name}' not found in workspace '{workspace_name}'.")
logger.info(f" Workspace ID: {workspace_id}")
logger.info(f" Lakehouse Workspace ID: {lakehouse_workspace_id}")
logger.info(f" Lakehouse ID: {lakehouse_id}")
logger.info(f" Semantic Model ID: {dataset_id}")
# --------------------------------------------------
# Get model definition (handles sync 200 and async 202)
# --------------------------------------------------
r = requests.post(
f"{FABRIC_API_BASE}/workspaces/{workspace_id}/semanticModels/{dataset_id}/getDefinition",
headers=HEADERS,
)
r.raise_for_status()
if r.status_code == 202:
poll_result = poll_operation(r.headers["Location"], "getDefinition")
if "definition" in poll_result:
parts = poll_result["definition"]["parts"]
else:
result_url = poll_result.get("resourceLocation") or r.headers["Location"] + "/result"
result_r = requests.get(result_url, headers=HEADERS)
result_r.raise_for_status()
parts = result_r.json()["definition"]["parts"]
else:
parts = r.json()["definition"]["parts"]
# --------------------------------------------------
# Patch OneLake URL in all matching parts
# --------------------------------------------------
new_url = (
f"https://onelake.dfs.fabric.microsoft.com/"
f"{lakehouse_workspace_id}/{lakehouse_id}"
)
onelake_pattern = re.compile(
r"https://onelake\.dfs\.fabric\.microsoft\.com/"
r"[0-9a-fA-F\-]{36}/[0-9a-fA-F\-]{36}"
)
patched = False
for part in parts:
content = base64.b64decode(part["payload"]).decode("utf-8")
if "onelake.dfs.fabric.microsoft.com" not in content.lower():
continue
updated = onelake_pattern.sub(new_url, content)
if updated == content:
logger.warning(
f"Part '{part['path']}' contains OneLake URL but regex did not match — "
f"inspect manually:\n{content}"
)
continue
logger.info(f" Patched part: {part['path']}")
part["payload"] = base64.b64encode(updated.encode("utf-8")).decode("utf-8")
patched = True
if not patched:
raise ValueError(
f"No patchable OneLake URL found in model '{dataset_name}'. "
"Verify this is a Direct Lake model."
)
# --------------------------------------------------
# Push updated definition back (handles sync / async)
# --------------------------------------------------
r2 = requests.post(
f"{FABRIC_API_BASE}/workspaces/{workspace_id}/semanticModels/{dataset_id}/updateDefinition",
headers=HEADERS,
json={"definition": {"parts": parts}},
)
r2.raise_for_status()
if r2.status_code == 202:
poll_operation(r2.headers["Location"], "updateDefinition")
logger.info("Direct Lake OneLake connection updated successfully.")
I found an article from semantic-link-labs (semantic-link-labs/notebooks/Service Principal.ipynb at main · microsoft/semantic-link-labs · GitHub) and adjusted my function accordingly.
Everything works except the directlake.update_direct_lake_model_connection() with error:
def main():
stage("Resolve Context")
targetWorkspaceId = workspaceId
targetLakehouseId = lakehouseId
if not targetWorkspaceId:
raise ValueError(f"Workspace not found: {TARGET_WORKSPACE_NAME}")
if not targetLakehouseId:
raise ValueError(f"Lakehouse not found: {LAKEHOUSE_NAME}")
stage("Check Existing Assets")
smExists = semanticModelExists(
targetWorkspaceId,
TARGET_SEMANTICMODEL_NAME
)
repExists = reportExists(
targetWorkspaceId,
SOURCE_REPORT_NAME
)
logger.info(f"Semantic model exists: {smExists}")
logger.info(f"Report exists: {repExists}")
# --------------------------------------------------
# AUTHENTICATION BLOCK (IMPORTANT)
# --------------------------------------------------
with labs.service_principal_authentication(
key_vault_uri=key_vault_uri,
key_vault_tenant_id=key_vault_tenant_id,
key_vault_client_id=key_vault_client_id,
key_vault_client_secret=key_vault_client_secret):
# --------------------------------------------------
# SHORT CIRCUIT
# --------------------------------------------------
if smExists and repExists:
logger.info("No deployment required. Assets already exist.")
else:
# --------------------------------------------------
# DEPLOY SEMANTIC MODEL
# --------------------------------------------------
if not smExists:
stage("Deploy Semantic Model")
labs.deploy_semantic_model(
source_dataset=SOURCE_DATASET_NAME,
source_workspace=SOURCE_WORKSPACE_NAME,
target_dataset=TARGET_SEMANTICMODEL_NAME,
target_workspace=TARGET_WORKSPACE_NAME,
refresh_target_dataset=False,
overwrite=True
)
directlake.update_direct_lake_model_connection(
dataset=TARGET_SEMANTICMODEL_NAME,
workspace=TARGET_WORKSPACE_NAME,
source=LAKEHOUSE_NAME,
source_type="Lakehouse",
source_workspace=TARGET_WORKSPACE_NAME,
use_sql_endpoint=False
)
# --------------------------------------------------
# CLONE REPORT
# --------------------------------------------------
if not repExists:
stage("Clone Report")
rep.clone_report(
report=SOURCE_REPORT_NAME,
cloned_report=SOURCE_REPORT_NAME,
workspace=SOURCE_WORKSPACE_NAME,
target_workspace=TARGET_WORKSPACE_NAME,
target_dataset=TARGET_SEMANTICMODEL_NAME,
target_dataset_workspace=TARGET_WORKSPACE_NAME
)
rep.report_rebind(
report=SOURCE_REPORT_NAME,
dataset=TARGET_SEMANTICMODEL_NAME,
report_workspace=TARGET_WORKSPACE_NAME,
dataset_workspace=TARGET_WORKSPACE_NAME
)
# --------------------------------------------------
# FINAL REFRESH
# --------------------------------------------------
stage("Refresh Semantic Model")
time.sleep(10)
try:
labs.refresh_semantic_model(
dataset=TARGET_SEMANTICMODEL_NAME,
workspace=TARGET_WORKSPACE_NAME
)
except Exception as e:
logger.error(f"Semantic model refresh failed: {str(e)}")
raise
logger.info("Deployment completed successfully")
What am I missing here? how to update Direct Lake SM connection?
Hi @Kuladeep ,
Before confirming the exact cause, could you check a few things in your environment?
1. Capacity XMLA Endpoint
Go to Admin Portal → Capacity Settings → [your capacity] → XMLA Endpoint
Is it set to Read, Read/Write, or disabled?
update_direct_lake_model_connection uses XMLA internally — if it's not set to Read/Write, the Service Principal can't resolve the workspace and throws exactly that error.
2. Service Principal enabled in Tenant Settings
Go to Admin Portal → Tenant Settings → Developer Settings
Is "Allow service principals to use Power BI APIs" enabled? Does it apply to the whole organization or only to a specific Security Group? Is your SP included?
3. SP role in the TARGET workspace
In the destination workspace (6e33da9d...), what role does the Service Principal have assigned? It needs at least Member, ideally Admin.
If my comments helped solve your question, it would be great if you could like all comment and mark it as the accepted solution. It helps others with the same issue and also motivates me to keep contributing.
Thanks a lot, I really appreciate it.
Hi @arabalca,
Thanks for your reply.
1. Yes, XMLA Endpoint is set to `Read Write`
2. The Service principal settings enabled on tenant and the security group, which the SP is part of, is also added to the settings
3. The SP has Admin level permissions on the WS
Yet it fail.. Is there any other way to repoint S`M connection to new LH via API or notebookutils?
I have implemented a workaround function to bypass editing the XMLA endpoint (directlake.update_direct_lake_model_connection()) using a REST-based approach.
Here is my function to update the direct lake connection
def update_direct_lake_onelake_connection(
dataset_name: str,
workspace_name: str,
lakehouse_name: str,
lakehouse_workspace_name: str,
) -> None:
"""
Patch the OneLake DFS URL in a Direct Lake semantic model's shared
expressions to point to the correct target Lakehouse.
Uses the Fabric REST API (getDefinition / updateDefinition) so that
Service Principal authentication is honoured correctly — bypassing
the XMLA endpoint where SP tokens are not forwarded by sempy_labs.
Parameters
----------
dataset_name : Display name of the target semantic model.
workspace_name : Display name of the workspace containing the model.
lakehouse_name : Display name of the target lakehouse.
lakehouse_workspace_name : Display name of the workspace containing the lakehouse.
"""
logger.info("Resolving IDs for Direct Lake connection update...")
workspace_id = get_workspace_id(workspace_name)
lakehouse_workspace_id = get_workspace_id(lakehouse_workspace_name)
lakehouse_id = get_lakehouse_id(lakehouse_workspace_id, lakehouse_name)
dataset_id = get_semantic_model_id(workspace_id, dataset_name)
if not dataset_id:
raise ValueError(f"Semantic model '{dataset_name}' not found in workspace '{workspace_name}'.")
logger.info(f" Workspace ID: {workspace_id}")
logger.info(f" Lakehouse Workspace ID: {lakehouse_workspace_id}")
logger.info(f" Lakehouse ID: {lakehouse_id}")
logger.info(f" Semantic Model ID: {dataset_id}")
# --------------------------------------------------
# Get model definition (handles sync 200 and async 202)
# --------------------------------------------------
r = requests.post(
f"{FABRIC_API_BASE}/workspaces/{workspace_id}/semanticModels/{dataset_id}/getDefinition",
headers=HEADERS,
)
r.raise_for_status()
if r.status_code == 202:
poll_result = poll_operation(r.headers["Location"], "getDefinition")
if "definition" in poll_result:
parts = poll_result["definition"]["parts"]
else:
result_url = poll_result.get("resourceLocation") or r.headers["Location"] + "/result"
result_r = requests.get(result_url, headers=HEADERS)
result_r.raise_for_status()
parts = result_r.json()["definition"]["parts"]
else:
parts = r.json()["definition"]["parts"]
# --------------------------------------------------
# Patch OneLake URL in all matching parts
# --------------------------------------------------
new_url = (
f"https://onelake.dfs.fabric.microsoft.com/"
f"{lakehouse_workspace_id}/{lakehouse_id}"
)
onelake_pattern = re.compile(
r"https://onelake\.dfs\.fabric\.microsoft\.com/"
r"[0-9a-fA-F\-]{36}/[0-9a-fA-F\-]{36}"
)
patched = False
for part in parts:
content = base64.b64decode(part["payload"]).decode("utf-8")
if "onelake.dfs.fabric.microsoft.com" not in content.lower():
continue
updated = onelake_pattern.sub(new_url, content)
if updated == content:
logger.warning(
f"Part '{part['path']}' contains OneLake URL but regex did not match — "
f"inspect manually:\n{content}"
)
continue
logger.info(f" Patched part: {part['path']}")
part["payload"] = base64.b64encode(updated.encode("utf-8")).decode("utf-8")
patched = True
if not patched:
raise ValueError(
f"No patchable OneLake URL found in model '{dataset_name}'. "
"Verify this is a Direct Lake model."
)
# --------------------------------------------------
# Push updated definition back (handles sync / async)
# --------------------------------------------------
r2 = requests.post(
f"{FABRIC_API_BASE}/workspaces/{workspace_id}/semanticModels/{dataset_id}/updateDefinition",
headers=HEADERS,
json={"definition": {"parts": parts}},
)
r2.raise_for_status()
if r2.status_code == 202:
poll_operation(r2.headers["Location"], "updateDefinition")
logger.info("Direct Lake OneLake connection updated successfully.")
Check out the June 2026 Fabric update to learn about new features.
Sign up to receive a private message when registration opens and key events begin.