<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Detecting the User Who Manually Triggered a Fabric Pipeline or Notebook in Data Engineering</title>
    <link>https://community.fabric.microsoft.com/t5/Data-Engineering/Detecting-the-User-Who-Manually-Triggered-a-Fabric-Pipeline-or/m-p/5191802#M16495</link>
    <description>&lt;H3&gt;Update – Finally Solved for Both Pipelines and Notebooks&lt;/H3&gt;&lt;P&gt;After further investigation and testing, I finally found a reliable way to determine who manually triggered both Microsoft Fabric Pipelines and Notebooks.&lt;/P&gt;&lt;P&gt;The original approach I shared above worked in some scenarios by using:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;Job instance metadata&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Runtime context&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Microsoft Graph /me&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;However, those methods only identify the currently executing identity and can be unreliable for auditing purposes, especially when notebooks or pipelines are triggered by different users.&lt;/P&gt;&lt;H3&gt;Final Solution&lt;/H3&gt;&lt;P&gt;The most reliable approach is to use the &lt;STRONG&gt;Power BI / Fabric Admin Activity Events API&lt;/STRONG&gt;:&lt;/P&gt;&lt;PRE&gt;GET /admin/activityevents&lt;/PRE&gt;&lt;P&gt;This API exposes Fabric audit events and allows us to identify the user who actually initiated a run.&lt;/P&gt;&lt;H3&gt;Pipeline Detection Logic&lt;/H3&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;Get the exact pipeline run start time using:&lt;/P&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;PRE&gt;queryactivityruns&lt;/PRE&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;Retrieve audit events around that execution time.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Find the matching:&lt;/P&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;PRE&gt;Operation = RunArtifact
ObjectId  = Pipeline ID&lt;/PRE&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;Return:&lt;/P&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;PRE&gt;UserId&lt;/PRE&gt;&lt;P&gt;which contains the triggering user's email address (UPN).&lt;/P&gt;&lt;H3&gt;Notebook Detection Logic&lt;/H3&gt;&lt;P&gt;Standalone notebooks do not have a queryactivityruns endpoint, so the process is slightly different:&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;Get the current notebook Job Instance.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Resolve the notebook start time.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Query Admin Activity Events.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Find matching notebook execution events:&lt;/P&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;PRE&gt;Operation = StartRunNotebook
or
Operation = RunArtifact&lt;/PRE&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;Match on Notebook ID.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Return the triggering user's email address.&lt;/P&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;H3&gt;Converting Email Address to Display Name&lt;/H3&gt;&lt;P&gt;The audit log returns:&lt;/P&gt;&lt;PRE&gt;user@company.com&lt;/PRE&gt;&lt;P&gt;To make logs more readable, I then call Microsoft Graph:&lt;/P&gt;&lt;PRE&gt;GET /users/{userPrincipalName}&lt;/PRE&gt;&lt;P&gt;and retrieve:&lt;/P&gt;&lt;PRE&gt;displayName
userPrincipalName&lt;/PRE&gt;&lt;P&gt;which allows logging values such as:&lt;/P&gt;&lt;PRE&gt;User Name (user@company.com)&lt;/PRE&gt;&lt;P&gt;instead of only the email address.&lt;/P&gt;&lt;H3&gt;Important Discovery – Audit Log Visibility Delay&lt;/H3&gt;&lt;P&gt;During testing I discovered that Fabric audit events are not always immediately available through the Admin Activity Events API.&lt;/P&gt;&lt;P&gt;For example:&lt;/P&gt;&lt;PRE&gt;Notebook Start Time : 05:21:41
Audit Event Time    : 05:21:51&lt;/PRE&gt;&lt;P&gt;The audit event itself was generated almost immediately.&lt;/P&gt;&lt;P&gt;However, the event was not visible through the API until approximately 5–7 minutes later.&lt;/P&gt;&lt;P&gt;Because of this, a simple one-time lookup can fail even though the audit event already exists internally.&lt;/P&gt;&lt;H3&gt;Final Reliability Improvement&lt;/H3&gt;&lt;P&gt;To make the solution production-ready, I implemented a retry mechanism:&lt;/P&gt;&lt;PRE&gt;retry_count=10
retry_interval_seconds=60&lt;/PRE&gt;&lt;P&gt;The lookup now retries every minute until the matching audit event becomes available.&lt;/P&gt;&lt;P&gt;This ensures that:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;Pipelines resolve the correct triggering user.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Notebooks resolve the correct triggering user.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Delayed audit ingestion does not cause false "Unknown" results.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Security and operational reporting remain accurate.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;H3&gt;Result&lt;/H3&gt;&lt;P&gt;The final implementation can now reliably populate logging tables and System Run Reports with:&lt;/P&gt;&lt;PRE&gt;Trigger Type
Trigger Name
Run Started By&lt;/PRE&gt;&lt;P&gt;for:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;Scheduled runs&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Manual runs&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Orchestrator pipeline runs&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Standalone notebook executions&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;This has significantly improved operational auditing and troubleshooting in our Fabric environment.&lt;/P&gt;&lt;P&gt;Hopefully this helps others looking for a reliable way to track who actually executed Fabric workloads.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P class=""&gt;&lt;SPAN&gt;One important thing I found is that audit events are not always visible immediately through the Admin Activity Events API. The event timestamp can be accurate, but the API visibility can be delayed by several minutes. Because of that, I added a retry loop.&lt;/SPAN&gt;&lt;/P&gt;&lt;P class=""&gt;&lt;SPAN&gt;Below is the sanitised sample code structure:&lt;/SPAN&gt;&lt;/P&gt;&lt;PRE&gt;&lt;SPAN&gt;import requests
import pandas as pd
import time

# ------------------------------------------------------------------
# Service Principal Inputs - replace with secure values
# ------------------------------------------------------------------
tenant_id = "&amp;lt;TENANT_ID&amp;gt;"
client_id = "&amp;lt;CLIENT_ID&amp;gt;"
client_secret = "&amp;lt;CLIENT_SECRET&amp;gt;"


