Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Be one of the first to start using Fabric Databases. View on-demand sessions with database experts and the Microsoft product team to learn just how easy it is to get started. Watch now

Reply
DebbieE
Community Champion
Community Champion

Pyspark Create a ProcessedFiles. Update the flag but I then can't resave OR then view the original P

I have a CurrentProcessed.parquet file containing

filename: string (nullable = true) 

processedTime: timestamp (nullable = true) 

fullyProcessedFlag: integer (nullable = true)

 

And throughout the process the flag is 0. At the end I create a dataframe of the three rows in the parquet file and then set the flag to 1. Processed. 

from pyspark.sql.functions import lit
# Update the fullyProcessedFlag column to 1 for all rows
df_Processed = dflog.select("filename","processedTime").withColumn("fullyProcessedFlag",lit(1))

display(df_Processed)

there is a slight change to the schema

 

fullyProcessedFlag: integer (nullable = false)

 

I then overwrite the parquet file with the new data. All processed

 

workspace_id = "########-####-####-####-############"
lakehouse_id = "########-####-####-####-############"

df_Processed.write.mode("overwrite").option("overwriteSchema", "true")\
.parquet(f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/Data/Silver/Log/ProcessedFiles.parquet")

# Print a success message
print("Parquet file overwritten successfully.")

And it errors. It removes the files from the parquet folder and leaves the folder empty. 

 

With the error

 

Caused by: org.apache.spark.SparkFileNotFoundException: Operation failed: "Not Found", 404, HEAD, It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

 

I am at a complete loss with this and its now a blocker. I can't get past it so cant close my run off by resetting the flag in the parquet file.

 

Can anyone see what I am doing wrong in the code? could the slight schema change to nullable = false be causing the issue (I can't sort this out either and change it to true)

5 REPLIES 5
Hofpower
Frequent Visitor

Hi @DebbieE 

 

I was interested in your question and reproduced your error. I am not entirely sure but I assume that the behaviour has something to do with the way spark processes data and keeps track of the schema of each dataframe and where it comes from. In your case I would assume that there is some kind of circular dependency of the schema you are overwritting to the original schema of the file and that is why spark runs into that error - but as I said, pretty unsure about that and actually just an assumption.

 

Nevertheless I was able to at least come up with a solution/ workaround: 

For the exact reason of different versions (in terms of schema-versions) of the parquet-file it seems obvious to use delta instead of parquet. So spark can keep track of the originally read schema while creating a newer version of the file since the "older" version is still kept in the lakehouse.

dflog_delta = spark.read.format('delta').load("Tables/" + delta_table_name)

from
pyspark.sql.functions import lit
df_Processed_delta = dflog_delta.select("filename","processedTime").withColumn("fullyProcessedFlag",lit(1))

df_Processed_delta.write.mode("overwrite").option("overwriteSchema", "true")\
.format('delta').saveAsTable(delta_table_name)

print("Delta file overwritten successfully.")

 

Hope this solution works for you. 🙂

 

BR

Martin

DebbieE
Community Champion
Community Champion

I dont want to do that. I want to have everything in the one file. Because otherwise it will get really confusing. 

 

I still want to see the old data in the latest file with 0 as processed in the latest. 

I think you could still use a delta table.

 

If you don't want to overwrite the information in the table, perhaps you could update or upsert instead?

 

https://docs.delta.io/latest/delta-update.html#update-a-table

 

I'm not sure I fully understand the functionality you are wanting to achieve. Could you show some pictures of what you're wanting to achieve?

 

Please remember don't show any sensitive or internal information.

Here is a Reddit thread which discusses a similar topic:

 

https://www.reddit.com/r/MicrosoftFabric/s/zm1hF16CPz

v-jingzhan-msft
Community Support
Community Support

Hi @DebbieE 

 

I reproduced the same error when trying to update a parquet file with your method. However I haven't figured out a solution yet. Still need some time to work on this, as I'm new to Spark and not very experienced by now.

vjingzhanmsft_0-1723455533795.png

 

Best Regards,
Jing

Helpful resources

Announcements
Las Vegas 2025

Join us at the Microsoft Fabric Community Conference

March 31 - April 2, 2025, in Las Vegas, Nevada. Use code MSCUST for a $150 discount!

ArunFabCon

Microsoft Fabric Community Conference 2025

Arun Ulag shares exciting details about the Microsoft Fabric Conference 2025, which will be held in Las Vegas, NV.

December 2024

A Year in Review - December 2024

Find out what content was popular in the Fabric community during 2024.