Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Enhance your career with this limited time 50% discount on Fabric and Power BI exams. Ends August 31st. Request your voucher.

Reply
theredsilva
New Member

How to configure incremental load from Business Central and Dataverse to a Lakehouse in Fabric

Hi everyone!

I'm trying to implement an incremental load in a Dataflow Gen2 in Microsoft Fabric, using Business Central and Dataverse as sources, with the final destination being a Lakehouse.

I’ve explored several options but ran into limitations:

  • The classic incremental refresh in Power BI is not supported when the destination is a Lakehouse.

  • Fabric pipelines and Data Factory pipelines are not usable, as they don't support Business Central as a data source.

  • Methods based on manual parameters or filters in Power Query are hard to scale and maintain across multiple tables with different structures.

  • Using a PySpark notebook to perform a merge/upsert-style incremental load might be a viable solution, but:

    • I don’t have a common unique key across tables (each table has different key structures).

    • I can rely on a timestamp field (like systemModifiedAt, lastModifiedDate, etc.) to simulate an append-only logic, but I don’t know how to handle updates (upsert) in this context.

Has anyone dealt with a similar scenario?
Thank you

2 ACCEPTED SOLUTIONS
jaineshp
Memorable Member
Memorable Member

Hey @theredsilva ,

For the incremental load without common keys:

  1. Use watermark tables - Create a simple control table in your Lakehouse to track the last successful load timestamp for each source table. This becomes your checkpoint system.
  2. Leverage systemModifiedAt fields - Since both Business Central and Dataverse have these timestamp fields, use them as your incremental filter. Query only records where systemModifiedAt > last_watermark_timestamp.
  3. Implement a hybrid approach in Dataflow Gen2:
    • Use Power Query to filter source data based on watermark timestamps
    • Write filtered data to staging tables in Lakehouse
    • Use a simple PySpark notebook for the merge logic (even without common keys, you can use table-specific primary keys)
  4. Handle the upsert challenge:
    • For each table, identify its natural primary key(s) - Business Central tables usually have consistent key patterns
    • In your PySpark notebook, perform a left anti join to get new records, then union with existing data after removing old versions of updated records
    • Update your watermark table after successful processing
  5. Scaling across multiple tables:
    • Create a metadata-driven approach where you maintain a config table with source table names, key columns, and timestamp fields
    • Use this to dynamically generate your dataflows and notebook logic

Quick win: Start with one or two tables to prove the concept, then expand the metadata-driven approach.

The key is treating it as an append-then-dedupe process rather than a true upsert, which sidesteps the common key limitation while still achieving incremental loading.

Best Regards,
Jainesh Poojara | Power BI Developer

View solution in original post

rohit1991
Super User
Super User

Hi @theredsilva ,

Incremental loading from Business Central/Dataverse to a Fabric Lakehouse without a common key is a real-world challenge. Here’s a pattern that works well for messy tables:

 

1. Watermark Table: Keep a simple control table in your lakehouse to store the last loaded timestamp for each source table. Every run, just grab records where systemModifiedAt (or similar) is newer.

 

2. Staging & Upsert: Filter incoming data by that timestamp and land it in a staging area. In PySpark, do a left anti-join (using whatever keys you have) to find true new/updated rows, then union them into your main table.

 

3. Metadata-Driven Logic: If you have lots of tables, store table/key/timestamp info in a metadata/config table and let your pipeline/notebook read from it makes scaling and maintenance way easier.

 


Did it work? ✔ Give a Kudo • Mark as Solution – help others too!

View solution in original post

4 REPLIES 4
theredsilva
New Member

@rohit1991 @jaineshp 
Thank you so much! It works ( using the "modifiedon" column and the primary key for each table ) 
Just so you know, and in case someone need help, i did use this prompt following your suggestions:

from delta.tables import DeltaTable

from pyspark.sql.functions import col, max as spark_max, row_number

from pyspark.sql.window import Window

import datetime as dt

Config legacy per datetime parquet

spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY")

spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY")

Elenco tabelle da processare

tables = [

"account",

"campaign",

"contact",

"lead",

"list",

"opportunity",

"po_mailchimpactivity",

"po_mailchimipcampaign",

"incident",

"systemuser"

]

Dizionario chiavi primarie

primary_keys = {

"account": "accountid",

"campaign": "campaignid",

"contact": "contactid",

"lead": "leadid",

"list": "listid",

"opportunity": "opportunityid",

"po_mailchimpactivity": "activityid",

"po_mailchimipcampaign": "po_mailchimpcampaignid",

"incident": "incidentid",

"systemuser": "systemuserid"

}

Nome tabella watermark

watermark_table_name = "watermark_table"

Inizializza tabella watermark se non esiste

if watermark_table_name in [t.name for t in spark.catalog.listTables()]:

print(" Tabella watermark già presente.")

watermark_df = spark.read.table(watermark_table_name)

else:

print("🆕 Creo la tabella watermark.")

watermark_df = spark.createDataFrame([], "table_name STRING, last_load TIMESTAMP")

watermark_df.write.format("delta").mode("overwrite").saveAsTable(watermark_table_name)

Liste di esito

success_tables = []

skipped_tables = []

error_tables = []

Colonna obbligatoria

required_column = "modifiedon"

for table_source in tables:

try:

print(f"\n--- Elaborazione tabella {table_source} ---")

table_target = table_source + "_incrementale"

if table_source not in primary_keys:

print(f" Chiave primaria non definita per {table_source}, salto tabella.")

skipped_tables.append((table_source, "Chiave primaria mancante"))

continue

pk = primary_keys[table_source]