def get_powerbi_admin_token(tenant_id, client_id, client_secret):
    token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"

    token_body = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret,
        "scope": "https://analysis.windows.net/powerbi/api/.default"
    }

    response = requests.post(
        token_url,
        data=token_body,
        headers={"Content-Type": "application/x-www-form-urlencoded"}
    )

    response.raise_for_status()
    return response.json()&lt;SPAN class=""&gt;["access_token"]&lt;/SPAN&gt;


def get_graph_display_name_from_upn(tenant_id, client_id, client_secret, upn):
    if not upn:
        return "Unknown"

    token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"

    token_body = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret,
        "scope": "https://graph.microsoft.com/.default"
    }

    token_response = requests.post(
        token_url,
        data=token_body,
        headers={"Content-Type": "application/x-www-form-urlencoded"},
        timeout=30
    )

    if token_response.status_code != 200:
        return upn

    graph_token = token_response.json()&lt;SPAN class=""&gt;["access_token"]&lt;/SPAN&gt;

    graph_response = requests.get(
        f"https://graph.microsoft.com/v1.0/users/{upn}?$select=displayName,userPrincipalName",
        headers={"Authorization": f"Bearer {graph_token}"},
        timeout=30
    )

    if graph_response.status_code != 200:
        return upn

    user = graph_response.json()
    display_name = user.get("displayName")
    user_principal_name = user.get("userPrincipalName") or upn

    if display_name:
        return f"{display_name} ({user_principal_name})"

    return user_principal_name


def get_manual_pipeline_triggered_by(
    tenant_id,
    client_id,
    client_secret,
    workspace_id,
    pipeline_id,
    pipeline_run_id,
    window_minutes=15,
    retry_count=10,
    retry_interval_seconds=60
):
    """
    Resolve who manually triggered a Fabric pipeline run.

    Logic:
    1. Use Fabric queryactivityruns API to find the actual pipeline run start time.
    2. Use Power BI Admin Activity Events API to search audit events around that time.
    3. Find Operation = RunArtifact where ObjectId = pipeline_id.
    4. Retry because Admin Activity Events API visibility can be delayed.
    5. Return UserId from the matching audit event.
    """

    fabric_token = mssparkutils.credentials.getToken(
        "https://api.fabric.microsoft.com"
    )

    fabric_headers = {
        "Authorization": f"Bearer {fabric_token}",
        "Content-Type": "application/json"
    }

    activity_runs_url = (
        f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}"
        f"/datapipelines/pipelineruns/{pipeline_run_id}/queryactivityruns"
    )

    activity_payload = {
        "lastUpdatedAfter": "2026-01-01T00:00:00Z",
        "lastUpdatedBefore": "2026-12-31T23:59:59Z",
        "filters": [],
        "orderBy": &lt;SPAN class=""&gt;[
            {
                "orderBy": "ActivityRunStart",
                "order": "ASC"
            }
        ]&lt;/SPAN&gt;
    }

    activity_response = requests.post(
        activity_runs_url,
        headers=fabric_headers,
        json=activity_payload
    )

    activity_response.raise_for_status()

    activity_rows = activity_response.json().get("value", [])

    if not activity_rows:
        return None

    df_activities = pd.json_normalize(activity_rows)

    start_col = None

    for col in &lt;SPAN class=""&gt;["activityRunStart", "ActivityRunStart", "startTime", "StartTime"]&lt;/SPAN&gt;:
        if col in df_activities.columns:
            start_col = col
            break

    if not start_col:
        return None

    run_start_utc = pd.to_datetime(
        df_activities&lt;SPAN class=""&gt;[start_col]&lt;/SPAN&gt;,
        utc=True,
        errors="coerce"
    ).min()

    if pd.isna(run_start_utc):
        return None

    powerbi_token = get_powerbi_admin_token(
        tenant_id,
        client_id,
        client_secret
    )

    powerbi_headers = {
        "Authorization": f"Bearer {powerbi_token}",
        "Content-Type": "application/json"
    }

    for retry_attempt in range(1, retry_count + 1):

        print(f"Pipeline audit lookup retry {retry_attempt} of {retry_count}")

        search_start_dt = run_start_utc - pd.Timedelta(minutes=window_minutes)
        search_end_dt = run_start_utc + pd.Timedelta(minutes=window_minutes)

        events = query_admin_activity_events(
            powerbi_headers,
            search_start_dt,
            search_end_dt
        )

        candidates = []

        for event in events:
            operation = event.get("Operation", "")

            object_id = str(
                event.get("ObjectId")
                or event.get("ItemId")
                or event.get("ArtifactId")
                or ""
            ).lower()

            if operation == "RunArtifact" and object_id == pipeline_id.lower():
                event_creation_dt = pd.to_datetime(
                    event.get("CreationTime"),
                    utc=True,
                    errors="coerce"
                )

                if pd.notna(event_creation_dt):
                    event&lt;SPAN class=""&gt;["_time_diff_seconds"]&lt;/SPAN&gt; = abs(
                        (event_creation_dt - run_start_utc).total_seconds()
                    )

                    candidates.append(event)

        if candidates:
            best_event = sorted(
                candidates,
                key=lambda x: x&lt;SPAN class=""&gt;["_time_diff_seconds"]&lt;/SPAN&gt;
            )&lt;SPAN class=""&gt;[0]&lt;/SPAN&gt;

            return best_event.get("UserId")

        if retry_attempt &amp;lt; retry_count:
            time.sleep(retry_interval_seconds)

    return None


