Don't miss your chance to take the Fabric Data Engineer (DP-700) exam on us!
Learn moreNext up in the FabCon + SQLCon recap series: The roadmap for Microsoft SQL and Maximizing Developer experiences in Fabric. All sessions are available on-demand after the live show. Register now
There is customer data in Azure SQL DB. Each customer stored here has a Guid value. I want to incrementally load this customer data into a Lakehouse.
If the customer data is modified, I want to keep the Guid value and only update the changed information. If new customer data is created, I want to simply insert it.
I want to configure this using all the resources of Fabric without using Query statements.
Is there a method that can be used universally in various projects?
Solved! Go to Solution.
If you are building a managed delta table in a lakehouse to bring the data from Sql, you can use Delta Table API in notebook to perform a fully qualified UPSERT using merge between source and target delta table.
from delta.tables import *
deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
If you are building a managed delta table in a lakehouse to bring the data from Sql, you can use Delta Table API in notebook to perform a fully qualified UPSERT using merge between source and target delta table.
from delta.tables import *
deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
unfortunately UPSERT option is not available in Copy / Dataflow gen2 with lakehouse as a sink.
you would have write some custom logic to handle the same , Below blog provides some details :
Experience the highlights from FabCon & SQLCon, available live and on-demand starting April 14th.
If you have recently started exploring Fabric, we'd love to hear how it's going. Your feedback can help with product improvements.
Share feedback directly with Fabric product managers, participate in targeted research studies and influence the Fabric roadmap.
| User | Count |
|---|---|
| 2 | |
| 1 | |
| 1 | |
| 1 | |
| 1 |