Starting December 3, join live sessions with database experts and the Microsoft product team to learn just how easy it is to get started
Learn moreGet certified in Microsoft Fabric—for free! For a limited time, get a free DP-600 exam voucher to use by the end of 2024. Register now
I have created a notebook that loads data from one lakehouse to another. if tables do not exist in the destination it creates a new table. if Exists it performs the merge operation. to do that I am doing the below major steps
Step 1: Recieve JSON formatted parameter values from the data pipeline. (watermark_id, source_table_name, destination_table_name, and key_columns) see the parameter sample below
watermark_array = """ [
{
"watermark_id": 7,
"source_database_name": "lh_bronze",
"source_table_name": "table_1",
"source_key_column_name": "KeyColumn",
"destination_database_name": "lh_silver",
"destination_table_name": "table_1",
"destination_key_column_name": "KeyColumn"
},
{
"watermark_id": 8,
"source_database_name": "lh_bronze",
"source_table_name": "table_2",
"source_key_column_name": "KeyColumn",
"destination_database_name": "lh_silver",
"destination_table_name": "table_2",
"destination_key_column_name": "KeyColumn"
}]
Step 2: convert the parameter in datagrams and loop through each table to perform the merge.
Step 3: executes a function that calls another function that checks the existence of the table in the source and destination
for row in df.collect():
watermark_id = [row][0]['watermark_id']
source_database_name = [row][0]['source_database_name']
source_table_name = [row][0]['source_table_name']
source_key_column_name = [row][0]['source_key_column_name']
destination_database_name = [row][0]['destination_database_name']
destination_table_name = [row][0]['destination_table_name']
destination_key_column_name = [row][0]['destination_key_column_name']
print(watermark_id,
source_database_name,
source_table_name,
source_key_column_name,
destination_database_name ,
destination_table_name,
destination_key_column_name )
run_data_load(source_lakehouse = source_database_name,
source_tablename = source_table_name,
source_table_key_columns = source_key_column_name,
destination_lakehouse = destination_database_name,
destination_tablename = destination_table_name,
destination_key_columns = destination_key_column_name)
Step 4: in run_data_load we call the below function
def table_exists(lakehouse_name, table_name):
tables = spark.catalog.listTables(lakehouse_name)
table_exists = any(table.name == table_name for table in tables)
return table_exists
Step 5: when it hits the line spark.catalog.listTable(lakehouse_name)
it fails with the below error in the second table processing. However, if I try to re-run it fails in the first table itself, If I restart the session it runs for the first table and fails for the next one.
[PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 0)
== SQL ==
^^^
---------------------------------------------------------------------------
ParseException Traceback (most recent call last)
Cell In[53], line 18
8 destination_key_column_name = [row][0]['destination_key_column_name']
10 print(watermark_id,
11 source_database_name,
12 source_table_name,
(...)
15 destination_table_name,
16 destination_key_column_name )
---> 18 run_data_load(source_lakehouse = source_database_name,
19 source_tablename = source_table_name,
20 source_table_key_columns = source_key_column_name,
21 destination_lakehouse = destination_database_name,
22 destination_tablename = destination_table_name,
23 destination_key_columns = destination_key_column_name)
26 # print(watermark_id,source_database_name,source_table_name, source_key_column_name, destination_database_name , destination_table_name, destination_key_column_name )
Cell In[47], line 4, in run_data_load(source_lakehouse, source_tablename, source_table_key_columns, destination_lakehouse, destination_tablename, destination_key_columns)
2 print(1)
3 source_normalized_data = normaloize_json_master(source_lakehouse, source_tablename)
----> 4 destination_column_list = get_column_list(destination_lakehouse,destination_tablename)
5 try:
6 if source_normalized_data != None:
Cell In[44], line 3, in get_column_list(lakehouse_name, table_name)
1 def get_column_list(lakehouse_name, table_name):
----> 3 if table_exists(lakehouse_name, table_name):
4 print("dd", lakehouse_name, table_name)
5 spark.catalog.setCurrentDatabase(lakehouse_name)
Cell In[38], line 3, in table_exists(lakehouse_name, table_name)
1 def table_exists(lakehouse_name, table_name):
2 print("te1")
----> 3 tables = spark.catalog.listTables(lakehouse_name)
4 print("te2")
5 table_exists = any(table.name == table_name for table in tables)
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/catalog.py:309, in Catalog.listTables(self, dbName)
307 if dbName is None:
308 dbName = self.currentDatabase()
--> 309 iter = self._jcatalog.listTables(dbName).toLocalIterator()
310 tables = []
311 while iter.hasNext():
File ~/cluster-env/trident_env/lib/python3.10/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File /opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py:175, in capture_sql_exception.<locals>.deco(*a, **kw)
171 converted = convert_exception(e.java_exception)
172 if not isinstance(converted, UnknownException):
173 # Hide where the exception came from that shows a non-Pythonic
174 # JVM exception message.
--> 175 raise converted from None
176 else:
177 raise
ParseException:
[PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 0)
== SQL ==
^^^
Not sure what can be the issue with it. Is there any possible best practice that I can use to avoid this issue?
Solved! Go to Solution.
Hello @Anonymous Thanks for your continuous follow-up. Indeed I have a solution, As you know the default environment for Fabric has spark 3.4 and delta 2.4 which is unstable at the moment. Therefore, we created a new environment with spark 3.3 and delta 2.2 and it works like a charm.
Hello @Anonymous Thanks for your continuous follow-up. Indeed I have a solution, As you know the default environment for Fabric has spark 3.4 and delta 2.4 which is unstable at the moment. Therefore, we created a new environment with spark 3.3 and delta 2.2 and it works like a charm.
Hi @mkumar_upstech ,
Thanks for using Fabric Community.
As a part of debugging, can you please try below code -
Present Code:
def get_column_list(lakehouse_name, table_name):
if table_exists(lakehouse_name, table_name):
Please update to this and have a try:
def get_column_list(lakehouse_name, table_name):
if any(table.name == table_name for table in spark.catalog.listTables(lakehouse_name)):
Let me know if you have further queries.
Hi @mkumar_upstech ,
We haven’t heard from you on the last response and was just checking back to see if you have a resolution yet.
In case if you have any resolution please do share that same with the community as it can be helpful to others.
Otherwise, will respond back with the more details and we will try to help.
Hello @mkumar_upstech ,
We haven’t heard from you on the last response and was just checking back to see if you have a resolution yet .
In case if you have any resolution please do share that same with the community as it can be helpful to others .
If you have any question relating to the current thread, please do let us know and we will try out best to help you.
In case if you have any other question on a different issue, we request you to open a new thread .
User | Count |
---|---|
5 | |
4 | |
2 | |
1 | |
1 |
User | Count |
---|---|
14 | |
6 | |
5 | |
4 | |
3 |