Earn a 50% discount on the DP-600 certification exam by completing the Fabric 30 Days to Learn It challenge.
Hello,
I need help to do upsert operation.
1) I am loading a Table SourceTbl
2) Select only these columns from the Source Table: 'DateId', 'FiscalYear', 'FiscalMonth', 'FiscalQuarter', 'ActualValue', 'SearchID'
3) Check if Target Table TargetTbl exists?
4) If Target Table does not exists then add all rows from Source to Target.
5) If Target Table exists then check for matching rows based on 'SearchId' column.
6) If there are matching rows on target then overwrite all those rows from Source for the Matched column
7) If there are no matching rows on target then just append the rows from Source to target.
99% of the time the Target table will be existing and the Column structure of SourceTbl and TargetTbl structure will be same.
Please Help.
Thanks,
Prabhat
Solved! Go to Solution.
Hi, I have created a similar function for this. Hope it can help you with some modifications.
Function Overview: The upsert_table function updates or inserts data into a target table based on the given DataFrame (df_new), load type and if table exists or not.
Full Load: If specified or if the table doesn't exist, it overwrites the existing table or creates a new one with the new data.
Incremental Load: If the table exists and a full reload is not specified, it merges the new data into the existing table. This involves:
Parameters:
df_new: your DF with new data that you want to upsert into target table, cleaned with same columns as target tables (the ones you specify).
load_type: A string indicating the type of load operation. It can either be "FULL" for a complete overwrite of the existing table or not specified for an incremental update where new rows are merged with existing data.
I have this as I do full loads periodically, so I control this parameter from pipelines. If you don't have such requirements you can modify the code to get rid of it.
target_table_path_loc: The file path location where the target table is stored or should be created. This path is used to construct the full path to the target table. For me its only Tables/ as its in my default lakehouse
target_table_name: The name of the target table. This name is standardized to lowercase in the function to ensure consistency in how table names are handled.
def upsert_table(df_new, load_type, target_table_path_loc, target_table_name):
target_table_name = target_table_name.lower()
target_table_path = target_table_path_loc + target_table_name
load_type = load_type.upper()
numOfNewRows = df_new.count() #I use this for some validation / logging etc.
try:
if not dt.DeltaTable.isDeltaTable(spark, target_table_path) or load_type == "FULL":
df_new.write.format("delta").mode("overwrite").saveAsTable(target_table_name)
else:
current_dt = DeltaTable.forPath(spark, target_table_path)
current_dt.alias("target").merge(
df_new.alias("source"),
"target.SearchId = source.SearchId")\
.whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()
except Exception as e:
....handle your exceptions....
Hi, I have created a similar function for this. Hope it can help you with some modifications.
Function Overview: The upsert_table function updates or inserts data into a target table based on the given DataFrame (df_new), load type and if table exists or not.
Full Load: If specified or if the table doesn't exist, it overwrites the existing table or creates a new one with the new data.
Incremental Load: If the table exists and a full reload is not specified, it merges the new data into the existing table. This involves:
Parameters:
df_new: your DF with new data that you want to upsert into target table, cleaned with same columns as target tables (the ones you specify).
load_type: A string indicating the type of load operation. It can either be "FULL" for a complete overwrite of the existing table or not specified for an incremental update where new rows are merged with existing data.
I have this as I do full loads periodically, so I control this parameter from pipelines. If you don't have such requirements you can modify the code to get rid of it.
target_table_path_loc: The file path location where the target table is stored or should be created. This path is used to construct the full path to the target table. For me its only Tables/ as its in my default lakehouse
target_table_name: The name of the target table. This name is standardized to lowercase in the function to ensure consistency in how table names are handled.
def upsert_table(df_new, load_type, target_table_path_loc, target_table_name):
target_table_name = target_table_name.lower()
target_table_path = target_table_path_loc + target_table_name
load_type = load_type.upper()
numOfNewRows = df_new.count() #I use this for some validation / logging etc.
try:
if not dt.DeltaTable.isDeltaTable(spark, target_table_path) or load_type == "FULL":
df_new.write.format("delta").mode("overwrite").saveAsTable(target_table_name)
else:
current_dt = DeltaTable.forPath(spark, target_table_path)
current_dt.alias("target").merge(
df_new.alias("source"),
"target.SearchId = source.SearchId")\
.whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()
except Exception as e:
....handle your exceptions....
Thanks @AlexanderPowBI
It is a good one to try. I appriciate your help.
Quick question: Can I know how many rows updated and how many inserted in case of upsert?
Also if I have UpdatedTime on the target df then the column should get updated with the SystemTime on Update only for the rows that are getting updtated. Please help adding this logic too.
Thanks,
Prabhat
Hi,
To get time into the updated rows, you can use ".whenMatchedUpdate" and "whenNotMatchedInsert". I haven't done this myself, but it allows you to specify what to do when a record is matched and when it is not, giving you the option to update the UpdateTime when matched and null it when not matched. Here is a link with examples:
https://docs.delta.io/latest/delta-update.html
For the count of inserted rows: I would suggest adding a count for the current_dt right before "upserting" it. After the upsert, create a count of the table and the difference of those counts will give you number of rows inserted. An alternative and maybe better approach would be to add a column called "insertedDate" that is only populated whenNotMatched with the date. Then you can use today's date to count the number of rows inserted. It's also good to have this column for other reasons.
For the number of updated records, count how many records have "today's" date using you UpdatedTime column.
Hope this helps you out.
//Alexander
Hi @prabhatnath,
Have you looked into this documentation for upsert by using merge:
https://learn.microsoft.com/en-us/azure/databricks/delta/merge
Maybe this exercise will be helpful also:
https://microsoftlearning.github.io/mslearn-fabric/Instructions/Labs/03b-medallion-lakehouse.html
Ask questions in Data Engineering, Data Science, Data Warehouse and General Discussion.
Check out the April 2024 Fabric update to learn about new features.
User | Count |
---|---|
5 | |
3 | |
2 | |
2 | |
1 |
User | Count |
---|---|
9 | |
8 | |
8 | |
4 | |
3 |