Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Data Days is here! Join us now for 60+ days of learning, challenges, and connection. Learn more

Reply
Yazdan
Advocate II
Advocate II

Detecting the User Who Manually Triggered a Fabric Pipeline or Notebook

Hi everyone,

 

I wanted to share a useful approach I’ve been using in Microsoft Fabric to detect which user has manually triggered a pipeline or notebook, as this can be very helpful for logging, auditing, and security purposes — especially when combined with a System Run Report or other operational dashboards.


Context

While Fabric now correctly returns @pipeline().TriggerType = "1" when a pipeline is triggered by a schedule, manual runs still require additional logic if you want to capture who initiated them.

For example, when I run a pipeline or notebook manually, I log the triggering user using the following discovery logic.


Code Example

Below is a Python snippet you can run inside a Fabric Notebook to detect who triggered the run:

 

from datetime import datetime, timedelta, date
import pytz
import json
import re
from pyspark.sql.functions import col, udf, to_timestamp, lit, expr, regexp_extract, to_date
from pyspark.sql.types import StringType
from notebookutils import mssparkutils, fs
import time
import random
import hashlib
from collections import defaultdict
from pyspark.sql import SparkSession
from sempy import fabric
from sempy.fabric import FabricRestClient
import requests
 
 
# ---------------- User discovery ----------------
def get_current_job_instance_id():
    ws_id = fabric.get_notebook_workspace_id()
    nb_id = fabric.get_artifact_id()
    client = FabricRestClient()
    resp = client.get(f"/v1/workspaces/{ws_id}/items/{nb_id}/jobs/instances").json()
    runs = resp.get("value", [])
    if not runs:
        return None, None
    active = [r for r in runs if str(r.get("status","")).lower() in {"inprogress","notstarted"}]
    pick = max(active or runs, key=lambda r: r.get("startTimeUtc",""))
    return pick.get("id"), pick

def _pluck_userish(obj):
    """
    Try common shapes:
      {createdBy: {displayName, userPrincipalName, id}}
      {requestedBy: {...}}, {submittedBy: {...}}, {initiatedBy: {...}}
      {owner: {...}}, {principal: {...}}
    """
    if not isinstance(obj, dict):
        return None
    candidates = ["createdBy", "requestedBy", "submittedBy", "initiatedBy", "owner", "principal"]
    for k in candidates:
        if k in obj and isinstance(obj[k], dict):
            u = obj[k]
            return {
                "displayName": u.get("displayName"),
                "userPrincipalName": u.get("userPrincipalName") or u.get("upn"),
                "id": u.get("id") or u.get("objectId")
            }
    return None

def find_user_from_job(details):
    # check top-level common fields
    user = _pluck_userish(details)
    if user:
        return user
    # scan nested dicts one level down
    for v in details.values():
        if isinstance(v, dict):
            user = _pluck_userish(v)
            if user:
                return user
    return None

def get_current_user_via_graph():
    """
    Resolve the current interactive user via Microsoft Graph /me.
    Requires that the Fabric runtime can mint a token for Graph.
    """
    try:
        from notebookutils import mssparkutils  # available in Fabric runtime
        token = mssparkutils.credentials.getToken("https://graph.microsoft.com/")
        if not token:
            return None
        headers = {"Authorization": f"Bearer {token}"}
        r = requests.get("https://graph.microsoft.com/v1.0/me?$select=displayName,userPrincipalName,id", headers=headers, timeout=10)
        if r.status_code == 200:
            me = r.json()
            return {
                "displayName": me.get("displayName"),
                "userPrincipalName": me.get("userPrincipalName"),
                "id": me.get("id"),
                "_source": "graph"
            }
        else:
            print("Graph /me call failed:", r.status_code, r.text[:300])
    except Exception as e:
        print("Graph lookup failed:", e)
    return None

def get_user_from_runtime_context():
    """
    Fallback for manual runs: read runner from Fabric runtime context.
    Uses keys observed in your tenant: userName (display) and userId (object id).
    """
    try:
        from notebookutils import mssparkutils
        ctx = mssparkutils.runtime.context or {}
        display_name = ctx.get("userName")
        object_id    = ctx.get("userId")
        if display_name or object_id:
            return {"displayName": display_name, "userPrincipalName": None, "id": object_id, "_source": "runtime_context"}
    except Exception:
        pass
    return None

# ---------------- Run & print ----------------
job_instance_id, details = get_current_job_instance_id()
print("Job Instance ID:", job_instance_id)

