Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Get Fabric Certified for FREE during AI Skills Fest. This week only. Secure your voucher now.

Reply
Kuladeep
Advocate I
Advocate I

Notebook with sempy_labs failed when executed in DataPipeline

Hello Fabric Community,

I’ve been struggling with the complex execution context in Fabric (p.s. Who's Calling?).

I created a notebook to deploy a (template) Semantic Model from a central workspace to a target workspace using sempy_labs.labs.

When I execute this notebook using a user identity, it works

When I execute the notebook inside a pipeline with a connection (authenticated using a Service Principal), it fails with the following error (though the SP has contributor access on both source and target WS):

FabricHTTPException
403 Forbidden for URL: https://api.fabric.microsoft.com/v1.0/myorg/groups/xxxxxxxx-yyyyy-zzzzz-000000/datasets
Headers: {'Content-Length': '0', 'Strict-Transport-Security': 'max-age=31536000; includeSubDomains', 'X-Frame-Options': 'deny', 'X-Content-Type-Options': 'nosniff', 'Access-Control-Expose-Headers': 'RequestId', 'RequestId': 'a9bd2bed-ff7b-4c1c-b28d-551727ebe685', 'Date': 'Mon, 20 Apr 2026 18:56:10 GMT'}

When I execute the notebook inside a data pipeline without a connection, it occasionally works, but mostly throws this error:

WorkspaceNotFoundException
Workspace 'xxxxxxxx-yyyyy-zzzzz-000000' not found.

 
Below is the code:
import sempy_labs as labs
from sempy_labs import directlake
import sempy_labs.report as rep
import time
import logging
import sys
import os
import requests

# all the variables and parameters are collected and passed correctly (ignore them)

# Helper functions

def get_workspace_id(workspace_name: str | None):
    if not workspace_name:
        return None
    df = fabric.list_workspaces()
    row = df.loc[df["Name"] == workspace_name, "Id"]
    return row.iloc[0] if not row.empty else None

def get_item_id(workspace_id, item_name, item_type):
    df = fabric.list_items(workspace=workspace_id)
    row = df.loc[
        (df["Type"] == item_type) &
        (df["Display Name"] == item_name),
        "Id"
    ]
    return row.iloc[0] if not row.empty else None

def semantic_model_exists(workspace_id, dataset_name):
    return get_item_id(workspace_id, dataset_name, "SemanticModel") is not None


def report_exists(workspace_id, report_name):
    return get_item_id(workspace_id, report_name, "Report") is not None

def stage(name):
    logger.info(f"=== {name} ===")

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)]
)

logger = logging.getLogger("deployment")

def main():

    stage("Resolve Context")

    target_workspace_id = workspaceId
    lakehouse_id = lakehouseId

    if not target_workspace_id:
        raise ValueError(f"Workspace not found: {TARGET_WORKSPACE_NAME}")

    if not lakehouse_id:
        raise ValueError(f"Lakehouse not found: {LAKEHOUSE_NAME}")

    stage("Check Existing Assets")

    sm_exists = semantic_model_exists(
        target_workspace_id,
        TARGET_SEMANTICMODEL_NAME
    )

    rep_exists = report_exists(
        target_workspace_id,
        SOURCE_REPORT_NAME
    )

    logger.info(f"Semantic model exists: {sm_exists}")
    logger.info(f"Report exists: {rep_exists}")

    # --------------------------------------------------
    # SHORT CIRCUIT
    # --------------------------------------------------

    if sm_exists and rep_exists:
        logger.info("Nothing to deploy")
    else:

        if not sm_exists:
            stage("Deploy Semantic Model")

            labs.deploy_semantic_model(
                source_dataset=SOURCE_DATASET_NAME,
                source_workspace=SOURCE_WORKSPACE_NAME,
                target_dataset=TARGET_SEMANTICMODEL_NAME,
                target_workspace=TARGET_WORKSPACE_NAME,
                refresh_target_dataset=False,
                overwrite=True
            )

            directlake.update_direct_lake_model_connection(
                dataset=TARGET_SEMANTICMODEL_NAME,
                workspace=TARGET_WORKSPACE_NAME,
                source=LAKEHOUSE_NAME,
                source_type="Lakehouse",
                source_workspace=TARGET_WORKSPACE_NAME,
                use_sql_endpoint=False
            )

        # --------------------------------------------------
        # CLONE REPORT
        # --------------------------------------------------

        if not rep_exists:
            stage("Clone Report")

            rep.clone_report(
                report=SOURCE_REPORT_NAME,
                cloned_report=SOURCE_REPORT_NAME,
                workspace=SOURCE_WORKSPACE_NAME,
                target_workspace=TARGET_WORKSPACE_NAME,
                target_dataset=TARGET_SEMANTICMODEL_NAME,
                target_dataset_workspace=TARGET_WORKSPACE_NAME
            )

            rep.report_rebind(
                report=SOURCE_REPORT_NAME,
                dataset=TARGET_SEMANTICMODEL_NAME,
                report_workspace=TARGET_WORKSPACE_NAME,
                dataset_workspace=TARGET_WORKSPACE_NAME
            )

    # --------------------------------------------------
    # FINAL REFRESH
    # --------------------------------------------------

    stage("Refresh Semantic Model")

    time.sleep(10)

    try:
        labs.refresh_semantic_model(
        dataset=TARGET_SEMANTICMODEL_NAME,
        workspace=TARGET_WORKSPACE_NAME
        )
    except Exception as e:
        raise


    logger.info("Deployment completed successfully")
