Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

The Power BI Data Visualization World Championships is back! Get ahead of the game and start preparing now! Learn more

Reply
venkatakrish_01
Frequent Visitor

Power bi Subscription

Hi

Currently, I have created 50+ paginated reports and subscribed them, and even users are getting those emails as per the subscription, but one week back, users gave a list of dates, for example, one report should generate on November 26, 2025, and another should generate on November 28, 2025, and likewise. So my question is, can I give any dates explicitly so that reports get scheduled to the email on that particular day? My limitations are that I can't use Power Automate and other tools, so please suggest any idea according.

1 ACCEPTED SOLUTION

Hi @venkatakrish_01,  What I am doing in my python script is first I am building a filter string and then I use that filter string to start an export using the APIs: 

{PBI_API_BASE}/groups/{group_id}/reports/{report_id}/ExportTo
 
Here is more or less my full script: 
 
Some customization will be required. 
 
My script is for audits, so I have another process that identifies which users need to get the report, and puts their emails into a table, which is the AUDIT_VIEW_NAME variable. 
then I loop through each one and generate a report using their email in the filter, so each report is unique to the user. 
 
import email
import os, re, time, json, pathlib, pyodbc, requests, math, random
import pandas as pd
from datetime import datetime, date, timedelta
from msal import ConfidentialClientApplication
from typing import Union, Sequence, Optional, Tuple
import smtplib
import mimetypes
from email.message import EmailMessage
from email.utils import formatdate, make_msgid
from concurrent.futures import ThreadPoolExecutor, as_completed
from sqlalchemy import create_engine

from dotenv import load_dotenv
load_dotenv()

# ===== Entra app / Power BI REST =====
CLIENT_ID     = os.getenv('CLIENT_ID')
CLIENT_SECRET = os.getenv('CLIENT_SECRET')
TENANT_ID     = os.getenv('TENANT_ID')
AUTHORITY     = f'https://login.microsoftonline.com/{TENANT_ID}'
SCOPE         = ['https://analysis.windows.net/powerbi/api/.default']
PBI_API_BASE  = "https://api.powerbi.com/v1.0/myorg"

# ======= BIC ==========
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST')
DB_NAME = os.getenv('DB_NAME')
DB_DRIVER = os.getenv('DB_DRIVER')
DB_URL = f'mssql+pyodbc://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}?driver={DB_DRIVER}'



AUDIT_VIEW_NAME = " "
GETTING_STARTED_VIEW_NAME = " "

# ===== Report =====
REPORT_URL = " "

# use EXPORT_PATH var and then append current date
OUTDIR = pathlib.Path(os.getenv('EXPORT_PATH')) / datetime.now().strftime("%Y%m%d")


# ===== Email (SMTP) =====
SMTP_HOST     = os.getenv("SMTP_HOST", " ")
SENDER_EMAIL  = os.getenv("SENDER_EMAIL", " ")
DEFAULT_CC    = os.getenv("DEFAULT_CC", " ")
DEFAULT_BCC   = os.getenv("DEFAULT_BCC", " ")

WORKSPACE_REQUEST_LINK = "https://albertahealthservices.sharepoint.com/:l:/s/SP10344/FF-vZ2eBLo1MuFTBoZpq9JIBcY_Yyo9Lx6ZpgtS_Gi4CDQ?nav=MzczOTc0YTktMGNmZC00ZGYxLTgyZmYtYmE3MjUxNzNiNTMx"
LEARN_LINK = "https://learn.microsoft.com/en-us/power-bi/fundamentals/service-get-started"
COP_LINK = "https://albertahealthservices.sharepoint.com/sites/SP10344/SitePages/Power-BI-Community-of-Practice_Landing.aspx"

# ==========================================
# email templates
# (PLAIN_TEXT_TEMPLATE, HTML_TEMPLATE, GETTING_STARTED_* templates)
# ==========================================

PLAIN_TEXT_TEMPLATE = ""

HTML_TEMPLATE = ""
# ------------------------------
# Token manager with refresh
# ------------------------------
class TokenManager:
    def __init__(self, client_id: str, client_secret: str, authority: str, scope: Sequence[str]):
        self.app = ConfidentialClientApplication(
            client_id, authority=authority, client_credential=client_secret
        )
        self.scope = scope
        self._token: Optional[str] = None
        self._expires_on: int = 0  # epoch seconds

    def get_token(self) -> str:
        now = int(time.time())
        # Refresh if missing or expiring within 5 minutes
        if not self._token or now >= (self._expires_on - 300):
            result = self.app.acquire_token_silent(self.scope, account=None)
            if not result:
                result = self.app.acquire_token_for_client(scopes=self.scope)
            if "access_token" not in result:
                raise RuntimeError(f"Token error: {result}")
            self._token = result["access_token"]
            # MSAL returns either expires_in (seconds) or expires_on (epoch)
            if "expires_on" in result:
                self._expires_on = int(result["expires_on"])
            else:
                self._expires_on = now + int(result.get("expires_in", 3600))
        return self._token

