Don't miss your chance to take the Fabric Data Engineer (DP-700) exam on us!
Learn moreWe've captured the moments from FabCon & SQLCon that everyone is talking about, and we are bringing them to the community, live and on-demand. Starts on April 14th. Register now
Whats the best practice when I want to both update AND insert a record into a target table in one operation ensureing atomisity?
newRecord
ID, Product, Color
123, A, Blue
myTable
ID, Product, Color, is_current
123, A, Yellow, TRUE
I basically want to do an atomic operation that gives the following result in myTable
ID, Product, Color, is_current
123, A, Yellow, FALSE
123, A, Blue, TRUE
I basically want to do a .whenMatchedUpdate AND a .whenMatchedInsert in a MERGE statement, but I know thats not supported.
Solved! Go to Solution.
Hello @AndersASorensen
give it a try
from pyspark.sql.functions import *
from delta.tables import *
# Assuming 'target_table' is your existing Delta table
target_table = DeltaTable.forName(spark, "target_table")
# 'source_df' is your new data
merge_condition = "target.natural_key = source.natural_key"
(target_table.alias("target")
.merge(source_df.alias("source"), merge_condition)
.whenMatchedUpdate(set={
"attribute1": "source.attribute1",
"attribute2": "source.attribute2",
"is_current": lit(True),
"update_date": current_date()
})
.whenNotMatchedInsert(values={
"natural_key": "source.natural_key",
"attribute1": "source.attribute1",
"attribute2": "source.attribute2",
"is_current": lit(True),
"update_date": current_date()
})
.execute())
Hope this helps.
Please accept the answer if this works.
Thanks
Hello @AndersASorensen
The `MERGE` statement is not natively supported in Microsoft Fabric’s Data Warehouse as of now. Users need to rely on PySpark or SQL workarounds for similar functionality
from delta.tables import DeltaTable
# Define source and target
target_table = DeltaTable.forName(spark, "Lakehouse.TargetTable")
source_df = spark.read.table("Lakehouse.SourceTable")
# Perform the merge
target_table.alias("target").merge(
source=source_df.alias("source"),
condition="target.key = source.key"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
Currently, the `MERGE` statement is not supported in Fabric’s Data Warehouses. As a workaround:
• Use a combination of `ROW_NUMBER()` for deduplication and `INSERT`/`UPDATE` statements to achieve similar functionality:
WITH DeduplicatedSource AS (
SELECT *, ROW_NUMBER() OVER (PARTITION BY ID ORDER BY LOAD_DATE DESC) AS row_num
FROM SourceTable
WHERE row_num = 1
)
-- Merge logic
MERGE INTO TargetTable AS target
USING DeduplicatedSource AS source
ON target.ID = source.ID
WHEN MATCHED THEN UPDATE SET target.Column = source.Column
WHEN NOT MATCHED THEN INSERT (Column) VALUES (source.Column);
Hope this helps
Thanks
I am happy to use pyspark, and I want to do it on a lakehouse, not a DW.
However, your .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute() suggestion will not work as this will only update the matched row.
Yes, I would like it to update a is_current column when matched, but it will not insert the record as a new row too.
Hello @AndersASorensen
give it a try
from pyspark.sql.functions import *
from delta.tables import *
# Assuming 'target_table' is your existing Delta table
target_table = DeltaTable.forName(spark, "target_table")
# 'source_df' is your new data
merge_condition = "target.natural_key = source.natural_key"
(target_table.alias("target")
.merge(source_df.alias("source"), merge_condition)
.whenMatchedUpdate(set={
"attribute1": "source.attribute1",
"attribute2": "source.attribute2",
"is_current": lit(True),
"update_date": current_date()
})
.whenNotMatchedInsert(values={
"natural_key": "source.natural_key",
"attribute1": "source.attribute1",
"attribute2": "source.attribute2",
"is_current": lit(True),
"update_date": current_date()
})
.execute())
Hope this helps.
Please accept the answer if this works.
Thanks
Hi again 🙂
The code provided won't work, but the article you linked to had a working solution 🙂
Thanks!
Experience the highlights from FabCon & SQLCon, available live and on-demand starting April 14th.
If you have recently started exploring Fabric, we'd love to hear how it's going. Your feedback can help with product improvements.
Share feedback directly with Fabric product managers, participate in targeted research studies and influence the Fabric roadmap.