Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Earn a 50% discount on the DP-600 certification exam by completing the Fabric 30 Days to Learn It challenge.

Reply
prabhatnath
Advocate III
Advocate III

Performing upsert in notebook PySpark (python)

Hello,

 

I need help to do upsert operation.

 

1) I am loading a Table SourceTbl

2) Select only these columns from the Source Table: 'DateId', 'FiscalYear', 'FiscalMonth', 'FiscalQuarter', 'ActualValue', 'SearchID'

3) Check if Target Table TargetTbl exists?

4) If Target Table does not exists then add all rows from Source to Target.

5) If Target Table exists then check for matching rows based on 'SearchId' column.

6) If there are matching rows on target then overwrite all those rows from Source for the Matched column

7) If there are no matching rows on target then just append the rows from Source to target.

 

99% of the time the Target table will be existing and the Column structure of SourceTbl and TargetTbl structure will be same. 

 

Please Help.

Thanks,

Prabhat

1 ACCEPTED SOLUTION
AlexanderPowBI
Advocate I
Advocate I

Hi, I have created a similar function for this. Hope it can help you with some modifications. 

 

Function Overview: The upsert_table function updates or inserts data into a target table based on the given DataFrame (df_new), load type and if table exists or not.
Full Load: If specified or if the table doesn't exist, it overwrites the existing table or creates a new one with the new data.
Incremental Load: If the table exists and a full reload is not specified, it merges the new data into the existing table. This involves:

  • Updating existing records that match between the new data and the existing table.
  • Inserting new records that don't find a match in the existing table.


Parameters:
df_new: your DF with new data that you want to upsert into target table, cleaned with same columns as target tables (the ones you specify).
load_type: A string indicating the type of load operation. It can either be "FULL" for a complete overwrite of the existing table or not specified for an incremental update where new rows are merged with existing data.
I have this as I do full loads periodically, so I control this parameter from pipelines. If you don't have such requirements you can modify the code to get rid of it. 
target_table_path_loc: The file path location where the target table is stored or should be created. This path is used to construct the full path to the target table. For me its only Tables/ as its in my default lakehouse 
target_table_name: The name of the target table. This name is standardized to lowercase in the function to ensure consistency in how table names are handled.

 

def upsert_table(df_new, load_type, target_table_path_loc, target_table_name):
    target_table_name = target_table_name.lower()
    target_table_path = target_table_path_loc + target_table_name
    load_type = load_type.upper()
    numOfNewRows = df_new.count() #I use this for some validation / logging etc.
    try:
        if not dt.DeltaTable.isDeltaTable(spark, target_table_path) or load_type == "FULL": 
            df_new.write.format("delta").mode("overwrite").saveAsTable(target_table_name)
        else:
            current_dt = DeltaTable.forPath(spark, target_table_path)
            current_dt.alias("target").merge(
            df_new.alias("source"),
            "target.SearchId = source.SearchId")\
            .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
            .execute()
    except Exception as e: 
			....handle your exceptions....

View solution in original post

4 REPLIES 4
AlexanderPowBI
Advocate I
Advocate I

Hi, I have created a similar function for this. Hope it can help you with some modifications. 

 

Function Overview: The upsert_table function updates or inserts data into a target table based on the given DataFrame (df_new), load type and if table exists or not.
Full Load: If specified or if the table doesn't exist, it overwrites the existing table or creates a new one with the new data.
Incremental Load: If the table exists and a full reload is not specified, it merges the new data into the existing table. This involves:

  • Updating existing records that match between the new data and the existing table.
  • Inserting new records that don't find a match in the existing table.


Parameters:
df_new: your DF with new data that you want to upsert into target table, cleaned with same columns as target tables (the ones you specify).
load_type: A string indicating the type of load operation. It can either be "FULL" for a complete overwrite of the existing table or not specified for an incremental update where new rows are merged with existing data.
I have this as I do full loads periodically, so I control this parameter from pipelines. If you don't have such requirements you can modify the code to get rid of it. 
target_table_path_loc: The file path location where the target table is stored or should be created. This path is used to construct the full path to the target table. For me its only Tables/ as its in my default lakehouse 
target_table_name: The name of the target table. This name is standardized to lowercase in the function to ensure consistency in how table names are handled.

 

def upsert_table(df_new, load_type, target_table_path_loc, target_table_name):
    target_table_name = target_table_name.lower()
    target_table_path = target_table_path_loc + target_table_name
    load_type = load_type.upper()
    numOfNewRows = df_new.count() #I use this for some validation / logging etc.
    try:
        if not dt.DeltaTable.isDeltaTable(spark, target_table_path) or load_type == "FULL": 
            df_new.write.format("delta").mode("overwrite").saveAsTable(target_table_name)
        else:
            current_dt = DeltaTable.forPath(spark, target_table_path)
            current_dt.alias("target").merge(
            df_new.alias("source"),
            "target.SearchId = source.SearchId")\
            .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
            .execute()
    except Exception as e: 
			....handle your exceptions....

Thanks @AlexanderPowBI 
It is a good one to try. I appriciate your help. 
Quick question: Can I know how many rows updated and how many inserted in case of upsert?

Also if I have UpdatedTime on the target df then the column should get updated with the SystemTime on Update only for the rows that are getting updtated. Please help adding this logic too.

 

Thanks,

Prabhat

Hi,

To get time into the updated rows, you can use ".whenMatchedUpdate" and "whenNotMatchedInsert". I haven't done this myself, but it allows you to specify what to do when a record is matched and when it is not, giving you the option to update the UpdateTime when matched and null it when not matched. Here is a link with examples:

https://docs.delta.io/latest/delta-update.html

 

For the count of inserted rows: I would suggest adding a count for the current_dt right before "upserting" it. After the upsert, create a count of the table and the difference of those counts will give you number of rows inserted. An alternative and maybe better approach would be to add a column called "insertedDate" that is only populated whenNotMatched with the date. Then you can use today's date to count the number of rows inserted. It's also good to have this column for other reasons. 

 

For the number of updated records, count how many records have "today's" date using you UpdatedTime column. 

 

Hope this helps you out.

//Alexander

frithjof_v
Continued Contributor
Continued Contributor

Hi @prabhatnath,

 

Have you looked into this documentation for upsert by using merge:
https://learn.microsoft.com/en-us/azure/databricks/delta/merge

Maybe this exercise will be helpful also:

https://microsoftlearning.github.io/mslearn-fabric/Instructions/Labs/03b-medallion-lakehouse.html

 

Helpful resources

Announcements
Expanding the Synapse Forums

New forum boards available in Synapse

Ask questions in Data Engineering, Data Science, Data Warehouse and General Discussion.

LearnSurvey

Fabric certifications survey

Certification feedback opportunity for the community.

April Fabric Update Carousel

Fabric Monthly Update - April 2024

Check out the April 2024 Fabric update to learn about new features.

April Fabric Community Update

Fabric Community Update - April 2024

Find out what's new and trending in the Fabric Community.