​

Can someone help me how can I achieve the deployment by the pipeline execution, preferably, using SP. Because my pipeline triggers via Fabric API from an external application using SP.

1 ACCEPTED SOLUTION

I have implemented a workaround function to bypass editing the XMLA endpoint (directlake.update_direct_lake_model_connection()) using a REST-based approach.

Here is my function to update the direct lake connection

def update_direct_lake_onelake_connection(
    dataset_name: str,
    workspace_name: str,
    lakehouse_name: str,
    lakehouse_workspace_name: str,
) -> None:
    """
    Patch the OneLake DFS URL in a Direct Lake semantic model's shared
    expressions to point to the correct target Lakehouse.

    Uses the Fabric REST API (getDefinition / updateDefinition) so that
    Service Principal authentication is honoured correctly — bypassing
    the XMLA endpoint where SP tokens are not forwarded by sempy_labs.

    Parameters
    ----------
    dataset_name             : Display name of the target semantic model.
    workspace_name           : Display name of the workspace containing the model.
    lakehouse_name           : Display name of the target lakehouse.
    lakehouse_workspace_name : Display name of the workspace containing the lakehouse.
    """
    logger.info("Resolving IDs for Direct Lake connection update...")

    workspace_id          = get_workspace_id(workspace_name)
    lakehouse_workspace_id = get_workspace_id(lakehouse_workspace_name)
    lakehouse_id          = get_lakehouse_id(lakehouse_workspace_id, lakehouse_name)
    dataset_id            = get_semantic_model_id(workspace_id, dataset_name)

    if not dataset_id:
        raise ValueError(f"Semantic model '{dataset_name}' not found in workspace '{workspace_name}'.")

    logger.info(f"  Workspace ID:           {workspace_id}")
    logger.info(f"  Lakehouse Workspace ID: {lakehouse_workspace_id}")
    logger.info(f"  Lakehouse ID:           {lakehouse_id}")
    logger.info(f"  Semantic Model ID:      {dataset_id}")

    # --------------------------------------------------
    # Get model definition (handles sync 200 and async 202)
    # --------------------------------------------------
    r = requests.post(
        f"{FABRIC_API_BASE}/workspaces/{workspace_id}/semanticModels/{dataset_id}/getDefinition",
        headers=HEADERS,
    )
    r.raise_for_status()

    if r.status_code == 202:
        poll_result = poll_operation(r.headers["Location"], "getDefinition")
        if "definition" in poll_result:
            parts = poll_result["definition"]["parts"]
        else:
            result_url = poll_result.get("resourceLocation") or r.headers["Location"] + "/result"
            result_r = requests.get(result_url, headers=HEADERS)
            result_r.raise_for_status()
            parts = result_r.json()["definition"]["parts"]
    else:
        parts = r.json()["definition"]["parts"]

    # --------------------------------------------------
    # Patch OneLake URL in all matching parts
    # --------------------------------------------------
    new_url = (
        f"https://onelake.dfs.fabric.microsoft.com/"
        f"{lakehouse_workspace_id}/{lakehouse_id}"
    )
    onelake_pattern = re.compile(
        r"https://onelake\.dfs\.fabric\.microsoft\.com/"
        r"[0-9a-fA-F\-]{36}/[0-9a-fA-F\-]{36}"
    )
    patched = False

    for part in parts:
        content = base64.b64decode(part["payload"]).decode("utf-8")

        if "onelake.dfs.fabric.microsoft.com" not in content.lower():
            continue

        updated = onelake_pattern.sub(new_url, content)

        if updated == content:
            logger.warning(
                f"Part '{part['path']}' contains OneLake URL but regex did not match — "
                f"inspect manually:\n{content}"
            )
            continue

        logger.info(f"  Patched part: {part['path']}")
        part["payload"] = base64.b64encode(updated.encode("utf-8")).decode("utf-8")
        patched = True

    if not patched:
        raise ValueError(
            f"No patchable OneLake URL found in model '{dataset_name}'. "
            "Verify this is a Direct Lake model."
        )

    # --------------------------------------------------
    # Push updated definition back (handles sync / async)
    # --------------------------------------------------
    r2 = requests.post(
        f"{FABRIC_API_BASE}/workspaces/{workspace_id}/semanticModels/{dataset_id}/updateDefinition",
        headers=HEADERS,
        json={"definition": {"parts": parts}},
    )
    r2.raise_for_status()

    if r2.status_code == 202:
        poll_operation(r2.headers["Location"], "updateDefinition")

    logger.info("Direct Lake OneLake connection updated successfully.")