# ------------------------------
# Helpers
# ------------------------------
def parse_ids_from_url(report_url: str) -> Tuple[str, str]:
    m = re.search(r"/groups/([0-9a-fA-F-]+)/reports/([0-9a-fA-F-]+)", report_url)
    if not m:
        raise ValueError("Could not parse groupId/reportId from REPORT_URL")
    return m.group(1), m.group(2)

def fetch_email_date_rows(view) -> pd.DataFrame:
    
    sql = f"SELECT user_to_email, DateFilter FROM {view};"
    engine = create_engine(DB_URL)
    with engine.connect() as con:
        df = pd.read_sql(sql, con)
    if "user_to_email" not in df.columns or "DateFilter" not in df.columns:
        raise ValueError(f"View must expose columns user_to_email and DateFilter. Got: {list(df.columns)}")
    df = df.dropna(subset=["user_to_email", "DateFilter"])
    df["user_to_email"] = df["user_to_email"].astype(str).str.strip()
    return df.drop_duplicates()

def normalize_date_to_str(dval) -> str:
    if isinstance(dval, datetime):
        return dval.date().isoformat()
    if isinstance(dval, date):
        return dval.isoformat()
    try:
        return datetime.fromisoformat(str(dval)).date().isoformat()
    except Exception:
        return str(dval)[:10]

def build_export_filter(user_email: str, date_yyyy_mm_dd: str) -> str:
    safe_email = user_email.replace("'", "''")
    d = datetime.strptime(date_yyyy_mm_dd, "%Y-%m-%d").date()
    next_day = d + timedelta(days=1)
    return (
        f"DimUser-Recipient/user_email eq '{safe_email}' "
        f"and DimDate/Date ge {d.isoformat()} "
        f"and DimDate/Date lt {next_day.isoformat()}"
    )

def sanitize_filename_part(s: str) -> str:
    return re.sub(r'[^A-Za-z0-9._-]+', '_', s)

# -----------------------------------------------
#  resilient HTTP with basic retry/backoff
# -----------------------------------------------
def _http_with_retry(method: str, url: str, session: Optional[requests.Session] = None,
                     max_tries: int = 5, backoff_base: float = 0.75, **kwargs) -> requests.Response:
    sess = session or requests.Session()
    last_err = None
    for attempt in range(1, max_tries + 1):
        try:
            resp = sess.request(method, url, timeout=60, **kwargs)
            # Retry on throttling or 5xx
            if resp.status_code in (429, 500, 502, 503, 504):
                raise requests.HTTPError(f"Transient HTTP {resp.status_code}", response=resp)
            resp.raise_for_status()
            return resp
        except Exception as ex:
            last_err = ex
            sleep_s = backoff_base * (2 ** (attempt - 1)) + random.uniform(0, 0.3)
            time.sleep(min(sleep_s, 8.0))
    raise last_err

# --------------------------------------------------------
#  export uses TokenManager and resilient HTTP
# --------------------------------------------------------
def export_report_pdf_with_filters(token_mgr: TokenManager, group_id: str, report_id: str,
                                   export_filter: Optional[str], outfile: pathlib.Path,
                                   include_hidden_pages: bool = False, poll_timeout_s: int = 300):
    session = requests.Session()
    headers = {"Authorization": f"Bearer {token_mgr.get_token()}", "Content-Type": "application/json"}
    powerbi_cfg = {"settings": {"includeHiddenPages": include_hidden_pages}}
    if export_filter:
        powerbi_cfg["reportLevelFilters"] = [{"filter": export_filter}]

    body = {
        "format": "PDF",
        "powerBIReportConfiguration": powerbi_cfg
    }
    # 1) start
    start = _http_with_retry(
        "POST",
        f"{PBI_API_BASE}/groups/{group_id}/reports/{report_id}/ExportTo",
        session=session,
        headers=headers, data=json.dumps(body)
    )
    export_id = start.json()["id"]

    # 2) poll
    status_url = f"{PBI_API_BASE}/groups/{group_id}/reports/{report_id}/exports/{export_id}"
    t0 = time.time()
    while True:
        # refresh token if needed during long polls
        headers["Authorization"] = f"Bearer {token_mgr.get_token()}"
        s = _http_with_retry("GET", status_url, session=session, headers=headers)
        st = s.json().get("status")
        if st == "Succeeded":
            break
        if st == "Failed":
            raise RuntimeError(f"Export failed: {s.text}")
        retry_after = s.headers.get("Retry-After")
        time.sleep(int(retry_after) if retry_after and retry_after.isdigit() else 2)
        if time.time() - t0 > poll_timeout_s:
            raise TimeoutError(f"Polling timed out after {poll_timeout_s}s; last status={st}")

    # 3) download
    headers["Authorization"] = f"Bearer {token_mgr.get_token()}"
    file_resp = _http_with_retry("GET", f"{status_url}/file", session=session, headers=headers)
    outfile.parent.mkdir(parents=True, exist_ok=True)
    outfile.write_bytes(file_resp.content)
    return outfile

