Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Find everything you need to get certified on Fabric—skills challenges, live sessions, exam prep, role guidance, and more. Get started

Reply
Morris98
Regular Visitor

Unexplainable behavior in Notebook when using pyspark

 

 

 

from pyspark.sql.functions import col, date_format
from pyspark.sql.types import IntegerType, StringType

print("Debug: ORIGINAL_VON_SCD")
#debug:
debug_info = final_dfs_versioned["issues"].filter(col('vorgangs_ID') == 1000)
debug_info.show()


# Creating a deep copy of each DataFrame in the dictionary
dfs_to_load_into_db = {}
for table_name, df in final_dfs_versioned.items():
    # Deep copy by selecting all columns and caching
    copied_df = df.select("*").cache()
    copied_df.count()  # Trigger an action to cache the DataFrame
    dfs_to_load_into_db[table_name] = copied_df

print("Debug: KOPIE!")
#debug:
debug_info = dfs_to_load_into_db["issues"].filter(col('vorgangs_ID') == 1000)
debug_info.show()

#Debug: Alle Tables leeren vor Import!
confirmation = 1
if confirmation == 1:
  print("Alles wird gelöscht")
  for table_name, table_config in table_mappings.items():
    # Leere die Tabelle
    spark.sql(f"DELETE FROM SAMPLE_LH.{table_name}")


#DATAFRAMES HAVE CHANGED FROM HERE ON! (the debug output)

print("Debug: ORIGINAL_VON_SCD")
#debug:
debug_info = final_dfs_versioned["issues"].filter(col('vorgangs_ID') == 1000)
debug_info.show()

print("Debug: KOPIE!")
#debug:
debug_info = dfs_to_load_into_db["issues"].filter(col('vorgangs_ID') == 1000)
debug_info.show()

 

 

 

Hi there,

I have a question about a behavior in Notebooks in Fabric.
Context: What I am trying to do is load new data and use SCD2 to compare it with the old data in the lakehouse to get a new df that I want to load into the lakehouse, replacing all the old data there.

While deleting the old data I stumpled upon a strange behaviour:

What I do not understand is that deleting data in the lakehouse somehow affects my pyspark frameworks: it deletes data or inserts data in a strange way (See the Comment "DATAFRAMES HAVE CHANGED FROM HERE ON!").
Even if I copy one of them into a new one and test it with this one, it shows the same behavior. This is strange to me because I did not expect this behavior.

I have to say that the table names inside the lakehouse are the same as in my pyspark dataframes (which are saved in a dictionary), but still the behavior is very strange to me.


Do you guys have any idea what is causing this and how I could fix the behavior so that I can manipulate the data in the lakehouse independently. Probably I need to decouple it somehow? (Is it because of the same table names?)

Thank you in advance and have a nice day.

Morris

1 ACCEPTED SOLUTION
v-gchenna-msft
Community Support
Community Support

Hi @Morris98 ,

Thanks for using Fabric Community.

As I understand you're facing a strange issue with your DataFrames after deleting data from the lakehouse. Let's break down what's happening and how to fix it.

The Issue:

When you create copies of your DataFrames using select("*").cache(), they seem to be referencing the same data in the Delta Lake tables. This is because DataFrames are like snapshots of data – they don't actually hold the data themselves.

So, when you delete data from the lakehouse tables using spark.sql(), both the original and copied DataFrames are affected because they point to the same underlying data source.

The Fix:

There are a couple of ways to achieve what you want:

 

  1. True Deep Copy:
    Instead of creating a "copy" that references the same data, you can create a truly independent DataFrame. Here's how:

    from pyspark.sql import Row
    copied_df = df.rdd.map(lambda row: Row(**row.asDict())).toDF()


    This code goes through each row in your original DataFrame, creates a new row object with the same data, and builds a completely new DataFrame from those new rows.

  2. Read After Deletion:
    Alternatively, you can simply read the data again after deleting it from the lakehouse:

    # Delete data
    spark.sql(f"DELETE FROM SAMPLE_LH.{table_name}")

    # Read data again for "issues" table
    debug_info = spark.read.format("delta").load(f"SAMPLE_LH.{table_name}")


    This ensures your DataFrame reflects the latest state of the data in the lakehouse after the deletion.

  3. Versioning (Optional):
    If you need to work with historical data, Delta Lake's versioning feature can be helpful. You can read a specific version of the data before the deletion for analysis.

By implementing one of these solutions, you can ensure your DataFrames are not influenced by deletions in the lakehouse and manipulate data independently.

Hope this might help. Do let me know incase of further queries.

View solution in original post

3 REPLIES 3
Morris98
Regular Visitor

Thank you so much for your detailed answer v-gchenna-msft

Your explanation of the Undelying issue for a newbie like me was very helpful.

I will definitely implement one of your solutions as soon as I get back to my laptop. The first option in particular seems to fit well with the rest of my code and I will give it a try.


Thank you again and have a nice day! 

Best wishes,
Morris

Hi @Morris98 ,

Glad to know that you got some insights. Please continue using Fabric Community on your further queries.

v-gchenna-msft
Community Support
Community Support

Hi @Morris98 ,

Thanks for using Fabric Community.

As I understand you're facing a strange issue with your DataFrames after deleting data from the lakehouse. Let's break down what's happening and how to fix it.

The Issue:

When you create copies of your DataFrames using select("*").cache(), they seem to be referencing the same data in the Delta Lake tables. This is because DataFrames are like snapshots of data – they don't actually hold the data themselves.

So, when you delete data from the lakehouse tables using spark.sql(), both the original and copied DataFrames are affected because they point to the same underlying data source.

The Fix:

There are a couple of ways to achieve what you want:

 

  1. True Deep Copy:
    Instead of creating a "copy" that references the same data, you can create a truly independent DataFrame. Here's how:

    from pyspark.sql import Row
    copied_df = df.rdd.map(lambda row: Row(**row.asDict())).toDF()


    This code goes through each row in your original DataFrame, creates a new row object with the same data, and builds a completely new DataFrame from those new rows.

  2. Read After Deletion:
    Alternatively, you can simply read the data again after deleting it from the lakehouse:

    # Delete data
    spark.sql(f"DELETE FROM SAMPLE_LH.{table_name}")

    # Read data again for "issues" table
    debug_info = spark.read.format("delta").load(f"SAMPLE_LH.{table_name}")


    This ensures your DataFrame reflects the latest state of the data in the lakehouse after the deletion.

  3. Versioning (Optional):
    If you need to work with historical data, Delta Lake's versioning feature can be helpful. You can read a specific version of the data before the deletion for analysis.

By implementing one of these solutions, you can ensure your DataFrames are not influenced by deletions in the lakehouse and manipulate data independently.

Hope this might help. Do let me know incase of further queries.

Helpful resources

Announcements
Europe Fabric Conference

Europe’s largest Microsoft Fabric Community Conference

Join the community in Stockholm for expert Microsoft Fabric learning including a very exciting keynote from Arun Ulag, Corporate Vice President, Azure Data.

Expanding the Synapse Forums

New forum boards available in Synapse

Ask questions in Data Engineering, Data Science, Data Warehouse and General Discussion.

RTI Forums Carousel3

New forum boards available in Real-Time Intelligence.

Ask questions in Eventhouse and KQL, Eventstream, and Reflex.

MayFBCUpdateCarousel

Fabric Monthly Update - May 2024

Check out the May 2024 Fabric update to learn about new features.