March 31 - April 2, 2025, in Las Vegas, Nevada. Use code MSCUST for a $150 discount! Early bird discount ends December 31.
Register NowBe one of the first to start using Fabric Databases. View on-demand sessions with database experts and the Microsoft product team to learn just how easy it is to get started. Watch now
I have a CurrentProcessed.parquet file containing
filename: string (nullable = true)
processedTime: timestamp (nullable = true)
fullyProcessedFlag: integer (nullable = true)
And throughout the process the flag is 0. At the end I create a dataframe of the three rows in the parquet file and then set the flag to 1. Processed.
from pyspark.sql.functions import lit
# Update the fullyProcessedFlag column to 1 for all rows
df_Processed = dflog.select("filename","processedTime").withColumn("fullyProcessedFlag",lit(1))
display(df_Processed)
there is a slight change to the schema
fullyProcessedFlag: integer (nullable = false)
I then overwrite the parquet file with the new data. All processed
workspace_id = "########-####-####-####-############"
lakehouse_id = "########-####-####-####-############"
df_Processed.write.mode("overwrite").option("overwriteSchema", "true")\
.parquet(f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles.parquet")
# Print a success message
print("Parquet file overwritten successfully.")
And it errors. It removes the files from the parquet folder and leaves the folder empty.
With the error
Caused by: org.apache.spark.SparkFileNotFoundException: Operation failed: "Not Found", 404, HEAD, It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
I am at a complete loss with this and its now a blocker. I can't get past it so cant close my run off by resetting the flag in the parquet file.
Can anyone see what I am doing wrong in the code? could the slight schema change to nullable = false be causing the issue (I can't sort this out either and change it to true)
Hi @DebbieE
I was interested in your question and reproduced your error. I am not entirely sure but I assume that the behaviour has something to do with the way spark processes data and keeps track of the schema of each dataframe and where it comes from. In your case I would assume that there is some kind of circular dependency of the schema you are overwritting to the original schema of the file and that is why spark runs into that error - but as I said, pretty unsure about that and actually just an assumption.
Nevertheless I was able to at least come up with a solution/ workaround:
For the exact reason of different versions (in terms of schema-versions) of the parquet-file it seems obvious to use delta instead of parquet. So spark can keep track of the originally read schema while creating a newer version of the file since the "older" version is still kept in the lakehouse.
dflog_delta = spark.read.format('delta').load("Tables/" + delta_table_name)
from pyspark.sql.functions import lit
df_Processed_delta = dflog_delta.select("filename","processedTime").withColumn("fullyProcessedFlag",lit(1))
df_Processed_delta.write.mode("overwrite").option("overwriteSchema", "true")\
.format('delta').saveAsTable(delta_table_name)
print("Delta file overwritten successfully.")
Hope this solution works for you. 🙂
BR
Martin
I dont want to do that. I want to have everything in the one file. Because otherwise it will get really confusing.
I still want to see the old data in the latest file with 0 as processed in the latest.
I think you could still use a delta table.
If you don't want to overwrite the information in the table, perhaps you could update or upsert instead?
https://docs.delta.io/latest/delta-update.html#update-a-table
I'm not sure I fully understand the functionality you are wanting to achieve. Could you show some pictures of what you're wanting to achieve?
Please remember don't show any sensitive or internal information.
Here is a Reddit thread which discusses a similar topic:
Hi @DebbieE
I reproduced the same error when trying to update a parquet file with your method. However I haven't figured out a solution yet. Still need some time to work on this, as I'm new to Spark and not very experienced by now.
Best Regards,
Jing
March 31 - April 2, 2025, in Las Vegas, Nevada. Use code MSCUST for a $150 discount!
Arun Ulag shares exciting details about the Microsoft Fabric Conference 2025, which will be held in Las Vegas, NV.
User | Count |
---|---|
8 | |
1 | |
1 | |
1 | |
1 |
User | Count |
---|---|
13 | |
4 | |
3 | |
2 | |
2 |