def get_manual_notebook_triggered_by(
    tenant_id,
    client_id,
    client_secret,
    notebook_id,
    notebook_start_time_utc,
    window_minutes=15,
    max_allowed_time_diff_seconds=60,
    retry_count=10,
    retry_interval_seconds=60
):
    """
    Resolve who manually triggered a standalone Fabric notebook run.

    Logic:
    1. Use the notebook job start time.
    2. Use Power BI Admin Activity Events API to search audit events around that time.
    3. Match Operation = StartRunNotebook or RunArtifact.
    4. Match ObjectId = notebook_id.
    5. Retry because audit events may not be immediately visible.
    6. Reject old matches using max_allowed_time_diff_seconds.
    """

    run_start_utc = pd.to_datetime(
        notebook_start_time_utc,
        utc=True,
        errors="coerce"
    )

    if pd.isna(run_start_utc):
        return None

    powerbi_token = get_powerbi_admin_token(
        tenant_id,
        client_id,
        client_secret
    )

    powerbi_headers = {
        "Authorization": f"Bearer {powerbi_token}",
        "Content-Type": "application/json"
    }

    for retry_attempt in range(1, retry_count + 1):

        print(f"Notebook audit lookup retry {retry_attempt} of {retry_count}")

        search_start_dt = run_start_utc - pd.Timedelta(minutes=window_minutes)
        search_end_dt = run_start_utc + pd.Timedelta(minutes=window_minutes)

        events = query_admin_activity_events(
            powerbi_headers,
            search_start_dt,
            search_end_dt
        )

        candidates = []

        for event in events:
            operation = event.get("Operation", "")

            object_id = str(
                event.get("ObjectId")
                or event.get("ItemId")
                or event.get("ArtifactId")
                or ""
            ).lower()

            if (
                operation in &lt;SPAN class=""&gt;["StartRunNotebook", "RunArtifact"]&lt;/SPAN&gt;
                and object_id == notebook_id.lower()
            ):
                event_creation_dt = pd.to_datetime(
                    event.get("CreationTime"),
                    utc=True,
                    errors="coerce"
                )

                if pd.notna(event_creation_dt):
                    event&lt;SPAN class=""&gt;["_time_diff_seconds"]&lt;/SPAN&gt; = abs(
                        (event_creation_dt - run_start_utc).total_seconds()
                    )

                    event&lt;SPAN class=""&gt;["_operation_priority"]&lt;/SPAN&gt; = {
                        "StartRunNotebook": 1,
                        "RunArtifact": 2
                    }.get(operation, 99)

                    candidates.append(event)

        if candidates:
            best_event = sorted(
                candidates,
                key=lambda x: (
                    x&lt;SPAN class=""&gt;["_operation_priority"]&lt;/SPAN&gt;,
                    x&lt;SPAN class=""&gt;["_time_diff_seconds"]&lt;/SPAN&gt;
                )
            )&lt;SPAN class=""&gt;[0]&lt;/SPAN&gt;

            if best_event&lt;SPAN class=""&gt;["_time_diff_seconds"]&lt;/SPAN&gt; &amp;lt;= max_allowed_time_diff_seconds:
                return best_event.get("UserId")

        if retry_attempt &amp;lt; retry_count:
            time.sleep(retry_interval_seconds)

    return None


def query_admin_activity_events(powerbi_headers, search_start_dt, search_end_dt):
    """
    Query Power BI / Fabric Admin Activity Events API.
    Splits the search if the time window crosses a UTC date boundary.
    """

    search_windows = []

    if search_start_dt.date() == search_end_dt.date():
        search_windows.append((search_start_dt, search_end_dt))
    else:
        end_of_first_day = (
            search_start_dt.normalize()
            + pd.Timedelta(days=1)
            - pd.Timedelta(seconds=1)
        )

        start_of_second_day = search_end_dt.normalize()

        search_windows.append((search_start_dt, end_of_first_day))
        search_windows.append((start_of_second_day, search_end_dt))

    events = []

    for window_start, window_end in search_windows:

        search_start_str = window_start.strftime("%Y-%m-%dT%H:%M:%S.000Z")
        search_end_str = window_end.strftime("%Y-%m-%dT%H:%M:%S.000Z")

        activity_events_url = (
            "https://api.powerbi.com/v1.0/myorg/admin/activityevents"
            f"?startDateTime='{search_start_str}'"
            f"&amp;amp;endDateTime='{search_end_str}'"
        )

        response = requests.get(
            activity_events_url,
            headers=powerbi_headers
        )

        response.raise_for_status()

        data = response.json()
        events.extend(data.get("activityEventEntities", []))

        continuation_uri = data.get("continuationUri")

        while continuation_uri:
            continuation_response = requests.get(
                continuation_uri,
                headers=powerbi_headers
            )

            continuation_response.raise_for_status()

            continuation_data = continuation_response.json()
            events.extend(continuation_data.get("activityEventEntities", []))

            continuation_uri = continuation_data.get("continuationUri")

    return events&lt;/SPAN&gt;&lt;/PRE&gt;&lt;P class=""&gt;&lt;SPAN&gt;Required permissions:&lt;/SPAN&gt;&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;SPAN&gt;Fabric access to query pipeline activity runs&lt;/SPAN&gt;&lt;/LI&gt;&lt;LI&gt;&lt;SPAN&gt;Power BI / Fabric Admin Activity Events API access&lt;/SPAN&gt;&lt;/LI&gt;&lt;LI&gt;&lt;SPAN&gt;Microsoft Graph application permission such as &lt;/SPAN&gt;&lt;SPAN&gt;User.Read.All&lt;/SPAN&gt;&lt;SPAN&gt; with admin consent, if display name resolution is required&lt;/SPAN&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;&lt;SPAN&gt;Final output example:&lt;/SPAN&gt;&lt;/P&gt;&lt;PRE&gt;&lt;SPAN&gt;Trigger Type : Manual
