Join us at FabCon Atlanta from March 16 - 20, 2026, for the ultimate Fabric, Power BI, AI and SQL community-led event. Save $200 with code FABCOMM.
Register now!The Power BI Data Visualization World Championships is back! Get ahead of the game and start preparing now! Learn more
Hi
Currently, I have created 50+ paginated reports and subscribed them, and even users are getting those emails as per the subscription, but one week back, users gave a list of dates, for example, one report should generate on November 26, 2025, and another should generate on November 28, 2025, and likewise. So my question is, can I give any dates explicitly so that reports get scheduled to the email on that particular day? My limitations are that I can't use Power Automate and other tools, so please suggest any idea according.
Solved! Go to Solution.
Hi @venkatakrish_01, What I am doing in my python script is first I am building a filter string and then I use that filter string to start an export using the APIs:
import email
import os, re, time, json, pathlib, pyodbc, requests, math, random
import pandas as pd
from datetime import datetime, date, timedelta
from msal import ConfidentialClientApplication
from typing import Union, Sequence, Optional, Tuple
import smtplib
import mimetypes
from email.message import EmailMessage
from email.utils import formatdate, make_msgid
from concurrent.futures import ThreadPoolExecutor, as_completed
from sqlalchemy import create_engine
from dotenv import load_dotenv
load_dotenv()
# ===== Entra app / Power BI REST =====
CLIENT_ID = os.getenv('CLIENT_ID')
CLIENT_SECRET = os.getenv('CLIENT_SECRET')
TENANT_ID = os.getenv('TENANT_ID')
AUTHORITY = f'https://login.microsoftonline.com/{TENANT_ID}'
SCOPE = ['https://analysis.windows.net/powerbi/api/.default']
PBI_API_BASE = "https://api.powerbi.com/v1.0/myorg"
# ======= BIC ==========
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST')
DB_NAME = os.getenv('DB_NAME')
DB_DRIVER = os.getenv('DB_DRIVER')
DB_URL = f'mssql+pyodbc://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}?driver={DB_DRIVER}'
AUDIT_VIEW_NAME = " "
GETTING_STARTED_VIEW_NAME = " "
# ===== Report =====
REPORT_URL = " "
# use EXPORT_PATH var and then append current date
OUTDIR = pathlib.Path(os.getenv('EXPORT_PATH')) / datetime.now().strftime("%Y%m%d")
# ===== Email (SMTP) =====
SMTP_HOST = os.getenv("SMTP_HOST", " ")
SENDER_EMAIL = os.getenv("SENDER_EMAIL", " ")
DEFAULT_CC = os.getenv("DEFAULT_CC", " ")
DEFAULT_BCC = os.getenv("DEFAULT_BCC", " ")
WORKSPACE_REQUEST_LINK = "https://albertahealthservices.sharepoint.com/:l:/s/SP10344/FF-vZ2eBLo1MuFTBoZpq9JIBcY_Yyo9Lx6ZpgtS_Gi4CDQ?nav=MzczOTc0YTktMGNmZC00ZGYxLTgyZmYtYmE3MjUxNzNiNTMx"
LEARN_LINK = "https://learn.microsoft.com/en-us/power-bi/fundamentals/service-get-started"
COP_LINK = "https://albertahealthservices.sharepoint.com/sites/SP10344/SitePages/Power-BI-Community-of-Practice_Landing.aspx"
# ==========================================
# email templates
# (PLAIN_TEXT_TEMPLATE, HTML_TEMPLATE, GETTING_STARTED_* templates)
# ==========================================
PLAIN_TEXT_TEMPLATE = ""
HTML_TEMPLATE = ""
# ------------------------------
# Token manager with refresh
# ------------------------------
class TokenManager:
def __init__(self, client_id: str, client_secret: str, authority: str, scope: Sequence[str]):
self.app = ConfidentialClientApplication(
client_id, authority=authority, client_credential=client_secret
)
self.scope = scope
self._token: Optional[str] = None
self._expires_on: int = 0 # epoch seconds
def get_token(self) -> str:
now = int(time.time())
# Refresh if missing or expiring within 5 minutes
if not self._token or now >= (self._expires_on - 300):
result = self.app.acquire_token_silent(self.scope, account=None)
if not result:
result = self.app.acquire_token_for_client(scopes=self.scope)
if "access_token" not in result:
raise RuntimeError(f"Token error: {result}")
self._token = result["access_token"]
# MSAL returns either expires_in (seconds) or expires_on (epoch)
if "expires_on" in result:
self._expires_on = int(result["expires_on"])
else:
self._expires_on = now + int(result.get("expires_in", 3600))
return self._token
# ------------------------------
# Helpers
# ------------------------------
def parse_ids_from_url(report_url: str) -> Tuple[str, str]:
m = re.search(r"/groups/([0-9a-fA-F-]+)/reports/([0-9a-fA-F-]+)", report_url)
if not m:
raise ValueError("Could not parse groupId/reportId from REPORT_URL")
return m.group(1), m.group(2)
def fetch_email_date_rows(view) -> pd.DataFrame:
sql = f"SELECT user_to_email, DateFilter FROM {view};"
engine = create_engine(DB_URL)
with engine.connect() as con:
df = pd.read_sql(sql, con)
if "user_to_email" not in df.columns or "DateFilter" not in df.columns:
raise ValueError(f"View must expose columns user_to_email and DateFilter. Got: {list(df.columns)}")
df = df.dropna(subset=["user_to_email", "DateFilter"])
df["user_to_email"] = df["user_to_email"].astype(str).str.strip()
return df.drop_duplicates()
def normalize_date_to_str(dval) -> str:
if isinstance(dval, datetime):
return dval.date().isoformat()
if isinstance(dval, date):
return dval.isoformat()
try:
return datetime.fromisoformat(str(dval)).date().isoformat()
except Exception:
return str(dval)[:10]
def build_export_filter(user_email: str, date_yyyy_mm_dd: str) -> str:
safe_email = user_email.replace("'", "''")
d = datetime.strptime(date_yyyy_mm_dd, "%Y-%m-%d").date()
next_day = d + timedelta(days=1)
return (
f"DimUser-Recipient/user_email eq '{safe_email}' "
f"and DimDate/Date ge {d.isoformat()} "
f"and DimDate/Date lt {next_day.isoformat()}"
)
def sanitize_filename_part(s: str) -> str:
return re.sub(r'[^A-Za-z0-9._-]+', '_', s)
# -----------------------------------------------
# resilient HTTP with basic retry/backoff
# -----------------------------------------------
def _http_with_retry(method: str, url: str, session: Optional[requests.Session] = None,
max_tries: int = 5, backoff_base: float = 0.75, **kwargs) -> requests.Response:
sess = session or requests.Session()
last_err = None
for attempt in range(1, max_tries + 1):
try:
resp = sess.request(method, url, timeout=60, **kwargs)
# Retry on throttling or 5xx
if resp.status_code in (429, 500, 502, 503, 504):
raise requests.HTTPError(f"Transient HTTP {resp.status_code}", response=resp)
resp.raise_for_status()
return resp
except Exception as ex:
last_err = ex
sleep_s = backoff_base * (2 ** (attempt - 1)) + random.uniform(0, 0.3)
time.sleep(min(sleep_s, 8.0))
raise last_err
# --------------------------------------------------------
# export uses TokenManager and resilient HTTP
# --------------------------------------------------------
def export_report_pdf_with_filters(token_mgr: TokenManager, group_id: str, report_id: str,
export_filter: Optional[str], outfile: pathlib.Path,
include_hidden_pages: bool = False, poll_timeout_s: int = 300):
session = requests.Session()
headers = {"Authorization": f"Bearer {token_mgr.get_token()}", "Content-Type": "application/json"}
powerbi_cfg = {"settings": {"includeHiddenPages": include_hidden_pages}}
if export_filter:
powerbi_cfg["reportLevelFilters"] = [{"filter": export_filter}]
body = {
"format": "PDF",
"powerBIReportConfiguration": powerbi_cfg
}
# 1) start
start = _http_with_retry(
"POST",
f"{PBI_API_BASE}/groups/{group_id}/reports/{report_id}/ExportTo",
session=session,
headers=headers, data=json.dumps(body)
)
export_id = start.json()["id"]
# 2) poll
status_url = f"{PBI_API_BASE}/groups/{group_id}/reports/{report_id}/exports/{export_id}"
t0 = time.time()
while True:
# refresh token if needed during long polls
headers["Authorization"] = f"Bearer {token_mgr.get_token()}"
s = _http_with_retry("GET", status_url, session=session, headers=headers)
st = s.json().get("status")
if st == "Succeeded":
break
if st == "Failed":
raise RuntimeError(f"Export failed: {s.text}")
retry_after = s.headers.get("Retry-After")
time.sleep(int(retry_after) if retry_after and retry_after.isdigit() else 2)
if time.time() - t0 > poll_timeout_s:
raise TimeoutError(f"Polling timed out after {poll_timeout_s}s; last status={st}")
# 3) download
headers["Authorization"] = f"Bearer {token_mgr.get_token()}"
file_resp = _http_with_retry("GET", f"{status_url}/file", session=session, headers=headers)
outfile.parent.mkdir(parents=True, exist_ok=True)
outfile.write_bytes(file_resp.content)
return outfile
# --------------------------------------------------------
# email send with connection-per-send + retries
# --------------------------------------------------------
def send_email_with_attachments(
sender: str,
to_recipients: Union[str, Sequence[str]],
subject: str,
body_text: str,
attachments: Optional[Union[pathlib.Path, Sequence[pathlib.Path]]] = None,
cc_recipients: Optional[Union[str, Sequence[str]]] = None,
bcc_recipients: Optional[Union[str, Sequence[str]]] = None,
html_body: Optional[str] = None,
smtp_host: str = SMTP_HOST,
max_tries: int = 4,
) -> None:
def _norm(v):
if not v:
return []
if isinstance(v, str):
parts = [p.strip() for p in re.split(r"[;,]", v) if p.strip()]
return parts
return list(v)
to_list = _norm(to_recipients)
cc_list = _norm(cc_recipients)
bcc_list = _norm(bcc_recipients)
if not to_list:
raise ValueError("No 'To' recipients provided")
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = sender
msg["To"] = ", ".join(to_list)
if cc_list:
msg["Cc"] = ", ".join(cc_list)
msg["Date"] = formatdate(localtime=True)
msg["Message-ID"] = make_msgid()
msg.set_content(body_text or "")
if html_body:
msg.add_alternative(html_body, subtype="html")
if attachments:
files = attachments if isinstance(attachments, Sequence) and not isinstance(attachments, (str, bytes, pathlib.Path)) else [attachments]
for fpath in files:
fpath = pathlib.Path(fpath)
if not fpath.exists():
continue
ctype, encoding = mimetypes.guess_type(str(fpath))
if ctype is None or encoding is not None:
ctype = "application/octet-stream"
maintype, subtype = ctype.split("/", 1)
with open(fpath, "rb") as fp:
msg.add_attachment(fp.read(), maintype=maintype, subtype=subtype, filename=fpath.name)
all_rcpts = to_list + cc_list + bcc_list
last_err = None
for attempt in range(1, max_tries + 1):
try:
with smtplib.SMTP(smtp_host, timeout=60) as smtp:
# If your server requires TLS or auth, add here:
# smtp.starttls()
# smtp.login(USER, PASS)
smtp.ehlo()
smtp.send_message(msg, from_addr=sender, to_addrs=all_rcpts)
return
except Exception as ex:
last_err = ex
# basic backoff
time.sleep(min(2 ** attempt, 8))
raise last_err
def derive_first_name_from_email(email_addr: str) -> str:
local = email_addr.split("@", 1)[0]
base = re.split(r"[._-]", local)[0] if local else ""
return base.capitalize() if base else "Colleague"
# --------------------------------------------------------
# worker to process a single row (export + email)
# --------------------------------------------------------
def process_one_entry(row_email: str, row_date, group_id: str, report_id: str,
token_mgr: TokenManager, is_getting_started: bool = False) -> Tuple[str, str, Optional[str]]:
email_addr = str(row_email).strip()
date_str = normalize_date_to_str(row_date)
export_filter = build_export_filter(email_addr, date_str)
outfile = OUTDIR / f"export-{sanitize_filename_part(email_addr)}-{date_str}.pdf"
export_report_pdf_with_filters(token_mgr, group_id, report_id, export_filter, outfile)
first_name = derive_first_name_from_email(email_addr)
attachment_name = outfile.name
deadline_date = (datetime.now().date() + timedelta(days=7)).isoformat()
subject = f"You're building great things in Power BI - let's make them shareable"
plain_body = PLAIN_TEXT_TEMPLATE.format(
first_name=first_name,
deadline_date=deadline_date,
workspace_request_link=WORKSPACE_REQUEST_LINK,
date=date_str,
attachment_name=attachment_name,
)
html_body = HTML_TEMPLATE.format(
first_name=first_name,
deadline_date=deadline_date,
workspace_request_link=WORKSPACE_REQUEST_LINK,
date=date_str,
attachment_name=attachment_name,
)
to_list = [email_addr]
send_email_with_attachments(
sender=SENDER_EMAIL,
to_recipients=to_list,
subject=subject,
body_text=plain_body,
attachments=outfile,
cc_recipients=DEFAULT_CC or None,
bcc_recipients=DEFAULT_BCC or None,
html_body=html_body,
)
return (email_addr, date_str, outfile.name)
# --------------------------------------------------------
# concurrent bulk functions
# --------------------------------------------------------
def bulk_email(view_name: str, is_getting_started: bool, max_workers: int = 4):
if not all([CLIENT_ID, CLIENT_SECRET, TENANT_ID]):
raise SystemExit("Set CLIENT_ID, CLIENT_SECRET, TENANT_ID env vars first.")
token_mgr = TokenManager(CLIENT_ID, CLIENT_SECRET, AUTHORITY, SCOPE)
group_id, report_id = parse_ids_from_url(REPORT_URL)
df = fetch_email_date_rows(view_name)
if df.empty:
print(f"[INFO] View {view_name} returned no rows.")
return
label = "GettingStarted" if is_getting_started else "PersonalWorkspace"
futures = []
with ThreadPoolExecutor(max_workers=max_workers) as ex:
for row in df.itertuples(index=False):
futures.append(
ex.submit(
process_one_entry,
row.user_to_email,
row.DateFilter,
group_id,
report_id,
token_mgr,
is_getting_started,
)
)
for fut in as_completed(futures):
try:
email_addr, date_str, fname = fut.result()
print(f"[OK][{label}] Emailed {email_addr} for {date_str} ({fname})")
except Exception as ex:
print(f"[WARN][{label}] Failed: {ex}")
def prime_report():
# --- Primer: one no-filter export to warm visuals, then wait 5 minutes ---
try:
primer_token_mgr = TokenManager(CLIENT_ID, CLIENT_SECRET, AUTHORITY, SCOPE)
primer_group_id, primer_report_id = parse_ids_from_url(REPORT_URL)
primer_out = OUTDIR / f"_primer-{datetime.now().strftime('%H%M%S')}.pdf"
export_report_pdf_with_filters(
token_mgr=primer_token_mgr,
group_id=primer_group_id,
report_id=primer_report_id,
export_filter=None, # <- NO FILTERS
outfile=primer_out,
include_hidden_pages=False,
poll_timeout_s=300
)
print(f"[INFO] Primer export completed: {primer_out.name}")
except Exception as ex:
print(f"[WARN] Primer export failed (continuing anyway): {ex}")
print("[INFO] Waiting 300 seconds (5 minutes) before main exports...")
time.sleep(300)
if __name__ == "__main__":
prime_report()
bulk_email(AUDIT_VIEW_NAME, is_getting_started=False, max_workers=8)
If you found this helpful, consider giving some Kudos. If I answered your question or solved your problem, mark this post as the solution.
Hi @venkatakrish_01,
If you must work natively in Fabric, I don't think you can do it.
Power Automate is the easiest approach, but that is not an option for you, so what you could also try is building your own custom subscription logic using the export to file API: Reports - Export To File - REST API (Power BI Power BI REST APIs) | Microsoft Learn
I have a python script running on a server in my network that uses this API to download reports, and then email them to users creating my own custom subscription system. Maybe you can do something similar?
If you found this helpful, consider giving some Kudos. If I answered your question or solved your problem, mark this post as the solution.
Hi @venkatakrish_01, What I am doing in my python script is first I am building a filter string and then I use that filter string to start an export using the APIs:
import email
import os, re, time, json, pathlib, pyodbc, requests, math, random
import pandas as pd
from datetime import datetime, date, timedelta
from msal import ConfidentialClientApplication
from typing import Union, Sequence, Optional, Tuple
import smtplib
import mimetypes
from email.message import EmailMessage
from email.utils import formatdate, make_msgid
from concurrent.futures import ThreadPoolExecutor, as_completed
from sqlalchemy import create_engine
from dotenv import load_dotenv
load_dotenv()
# ===== Entra app / Power BI REST =====
CLIENT_ID = os.getenv('CLIENT_ID')
CLIENT_SECRET = os.getenv('CLIENT_SECRET')
TENANT_ID = os.getenv('TENANT_ID')
AUTHORITY = f'https://login.microsoftonline.com/{TENANT_ID}'
SCOPE = ['https://analysis.windows.net/powerbi/api/.default']
PBI_API_BASE = "https://api.powerbi.com/v1.0/myorg"
# ======= BIC ==========
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST')
DB_NAME = os.getenv('DB_NAME')
DB_DRIVER = os.getenv('DB_DRIVER')
DB_URL = f'mssql+pyodbc://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}?driver={DB_DRIVER}'
AUDIT_VIEW_NAME = " "
GETTING_STARTED_VIEW_NAME = " "
# ===== Report =====
REPORT_URL = " "
# use EXPORT_PATH var and then append current date
OUTDIR = pathlib.Path(os.getenv('EXPORT_PATH')) / datetime.now().strftime("%Y%m%d")
# ===== Email (SMTP) =====
SMTP_HOST = os.getenv("SMTP_HOST", " ")
SENDER_EMAIL = os.getenv("SENDER_EMAIL", " ")
DEFAULT_CC = os.getenv("DEFAULT_CC", " ")
DEFAULT_BCC = os.getenv("DEFAULT_BCC", " ")
WORKSPACE_REQUEST_LINK = "https://albertahealthservices.sharepoint.com/:l:/s/SP10344/FF-vZ2eBLo1MuFTBoZpq9JIBcY_Yyo9Lx6ZpgtS_Gi4CDQ?nav=MzczOTc0YTktMGNmZC00ZGYxLTgyZmYtYmE3MjUxNzNiNTMx"
LEARN_LINK = "https://learn.microsoft.com/en-us/power-bi/fundamentals/service-get-started"
COP_LINK = "https://albertahealthservices.sharepoint.com/sites/SP10344/SitePages/Power-BI-Community-of-Practice_Landing.aspx"
# ==========================================
# email templates
# (PLAIN_TEXT_TEMPLATE, HTML_TEMPLATE, GETTING_STARTED_* templates)
# ==========================================
PLAIN_TEXT_TEMPLATE = ""
HTML_TEMPLATE = ""
# ------------------------------
# Token manager with refresh
# ------------------------------
class TokenManager:
def __init__(self, client_id: str, client_secret: str, authority: str, scope: Sequence[str]):
self.app = ConfidentialClientApplication(
client_id, authority=authority, client_credential=client_secret
)
self.scope = scope
self._token: Optional[str] = None
self._expires_on: int = 0 # epoch seconds
def get_token(self) -> str:
now = int(time.time())
# Refresh if missing or expiring within 5 minutes
if not self._token or now >= (self._expires_on - 300):
result = self.app.acquire_token_silent(self.scope, account=None)
if not result:
result = self.app.acquire_token_for_client(scopes=self.scope)
if "access_token" not in result:
raise RuntimeError(f"Token error: {result}")
self._token = result["access_token"]
# MSAL returns either expires_in (seconds) or expires_on (epoch)
if "expires_on" in result:
self._expires_on = int(result["expires_on"])
else:
self._expires_on = now + int(result.get("expires_in", 3600))
return self._token
# ------------------------------
# Helpers
# ------------------------------
def parse_ids_from_url(report_url: str) -> Tuple[str, str]:
m = re.search(r"/groups/([0-9a-fA-F-]+)/reports/([0-9a-fA-F-]+)", report_url)
if not m:
raise ValueError("Could not parse groupId/reportId from REPORT_URL")
return m.group(1), m.group(2)
def fetch_email_date_rows(view) -> pd.DataFrame:
sql = f"SELECT user_to_email, DateFilter FROM {view};"
engine = create_engine(DB_URL)
with engine.connect() as con:
df = pd.read_sql(sql, con)
if "user_to_email" not in df.columns or "DateFilter" not in df.columns:
raise ValueError(f"View must expose columns user_to_email and DateFilter. Got: {list(df.columns)}")
df = df.dropna(subset=["user_to_email", "DateFilter"])
df["user_to_email"] = df["user_to_email"].astype(str).str.strip()
return df.drop_duplicates()
def normalize_date_to_str(dval) -> str:
if isinstance(dval, datetime):
return dval.date().isoformat()
if isinstance(dval, date):
return dval.isoformat()
try:
return datetime.fromisoformat(str(dval)).date().isoformat()
except Exception:
return str(dval)[:10]
def build_export_filter(user_email: str, date_yyyy_mm_dd: str) -> str:
safe_email = user_email.replace("'", "''")
d = datetime.strptime(date_yyyy_mm_dd, "%Y-%m-%d").date()
next_day = d + timedelta(days=1)
return (
f"DimUser-Recipient/user_email eq '{safe_email}' "
f"and DimDate/Date ge {d.isoformat()} "
f"and DimDate/Date lt {next_day.isoformat()}"
)
def sanitize_filename_part(s: str) -> str:
return re.sub(r'[^A-Za-z0-9._-]+', '_', s)
# -----------------------------------------------
# resilient HTTP with basic retry/backoff
# -----------------------------------------------
def _http_with_retry(method: str, url: str, session: Optional[requests.Session] = None,
max_tries: int = 5, backoff_base: float = 0.75, **kwargs) -> requests.Response:
sess = session or requests.Session()
last_err = None
for attempt in range(1, max_tries + 1):
try:
resp = sess.request(method, url, timeout=60, **kwargs)
# Retry on throttling or 5xx
if resp.status_code in (429, 500, 502, 503, 504):
raise requests.HTTPError(f"Transient HTTP {resp.status_code}", response=resp)
resp.raise_for_status()
return resp
except Exception as ex:
last_err = ex
sleep_s = backoff_base * (2 ** (attempt - 1)) + random.uniform(0, 0.3)
time.sleep(min(sleep_s, 8.0))
raise last_err
# --------------------------------------------------------
# export uses TokenManager and resilient HTTP
# --------------------------------------------------------
def export_report_pdf_with_filters(token_mgr: TokenManager, group_id: str, report_id: str,
export_filter: Optional[str], outfile: pathlib.Path,
include_hidden_pages: bool = False, poll_timeout_s: int = 300):
session = requests.Session()
headers = {"Authorization": f"Bearer {token_mgr.get_token()}", "Content-Type": "application/json"}
powerbi_cfg = {"settings": {"includeHiddenPages": include_hidden_pages}}
if export_filter:
powerbi_cfg["reportLevelFilters"] = [{"filter": export_filter}]
body = {
"format": "PDF",
"powerBIReportConfiguration": powerbi_cfg
}
# 1) start
start = _http_with_retry(
"POST",
f"{PBI_API_BASE}/groups/{group_id}/reports/{report_id}/ExportTo",
session=session,
headers=headers, data=json.dumps(body)
)
export_id = start.json()["id"]
# 2) poll
status_url = f"{PBI_API_BASE}/groups/{group_id}/reports/{report_id}/exports/{export_id}"
t0 = time.time()
while True:
# refresh token if needed during long polls
headers["Authorization"] = f"Bearer {token_mgr.get_token()}"
s = _http_with_retry("GET", status_url, session=session, headers=headers)
st = s.json().get("status")
if st == "Succeeded":
break
if st == "Failed":
raise RuntimeError(f"Export failed: {s.text}")
retry_after = s.headers.get("Retry-After")
time.sleep(int(retry_after) if retry_after and retry_after.isdigit() else 2)
if time.time() - t0 > poll_timeout_s:
raise TimeoutError(f"Polling timed out after {poll_timeout_s}s; last status={st}")
# 3) download
headers["Authorization"] = f"Bearer {token_mgr.get_token()}"
file_resp = _http_with_retry("GET", f"{status_url}/file", session=session, headers=headers)
outfile.parent.mkdir(parents=True, exist_ok=True)
outfile.write_bytes(file_resp.content)
return outfile
# --------------------------------------------------------
# email send with connection-per-send + retries
# --------------------------------------------------------
def send_email_with_attachments(
sender: str,
to_recipients: Union[str, Sequence[str]],
subject: str,
body_text: str,
attachments: Optional[Union[pathlib.Path, Sequence[pathlib.Path]]] = None,
cc_recipients: Optional[Union[str, Sequence[str]]] = None,
bcc_recipients: Optional[Union[str, Sequence[str]]] = None,
html_body: Optional[str] = None,
smtp_host: str = SMTP_HOST,
max_tries: int = 4,
) -> None:
def _norm(v):
if not v:
return []
if isinstance(v, str):
parts = [p.strip() for p in re.split(r"[;,]", v) if p.strip()]
return parts
return list(v)
to_list = _norm(to_recipients)
cc_list = _norm(cc_recipients)
bcc_list = _norm(bcc_recipients)
if not to_list:
raise ValueError("No 'To' recipients provided")
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = sender
msg["To"] = ", ".join(to_list)
if cc_list:
msg["Cc"] = ", ".join(cc_list)
msg["Date"] = formatdate(localtime=True)
msg["Message-ID"] = make_msgid()
msg.set_content(body_text or "")
if html_body:
msg.add_alternative(html_body, subtype="html")
if attachments:
files = attachments if isinstance(attachments, Sequence) and not isinstance(attachments, (str, bytes, pathlib.Path)) else [attachments]
for fpath in files:
fpath = pathlib.Path(fpath)
if not fpath.exists():
continue
ctype, encoding = mimetypes.guess_type(str(fpath))
if ctype is None or encoding is not None:
ctype = "application/octet-stream"
maintype, subtype = ctype.split("/", 1)
with open(fpath, "rb") as fp:
msg.add_attachment(fp.read(), maintype=maintype, subtype=subtype, filename=fpath.name)
all_rcpts = to_list + cc_list + bcc_list
last_err = None
for attempt in range(1, max_tries + 1):
try:
with smtplib.SMTP(smtp_host, timeout=60) as smtp:
# If your server requires TLS or auth, add here:
# smtp.starttls()
# smtp.login(USER, PASS)
smtp.ehlo()
smtp.send_message(msg, from_addr=sender, to_addrs=all_rcpts)
return
except Exception as ex:
last_err = ex
# basic backoff
time.sleep(min(2 ** attempt, 8))
raise last_err
def derive_first_name_from_email(email_addr: str) -> str:
local = email_addr.split("@", 1)[0]
base = re.split(r"[._-]", local)[0] if local else ""
return base.capitalize() if base else "Colleague"
# --------------------------------------------------------
# worker to process a single row (export + email)
# --------------------------------------------------------
def process_one_entry(row_email: str, row_date, group_id: str, report_id: str,
token_mgr: TokenManager, is_getting_started: bool = False) -> Tuple[str, str, Optional[str]]:
email_addr = str(row_email).strip()
date_str = normalize_date_to_str(row_date)
export_filter = build_export_filter(email_addr, date_str)
outfile = OUTDIR / f"export-{sanitize_filename_part(email_addr)}-{date_str}.pdf"
export_report_pdf_with_filters(token_mgr, group_id, report_id, export_filter, outfile)
first_name = derive_first_name_from_email(email_addr)
attachment_name = outfile.name
deadline_date = (datetime.now().date() + timedelta(days=7)).isoformat()
subject = f"You're building great things in Power BI - let's make them shareable"
plain_body = PLAIN_TEXT_TEMPLATE.format(
first_name=first_name,
deadline_date=deadline_date,
workspace_request_link=WORKSPACE_REQUEST_LINK,
date=date_str,
attachment_name=attachment_name,
)
html_body = HTML_TEMPLATE.format(
first_name=first_name,
deadline_date=deadline_date,
workspace_request_link=WORKSPACE_REQUEST_LINK,
date=date_str,
attachment_name=attachment_name,
)
to_list = [email_addr]
send_email_with_attachments(
sender=SENDER_EMAIL,
to_recipients=to_list,
subject=subject,
body_text=plain_body,
attachments=outfile,
cc_recipients=DEFAULT_CC or None,
bcc_recipients=DEFAULT_BCC or None,
html_body=html_body,
)
return (email_addr, date_str, outfile.name)
# --------------------------------------------------------
# concurrent bulk functions
# --------------------------------------------------------
def bulk_email(view_name: str, is_getting_started: bool, max_workers: int = 4):
if not all([CLIENT_ID, CLIENT_SECRET, TENANT_ID]):
raise SystemExit("Set CLIENT_ID, CLIENT_SECRET, TENANT_ID env vars first.")
token_mgr = TokenManager(CLIENT_ID, CLIENT_SECRET, AUTHORITY, SCOPE)
group_id, report_id = parse_ids_from_url(REPORT_URL)
df = fetch_email_date_rows(view_name)
if df.empty:
print(f"[INFO] View {view_name} returned no rows.")
return
label = "GettingStarted" if is_getting_started else "PersonalWorkspace"
futures = []
with ThreadPoolExecutor(max_workers=max_workers) as ex:
for row in df.itertuples(index=False):
futures.append(
ex.submit(
process_one_entry,
row.user_to_email,
row.DateFilter,
group_id,
report_id,
token_mgr,
is_getting_started,
)
)
for fut in as_completed(futures):
try:
email_addr, date_str, fname = fut.result()
print(f"[OK][{label}] Emailed {email_addr} for {date_str} ({fname})")
except Exception as ex:
print(f"[WARN][{label}] Failed: {ex}")
def prime_report():
# --- Primer: one no-filter export to warm visuals, then wait 5 minutes ---
try:
primer_token_mgr = TokenManager(CLIENT_ID, CLIENT_SECRET, AUTHORITY, SCOPE)
primer_group_id, primer_report_id = parse_ids_from_url(REPORT_URL)
primer_out = OUTDIR / f"_primer-{datetime.now().strftime('%H%M%S')}.pdf"
export_report_pdf_with_filters(
token_mgr=primer_token_mgr,
group_id=primer_group_id,
report_id=primer_report_id,
export_filter=None, # <- NO FILTERS
outfile=primer_out,
include_hidden_pages=False,
poll_timeout_s=300
)
print(f"[INFO] Primer export completed: {primer_out.name}")
except Exception as ex:
print(f"[WARN] Primer export failed (continuing anyway): {ex}")
print("[INFO] Waiting 300 seconds (5 minutes) before main exports...")
time.sleep(300)
if __name__ == "__main__":
prime_report()
bulk_email(AUDIT_VIEW_NAME, is_getting_started=False, max_workers=8)
If you found this helpful, consider giving some Kudos. If I answered your question or solved your problem, mark this post as the solution.
Hi @venkatakrish_01 ,
Thanks for reaching out to the Microsoft fabric community forum.
Can you please confirm whether the issue is sorted or not.
@venkatakrish_01 My understanding is that you can set subscriptions to hourly, daily, weekly or monthly but you can't set specific, random dates, there has to be some kind of pattern like every Tuesday, etc. Power Automate could get around this issue but, apparently, you cannot use it.
The Power BI Data Visualization World Championships is back! Get ahead of the game and start preparing now!
| User | Count |
|---|---|
| 48 | |
| 46 | |
| 44 | |
| 16 | |
| 15 |