# --------------------------------------------------------
#  email send with connection-per-send + retries
# --------------------------------------------------------
def send_email_with_attachments(
    sender: str,
    to_recipients: Union[str, Sequence[str]],
    subject: str,
    body_text: str,
    attachments: Optional[Union[pathlib.Path, Sequence[pathlib.Path]]] = None,
    cc_recipients: Optional[Union[str, Sequence[str]]] = None,
    bcc_recipients: Optional[Union[str, Sequence[str]]] = None,
    html_body: Optional[str] = None,
    smtp_host: str = SMTP_HOST,
    max_tries: int = 4,
) -> None:
    def _norm(v):
        if not v:
            return []
        if isinstance(v, str):
            parts = [p.strip() for p in re.split(r"[;,]", v) if p.strip()]
            return parts
        return list(v)

    to_list  = _norm(to_recipients)
    cc_list  = _norm(cc_recipients)
    bcc_list = _norm(bcc_recipients)
    if not to_list:
        raise ValueError("No 'To' recipients provided")

    msg = EmailMessage()
    msg["Subject"] = subject
    msg["From"]    = sender
    msg["To"]      = ", ".join(to_list)
    if cc_list:
        msg["Cc"]  = ", ".join(cc_list)
    msg["Date"] = formatdate(localtime=True)
    msg["Message-ID"] = make_msgid()

    msg.set_content(body_text or "")
    if html_body:
        msg.add_alternative(html_body, subtype="html")

    if attachments:
        files = attachments if isinstance(attachments, Sequence) and not isinstance(attachments, (str, bytes, pathlib.Path)) else [attachments]
        for fpath in files:
            fpath = pathlib.Path(fpath)
            if not fpath.exists():
                continue
            ctype, encoding = mimetypes.guess_type(str(fpath))
            if ctype is None or encoding is not None:
                ctype = "application/octet-stream"
            maintype, subtype = ctype.split("/", 1)
            with open(fpath, "rb") as fp:
                msg.add_attachment(fp.read(), maintype=maintype, subtype=subtype, filename=fpath.name)

    all_rcpts = to_list + cc_list + bcc_list

    last_err = None
    for attempt in range(1, max_tries + 1):
        try:
            with smtplib.SMTP(smtp_host, timeout=60) as smtp:
                # If your server requires TLS or auth, add here:
                # smtp.starttls()
                # smtp.login(USER, PASS)
                smtp.ehlo()
                smtp.send_message(msg, from_addr=sender, to_addrs=all_rcpts)
            return
        except Exception as ex:
            last_err = ex
            # basic backoff
            time.sleep(min(2 ** attempt, 8))
    raise last_err

def derive_first_name_from_email(email_addr: str) -> str:
    local = email_addr.split("@", 1)[0]
    base = re.split(r"[._-]", local)[0] if local else ""
    return base.capitalize() if base else "Colleague"

