The ultimate Fabric, Power BI, SQL, and AI community-led learning event. Save €200 with code FABCOMM.
Get registeredEnhance your career with this limited time 50% discount on Fabric and Power BI exams. Ends August 31st. Request your voucher.
Hi everyone!
I'm trying to implement an incremental load in a Dataflow Gen2 in Microsoft Fabric, using Business Central and Dataverse as sources, with the final destination being a Lakehouse.
I’ve explored several options but ran into limitations:
The classic incremental refresh in Power BI is not supported when the destination is a Lakehouse.
Fabric pipelines and Data Factory pipelines are not usable, as they don't support Business Central as a data source.
Methods based on manual parameters or filters in Power Query are hard to scale and maintain across multiple tables with different structures.
Using a PySpark notebook to perform a merge/upsert-style incremental load might be a viable solution, but:
I don’t have a common unique key across tables (each table has different key structures).
I can rely on a timestamp field (like systemModifiedAt, lastModifiedDate, etc.) to simulate an append-only logic, but I don’t know how to handle updates (upsert) in this context.
Has anyone dealt with a similar scenario?
Thank you
Solved! Go to Solution.
Hey @theredsilva ,
For the incremental load without common keys:
Quick win: Start with one or two tables to prove the concept, then expand the metadata-driven approach.
The key is treating it as an append-then-dedupe process rather than a true upsert, which sidesteps the common key limitation while still achieving incremental loading.
Best Regards,
Jainesh Poojara | Power BI Developer
Hi @theredsilva ,
Incremental loading from Business Central/Dataverse to a Fabric Lakehouse without a common key is a real-world challenge. Here’s a pattern that works well for messy tables:
1. Watermark Table: Keep a simple control table in your lakehouse to store the last loaded timestamp for each source table. Every run, just grab records where systemModifiedAt (or similar) is newer.
2. Staging & Upsert: Filter incoming data by that timestamp and land it in a staging area. In PySpark, do a left anti-join (using whatever keys you have) to find true new/updated rows, then union them into your main table.
3. Metadata-Driven Logic: If you have lots of tables, store table/key/timestamp info in a metadata/config table and let your pipeline/notebook read from it makes scaling and maintenance way easier.
@rohit1991 @jaineshp
Thank you so much! It works ( using the "modifiedon" column and the primary key for each table )
Just so you know, and in case someone need help, i did use this prompt following your suggestions:
from delta.tables import DeltaTable
from pyspark.sql.functions import col, max as spark_max, row_number
from pyspark.sql.window import Window
import datetime as dt
spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY")
spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY")
tables = [
"account",
"campaign",
"contact",
"lead",
"list",
"opportunity",
"po_mailchimpactivity",
"po_mailchimipcampaign",
"incident",
"systemuser"
]
primary_keys = {
"account": "accountid",
"campaign": "campaignid",
"contact": "contactid",
"lead": "leadid",
"list": "listid",
"opportunity": "opportunityid",
"po_mailchimpactivity": "activityid",
"po_mailchimipcampaign": "po_mailchimpcampaignid",
"incident": "incidentid",
"systemuser": "systemuserid"
}
watermark_table_name = "watermark_table"
if watermark_table_name in [t.name for t in spark.catalog.listTables()]:
print("✅ Tabella watermark già presente.")
watermark_df = spark.read.table(watermark_table_name)
else:
print("🆕 Creo la tabella watermark.")
watermark_df = spark.createDataFrame([], "table_name STRING, last_load TIMESTAMP")
watermark_df.write.format("delta").mode("overwrite").saveAsTable(watermark_table_name)
success_tables = []
skipped_tables = []
error_tables = []
required_column = "modifiedon"
for table_source in tables:
try:
print(f"\n--- Elaborazione tabella {table_source} ---")
table_target = table_source + "_incrementale"
if table_source not in primary_keys:
print(f"❌ Chiave primaria non definita per {table_source}, salto tabella.")
skipped_tables.append((table_source, "Chiave primaria mancante"))
continue
pk = primary_keys[table_source]
last_load_row = watermark_df.filter(col("table_name") == table_source).select("last_load").head()
last_load = last_load_row["last_load"] if last_load_row else dt.datetime(2000, 1, 1)
print(f"Ultimo watermark modifiedon: {last_load}")
df_source = spark.read.table(table_source)
if required_column not in df_source.columns:
print(f"⏭️ Colonna '{required_column}' mancante.")
skipped_tables.append((table_source, f"Colonna '{required_column}' assente"))
continue
if pk not in df_source.columns:
print(f"⏭️ Colonna chiave primaria '{pk}' mancante.")
skipped_tables.append((table_source, f"Chiave primaria '{pk}' assente"))
continue
df_incremental = df_source.filter(col("modifiedon") > last_load)
count_incremental = df_incremental.count()
if count_incremental == 0:
print("⏭️ Nessun nuovo record.")
skipped_tables.append((table_source, "Nessun dato nuovo"))
continue
print(f"Trovati {count_incremental} record nuovi.")
if table_source == "list":
window_spec = Window.partitionBy(pk).orderBy(col("modifiedon").desc())
df_incremental = df_incremental.withColumn("row_num", row_number().over(window_spec)).filter(col("row_num") == 1).drop("row_num")
if table_target not in [t.name for t in spark.catalog.listTables()]:
print(f"🆕 Creo tabella target {table_target} con overwrite.")
df_incremental.write.format("delta").mode("overwrite").saveAsTable(table_target)
else:
delta_table = DeltaTable.forName(spark, table_target)
(delta_table.alias("target")
.merge(
df_incremental.alias("source"),
f"target.{pk} = source.{pk}"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
print(f"✅ Tabella {table_target} aggiornata con merge upsert.")
max_modified = df_incremental.agg(spark_max("modifiedon")).collect()[0][0]
watermark_df = watermark_df.filter(col("table_name") != table_source).union(
spark.createDataFrame([(table_source, max_modified)], schema=watermark_df.schema)
)
watermark_df.write.format("delta").mode("overwrite").saveAsTable(watermark_table_name)
print(f"📌 Watermark aggiornato: {max_modified}")
success_tables.append(table_source)
except Exception as e:
print(f"❌ Errore nella tabella {table_source}: {e}")
error_tables.append((table_source, str(e)))
print("\n📋 REPORT FINALE 📋")
print("\n✅ Tabelle OK:")
for t in success_tables:
print(f" - {t}")
print("\n⏭️ Tabelle saltate:")
for t, reason in skipped_tables:
print(f" - {t} ➤ {reason}")
print("\n❌ Tabelle con errore:")
for t, err in error_tables:
print(f" - {t} ➤ {err.splitlines()[0][:100]}")"
Hi @theredsilva ,
Incremental loading from Business Central/Dataverse to a Fabric Lakehouse without a common key is a real-world challenge. Here’s a pattern that works well for messy tables:
1. Watermark Table: Keep a simple control table in your lakehouse to store the last loaded timestamp for each source table. Every run, just grab records where systemModifiedAt (or similar) is newer.
2. Staging & Upsert: Filter incoming data by that timestamp and land it in a staging area. In PySpark, do a left anti-join (using whatever keys you have) to find true new/updated rows, then union them into your main table.
3. Metadata-Driven Logic: If you have lots of tables, store table/key/timestamp info in a metadata/config table and let your pipeline/notebook read from it makes scaling and maintenance way easier.
Hi @theredsilva ,
Thank you @jaineshp for the response provided!
Has your issue been resolved? If the response provided by the community member addressed your query, could you please confirm? It helps us ensure that the solutions provided are effective and beneficial for everyone.
Thank you for your understanding!
Hey @theredsilva ,
For the incremental load without common keys:
Quick win: Start with one or two tables to prove the concept, then expand the metadata-driven approach.
The key is treating it as an append-then-dedupe process rather than a true upsert, which sidesteps the common key limitation while still achieving incremental loading.
Best Regards,
Jainesh Poojara | Power BI Developer
User | Count |
---|---|
43 | |
15 | |
12 | |
11 | |
8 |
User | Count |
---|---|
51 | |
31 | |
20 | |
18 | |
15 |