Trigger Name : User Name (user@company.com)&lt;/SPAN&gt;&lt;/PRE&gt;&lt;P&gt;&lt;SPAN&gt;This gives a much more reliable result than using runtime context or Graph &lt;/SPAN&gt;&lt;SPAN&gt;/me&lt;/SPAN&gt;&lt;SPAN&gt;, because those can return the current execution identity rather than the user who actually triggered the pipeline or notebook.&lt;/SPAN&gt;&lt;/P&gt;</description>
    <pubDate>Tue, 02 Jun 2026 07:04:44 GMT</pubDate>
    <dc:creator>Yazdan</dc:creator>
    <dc:date>2026-06-02T07:04:44Z</dc:date>
    <item>
      <title>Detecting the User Who Manually Triggered a Fabric Pipeline or Notebook</title>
      <link>https://community.fabric.microsoft.com/t5/Data-Engineering/Detecting-the-User-Who-Manually-Triggered-a-Fabric-Pipeline-or/m-p/4856380#M13058</link>
      <description>&lt;P&gt;Hi everyone,&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I wanted to share a useful approach I’ve been using in &lt;STRONG&gt;Microsoft Fabric to detect which user has manually triggered a &lt;STRONG&gt;pipeline or notebook, as this can be very helpful for &lt;STRONG&gt;logging, &lt;STRONG&gt;auditing, and &lt;STRONG&gt;security purposes — especially when combined with a &lt;STRONG&gt;System Run Report or other operational dashboards.&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/P&gt;&lt;HR /&gt;&lt;H4&gt;&lt;STRONG&gt;&lt;STRONG&gt;&lt;STRONG&gt;&lt;STRONG&gt;Context&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/H4&gt;&lt;P&gt;While Fabric now correctly returns @pipeline().TriggerType = "1" when a pipeline is triggered by a &lt;STRONG&gt;schedule, manual runs still require additional logic if you want to capture &lt;STRONG&gt;who initiated them.&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;&lt;STRONG&gt;For example, when I run a pipeline or notebook manually, I log the triggering user using the following discovery logic.&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/P&gt;&lt;HR /&gt;&lt;H4&gt;&lt;STRONG&gt;&lt;STRONG&gt;&lt;STRONG&gt;Code Example&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/H4&gt;&lt;P&gt;Below is a Python snippet you can run inside a &lt;STRONG&gt;Fabric Notebook to detect who triggered the run:&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;&amp;nbsp;&lt;/STRONG&gt;&lt;/P&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;from datetime import datetime, timedelta, date&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;import pytz&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;import json&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;import re&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;from pyspark.sql.functions import col, udf, to_timestamp, lit, expr, regexp_extract, to_date&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;from pyspark.sql.types import StringType&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;from notebookutils import mssparkutils, fs&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;import time&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;import random&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;import hashlib&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;from collections import defaultdict&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;from pyspark.sql import SparkSession&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;from sempy import fabric&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;from sempy.fabric import FabricRestClient&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;import requests&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp;&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp;&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;# ---------------- User discovery ----------------&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;def get_current_job_instance_id():&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; ws_id = fabric.get_notebook_workspace_id()&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; nb_id = fabric.get_artifact_id()&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; client = FabricRestClient()&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; resp = client.get(f"/v1/workspaces/{ws_id}/items/{nb_id}/jobs/instances").json()&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; runs = resp.get("value", [])&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; if not runs:&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return None, None&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; active = [r for r in runs if str(r.get("status","")).lower() in {"inprogress","notstarted"}]&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; pick = max(active or runs, key=lambda r: r.get("startTimeUtc",""))&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; return pick.get("id"), pick&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;def _pluck_userish(obj):&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; """&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; Try common shapes:&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; {createdBy: {displayName, userPrincipalName, id}}&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; {requestedBy: {...}}, {submittedBy: {...}}, {initiatedBy: {...}}&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; {owner: {...}}, {principal: {...}}&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; """&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; if not isinstance(obj, dict):&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return None&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; candidates = ["createdBy", "requestedBy", "submittedBy", "initiatedBy", "owner", "principal"]&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; for k in candidates:&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if k in obj and isinstance(obj[k], dict):&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; u = obj[k]&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return {&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "displayName": u.get("displayName"),&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "userPrincipalName": u.get("userPrincipalName") or u.get("upn"),&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "id": u.get("id") or u.get("objectId")&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; return None&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;def find_user_from_job(details):&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; # check top-level common fields&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; user = _pluck_userish(details)&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; if user:&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return user&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; # scan nested dicts one level down&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; for v in details.values():&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if isinstance(v, dict):&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; user = _pluck_userish(v)&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if user:&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return user&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; return None&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;def get_current_user_via_graph():&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; """&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; Resolve the current interactive user via Microsoft Graph /me.&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; Requires that the Fabric runtime can mint a token for Graph.&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; """&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; try:&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; from notebookutils import mssparkutils &amp;nbsp;# available in Fabric runtime&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; token = mssparkutils.credentials.getToken("&lt;A href="https://graph.microsoft.com/" target="_blank"&gt;https://graph.microsoft.com/&lt;/A&gt;")&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if not token:&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return None&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; headers = {"Authorization": f"Bearer {token}"}&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; r = requests.get("&lt;A href="https://graph.microsoft.com/v1.0/me?$select=displayName,userPrincipalName,id" target="_blank"&gt;https://graph.microsoft.com/v1.0/me?$select=displayName,userPrincipalName,id&lt;/A&gt;", headers=headers, timeout=10)&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if r.status_code == 200:&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; me = r.json()&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return {&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "displayName": me.get("displayName"),&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "userPrincipalName": me.get("userPrincipalName"),&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "id": me.get("id"),&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "_source": "graph"&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; else:&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; print("Graph /me call failed:", r.status_code, r.text[:300])&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; except Exception as e:&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; print("Graph lookup failed:", e)&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; return None&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;def get_user_from_runtime_context():&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; """&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; Fallback for manual runs: read runner from Fabric runtime context.&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; Uses keys observed in your tenant: userName (display) and userId (object id).&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; """&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; try:&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; from notebookutils import mssparkutils&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ctx = mssparkutils.runtime.context or {}&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; display_name = ctx.get("userName")&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; object_id &amp;nbsp; &amp;nbsp;= ctx.get("userId")&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if display_name or object_id:&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return {"displayName": display_name, "userPrincipalName": None, "id": object_id, "_source": "runtime_context"}&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; except Exception:&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; pass&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; return None&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;# ---------------- Run &amp;amp; print ----------------&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;job_instance_id, details = get_current_job_instance_id()&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;print("Job Instance ID:", job_instance_id)&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;run_started_by_name = None &amp;nbsp;# capture for logging later&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;if details:&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; invoke_type = details.get("invokeType")&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; print("InvokeType:", invoke_type)&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; # -------- Try to resolve the user who ran it --------&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; user = find_user_from_job(details)&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; if user and (user.get("displayName") or user.get("userPrincipalName")):&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; run_started_by_name = (user.get('displayName') or user.get('userPrincipalName'))&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; print("Run started by (from job):",&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; f"{user.get('displayName') or ''}".strip(),&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; f"({user.get('userPrincipalName')})" if user.get("userPrincipalName") else "")&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; else:&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; # Fallback 1: runtime context (works in your tenant)&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; user = get_user_from_runtime_context()&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if user and (user.get("displayName") or user.get("userPrincipalName")):&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; run_started_by_name = (user.get('displayName') or user.get('userPrincipalName'))&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; print("Run started by (from context):",&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; f"{user.get('displayName') or ''}".strip(),&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; f"({user.get('userPrincipalName')})" if user.get("userPrincipalName") else "")&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; else:&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; # Fallback 2: ask Graph who is running this notebook interactively&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; user = get_current_user_via_graph()&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if user:&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; run_started_by_name = (user.get('displayName') or user.get('userPrincipalName'))&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; print("Run started by (from Graph):",&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; f"{user.get('displayName') or ''}".strip(),&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; f"({user.get('userPrincipalName')})" if user.get("userPrincipalName") else "")&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; else:&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; print("Run started by: Unknown (no user info in job metadata, runtime context, or Graph)")&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;######################################################################################&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;# ----------- Prepare Log Parameters (safe) -----------&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;if trigger_type.lower() == "pipelineactivity":&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; trigger_type = "Orchestrator Pipeline"&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; trigger_name = trigger_name&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;elif trigger_type.lower() == "1":&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; trigger_type = "Scheduled"&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; trigger_name = "Scheduling Module"&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;elif trigger_type.lower() == "manual":&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; trigger_type = "Manual"&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; trigger_name = run_started_by_name&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;else:&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; trigger_type = trigger_type&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; trigger_name = "UNKNOWN"&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV class=""&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;This code safely checks:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;The &lt;STRONG&gt;job instance metadata&lt;/STRONG&gt;&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;The &lt;STRONG&gt;Fabric runtime context&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;&lt;STRONG&gt;And as a fallback, &lt;STRONG&gt;Microsoft Graph /me&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;&lt;STRONG&gt;&lt;STRONG&gt;It returns the &lt;STRONG&gt;display name or UPN of the user who manually triggered the run.&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/P&gt;&lt;HR /&gt;&lt;H4&gt;&lt;STRONG&gt;&lt;STRONG&gt;&lt;STRONG&gt;&lt;STRONG&gt;Trigger Type Mapping&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/H4&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;if trigger_type.lower() == "pipelineactivity":&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; trigger_type = "Orchestrator Pipeline"&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; trigger_name = trigger_name&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;elif trigger_type.lower() == "1":&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; trigger_type = "Scheduled"&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; trigger_name = "Scheduling Module"&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;elif trigger_type.lower() == "manual":&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; trigger_type = "Manual"&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; trigger_name = run_started_by_name&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;else:&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; trigger_type = trigger_type&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&amp;nbsp; &amp;nbsp; trigger_name = "UNKNOWN"&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;This logic helps ensure your logs clearly differentiate between &lt;STRONG&gt;manual, &lt;STRONG&gt;scheduled, and &lt;STRONG&gt;orchestrated pipeline runs — with proper user tracking for manual triggers.&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/P&gt;&lt;HR /&gt;&lt;H4&gt;&lt;STRONG&gt;&lt;STRONG&gt;&lt;STRONG&gt;&lt;STRONG&gt;Benefit&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/H4&gt;&lt;P&gt;By logging this user information into your &lt;STRONG&gt;System Run Report or operational tables, you can generate insights like:&lt;/STRONG&gt;&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;Who manually executed pipelines or notebooks&lt;/STRONG&gt;&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;When and how often manual interventions occurred&lt;/STRONG&gt;&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;Scheduled vs manual run ratios&lt;/STRONG&gt;&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;Security auditing for sensitive pipelines&lt;/STRONG&gt;&lt;/P&gt;&lt;HR /&gt;&lt;H4&gt;&lt;STRONG&gt;&lt;span class="lia-unicode-emoji" title=":white_heavy_check_mark:"&gt;✅&lt;/span&gt; &lt;STRONG&gt;Update&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/H4&gt;&lt;P&gt;Recently, I noticed that &lt;STRONG&gt;Fabric has fixed the previous bug — now, when triggered by a &lt;STRONG&gt;schedule, the value of @pipeline().TriggerType correctly returns &lt;STRONG&gt;"1" &lt;span class="lia-unicode-emoji" title=":party_popper:"&gt;🎉&lt;/span&gt;.&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;&lt;STRONG&gt;&lt;STRONG&gt;So with this fix and the manual user detection logic above, we can now fully distinguish &lt;STRONG&gt;all run types dynamically at runtime.&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/P&gt;&lt;HR /&gt;&lt;P&gt;&lt;STRONG&gt;&lt;STRONG&gt;&lt;STRONG&gt;&lt;STRONG&gt;Hope this helps others who want more robust runtime tracking and reporting in Fabric!&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;&lt;STRONG&gt;&lt;STRONG&gt;&amp;nbsp;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;&lt;STRONG&gt;&lt;STRONG&gt;&lt;STRONG&gt;Cheers,&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;&lt;STRONG&gt;&lt;STRONG&gt;&amp;nbsp;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;&lt;STRONG&gt;&lt;STRONG&gt;&amp;nbsp;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Thu, 23 Oct 2025 04:39:14 GMT</pubDate>
      <guid>https://community.fabric.microsoft.com/t5/Data-Engineering/Detecting-the-User-Who-Manually-Triggered-a-Fabric-Pipeline-or/m-p/4856380#M13058</guid>
      <dc:creator>Yazdan</dc:creator>
      <dc:date>2025-10-23T04:39:14Z</dc:date>
    </item>
    <item>
      <title>Re: Detecting the User Who Manually Triggered a Fabric Pipeline or Notebook</title>
      <link>https://community.fabric.microsoft.com/t5/Data-Engineering/Detecting-the-User-Who-Manually-Triggered-a-Fabric-Pipeline-or/m-p/4856651#M13070</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.fabric.microsoft.com/t5/user/viewprofilepage/user-id/1272584"&gt;@Yazdan&lt;/a&gt;,&lt;/P&gt;