# --------------------------------------------------------
# worker to process a single row (export + email)
# --------------------------------------------------------
def process_one_entry(row_email: str, row_date, group_id: str, report_id: str,
                      token_mgr: TokenManager, is_getting_started: bool = False) -> Tuple[str, str, Optional[str]]:
    email_addr = str(row_email).strip()
    date_str = normalize_date_to_str(row_date)
    export_filter = build_export_filter(email_addr, date_str)
    outfile = OUTDIR / f"export-{sanitize_filename_part(email_addr)}-{date_str}.pdf"

    export_report_pdf_with_filters(token_mgr, group_id, report_id, export_filter, outfile)

    first_name = derive_first_name_from_email(email_addr)
    attachment_name = outfile.name

    deadline_date = (datetime.now().date() + timedelta(days=7)).isoformat()
    subject = f"You're building great things in Power BI - let's make them shareable"
    plain_body = PLAIN_TEXT_TEMPLATE.format(
        first_name=first_name,
        deadline_date=deadline_date,
        workspace_request_link=WORKSPACE_REQUEST_LINK,
        date=date_str,
        attachment_name=attachment_name,
    )
    html_body = HTML_TEMPLATE.format(
        first_name=first_name,
        deadline_date=deadline_date,
        workspace_request_link=WORKSPACE_REQUEST_LINK,
        date=date_str,
        attachment_name=attachment_name,
    )

    to_list = [email_addr]

    send_email_with_attachments(
        sender=SENDER_EMAIL,
        to_recipients=to_list,
        subject=subject,
        body_text=plain_body,
        attachments=outfile,
        cc_recipients=DEFAULT_CC or None,
        bcc_recipients=DEFAULT_BCC or None,
        html_body=html_body,
    )

    return (email_addr, date_str, outfile.name)

# --------------------------------------------------------
#  concurrent bulk functions
# --------------------------------------------------------
def bulk_email(view_name: str, is_getting_started: bool, max_workers: int = 4):
    if not all([CLIENT_ID, CLIENT_SECRET, TENANT_ID]):
        raise SystemExit("Set CLIENT_ID, CLIENT_SECRET, TENANT_ID env vars first.")
    token_mgr = TokenManager(CLIENT_ID, CLIENT_SECRET, AUTHORITY, SCOPE)
    group_id, report_id = parse_ids_from_url(REPORT_URL)

    df = fetch_email_date_rows(view_name)
    if df.empty:
        print(f"[INFO] View {view_name} returned no rows.")
        return

    label = "GettingStarted" if is_getting_started else "PersonalWorkspace"
    futures = []
    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        for row in df.itertuples(index=False):
            futures.append(
                ex.submit(
                    process_one_entry,
                    row.user_to_email,
                    row.DateFilter,
                    group_id,
                    report_id,
                    token_mgr,
                    is_getting_started,
                )
            )

        for fut in as_completed(futures):
            try:
                email_addr, date_str, fname = fut.result()
                print(f"[OK][{label}] Emailed {email_addr} for {date_str} ({fname})")
            except Exception as ex:
                print(f"[WARN][{label}] Failed: {ex}")

def prime_report():
        # --- Primer: one no-filter export to warm visuals, then wait 5 minutes ---
    try:
        primer_token_mgr = TokenManager(CLIENT_ID, CLIENT_SECRET, AUTHORITY, SCOPE)
        primer_group_id, primer_report_id = parse_ids_from_url(REPORT_URL)
        primer_out = OUTDIR / f"_primer-{datetime.now().strftime('%H%M%S')}.pdf"
        export_report_pdf_with_filters(
            token_mgr=primer_token_mgr,
            group_id=primer_group_id,
            report_id=primer_report_id,
            export_filter=None,              # <- NO FILTERS
            outfile=primer_out,
            include_hidden_pages=False,
            poll_timeout_s=300
        )
        print(f"[INFO] Primer export completed: {primer_out.name}")
    except Exception as ex:
        print(f"[WARN] Primer export failed (continuing anyway): {ex}")

    print("[INFO] Waiting 300 seconds (5 minutes) before main exports...")
    time.sleep(300)

if __name__ == "__main__":
    prime_report()
    bulk_email(AUDIT_VIEW_NAME, is_getting_started=False, max_workers=8)

 

 


If you found this helpful, consider giving some Kudos. If I answered your question or solved your problem, mark this post as the solution.

If you found this helpful, consider giving some Kudos. If I answered your question or solved your problem, mark this post as the solution.

View solution in original post

5 REPLIES 5
tayloramy
Community Champion
Community Champion

Hi @venkatakrish_01

 

If you must work natively in Fabric, I don't think you can do it. 

 

 Power Automate is the easiest approach, but that is not an option for you, so what you could also try is building your own custom subscription logic using the export to file API: Reports - Export To File - REST API (Power BI Power BI REST APIs) | Microsoft Learn

 

I have a python script running on a server in my network that uses this API to download reports, and then email them to users creating my own custom subscription system. Maybe you can do something similar? 

If you found this helpful, consider giving some Kudos. If I answered your question or solved your problem, mark this post as the solution.