run_started_by_name = None  # capture for logging later
if details:
    invoke_type = details.get("invokeType")
    print("InvokeType:", invoke_type)

    # -------- Try to resolve the user who ran it --------
    user = find_user_from_job(details)
    if user and (user.get("displayName") or user.get("userPrincipalName")):
        run_started_by_name = (user.get('displayName') or user.get('userPrincipalName'))
        print("Run started by (from job):",
              f"{user.get('displayName') or ''}".strip(),
              f"({user.get('userPrincipalName')})" if user.get("userPrincipalName") else "")
    else:
        # Fallback 1: runtime context (works in your tenant)
        user = get_user_from_runtime_context()
        if user and (user.get("displayName") or user.get("userPrincipalName")):
            run_started_by_name = (user.get('displayName') or user.get('userPrincipalName'))
            print("Run started by (from context):",
                  f"{user.get('displayName') or ''}".strip(),
                  f"({user.get('userPrincipalName')})" if user.get("userPrincipalName") else "")
        else:
            # Fallback 2: ask Graph who is running this notebook interactively
            user = get_current_user_via_graph()
            if user:
                run_started_by_name = (user.get('displayName') or user.get('userPrincipalName'))
                print("Run started by (from Graph):",
                      f"{user.get('displayName') or ''}".strip(),
                      f"({user.get('userPrincipalName')})" if user.get("userPrincipalName") else "")
            else:
                print("Run started by: Unknown (no user info in job metadata, runtime context, or Graph)")

######################################################################################
# ----------- Prepare Log Parameters (safe) -----------
if trigger_type.lower() == "pipelineactivity":
    trigger_type = "Orchestrator Pipeline"
    trigger_name = trigger_name
elif trigger_type.lower() == "1":
    trigger_type = "Scheduled"
    trigger_name = "Scheduling Module"
elif trigger_type.lower() == "manual":
    trigger_type = "Manual"
    trigger_name = run_started_by_name
else:
    trigger_type = trigger_type
    trigger_name = "UNKNOWN"

 

This code safely checks:

  • The job instance metadata

  • The Fabric runtime context

  • And as a fallback, Microsoft Graph /me

    It returns the display name or UPN of the user who manually triggered the run.


    Trigger Type Mapping

    if trigger_type.lower() == "pipelineactivity":
        trigger_type = "Orchestrator Pipeline"
        trigger_name = trigger_name
    elif trigger_type.lower() == "1":
        trigger_type = "Scheduled"
        trigger_name = "Scheduling Module"
    elif trigger_type.lower() == "manual":
        trigger_type = "Manual"
        trigger_name = run_started_by_name
    else:
        trigger_type = trigger_type
        trigger_name = "UNKNOWN"

     

    This logic helps ensure your logs clearly differentiate between manual, scheduled, and orchestrated pipeline runs — with proper user tracking for manual triggers.


    Benefit

    By logging this user information into your System Run Report or operational tables, you can generate insights like:

    • Who manually executed pipelines or notebooks

    • When and how often manual interventions occurred

    • Scheduled vs manual run ratios

    • Security auditing for sensitive pipelines


      Update

      Recently, I noticed that Fabric has fixed the previous bug — now, when triggered by a schedule, the value of @pipeline().TriggerType correctly returns "1" 🎉.

      So with this fix and the manual user detection logic above, we can now fully distinguish all run types dynamically at runtime.


      Hope this helps others who want more robust runtime tracking and reporting in Fabric!

       

      Cheers,

       

       

1 ACCEPTED SOLUTION
Yazdan
Advocate II
Advocate II

Update – Finally Solved for Both Pipelines and Notebooks

After further investigation and testing, I finally found a reliable way to determine who manually triggered both Microsoft Fabric Pipelines and Notebooks.

The original approach I shared above worked in some scenarios by using:

  • Job instance metadata

  • Runtime context

  • Microsoft Graph /me

However, those methods only identify the currently executing identity and can be unreliable for auditing purposes, especially when notebooks or pipelines are triggered by different users.

Final Solution

The most reliable approach is to use the Power BI / Fabric Admin Activity Events API:

GET /admin/activityevents

This API exposes Fabric audit events and allows us to identify the user who actually initiated a run.

Pipeline Detection Logic

  1. Get the exact pipeline run start time using:

queryactivityruns
  1. Retrieve audit events around that execution time.

  2. Find the matching:

Operation = RunArtifact
ObjectId  = Pipeline ID
  1. Return:

UserId

which contains the triggering user's email address (UPN).

Notebook Detection Logic

Standalone notebooks do not have a queryactivityruns endpoint, so the process is slightly different:

  1. Get the current notebook Job Instance.

  2. Resolve the notebook start time.

  3. Query Admin Activity Events.

  4. Find matching notebook execution events:

Operation = StartRunNotebook
or
Operation = RunArtifact
  1. Match on Notebook ID.

  2. Return the triggering user's email address.

Converting Email Address to Display Name

The audit log returns:

user@company.com

To make logs more readable, I then call Microsoft Graph:

GET /users/{userPrincipalName}

and retrieve:

displayName
userPrincipalName

which allows logging values such as:

User Name (user@company.com)

instead of only the email address.

Important Discovery – Audit Log Visibility Delay

During testing I discovered that Fabric audit events are not always immediately available through the Admin Activity Events API.

For example:

Notebook Start Time : 05:21:41
Audit Event Time    : 05:21:51

The audit event itself was generated almost immediately.

However, the event was not visible through the API until approximately 5–7 minutes later.

Because of this, a simple one-time lookup can fail even though the audit event already exists internally.

Final Reliability Improvement

To make the solution production-ready, I implemented a retry mechanism:

retry_count=10
retry_interval_seconds=60

The lookup now retries every minute until the matching audit event becomes available.

This ensures that:

  • Pipelines resolve the correct triggering user.

  • Notebooks resolve the correct triggering user.

  • Delayed audit ingestion does not cause false "Unknown" results.

  • Security and operational reporting remain accurate.

Result

The final implementation can now reliably populate logging tables and System Run Reports with:

Trigger Type
Trigger Name
Run Started By

for:

  • Scheduled runs

  • Manual runs

  • Orchestrator pipeline runs

  • Standalone notebook executions

This has significantly improved operational auditing and troubleshooting in our Fabric environment.

Hopefully this helps others looking for a reliable way to track who actually executed Fabric workloads.

 

One important thing I found is that audit events are not always visible immediately through the Admin Activity Events API. The event timestamp can be accurate, but the API visibility can be delayed by several minutes. Because of that, I added a retry loop.

Below is the sanitised sample code structure:

import requests
import pandas as pd
import time

# ------------------------------------------------------------------
# Service Principal Inputs - replace with secure values
# ------------------------------------------------------------------
tenant_id = "<TENANT_ID>"
client_id = "<CLIENT_ID>"
client_secret = "<CLIENT_SECRET>"


def get_powerbi_admin_token(tenant_id, client_id, client_secret):
    token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"

    token_body = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret,
        "scope": "https://analysis.windows.net/powerbi/api/.default"
    }

    response = requests.post(
        token_url,
        data=token_body,
        headers={"Content-Type": "application/x-www-form-urlencoded"}
    )

    response.raise_for_status()
    return response.json()["access_token"]


def get_graph_display_name_from_upn(tenant_id, client_id, client_secret, upn):
    if not upn:
        return "Unknown"

    token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"

    token_body = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret,
        "scope": "https://graph.microsoft.com/.default"
    }

    token_response = requests.post(
        token_url,
        data=token_body,
        headers={"Content-Type": "application/x-www-form-urlencoded"},
        timeout=30
    )

    if token_response.status_code != 200:
        return upn

    graph_token = token_response.json()["access_token"]

    graph_response = requests.get(
        f"https://graph.microsoft.com/v1.0/users/{upn}?$select=displayName,userPrincipalName",
        headers={"Authorization": f"Bearer {graph_token}"},
        timeout=30
    )

    if graph_response.status_code != 200:
        return upn

    user = graph_response.json()
    display_name = user.get("displayName")
    user_principal_name = user.get("userPrincipalName") or upn

    if display_name:
        return f"{display_name} ({user_principal_name})"

    return user_principal_name