View solution in original post

4 REPLIES 4
Kuladeep
Advocate I
Advocate I

I found an article from semantic-link-labs (semantic-link-labs/notebooks/Service Principal.ipynb at main · microsoft/semantic-link-labs · GitHub) and adjusted my function accordingly.
Everything works except the directlake.update_direct_lake_model_connection() with error:  

ConnectionException
The specified Power BI workspace ('6e33da9d-a067-4519-a893-b9aa0d867d95') is not found. Technical Details: RootActivityId: 4b87e95d-5bd9-4418-87e4-a320b18fda4b Workspace: '6e33da9d-a067-4519-a893-b9aa0d867d95' at Microsoft.AnalysisServices.ConnectionInfo.ResolveHTTPConnectionProperties

 

 

def main():

    stage("Resolve Context")

    targetWorkspaceId = workspaceId
    targetLakehouseId = lakehouseId

    if not targetWorkspaceId:
        raise ValueError(f"Workspace not found: {TARGET_WORKSPACE_NAME}")

    if not targetLakehouseId:
        raise ValueError(f"Lakehouse not found: {LAKEHOUSE_NAME}")

    stage("Check Existing Assets")

    smExists = semanticModelExists(
        targetWorkspaceId,
        TARGET_SEMANTICMODEL_NAME
    )

    repExists = reportExists(
        targetWorkspaceId,
        SOURCE_REPORT_NAME
    )

    logger.info(f"Semantic model exists: {smExists}")
    logger.info(f"Report exists: {repExists}")

    # --------------------------------------------------
    # AUTHENTICATION BLOCK (IMPORTANT)
    # --------------------------------------------------

    with labs.service_principal_authentication(
        key_vault_uri=key_vault_uri, 
    key_vault_tenant_id=key_vault_tenant_id,
    key_vault_client_id=key_vault_client_id,
    key_vault_client_secret=key_vault_client_secret):

        # --------------------------------------------------
        # SHORT CIRCUIT
        # --------------------------------------------------

        if smExists and repExists:
            logger.info("No deployment required. Assets already exist.")

        else:

            # --------------------------------------------------
            # DEPLOY SEMANTIC MODEL
            # --------------------------------------------------

            if not smExists:
                stage("Deploy Semantic Model")

                labs.deploy_semantic_model(
                    source_dataset=SOURCE_DATASET_NAME,
                    source_workspace=SOURCE_WORKSPACE_NAME,
                    target_dataset=TARGET_SEMANTICMODEL_NAME,
                    target_workspace=TARGET_WORKSPACE_NAME,
                    refresh_target_dataset=False,
                    overwrite=True
                )
                
                directlake.update_direct_lake_model_connection(
                    dataset=TARGET_SEMANTICMODEL_NAME,
                    workspace=TARGET_WORKSPACE_NAME,
                    source=LAKEHOUSE_NAME,
                    source_type="Lakehouse",
                    source_workspace=TARGET_WORKSPACE_NAME,
                    use_sql_endpoint=False
                )

            # --------------------------------------------------
            # CLONE REPORT
            # --------------------------------------------------

            if not repExists:
                stage("Clone Report")

                rep.clone_report(
                    report=SOURCE_REPORT_NAME,
                    cloned_report=SOURCE_REPORT_NAME,
                    workspace=SOURCE_WORKSPACE_NAME,
                    target_workspace=TARGET_WORKSPACE_NAME,
                    target_dataset=TARGET_SEMANTICMODEL_NAME,
                    target_dataset_workspace=TARGET_WORKSPACE_NAME
                )

                rep.report_rebind(
                    report=SOURCE_REPORT_NAME,
                    dataset=TARGET_SEMANTICMODEL_NAME,
                    report_workspace=TARGET_WORKSPACE_NAME,
                    dataset_workspace=TARGET_WORKSPACE_NAME
                )

        # --------------------------------------------------
        # FINAL REFRESH
        # --------------------------------------------------

        stage("Refresh Semantic Model")

        time.sleep(10)

        try:
            labs.refresh_semantic_model(
                dataset=TARGET_SEMANTICMODEL_NAME,
                workspace=TARGET_WORKSPACE_NAME
            )
        except Exception as e:
            logger.error(f"Semantic model refresh failed: {str(e)}")
            raise
                
    logger.info("Deployment completed successfully")

 

 