If you found this helpful, consider giving some Kudos. If I answered your question or solved your problem, mark this post as the solution.

Hi @tayloramy @can we discuss more about this 

Hi @venkatakrish_01,  What I am doing in my python script is first I am building a filter string and then I use that filter string to start an export using the APIs: 

{PBI_API_BASE}/groups/{group_id}/reports/{report_id}/ExportTo
 
Here is more or less my full script: 
 
Some customization will be required. 
 
My script is for audits, so I have another process that identifies which users need to get the report, and puts their emails into a table, which is the AUDIT_VIEW_NAME variable. 
then I loop through each one and generate a report using their email in the filter, so each report is unique to the user. 
 
import email
import os, re, time, json, pathlib, pyodbc, requests, math, random
import pandas as pd
from datetime import datetime, date, timedelta
from msal import ConfidentialClientApplication
from typing import Union, Sequence, Optional, Tuple
import smtplib
import mimetypes
from email.message import EmailMessage
from email.utils import formatdate, make_msgid
from concurrent.futures import ThreadPoolExecutor, as_completed
from sqlalchemy import create_engine

from dotenv import load_dotenv
load_dotenv()

# ===== Entra app / Power BI REST =====
CLIENT_ID     = os.getenv('CLIENT_ID')
CLIENT_SECRET = os.getenv('CLIENT_SECRET')
TENANT_ID     = os.getenv('TENANT_ID')
AUTHORITY     = f'https://login.microsoftonline.com/{TENANT_ID}'
SCOPE         = ['https://analysis.windows.net/powerbi/api/.default']
PBI_API_BASE  = "https://api.powerbi.com/v1.0/myorg"

# ======= BIC ==========
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST')
DB_NAME = os.getenv('DB_NAME')
DB_DRIVER = os.getenv('DB_DRIVER')
DB_URL = f'mssql+pyodbc://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}?driver={DB_DRIVER}'



AUDIT_VIEW_NAME = " "
GETTING_STARTED_VIEW_NAME = " "

# ===== Report =====
REPORT_URL = " "

# use EXPORT_PATH var and then append current date
OUTDIR = pathlib.Path(os.getenv('EXPORT_PATH')) / datetime.now().strftime("%Y%m%d")


# ===== Email (SMTP) =====
SMTP_HOST     = os.getenv("SMTP_HOST", " ")
SENDER_EMAIL  = os.getenv("SENDER_EMAIL", " ")
DEFAULT_CC    = os.getenv("DEFAULT_CC", " ")
DEFAULT_BCC   = os.getenv("DEFAULT_BCC", " ")

WORKSPACE_REQUEST_LINK = "https://albertahealthservices.sharepoint.com/:l:/s/SP10344/FF-vZ2eBLo1MuFTBoZpq9JIBcY_Yyo9Lx6ZpgtS_Gi4CDQ?nav=MzczOTc0YTktMGNmZC00ZGYxLTgyZmYtYmE3MjUxNzNiNTMx"
LEARN_LINK = "https://learn.microsoft.com/en-us/power-bi/fundamentals/service-get-started"
COP_LINK = "https://albertahealthservices.sharepoint.com/sites/SP10344/SitePages/Power-BI-Community-of-Practice_Landing.aspx"

# ==========================================
# email templates
# (PLAIN_TEXT_TEMPLATE, HTML_TEMPLATE, GETTING_STARTED_* templates)
# ==========================================

PLAIN_TEXT_TEMPLATE = ""

HTML_TEMPLATE = ""
# ------------------------------
# Token manager with refresh
# ------------------------------
class TokenManager:
    def __init__(self, client_id: str, client_secret: str, authority: str, scope: Sequence[str]):
        self.app = ConfidentialClientApplication(
            client_id, authority=authority, client_credential=client_secret
        )
        self.scope = scope
        self._token: Optional[str] = None
        self._expires_on: int = 0  # epoch seconds

    def get_token(self) -> str:
        now = int(time.time())
        # Refresh if missing or expiring within 5 minutes
        if not self._token or now >= (self._expires_on - 300):
            result = self.app.acquire_token_silent(self.scope, account=None)
            if not result:
                result = self.app.acquire_token_for_client(scopes=self.scope)
            if "access_token" not in result:
                raise RuntimeError(f"Token error: {result}")
            self._token = result["access_token"]
            # MSAL returns either expires_in (seconds) or expires_on (epoch)
            if "expires_on" in result:
                self._expires_on = int(result["expires_on"])
            else:
                self._expires_on = now + int(result.get("expires_in", 3600))
        return self._token