last_load_row = watermark_df.filter(col("table_name") == table_source).select("last_load").head()

last_load = last_load_row["last_load"] if last_load_row else dt.datetime(2000, 1, 1)

print(f"Ultimo watermark modifiedon: {last_load}")

df_source = spark.read.table(table_source)

if required_column not in df_source.columns:

print(f"️  Colonna '{required_column}' mancante.")

skipped_tables.append((table_source, f"Colonna '{required_column}' assente"))

continue

if pk not in df_source.columns:

print(f"️  Colonna chiave primaria '{pk}' mancante.")

skipped_tables.append((table_source, f"Chiave primaria '{pk}' assente"))

continue

df_incremental = df_source.filter(col("modifiedon") > last_load)

count_incremental = df_incremental.count()

if count_incremental == 0:

print("️  Nessun nuovo record.")

skipped_tables.append((table_source, "Nessun dato nuovo"))

continue

print(f"Trovati {count_incremental} record nuovi.")

CDC: deduplicazione per list solo con ROW_NUMBER

if table_source == "list":

window_spec = Window.partitionBy(pk).orderBy(col("modifiedon").desc())

df_incremental = df_incremental.withColumn("row_num", row_number().over(window_spec)).filter(col("row_num") == 1).drop("row_num")

if table_target not in [t.name for t in spark.catalog.listTables()]:

print(f"🆕 Creo tabella target {table_target} con overwrite.")

df_incremental.write.format("delta").mode("overwrite").saveAsTable(table_target)

else:

delta_table = DeltaTable.forName(spark, table_target)

(delta_table.alias("target")

.merge(

df_incremental.alias("source"),

f"target.{pk} = source.{pk}"

)

.whenMatchedUpdateAll()

.whenNotMatchedInsertAll()

.execute()

)

print(f" Tabella {table_target} aggiornata con merge upsert.")

max_modified = df_incremental.agg(spark_max("modifiedon")).collect()[0][0]

watermark_df = watermark_df.filter(col("table_name") != table_source).union(

spark.createDataFrame([(table_source, max_modified)], schema=watermark_df.schema)

)

watermark_df.write.format("delta").mode("overwrite").saveAsTable(watermark_table_name)

print(f"📌 Watermark aggiornato: {max_modified}")

success_tables.append(table_source)

except Exception as e:

print(f" Errore nella tabella {table_source}: {e}")

error_tables.append((table_source, str(e)))

--- Report finale ---

print("\n📋 REPORT FINALE 📋")

print("\n Tabelle OK:")

for t in success_tables:

print(f"  - {t}")

print("\n️ Tabelle saltate:")

for t, reason in skipped_tables:

print(f"  - {t} ➤ {reason}")

print("\n Tabelle con errore:")

for t, err in error_tables:

print(f"  - {t} ➤ {err.splitlines()[0][:100]}")"

rohit1991
Super User
Super User

Hi @theredsilva ,

Incremental loading from Business Central/Dataverse to a Fabric Lakehouse without a common key is a real-world challenge. Here’s a pattern that works well for messy tables:

 

1. Watermark Table: Keep a simple control table in your lakehouse to store the last loaded timestamp for each source table. Every run, just grab records where systemModifiedAt (or similar) is newer.

 

2. Staging & Upsert: Filter incoming data by that timestamp and land it in a staging area. In PySpark, do a left anti-join (using whatever keys you have) to find true new/updated rows, then union them into your main table.

 

3. Metadata-Driven Logic: If you have lots of tables, store table/key/timestamp info in a metadata/config table and let your pipeline/notebook read from it makes scaling and maintenance way easier.

 


Did it work? ✔ Give a Kudo • Mark as Solution – help others too!
v-tejrama
Community Support
Community Support

Hi @theredsilva ,

 

Thank you @jaineshp  for the response provided!
Has your issue been resolved? If the response provided by the community member addressed your query, could you please confirm? It helps us ensure that the solutions provided are effective and beneficial for everyone.

Thank you for your understanding!

jaineshp
Memorable Member
Memorable Member

Hey @theredsilva ,

For the incremental load without common keys:

  1. Use watermark tables - Create a simple control table in your Lakehouse to track the last successful load timestamp for each source table. This becomes your checkpoint system.
  2. Leverage systemModifiedAt fields - Since both Business Central and Dataverse have these timestamp fields, use them as your incremental filter. Query only records where systemModifiedAt > last_watermark_timestamp.
  3. Implement a hybrid approach in Dataflow Gen2:
    • Use Power Query to filter source data based on watermark timestamps
    • Write filtered data to staging tables in Lakehouse
    • Use a simple PySpark notebook for the merge logic (even without common keys, you can use table-specific primary keys)
  4. Handle the upsert challenge:
    • For each table, identify its natural primary key(s) - Business Central tables usually have consistent key patterns
    • In your PySpark notebook, perform a left anti join to get new records, then union with existing data after removing old versions of updated records
    • Update your watermark table after successful processing
  5. Scaling across multiple tables:
    • Create a metadata-driven approach where you maintain a config table with source table names, key columns, and timestamp fields
    • Use this to dynamically generate your dataflows and notebook logic

Quick win: Start with one or two tables to prove the concept, then expand the metadata-driven approach.

The key is treating it as an append-then-dedupe process rather than a true upsert, which sidesteps the common key limitation while still achieving incremental loading.

Best Regards,
Jainesh Poojara | Power BI Developer

Helpful resources

Announcements
August Power BI Update Carousel

Power BI Monthly Update - August 2025

Check out the August 2025 Power BI update to learn about new features.

August 2025 community update carousel

Fabric Community Update - August 2025

Find out what's new and trending in the Fabric community.