def get_manual_pipeline_triggered_by(
    tenant_id,
    client_id,
    client_secret,
    workspace_id,
    pipeline_id,
    pipeline_run_id,
    window_minutes=15,
    retry_count=10,
    retry_interval_seconds=60
):
    """
    Resolve who manually triggered a Fabric pipeline run.

    Logic:
    1. Use Fabric queryactivityruns API to find the actual pipeline run start time.
    2. Use Power BI Admin Activity Events API to search audit events around that time.
    3. Find Operation = RunArtifact where ObjectId = pipeline_id.
    4. Retry because Admin Activity Events API visibility can be delayed.
    5. Return UserId from the matching audit event.
    """

    fabric_token = mssparkutils.credentials.getToken(
        "https://api.fabric.microsoft.com"
    )

    fabric_headers = {
        "Authorization": f"Bearer {fabric_token}",
        "Content-Type": "application/json"
    }

    activity_runs_url = (
        f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}"
        f"/datapipelines/pipelineruns/{pipeline_run_id}/queryactivityruns"
    )

    activity_payload = {
        "lastUpdatedAfter": "2026-01-01T00:00:00Z",
        "lastUpdatedBefore": "2026-12-31T23:59:59Z",
        "filters": [],
        "orderBy": [
            {
                "orderBy": "ActivityRunStart",
                "order": "ASC"
            }
        ]
    }

    activity_response = requests.post(
        activity_runs_url,
        headers=fabric_headers,
        json=activity_payload
    )

    activity_response.raise_for_status()

    activity_rows = activity_response.json().get("value", [])

    if not activity_rows:
        return None

    df_activities = pd.json_normalize(activity_rows)

    start_col = None

    for col in ["activityRunStart", "ActivityRunStart", "startTime", "StartTime"]:
        if col in df_activities.columns:
            start_col = col
            break

    if not start_col:
        return None

    run_start_utc = pd.to_datetime(
        df_activities[start_col],
        utc=True,
        errors="coerce"
    ).min()

    if pd.isna(run_start_utc):
        return None

    powerbi_token = get_powerbi_admin_token(
        tenant_id,
        client_id,
        client_secret
    )

    powerbi_headers = {
        "Authorization": f"Bearer {powerbi_token}",
        "Content-Type": "application/json"
    }

    for retry_attempt in range(1, retry_count + 1):

        print(f"Pipeline audit lookup retry {retry_attempt} of {retry_count}")

        search_start_dt = run_start_utc - pd.Timedelta(minutes=window_minutes)
        search_end_dt = run_start_utc + pd.Timedelta(minutes=window_minutes)

        events = query_admin_activity_events(
            powerbi_headers,
            search_start_dt,
            search_end_dt
        )

        candidates = []

        for event in events:
            operation = event.get("Operation", "")

            object_id = str(
                event.get("ObjectId")
                or event.get("ItemId")
                or event.get("ArtifactId")
                or ""
            ).lower()

            if operation == "RunArtifact" and object_id == pipeline_id.lower():
                event_creation_dt = pd.to_datetime(
                    event.get("CreationTime"),
                    utc=True,
                    errors="coerce"
                )

                if pd.notna(event_creation_dt):
                    event["_time_diff_seconds"] = abs(
                        (event_creation_dt - run_start_utc).total_seconds()
                    )

                    candidates.append(event)

        if candidates:
            best_event = sorted(
                candidates,
                key=lambda x: x["_time_diff_seconds"]
            )[0]

            return best_event.get("UserId")

        if retry_attempt < retry_count:
            time.sleep(retry_interval_seconds)

    return None