# ------------------------------
# Helpers
# ------------------------------
def parse_ids_from_url(report_url: str) -> Tuple[str, str]:
    m = re.search(r"/groups/([0-9a-fA-F-]+)/reports/([0-9a-fA-F-]+)", report_url)
    if not m:
        raise ValueError("Could not parse groupId/reportId from REPORT_URL")
    return m.group(1), m.group(2)

def fetch_email_date_rows(view) -> pd.DataFrame:
    
    sql = f"SELECT user_to_email, DateFilter FROM {view};"
    engine = create_engine(DB_URL)
    with engine.connect() as con:
        df = pd.read_sql(sql, con)
    if "user_to_email" not in df.columns or "DateFilter" not in df.columns:
        raise ValueError(f"View must expose columns user_to_email and DateFilter. Got: {list(df.columns)}")
    df = df.dropna(subset=["user_to_email", "DateFilter"])
    df["user_to_email"] = df["user_to_email"].astype(str).str.strip()
    return df.drop_duplicates()

def normalize_date_to_str(dval) -> str:
    if isinstance(dval, datetime):
        return dval.date().isoformat()
    if isinstance(dval, date):
        return dval.isoformat()
    try:
        return datetime.fromisoformat(str(dval)).date().isoformat()
    except Exception:
        return str(dval)[:10]

def build_export_filter(user_email: str, date_yyyy_mm_dd: str) -> str:
    safe_email = user_email.replace("'", "''")
    d = datetime.strptime(date_yyyy_mm_dd, "%Y-%m-%d").date()
    next_day = d + timedelta(days=1)
    return (
        f"DimUser-Recipient/user_email eq '{safe_email}' "
        f"and DimDate/Date ge {d.isoformat()} "
        f"and DimDate/Date lt {next_day.isoformat()}"
    )

def sanitize_filename_part(s: str) -> str:
    return re.sub(r'[^A-Za-z0-9._-]+', '_', s)

# -----------------------------------------------
#  resilient HTTP with basic retry/backoff
# -----------------------------------------------
def _http_with_retry(method: str, url: str, session: Optional[requests.Session] = None,
                     max_tries: int = 5, backoff_base: float = 0.75, **kwargs) -> requests.Response:
    sess = session or requests.Session()
    last_err = None
    for attempt in range(1, max_tries + 1):
        try:
            resp = sess.request(method, url, timeout=60, **kwargs)
            # Retry on throttling or 5xx
            if resp.status_code in (429, 500, 502, 503, 504):
                raise requests.HTTPError(f"Transient HTTP {resp.status_code}", response=resp)
            resp.raise_for_status()
            return resp
        except Exception as ex:
            last_err = ex
            sleep_s = backoff_base * (2 ** (attempt - 1)) + random.uniform(0, 0.3)
            time.sleep(min(sleep_s, 8.0))
    raise last_err

# --------------------------------------------------------
#  export uses TokenManager and resilient HTTP
# --------------------------------------------------------
def export_report_pdf_with_filters(token_mgr: TokenManager, group_id: str, report_id: str,
                                   export_filter: Optional[str], outfile: pathlib.Path,
                                   include_hidden_pages: bool = False, poll_timeout_s: int = 300):
    session = requests.Session()
    headers = {"Authorization": f"Bearer {token_mgr.get_token()}", "Content-Type": "application/json"}
    powerbi_cfg = {"settings": {"includeHiddenPages": include_hidden_pages}}
    if export_filter:
        powerbi_cfg["reportLevelFilters"] = [{"filter": export_filter}]

    body = {
        "format": "PDF",
        "powerBIReportConfiguration": powerbi_cfg
    }
    # 1) start
    start = _http_with_retry(
        "POST",
        f"{PBI_API_BASE}/groups/{group_id}/reports/{report_id}/ExportTo",
        session=session,
        headers=headers, data=json.dumps(body)
    )
    export_id = start.json()["id"]

    # 2) poll
    status_url = f"{PBI_API_BASE}/groups/{group_id}/reports/{report_id}/exports/{export_id}"
    t0 = time.time()
    while True:
        # refresh token if needed during long polls
        headers["Authorization"] = f"Bearer {token_mgr.get_token()}"
        s = _http_with_retry("GET", status_url, session=session, headers=headers)
        st = s.json().get("status")
        if st == "Succeeded":
            break
        if st == "Failed":
            raise RuntimeError(f"Export failed: {s.text}")
        retry_after = s.headers.get("Retry-After")
        time.sleep(int(retry_after) if retry_after and retry_after.isdigit() else 2)
        if time.time() - t0 > poll_timeout_s:
            raise TimeoutError(f"Polling timed out after {poll_timeout_s}s; last status={st}")

    # 3) download
    headers["Authorization"] = f"Bearer {token_mgr.get_token()}"
    file_resp = _http_with_retry("GET", f"{status_url}/file", session=session, headers=headers)
    outfile.parent.mkdir(parents=True, exist_ok=True)
    outfile.write_bytes(file_resp.content)
    return outfile