What am I missing here? how to update Direct Lake SM connection?

arabalca
Impactful Individual
Impactful Individual

Hi @Kuladeep ,

 

Before confirming the exact cause, could you check a few things in your environment?

1. Capacity XMLA Endpoint

Go to Admin Portal → Capacity Settings → [your capacity] → XMLA Endpoint

Is it set to Read, Read/Write, or disabled?

update_direct_lake_model_connection uses XMLA internally — if it's not set to Read/Write, the Service Principal can't resolve the workspace and throws exactly that error.

2. Service Principal enabled in Tenant Settings

Go to Admin Portal → Tenant Settings → Developer Settings

Is "Allow service principals to use Power BI APIs" enabled? Does it apply to the whole organization or only to a specific Security Group? Is your SP included?

3. SP role in the TARGET workspace

In the destination workspace (6e33da9d...), what role does the Service Principal have assigned? It needs at least Member, ideally Admin.

 

If my comments helped solve your question, it would be great if you could like all comment and mark it as the accepted solution. It helps others with the same issue and also motivates me to keep contributing.

 

 

 

Thanks a lot, I really appreciate it.

Hi @arabalca,
Thanks for your reply.

1. Yes, XMLA Endpoint is set to `Read Write`

2. The Service principal settings enabled on tenant and the security group, which the SP is part of, is also added to the settings

3. The SP has Admin level permissions on the WS