&lt;P&gt;Thanks for sharing your insights on detecting which user manually triggered a pipeline.&amp;nbsp;&lt;SPAN&gt;Definitely consider turning this into a blog post so others can benefit from your experience more easily&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;A href="https://community.fabric.microsoft.com/t5/Power-BI-Community-Blog/bg-p/community_blog" target="_blank"&gt;Power BI Community Blog - Microsoft Fabric Community&lt;/A&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;We also appreciate you sharing this with the community&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Thank you.&lt;/P&gt;</description>
      <pubDate>Thu, 23 Oct 2025 09:17:32 GMT</pubDate>
      <guid>https://community.fabric.microsoft.com/t5/Data-Engineering/Detecting-the-User-Who-Manually-Triggered-a-Fabric-Pipeline-or/m-p/4856651#M13070</guid>
      <dc:creator>v-saisrao-msft</dc:creator>
      <dc:date>2025-10-23T09:17:32Z</dc:date>
    </item>
    <item>
      <title>Re: Detecting the User Who Manually Triggered a Fabric Pipeline or Notebook</title>
      <link>https://community.fabric.microsoft.com/t5/Data-Engineering/Detecting-the-User-Who-Manually-Triggered-a-Fabric-Pipeline-or/m-p/5191802#M16495</link>
      <description>&lt;H3&gt;Update – Finally Solved for Both Pipelines and Notebooks&lt;/H3&gt;&lt;P&gt;After further investigation and testing, I finally found a reliable way to determine who manually triggered both Microsoft Fabric Pipelines and Notebooks.&lt;/P&gt;&lt;P&gt;The original approach I shared above worked in some scenarios by using:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;Job instance metadata&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Runtime context&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Microsoft Graph /me&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;However, those methods only identify the currently executing identity and can be unreliable for auditing purposes, especially when notebooks or pipelines are triggered by different users.&lt;/P&gt;&lt;H3&gt;Final Solution&lt;/H3&gt;&lt;P&gt;The most reliable approach is to use the &lt;STRONG&gt;Power BI / Fabric Admin Activity Events API&lt;/STRONG&gt;:&lt;/P&gt;&lt;PRE&gt;GET /admin/activityevents&lt;/PRE&gt;&lt;P&gt;This API exposes Fabric audit events and allows us to identify the user who actually initiated a run.&lt;/P&gt;&lt;H3&gt;Pipeline Detection Logic&lt;/H3&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;Get the exact pipeline run start time using:&lt;/P&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;PRE&gt;queryactivityruns&lt;/PRE&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;Retrieve audit events around that execution time.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Find the matching:&lt;/P&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;PRE&gt;Operation = RunArtifact