# --------------------------------------------------------
#  email send with connection-per-send + retries
# --------------------------------------------------------
def send_email_with_attachments(
    sender: str,
    to_recipients: Union[str, Sequence[str]],
    subject: str,
    body_text: str,
    attachments: Optional[Union[pathlib.Path, Sequence[pathlib.Path]]] = None,
    cc_recipients: Optional[Union[str, Sequence[str]]] = None,
    bcc_recipients: Optional[Union[str, Sequence[str]]] = None,
    html_body: Optional[str] = None,
    smtp_host: str = SMTP_HOST,
    max_tries: int = 4,
) -> None:
    def _norm(v):
        if not v:
            return []
        if isinstance(v, str):
            parts = [p.strip() for p in re.split(r"[;,]", v) if p.strip()]
            return parts
        return list(v)

    to_list  = _norm(to_recipients)
    cc_list  = _norm(cc_recipients)
    bcc_list = _norm(bcc_recipients)
    if not to_list:
        raise ValueError("No 'To' recipients provided")

    msg = EmailMessage()
    msg["Subject"] = subject
    msg["From"]    = sender
    msg["To"]      = ", ".join(to_list)
    if cc_list:
        msg["Cc"]  = ", ".join(cc_list)
    msg["Date"] = formatdate(localtime=True)
    msg["Message-ID"] = make_msgid()

    msg.set_content(body_text or "")
    if html_body:
        msg.add_alternative(html_body, subtype="html")

    if attachments:
        files = attachments if isinstance(attachments, Sequence) and not isinstance(attachments, (str, bytes, pathlib.Path)) else [attachments]
        for fpath in files:
            fpath = pathlib.Path(fpath)
            if not fpath.exists():
                continue
            ctype, encoding = mimetypes.guess_type(str(fpath))
            if ctype is None or encoding is not None:
                ctype = "application/octet-stream"
            maintype, subtype = ctype.split("/", 1)
            with open(fpath, "rb") as fp:
                msg.add_attachment(fp.read(), maintype=maintype, subtype=subtype, filename=fpath.name)

    all_rcpts = to_list + cc_list + bcc_list

    last_err = None
    for attempt in range(1, max_tries + 1):
        try:
            with smtplib.SMTP(smtp_host, timeout=60) as smtp:
                # If your server requires TLS or auth, add here:
                # smtp.starttls()
                # smtp.login(USER, PASS)
                smtp.ehlo()
                smtp.send_message(msg, from_addr=sender, to_addrs=all_rcpts)
            return
        except Exception as ex:
            last_err = ex
            # basic backoff
            time.sleep(min(2 ** attempt, 8))
    raise last_err

def derive_first_name_from_email(email_addr: str) -> str:
    local = email_addr.split("@", 1)[0]
    base = re.split(r"[._-]", local)[0] if local else ""
    return base.capitalize() if base else "Colleague"

# --------------------------------------------------------
# worker to process a single row (export + email)
# --------------------------------------------------------
def process_one_entry(row_email: str, row_date, group_id: str, report_id: str,
                      token_mgr: TokenManager, is_getting_started: bool = False) -> Tuple[str, str, Optional[str]]:
    email_addr = str(row_email).strip()
    date_str = normalize_date_to_str(row_date)
    export_filter = build_export_filter(email_addr, date_str)
    outfile = OUTDIR / f"export-{sanitize_filename_part(email_addr)}-{date_str}.pdf"

    export_report_pdf_with_filters(token_mgr, group_id, report_id, export_filter, outfile)

    first_name = derive_first_name_from_email(email_addr)
    attachment_name = outfile.name

    deadline_date = (datetime.now().date() + timedelta(days=7)).isoformat()
    subject = f"You're building great things in Power BI - let's make them shareable"
    plain_body = PLAIN_TEXT_TEMPLATE.format(
        first_name=first_name,
        deadline_date=deadline_date,
        workspace_request_link=WORKSPACE_REQUEST_LINK,
        date=date_str,
        attachment_name=attachment_name,
    )
    html_body = HTML_TEMPLATE.format(
        first_name=first_name,
        deadline_date=deadline_date,
        workspace_request_link=WORKSPACE_REQUEST_LINK,
        date=date_str,
        attachment_name=attachment_name,
    )

    to_list = [email_addr]

    send_email_with_attachments(
        sender=SENDER_EMAIL,
        to_recipients=to_list,
        subject=subject,
        body_text=plain_body,
        attachments=outfile,
        cc_recipients=DEFAULT_CC or None,
        bcc_recipients=DEFAULT_BCC or None,
        html_body=html_body,
    )

    return (email_addr, date_str, outfile.name)