def get_manual_notebook_triggered_by(
    tenant_id,
    client_id,
    client_secret,
    notebook_id,
    notebook_start_time_utc,
    window_minutes=15,
    max_allowed_time_diff_seconds=60,
    retry_count=10,
    retry_interval_seconds=60
):
    """
    Resolve who manually triggered a standalone Fabric notebook run.

    Logic:
    1. Use the notebook job start time.
    2. Use Power BI Admin Activity Events API to search audit events around that time.
    3. Match Operation = StartRunNotebook or RunArtifact.
    4. Match ObjectId = notebook_id.
    5. Retry because audit events may not be immediately visible.
    6. Reject old matches using max_allowed_time_diff_seconds.
    """

    run_start_utc = pd.to_datetime(
        notebook_start_time_utc,
        utc=True,
        errors="coerce"
    )

    if pd.isna(run_start_utc):
        return None

    powerbi_token = get_powerbi_admin_token(
        tenant_id,
        client_id,
        client_secret
    )

    powerbi_headers = {
        "Authorization": f"Bearer {powerbi_token}",
        "Content-Type": "application/json"
    }

    for retry_attempt in range(1, retry_count + 1):

        print(f"Notebook audit lookup retry {retry_attempt} of {retry_count}")

        search_start_dt = run_start_utc - pd.Timedelta(minutes=window_minutes)
        search_end_dt = run_start_utc + pd.Timedelta(minutes=window_minutes)

        events = query_admin_activity_events(
            powerbi_headers,
            search_start_dt,
            search_end_dt
        )

        candidates = []

        for event in events:
            operation = event.get("Operation", "")

            object_id = str(
                event.get("ObjectId")
                or event.get("ItemId")
                or event.get("ArtifactId")
                or ""
            ).lower()

            if (
                operation in ["StartRunNotebook", "RunArtifact"]
                and object_id == notebook_id.lower()
            ):
                event_creation_dt = pd.to_datetime(
                    event.get("CreationTime"),
                    utc=True,
                    errors="coerce"
                )

                if pd.notna(event_creation_dt):
                    event["_time_diff_seconds"] = abs(
                        (event_creation_dt - run_start_utc).total_seconds()
                    )

                    event["_operation_priority"] = {
                        "StartRunNotebook": 1,
                        "RunArtifact": 2
                    }.get(operation, 99)

                    candidates.append(event)

        if candidates:
            best_event = sorted(
                candidates,
                key=lambda x: (
                    x["_operation_priority"],
                    x["_time_diff_seconds"]
                )
            )[0]

            if best_event["_time_diff_seconds"] <= max_allowed_time_diff_seconds:
                return best_event.get("UserId")

        if retry_attempt < retry_count:
            time.sleep(retry_interval_seconds)

    return None


def query_admin_activity_events(powerbi_headers, search_start_dt, search_end_dt):
    """
    Query Power BI / Fabric Admin Activity Events API.
    Splits the search if the time window crosses a UTC date boundary.
    """

    search_windows = []

    if search_start_dt.date() == search_end_dt.date():
        search_windows.append((search_start_dt, search_end_dt))
    else:
        end_of_first_day = (
            search_start_dt.normalize()
            + pd.Timedelta(days=1)
            - pd.Timedelta(seconds=1)
        )

        start_of_second_day = search_end_dt.normalize()

        search_windows.append((search_start_dt, end_of_first_day))
        search_windows.append((start_of_second_day, search_end_dt))

    events = []

    for window_start, window_end in search_windows:

        search_start_str = window_start.strftime("%Y-%m-%dT%H:%M:%S.000Z")
        search_end_str = window_end.strftime("%Y-%m-%dT%H:%M:%S.000Z")

        activity_events_url = (
            "https://api.powerbi.com/v1.0/myorg/admin/activityevents"
            f"?startDateTime='{search_start_str}'"
            f"&endDateTime='{search_end_str}'"
        )

        response = requests.get(
            activity_events_url,
            headers=powerbi_headers
        )

        response.raise_for_status()

        data = response.json()
        events.extend(data.get("activityEventEntities", []))

        continuation_uri = data.get("continuationUri")

        while continuation_uri:
            continuation_response = requests.get(
                continuation_uri,
                headers=powerbi_headers
            )

            continuation_response.raise_for_status()

            continuation_data = continuation_response.json()
            events.extend(continuation_data.get("activityEventEntities", []))

            continuation_uri = continuation_data.get("continuationUri")

    return events

Required permissions:

  • Fabric access to query pipeline activity runs
  • Power BI / Fabric Admin Activity Events API access
  • Microsoft Graph application permission such as User.Read.All with admin consent, if display name resolution is required

Final output example:

Trigger Type : Manual
Trigger Name : User Name (user@company.com)

This gives a much more reliable result than using runtime context or Graph /me, because those can return the current execution identity rather than the user who actually triggered the pipeline or notebook.

View solution in original post

2 REPLIES 2
Yazdan
Advocate II
Advocate II

Update – Finally Solved for Both Pipelines and Notebooks

After further investigation and testing, I finally found a reliable way to determine who manually triggered both Microsoft Fabric Pipelines and Notebooks.

The original approach I shared above worked in some scenarios by using:

  • Job instance metadata

  • Runtime context

  • Microsoft Graph /me

However, those methods only identify the currently executing identity and can be unreliable for auditing purposes, especially when notebooks or pipelines are triggered by different users.

Final Solution

The most reliable approach is to use the Power BI / Fabric Admin Activity Events API:

GET /admin/activityevents

This API exposes Fabric audit events and allows us to identify the user who actually initiated a run.