ObjectId  = Pipeline ID&lt;/PRE&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;Return:&lt;/P&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;PRE&gt;UserId&lt;/PRE&gt;&lt;P&gt;which contains the triggering user's email address (UPN).&lt;/P&gt;&lt;H3&gt;Notebook Detection Logic&lt;/H3&gt;&lt;P&gt;Standalone notebooks do not have a queryactivityruns endpoint, so the process is slightly different:&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;Get the current notebook Job Instance.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Resolve the notebook start time.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Query Admin Activity Events.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Find matching notebook execution events:&lt;/P&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;PRE&gt;Operation = StartRunNotebook
or
Operation = RunArtifact&lt;/PRE&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;Match on Notebook ID.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Return the triggering user's email address.&lt;/P&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;H3&gt;Converting Email Address to Display Name&lt;/H3&gt;&lt;P&gt;The audit log returns:&lt;/P&gt;&lt;PRE&gt;user@company.com&lt;/PRE&gt;&lt;P&gt;To make logs more readable, I then call Microsoft Graph:&lt;/P&gt;&lt;PRE&gt;GET /users/{userPrincipalName}&lt;/PRE&gt;&lt;P&gt;and retrieve:&lt;/P&gt;&lt;PRE&gt;displayName
userPrincipalName&lt;/PRE&gt;&lt;P&gt;which allows logging values such as:&lt;/P&gt;&lt;PRE&gt;User Name (user@company.com)&lt;/PRE&gt;&lt;P&gt;instead of only the email address.&lt;/P&gt;&lt;H3&gt;Important Discovery – Audit Log Visibility Delay&lt;/H3&gt;&lt;P&gt;During testing I discovered that Fabric audit events are not always immediately available through the Admin Activity Events API.&lt;/P&gt;&lt;P&gt;For example:&lt;/P&gt;&lt;PRE&gt;Notebook Start Time : 05:21:41
Audit Event Time    : 05:21:51&lt;/PRE&gt;&lt;P&gt;The audit event itself was generated almost immediately.&lt;/P&gt;&lt;P&gt;However, the event was not visible through the API until approximately 5–7 minutes later.&lt;/P&gt;&lt;P&gt;Because of this, a simple one-time lookup can fail even though the audit event already exists internally.&lt;/P&gt;&lt;H3&gt;Final Reliability Improvement&lt;/H3&gt;&lt;P&gt;To make the solution production-ready, I implemented a retry mechanism:&lt;/P&gt;&lt;PRE&gt;retry_count=10
retry_interval_seconds=60&lt;/PRE&gt;&lt;P&gt;The lookup now retries every minute until the matching audit event becomes available.&lt;/P&gt;&lt;P&gt;This ensures that:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;Pipelines resolve the correct triggering user.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Notebooks resolve the correct triggering user.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Delayed audit ingestion does not cause false "Unknown" results.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Security and operational reporting remain accurate.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;H3&gt;Result&lt;/H3&gt;&lt;P&gt;The final implementation can now reliably populate logging tables and System Run Reports with:&lt;/P&gt;&lt;PRE&gt;Trigger Type
Trigger Name
Run Started By&lt;/PRE&gt;&lt;P&gt;for:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;Scheduled runs&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Manual runs&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Orchestrator pipeline runs&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Standalone notebook executions&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;This has significantly improved operational auditing and troubleshooting in our Fabric environment.&lt;/P&gt;&lt;P&gt;Hopefully this helps others looking for a reliable way to track who actually executed Fabric workloads.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P class=""&gt;&lt;SPAN&gt;One important thing I found is that audit events are not always visible immediately through the Admin Activity Events API. The event timestamp can be accurate, but the API visibility can be delayed by several minutes. Because of that, I added a retry loop.&lt;/SPAN&gt;&lt;/P&gt;&lt;P class=""&gt;&lt;SPAN&gt;Below is the sanitised sample code structure:&lt;/SPAN&gt;&lt;/P&gt;&lt;PRE&gt;&lt;SPAN&gt;import requests
import pandas as pd
import time