# --------------------------------------------------------
#  concurrent bulk functions
# --------------------------------------------------------
def bulk_email(view_name: str, is_getting_started: bool, max_workers: int = 4):
    if not all([CLIENT_ID, CLIENT_SECRET, TENANT_ID]):
        raise SystemExit("Set CLIENT_ID, CLIENT_SECRET, TENANT_ID env vars first.")
    token_mgr = TokenManager(CLIENT_ID, CLIENT_SECRET, AUTHORITY, SCOPE)
    group_id, report_id = parse_ids_from_url(REPORT_URL)

    df = fetch_email_date_rows(view_name)
    if df.empty:
        print(f"[INFO] View {view_name} returned no rows.")
        return

    label = "GettingStarted" if is_getting_started else "PersonalWorkspace"
    futures = []
    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        for row in df.itertuples(index=False):
            futures.append(
                ex.submit(
                    process_one_entry,
                    row.user_to_email,
                    row.DateFilter,
                    group_id,
                    report_id,
                    token_mgr,
                    is_getting_started,
                )
            )

        for fut in as_completed(futures):
            try:
                email_addr, date_str, fname = fut.result()
                print(f"[OK][{label}] Emailed {email_addr} for {date_str} ({fname})")
            except Exception as ex:
                print(f"[WARN][{label}] Failed: {ex}")

def prime_report():
        # --- Primer: one no-filter export to warm visuals, then wait 5 minutes ---
    try:
        primer_token_mgr = TokenManager(CLIENT_ID, CLIENT_SECRET, AUTHORITY, SCOPE)
        primer_group_id, primer_report_id = parse_ids_from_url(REPORT_URL)
        primer_out = OUTDIR / f"_primer-{datetime.now().strftime('%H%M%S')}.pdf"
        export_report_pdf_with_filters(
            token_mgr=primer_token_mgr,
            group_id=primer_group_id,
            report_id=primer_report_id,
            export_filter=None,              # <- NO FILTERS
            outfile=primer_out,
            include_hidden_pages=False,
            poll_timeout_s=300
        )
        print(f"[INFO] Primer export completed: {primer_out.name}")
    except Exception as ex:
        print(f"[WARN] Primer export failed (continuing anyway): {ex}")

    print("[INFO] Waiting 300 seconds (5 minutes) before main exports...")
    time.sleep(300)

if __name__ == "__main__":
    prime_report()
    bulk_email(AUDIT_VIEW_NAME, is_getting_started=False, max_workers=8)

 

 


If you found this helpful, consider giving some Kudos. If I answered your question or solved your problem, mark this post as the solution.

If you found this helpful, consider giving some Kudos. If I answered your question or solved your problem, mark this post as the solution.

Hi @venkatakrish_01 ,
Thanks for reaching out to the Microsoft fabric community forum.
Can you please confirm whether the issue is sorted or not.

GeraldGEmerick
Memorable Member
Memorable Member

@venkatakrish_01 My understanding is that you can set subscriptions to hourly, daily, weekly or monthly but you can't set specific, random dates, there has to be some kind of pattern like every Tuesday, etc. Power Automate could get around this issue but, apparently, you cannot use it.

Helpful resources

Announcements
Power BI DataViz World Championships

Power BI Dataviz World Championships

The Power BI Data Visualization World Championships is back! Get ahead of the game and start preparing now!

December 2025 Power BI Update Carousel

Power BI Monthly Update - December 2025

Check out the December 2025 Power BI Holiday Recap!

FabCon Atlanta 2026 carousel

FabCon Atlanta 2026

Join us at FabCon Atlanta, March 16-20, for the ultimate Fabric, Power BI, AI and SQL community-led event. Save $200 with code FABCOMM.