Yet it fail.. Is there any other way to repoint S`M connection to new LH via API or notebookutils?

 

I have implemented a workaround function to bypass editing the XMLA endpoint (directlake.update_direct_lake_model_connection()) using a REST-based approach.

Here is my function to update the direct lake connection

def update_direct_lake_onelake_connection(
    dataset_name: str,
    workspace_name: str,
    lakehouse_name: str,
    lakehouse_workspace_name: str,
) -> None:
    """
    Patch the OneLake DFS URL in a Direct Lake semantic model's shared
    expressions to point to the correct target Lakehouse.

    Uses the Fabric REST API (getDefinition / updateDefinition) so that
    Service Principal authentication is honoured correctly — bypassing
    the XMLA endpoint where SP tokens are not forwarded by sempy_labs.

    Parameters
    ----------
    dataset_name             : Display name of the target semantic model.
    workspace_name           : Display name of the workspace containing the model.
    lakehouse_name           : Display name of the target lakehouse.
    lakehouse_workspace_name : Display name of the workspace containing the lakehouse.
    """
    logger.info("Resolving IDs for Direct Lake connection update...")

    workspace_id          = get_workspace_id(workspace_name)
    lakehouse_workspace_id = get_workspace_id(lakehouse_workspace_name)
    lakehouse_id          = get_lakehouse_id(lakehouse_workspace_id, lakehouse_name)
    dataset_id            = get_semantic_model_id(workspace_id, dataset_name)

    if not dataset_id:
        raise ValueError(f"Semantic model '{dataset_name}' not found in workspace '{workspace_name}'.")

    logger.info(f"  Workspace ID:           {workspace_id}")
    logger.info(f"  Lakehouse Workspace ID: {lakehouse_workspace_id}")
    logger.info(f"  Lakehouse ID:           {lakehouse_id}")
    logger.info(f"  Semantic Model ID:      {dataset_id}")

    # --------------------------------------------------
    # Get model definition (handles sync 200 and async 202)
    # --------------------------------------------------
    r = requests.post(
        f"{FABRIC_API_BASE}/workspaces/{workspace_id}/semanticModels/{dataset_id}/getDefinition",
        headers=HEADERS,
    )
    r.raise_for_status()

    if r.status_code == 202:
        poll_result = poll_operation(r.headers["Location"], "getDefinition")
        if "definition" in poll_result:
            parts = poll_result["definition"]["parts"]
        else:
            result_url = poll_result.get("resourceLocation") or r.headers["Location"] + "/result"
            result_r = requests.get(result_url, headers=HEADERS)
            result_r.raise_for_status()
            parts = result_r.json()["definition"]["parts"]
    else:
        parts = r.json()["definition"]["parts"]

    # --------------------------------------------------
    # Patch OneLake URL in all matching parts
    # --------------------------------------------------
    new_url = (
        f"https://onelake.dfs.fabric.microsoft.com/"
        f"{lakehouse_workspace_id}/{lakehouse_id}"
    )
    onelake_pattern = re.compile(
        r"https://onelake\.dfs\.fabric\.microsoft\.com/"
        r"[0-9a-fA-F\-]{36}/[0-9a-fA-F\-]{36}"
    )
    patched = False

    for part in parts:
        content = base64.b64decode(part["payload"]).decode("utf-8")

        if "onelake.dfs.fabric.microsoft.com" not in content.lower():
            continue

        updated = onelake_pattern.sub(new_url, content)

        if updated == content:
            logger.warning(
                f"Part '{part['path']}' contains OneLake URL but regex did not match — "
                f"inspect manually:\n{content}"
            )
            continue

        logger.info(f"  Patched part: {part['path']}")
        part["payload"] = base64.b64encode(updated.encode("utf-8")).decode("utf-8")
        patched = True

    if not patched:
        raise ValueError(
            f"No patchable OneLake URL found in model '{dataset_name}'. "
            "Verify this is a Direct Lake model."
        )

    # --------------------------------------------------
    # Push updated definition back (handles sync / async)
    # --------------------------------------------------
    r2 = requests.post(
        f"{FABRIC_API_BASE}/workspaces/{workspace_id}/semanticModels/{dataset_id}/updateDefinition",
        headers=HEADERS,
        json={"definition": {"parts": parts}},
    )
    r2.raise_for_status()

    if r2.status_code == 202:
        poll_operation(r2.headers["Location"], "updateDefinition")

    logger.info("Direct Lake OneLake connection updated successfully.")

Helpful resources

Announcements
June Fabric Update Carousel

Fabric Monthly Update - June 2026

Check out the June 2026 Fabric update to learn about new features.

Fabric SQL PBI Data Days

Data Days 2026 coming soon!

Sign up to receive a private message when registration opens and key events begin.

New to Fabric survey Carousel

New to Fabric Survey

If you have recently started exploring Fabric, we'd love to hear how it's going. Your feedback can help with product improvements.