# ------------------------------------------------------------------
# Service Principal Inputs - replace with secure values
# ------------------------------------------------------------------
tenant_id = "&amp;lt;TENANT_ID&amp;gt;"
client_id = "&amp;lt;CLIENT_ID&amp;gt;"
client_secret = "&amp;lt;CLIENT_SECRET&amp;gt;"


def get_powerbi_admin_token(tenant_id, client_id, client_secret):
    token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"

    token_body = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret,
        "scope": "https://analysis.windows.net/powerbi/api/.default"
    }

    response = requests.post(
        token_url,
        data=token_body,
        headers={"Content-Type": "application/x-www-form-urlencoded"}
    )

    response.raise_for_status()
    return response.json()&lt;SPAN class=""&gt;["access_token"]&lt;/SPAN&gt;


def get_graph_display_name_from_upn(tenant_id, client_id, client_secret, upn):
    if not upn:
        return "Unknown"

    token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"

    token_body = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret,
        "scope": "https://graph.microsoft.com/.default"
    }

    token_response = requests.post(
        token_url,
        data=token_body,
        headers={"Content-Type": "application/x-www-form-urlencoded"},
        timeout=30
    )

    if token_response.status_code != 200:
        return upn

    graph_token = token_response.json()&lt;SPAN class=""&gt;["access_token"]&lt;/SPAN&gt;

    graph_response = requests.get(
        f"https://graph.microsoft.com/v1.0/users/{upn}?$select=displayName,userPrincipalName",
        headers={"Authorization": f"Bearer {graph_token}"},
        timeout=30
    )

    if graph_response.status_code != 200:
        return upn

    user = graph_response.json()
    display_name = user.get("displayName")
    user_principal_name = user.get("userPrincipalName") or upn

    if display_name:
        return f"{display_name} ({user_principal_name})"

    return user_principal_name


def get_manual_pipeline_triggered_by(
    tenant_id,
    client_id,
    client_secret,
    workspace_id,
    pipeline_id,
    pipeline_run_id,
    window_minutes=15,
    retry_count=10,
    retry_interval_seconds=60
):
    """
    Resolve who manually triggered a Fabric pipeline run.

    Logic:
    1. Use Fabric queryactivityruns API to find the actual pipeline run start time.
    2. Use Power BI Admin Activity Events API to search audit events around that time.
    3. Find Operation = RunArtifact where ObjectId = pipeline_id.
    4. Retry because Admin Activity Events API visibility can be delayed.
    5. Return UserId from the matching audit event.
    """

    fabric_token = mssparkutils.credentials.getToken(
        "https://api.fabric.microsoft.com"
    )

    fabric_headers = {
        "Authorization": f"Bearer {fabric_token}",
        "Content-Type": "application/json"
    }

    activity_runs_url = (
        f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}"
        f"/datapipelines/pipelineruns/{pipeline_run_id}/queryactivityruns"
    )

    activity_payload = {
        "lastUpdatedAfter": "2026-01-01T00:00:00Z",
        "lastUpdatedBefore": "2026-12-31T23:59:59Z",
        "filters": [],
        "orderBy": &lt;SPAN class=""&gt;[
            {
                "orderBy": "ActivityRunStart",
                "order": "ASC"
            }
        ]&lt;/SPAN&gt;
    }

    activity_response = requests.post(
        activity_runs_url,
        headers=fabric_headers,
        json=activity_payload
    )

    activity_response.raise_for_status()

    activity_rows = activity_response.json().get("value", [])

    if not activity_rows:
        return None

    df_activities = pd.json_normalize(activity_rows)

    start_col = None

    for col in &lt;SPAN class=""&gt;["activityRunStart", "ActivityRunStart", "startTime", "StartTime"]&lt;/SPAN&gt;:
        if col in df_activities.columns:
            start_col = col
            break

    if not start_col:
        return None

    run_start_utc = pd.to_datetime(
        df_activities&lt;SPAN class=""&gt;[start_col]&lt;/SPAN&gt;,
        utc=True,
        errors="coerce"
    ).min()

    if pd.isna(run_start_utc):
        return None

    powerbi_token = get_powerbi_admin_token(
        tenant_id,
        client_id,
        client_secret
    )

    powerbi_headers = {
        "Authorization": f"Bearer {powerbi_token}",
        "Content-Type": "application/json"
    }

    for retry_attempt in range(1, retry_count + 1):

        print(f"Pipeline audit lookup retry {retry_attempt} of {retry_count}")

        search_start_dt = run_start_utc - pd.Timedelta(minutes=window_minutes)
        search_end_dt = run_start_utc + pd.Timedelta(minutes=window_minutes)

        events = query_admin_activity_events(
            powerbi_headers,
            search_start_dt,
            search_end_dt
        )

        candidates = []

        for event in events:
            operation = event.get("Operation", "")

            object_id = str(
                event.get("ObjectId")
                or event.get("ItemId")
                or event.get("ArtifactId")
                or ""
            ).lower()

            if operation == "RunArtifact" and object_id == pipeline_id.lower():
                event_creation_dt = pd.to_datetime(
                    event.get("CreationTime"),
                    utc=True,
                    errors="coerce"
                )

                if pd.notna(event_creation_dt):
                    event&lt;SPAN class=""&gt;["_time_diff_seconds"]&lt;/SPAN&gt; = abs(
                        (event_creation_dt - run_start_utc).total_seconds()
                    )

                    candidates.append(event)

        if candidates:
            best_event = sorted(
                candidates,
                key=lambda x: x&lt;SPAN class=""&gt;["_time_diff_seconds"]&lt;/SPAN&gt;
            )&lt;SPAN class=""&gt;[0]&lt;/SPAN&gt;

            return best_event.get("UserId")

        if retry_attempt &amp;lt; retry_count:
            time.sleep(retry_interval_seconds)

    return None