Pipeline Detection Logic

  1. Get the exact pipeline run start time using:

queryactivityruns
  1. Retrieve audit events around that execution time.

  2. Find the matching:

Operation = RunArtifact
ObjectId  = Pipeline ID
  1. Return:

UserId

which contains the triggering user's email address (UPN).

Notebook Detection Logic

Standalone notebooks do not have a queryactivityruns endpoint, so the process is slightly different:

  1. Get the current notebook Job Instance.

  2. Resolve the notebook start time.

  3. Query Admin Activity Events.

  4. Find matching notebook execution events:

Operation = StartRunNotebook
or
Operation = RunArtifact
  1. Match on Notebook ID.

  2. Return the triggering user's email address.

Converting Email Address to Display Name

The audit log returns:

user@company.com

To make logs more readable, I then call Microsoft Graph:

GET /users/{userPrincipalName}

and retrieve:

displayName
userPrincipalName

which allows logging values such as:

User Name (user@company.com)

instead of only the email address.

Important Discovery – Audit Log Visibility Delay

During testing I discovered that Fabric audit events are not always immediately available through the Admin Activity Events API.

For example:

Notebook Start Time : 05:21:41
Audit Event Time    : 05:21:51

The audit event itself was generated almost immediately.

However, the event was not visible through the API until approximately 5–7 minutes later.

Because of this, a simple one-time lookup can fail even though the audit event already exists internally.

Final Reliability Improvement

To make the solution production-ready, I implemented a retry mechanism:

retry_count=10
retry_interval_seconds=60

The lookup now retries every minute until the matching audit event becomes available.

This ensures that:

  • Pipelines resolve the correct triggering user.

  • Notebooks resolve the correct triggering user.

  • Delayed audit ingestion does not cause false "Unknown" results.

  • Security and operational reporting remain accurate.

Result

The final implementation can now reliably populate logging tables and System Run Reports with:

Trigger Type
Trigger Name
Run Started By

for:

  • Scheduled runs

  • Manual runs

  • Orchestrator pipeline runs

  • Standalone notebook executions

This has significantly improved operational auditing and troubleshooting in our Fabric environment.

Hopefully this helps others looking for a reliable way to track who actually executed Fabric workloads.

 

One important thing I found is that audit events are not always visible immediately through the Admin Activity Events API. The event timestamp can be accurate, but the API visibility can be delayed by several minutes. Because of that, I added a retry loop.

Below is the sanitised sample code structure:

import requests
import pandas as pd
import time

# ------------------------------------------------------------------
# Service Principal Inputs - replace with secure values
# ------------------------------------------------------------------
tenant_id = "<TENANT_ID>"
client_id = "<CLIENT_ID>"
client_secret = "<CLIENT_SECRET>"


def get_powerbi_admin_token(tenant_id, client_id, client_secret):
    token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"

    token_body = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret,
        "scope": "https://analysis.windows.net/powerbi/api/.default"
    }

    response = requests.post(
        token_url,
        data=token_body,
        headers={"Content-Type": "application/x-www-form-urlencoded"}
    )

    response.raise_for_status()
    return response.json()["access_token"]


def get_graph_display_name_from_upn(tenant_id, client_id, client_secret, upn):
    if not upn:
        return "Unknown"

    token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"

    token_body = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret,
        "scope": "https://graph.microsoft.com/.default"
    }

    token_response = requests.post(
        token_url,
        data=token_body,
        headers={"Content-Type": "application/x-www-form-urlencoded"},
        timeout=30
    )

    if token_response.status_code != 200:
        return upn

    graph_token = token_response.json()["access_token"]

    graph_response = requests.get(
        f"https://graph.microsoft.com/v1.0/users/{upn}?$select=displayName,userPrincipalName",
        headers={"Authorization": f"Bearer {graph_token}"},
        timeout=30
    )

    if graph_response.status_code != 200:
        return upn

    user = graph_response.json()
    display_name = user.get("displayName")
    user_principal_name = user.get("userPrincipalName") or upn

    if display_name:
        return f"{display_name} ({user_principal_name})"

    return user_principal_name