def get_manual_notebook_triggered_by(
    tenant_id,
    client_id,
    client_secret,
    notebook_id,
    notebook_start_time_utc,
    window_minutes=15,
    max_allowed_time_diff_seconds=60,
    retry_count=10,
    retry_interval_seconds=60
):
    """
    Resolve who manually triggered a standalone Fabric notebook run.

    Logic:
    1. Use the notebook job start time.
    2. Use Power BI Admin Activity Events API to search audit events around that time.
    3. Match Operation = StartRunNotebook or RunArtifact.
    4. Match ObjectId = notebook_id.
    5. Retry because audit events may not be immediately visible.
    6. Reject old matches using max_allowed_time_diff_seconds.
    """

    run_start_utc = pd.to_datetime(
        notebook_start_time_utc,
        utc=True,
        errors="coerce"
    )

    if pd.isna(run_start_utc):
        return None

    powerbi_token = get_powerbi_admin_token(
        tenant_id,
        client_id,
        client_secret
    )

    powerbi_headers = {
        "Authorization": f"Bearer {powerbi_token}",
        "Content-Type": "application/json"
    }

    for retry_attempt in range(1, retry_count + 1):

        print(f"Notebook audit lookup retry {retry_attempt} of {retry_count}")

        search_start_dt = run_start_utc - pd.Timedelta(minutes=window_minutes)
        search_end_dt = run_start_utc + pd.Timedelta(minutes=window_minutes)

        events = query_admin_activity_events(
            powerbi_headers,
            search_start_dt,
            search_end_dt
        )

        candidates = []

        for event in events:
            operation = event.get("Operation", "")

            object_id = str(
                event.get("ObjectId")
                or event.get("ItemId")
                or event.get("ArtifactId")
                or ""
            ).lower()

            if (
                operation in &lt;SPAN class=""&gt;["StartRunNotebook", "RunArtifact"]&lt;/SPAN&gt;
                and object_id == notebook_id.lower()
            ):
                event_creation_dt = pd.to_datetime(
                    event.get("CreationTime"),
                    utc=True,
                    errors="coerce"
                )

                if pd.notna(event_creation_dt):
                    event&lt;SPAN class=""&gt;["_time_diff_seconds"]&lt;/SPAN&gt; = abs(
                        (event_creation_dt - run_start_utc).total_seconds()
                    )

                    event&lt;SPAN class=""&gt;["_operation_priority"]&lt;/SPAN&gt; = {
                        "StartRunNotebook": 1,
                        "RunArtifact": 2
                    }.get(operation, 99)

                    candidates.append(event)

        if candidates:
            best_event = sorted(
                candidates,
                key=lambda x: (
                    x&lt;SPAN class=""&gt;["_operation_priority"]&lt;/SPAN&gt;,
                    x&lt;SPAN class=""&gt;["_time_diff_seconds"]&lt;/SPAN&gt;
                )
            )&lt;SPAN class=""&gt;[0]&lt;/SPAN&gt;

            if best_event&lt;SPAN class=""&gt;["_time_diff_seconds"]&lt;/SPAN&gt; &amp;lt;= max_allowed_time_diff_seconds:
                return best_event.get("UserId")

        if retry_attempt &amp;lt; retry_count:
            time.sleep(retry_interval_seconds)

    return None


def query_admin_activity_events(powerbi_headers, search_start_dt, search_end_dt):
    """
    Query Power BI / Fabric Admin Activity Events API.
    Splits the search if the time window crosses a UTC date boundary.
    """

    search_windows = []

    if search_start_dt.date() == search_end_dt.date():
        search_windows.append((search_start_dt, search_end_dt))
    else:
        end_of_first_day = (
            search_start_dt.normalize()
            + pd.Timedelta(days=1)
            - pd.Timedelta(seconds=1)
        )

        start_of_second_day = search_end_dt.normalize()

        search_windows.append((search_start_dt, end_of_first_day))
        search_windows.append((start_of_second_day, search_end_dt))

    events = []

    for window_start, window_end in search_windows:

        search_start_str = window_start.strftime("%Y-%m-%dT%H:%M:%S.000Z")
        search_end_str = window_end.strftime("%Y-%m-%dT%H:%M:%S.000Z")

        activity_events_url = (
            "https://api.powerbi.com/v1.0/myorg/admin/activityevents"
            f"?startDateTime='{search_start_str}'"
            f"&amp;amp;endDateTime='{search_end_str}'"
        )

        response = requests.get(
            activity_events_url,
            headers=powerbi_headers
        )

        response.raise_for_status()

        data = response.json()
        events.extend(data.get("activityEventEntities", []))

        continuation_uri = data.get("continuationUri")

        while continuation_uri:
            continuation_response = requests.get(
                continuation_uri,
                headers=powerbi_headers
            )

            continuation_response.raise_for_status()

            continuation_data = continuation_response.json()
            events.extend(continuation_data.get("activityEventEntities", []))

            continuation_uri = continuation_data.get("continuationUri")

    return events&lt;/SPAN&gt;&lt;/PRE&gt;&lt;P class=""&gt;&lt;SPAN&gt;Required permissions:&lt;/SPAN&gt;&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;SPAN&gt;Fabric access to query pipeline activity runs&lt;/SPAN&gt;&lt;/LI&gt;&lt;LI&gt;&lt;SPAN&gt;Power BI / Fabric Admin Activity Events API access&lt;/SPAN&gt;&lt;/LI&gt;&lt;LI&gt;&lt;SPAN&gt;Microsoft Graph application permission such as &lt;/SPAN&gt;&lt;SPAN&gt;User.Read.All&lt;/SPAN&gt;&lt;SPAN&gt; with admin consent, if display name resolution is required&lt;/SPAN&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;&lt;SPAN&gt;Final output example:&lt;/SPAN&gt;&lt;/P&gt;&lt;PRE&gt;&lt;SPAN&gt;Trigger Type : Manual
Trigger Name : User Name (user@company.com)&lt;/SPAN&gt;&lt;/PRE&gt;&lt;P&gt;&lt;SPAN&gt;This gives a much more reliable result than using runtime context or Graph &lt;/SPAN&gt;&lt;SPAN&gt;/me&lt;/SPAN&gt;&lt;SPAN&gt;, because those can return the current execution identity rather than the user who actually triggered the pipeline or notebook.&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 02 Jun 2026 07:04:44 GMT</pubDate>
      <guid>https://community.fabric.microsoft.com/t5/Data-Engineering/Detecting-the-User-Who-Manually-Triggered-a-Fabric-Pipeline-or/m-p/5191802#M16495</guid>
      <dc:creator>Yazdan</dc:creator>
      <dc:date>2026-06-02T07:04:44Z</dc:date>
    </item>
  </channel>
</rss>