def get_manual_pipeline_triggered_by(
    tenant_id,
    client_id,
    client_secret,
    workspace_id,
    pipeline_id,
    pipeline_run_id,
    window_minutes=15,
    retry_count=10,
    retry_interval_seconds=60
):
    """
    Resolve who manually triggered a Fabric pipeline run.

    Logic:
    1. Use Fabric queryactivityruns API to find the actual pipeline run start time.
    2. Use Power BI Admin Activity Events API to search audit events around that time.
    3. Find Operation = RunArtifact where ObjectId = pipeline_id.
    4. Retry because Admin Activity Events API visibility can be delayed.
    5. Return UserId from the matching audit event.
    """

    fabric_token = mssparkutils.credentials.getToken(
        "https://api.fabric.microsoft.com"
    )

    fabric_headers = {
        "Authorization": f"Bearer {fabric_token}",
        "Content-Type": "application/json"
    }

    activity_runs_url = (
        f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}"
        f"/datapipelines/pipelineruns/{pipeline_run_id}/queryactivityruns"
    )

    activity_payload = {
        "lastUpdatedAfter": "2026-01-01T00:00:00Z",
        "lastUpdatedBefore": "2026-12-31T23:59:59Z",
        "filters": [],
        "orderBy": [
            {
                "orderBy": "ActivityRunStart",
                "order": "ASC"
            }
        ]
    }

    activity_response = requests.post(
        activity_runs_url,
        headers=fabric_headers,
        json=activity_payload
    )

    activity_response.raise_for_status()

    activity_rows = activity_response.json().get("value", [])

    if not activity_rows:
        return None

    df_activities = pd.json_normalize(activity_rows)

    start_col = None

    for col in ["activityRunStart", "ActivityRunStart", "startTime", "StartTime"]:
        if col in df_activities.columns:
            start_col = col
            break

    if not start_col:
        return None

    run_start_utc = pd.to_datetime(
        df_activities[start_col],
        utc=True,
        errors="coerce"
    ).min()

    if pd.isna(run_start_utc):
        return None

    powerbi_token = get_powerbi_admin_token(
        tenant_id,
        client_id,
        client_secret
    )

    powerbi_headers = {
        "Authorization": f"Bearer {powerbi_token}",
        "Content-Type": "application/json"
    }

    for retry_attempt in range(1, retry_count + 1):

        print(f"Pipeline audit lookup retry {retry_attempt} of {retry_count}")

        search_start_dt = run_start_utc - pd.Timedelta(minutes=window_minutes)
        search_end_dt = run_start_utc + pd.Timedelta(minutes=window_minutes)

        events = query_admin_activity_events(
            powerbi_headers,
            search_start_dt,
            search_end_dt
        )

        candidates = []

        for event in events:
            operation = event.get("Operation", "")

            object_id = str(
                event.get("ObjectId")
                or event.get("ItemId")
                or event.get("ArtifactId")
                or ""
            ).lower()

            if operation == "RunArtifact" and object_id == pipeline_id.lower():
                event_creation_dt = pd.to_datetime(
                    event.get("CreationTime"),
                    utc=True,
                    errors="coerce"
                )

                if pd.notna(event_creation_dt):
                    event["_time_diff_seconds"] = abs(
                        (event_creation_dt - run_start_utc).total_seconds()
                    )

                    candidates.append(event)

        if candidates:
            best_event = sorted(
                candidates,
                key=lambda x: x["_time_diff_seconds"]
            )[0]

            return best_event.get("UserId")

        if retry_attempt < retry_count:
            time.sleep(retry_interval_seconds)

    return None


def get_manual_notebook_triggered_by(
    tenant_id,
    client_id,
    client_secret,
    notebook_id,
    notebook_start_time_utc,
    window_minutes=15,
    max_allowed_time_diff_seconds=60,
    retry_count=10,
    retry_interval_seconds=60
):
    """
    Resolve who manually triggered a standalone Fabric notebook run.

    Logic:
    1. Use the notebook job start time.
    2. Use Power BI Admin Activity Events API to search audit events around that time.
    3. Match Operation = StartRunNotebook or RunArtifact.
    4. Match ObjectId = notebook_id.
    5. Retry because audit events may not be immediately visible.
    6. Reject old matches using max_allowed_time_diff_seconds.
    """

    run_start_utc = pd.to_datetime(
        notebook_start_time_utc,
        utc=True,
        errors="coerce"
    )

    if pd.isna(run_start_utc):
        return None

    powerbi_token = get_powerbi_admin_token(
        tenant_id,
        client_id,
        client_secret
    )

    powerbi_headers = {
        "Authorization": f"Bearer {powerbi_token}",
        "Content-Type": "application/json"
    }

    for retry_attempt in range(1, retry_count + 1):

        print(f"Notebook audit lookup retry {retry_attempt} of {retry_count}")

        search_start_dt = run_start_utc - pd.Timedelta(minutes=window_minutes)
        search_end_dt = run_start_utc + pd.Timedelta(minutes=window_minutes)

        events = query_admin_activity_events(
            powerbi_headers,
            search_start_dt,
            search_end_dt
        )

        candidates = []

        for event in events:
            operation = event.get("Operation", "")

            object_id = str(
                event.get("ObjectId")
                or event.get("ItemId")
                or event.get("ArtifactId")
                or ""
            ).lower()

            if (
                operation in ["StartRunNotebook", "RunArtifact"]
                and object_id == notebook_id.lower()
            ):
                event_creation_dt = pd.to_datetime(
                    event.get("CreationTime"),
                    utc=True,
                    errors="coerce"
                )

                if pd.notna(event_creation_dt):
                    event["_time_diff_seconds"] = abs(
                        (event_creation_dt - run_start_utc).total_seconds()
                    )

                    event["_operation_priority"] = {
                        "StartRunNotebook": 1,
                        "RunArtifact": 2
                    }.get(operation, 99)

                    candidates.append(event)

        if candidates:
            best_event = sorted(
                candidates,
                key=lambda x: (
                    x["_operation_priority"],
                    x["_time_diff_seconds"]
                )
            )[0]

            if best_event["_time_diff_seconds"] <= max_allowed_time_diff_seconds:
                return best_event.get("UserId")

        if retry_attempt < retry_count:
            time.sleep(retry_interval_seconds)

    return None


def query_admin_activity_events(powerbi_headers, search_start_dt, search_end_dt):
    """
    Query Power BI / Fabric Admin Activity Events API.
    Splits the search if the time window crosses a UTC date boundary.
    """

    search_windows = []

    if search_start_dt.date() == search_end_dt.date():
        search_windows.append((search_start_dt, search_end_dt))
    else:
        end_of_first_day = (
            search_start_dt.normalize()
            + pd.Timedelta(days=1)
            - pd.Timedelta(seconds=1)
        )

        start_of_second_day = search_end_dt.normalize()

        search_windows.append((search_start_dt, end_of_first_day))
        search_windows.append((start_of_second_day, search_end_dt))

    events = []

    for window_start, window_end in search_windows:

        search_start_str = window_start.strftime("%Y-%m-%dT%H:%M:%S.000Z")
        search_end_str = window_end.strftime("%Y-%m-%dT%H:%M:%S.000Z")

        activity_events_url = (
            "https://api.powerbi.com/v1.0/myorg/admin/activityevents"
            f"?startDateTime='{search_start_str}'"
            f"&endDateTime='{search_end_str}'"
        )

        response = requests.get(
            activity_events_url,
            headers=powerbi_headers
        )

        response.raise_for_status()

        data = response.json()
        events.extend(data.get("activityEventEntities", []))

        continuation_uri = data.get("continuationUri")

        while continuation_uri:
            continuation_response = requests.get(
                continuation_uri,
                headers=powerbi_headers
            )

            continuation_response.raise_for_status()

            continuation_data = continuation_response.json()
            events.extend(continuation_data.get("activityEventEntities", []))

            continuation_uri = continuation_data.get("continuationUri")

    return events

Required permissions:

  • Fabric access to query pipeline activity runs
  • Power BI / Fabric Admin Activity Events API access
  • Microsoft Graph application permission such as User.Read.All with admin consent, if display name resolution is required

Final output example:

Trigger Type : Manual
Trigger Name : User Name (user@company.com)

This gives a much more reliable result than using runtime context or Graph /me, because those can return the current execution identity rather than the user who actually triggered the pipeline or notebook.

v-saisrao-msft
Community Support
Community Support

Hi @Yazdan,

Thanks for sharing your insights on detecting which user manually triggered a pipeline. Definitely consider turning this into a blog post so others can benefit from your experience more easily

Power BI Community Blog - Microsoft Fabric Community

We also appreciate you sharing this with the community

 

Thank you.

Helpful resources

Announcements
Fabric Data Days is here Carousel

Fabric Data Days 2026

Don't miss out on Data Days, June 15 through August 7. Learn Fabric, Power BI, SQL, AI and more.

June Fabric Update Carousel

Fabric Monthly Update - June 2026

Check out the June 2026 Fabric update to learn about new features.