Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Join us at FabCon Vienna from September 15-18, 2025, for the ultimate Fabric, Power BI, SQL, and AI community-led learning event. Save €200 with code FABCOMM. Get registered

Reply
amaaiia
Super User
Super User

Performance issue with PySpark code

I have a performance problem with a pyspark code. Does anyone know hoy to optimize it? The code I have is quite robust, but here is a basic example of what I do.

- I have a df table:

order value
AAA 23656
BBB 4647


- I have another table df_a:

order next_order next_value
AAA CCC 6878
BBB DDD 5676
DDD EEE 5653
CCC FFF 5666
EEE GGG 54353
FFF HHH 8878

 

In my code, my goal is to do join of df with df_a until no record from df matches with df_a as follows:

while df.filter(col('order').isNotNull()).count()>0:
   df_orig=df.join(df_a, [df['order']==df_a['order']]) \
        .select(df_a['next_order'].alias('order')
                df_a['next_value'].alias('value'))
   df = df_orig

 

I get an error with this code becaue I'm trying to join same dataframe several time with itself. However, I can't use alias because it'd be same alias every loop.

 

I've tried to change df=df_orig with df=df_orig.rdd.toDF(df_orig.schema) simulating it is a new dataframe and it works, but in 10 loops I get an error I think because fo memory reasons due tu rdd geretation:

 

amaaiia_0-1742473458461.png

 

My real use case is more complex than this dummy example (more data, more join conditions...) but I think the main idea is reflected.

 

1 ACCEPTED SOLUTION
v-veshwara-msft
Community Support
Community Support

Hi @amaaiia ,
Thanks for using Microsoft Fabric Community,

As mentioned by @ObungiNiels, the performance challenges in your code arise from excessive iterative self-joins, which lead to high shuffle costs and memory exhaustion. The error message indicates that some executors are failing due to memory constraints or task failures.

 

The key factors affecting performance include the use of count(), which forces Spark to recompute in each iteration, overwriting df, which leads to unnecessary recalculations, and repeated self-joins, which cause excessive data shuffling, increasing execution time and memory usage.

 

Here's an optimized version of code:
Instead of overwriting df, you can accumulate results using unionByName(). Also, avoid count() by tracking new records using exceptAll().

from pyspark.sql.functions import col
# Sample Data
df = spark.createDataFrame([("AAA", 23656), ("BBB", 4647)], ["order", "value"])
df_a = spark.createDataFrame([
    ("AAA", "CCC", 6878),
    ("BBB", "DDD", 5676),
    ("DDD", "EEE", 5653),
    ("CCC", "FFF", 5666),
    ("EEE", "GGG", 54353),
    ("FFF", "HHH", 8878),
], ["order", "next_order", "next_value"])

# Initialize result DataFrame with the starting values
result_df = df
new_records = df
while new_records.count() > 0:  # Ensure loop only runs when new matches exist
    # Join only on new records to get next level data
    new_df = new_records.alias("r").join(
        df_a.alias("a"), col("r.order") == col("a.order"), "inner"
    ).select(
        col("a.next_order").alias("order"),
        col("a.next_value").alias("value")
    )

    # Remove already processed records to avoid re-processing
    new_records = new_df.exceptAll(result_df)

    # Accumulate new results without overwriting
    result_df = result_df.unionByName(new_records)

# Show final result
result_df.show()

 

This approach should help optimize performance and prevent memory issues. Let us know if you need further assistance.

 

If this post helps, then please consider Accept it as the solution to help the other members find it more quickly and a kudos would be appreciated.

 

Regards,
Vinay.

View solution in original post

3 REPLIES 3
v-veshwara-msft
Community Support
Community Support

Hi @amaaiia ,
Thanks for using Microsoft Fabric Community,

As mentioned by @ObungiNiels, the performance challenges in your code arise from excessive iterative self-joins, which lead to high shuffle costs and memory exhaustion. The error message indicates that some executors are failing due to memory constraints or task failures.

 

The key factors affecting performance include the use of count(), which forces Spark to recompute in each iteration, overwriting df, which leads to unnecessary recalculations, and repeated self-joins, which cause excessive data shuffling, increasing execution time and memory usage.

 

Here's an optimized version of code:
Instead of overwriting df, you can accumulate results using unionByName(). Also, avoid count() by tracking new records using exceptAll().

from pyspark.sql.functions import col
# Sample Data
df = spark.createDataFrame([("AAA", 23656), ("BBB", 4647)], ["order", "value"])
df_a = spark.createDataFrame([
    ("AAA", "CCC", 6878),
    ("BBB", "DDD", 5676),
    ("DDD", "EEE", 5653),
    ("CCC", "FFF", 5666),
    ("EEE", "GGG", 54353),
    ("FFF", "HHH", 8878),
], ["order", "next_order", "next_value"])

# Initialize result DataFrame with the starting values
result_df = df
new_records = df
while new_records.count() > 0:  # Ensure loop only runs when new matches exist
    # Join only on new records to get next level data
    new_df = new_records.alias("r").join(
        df_a.alias("a"), col("r.order") == col("a.order"), "inner"
    ).select(
        col("a.next_order").alias("order"),
        col("a.next_value").alias("value")
    )

    # Remove already processed records to avoid re-processing
    new_records = new_df.exceptAll(result_df)

    # Accumulate new results without overwriting
    result_df = result_df.unionByName(new_records)

# Show final result
result_df.show()

 

This approach should help optimize performance and prevent memory issues. Let us know if you need further assistance.

 

If this post helps, then please consider Accept it as the solution to help the other members find it more quickly and a kudos would be appreciated.

 

Regards,
Vinay.

ObungiNiels
Resolver III
Resolver III

Hi @amaaiia ,

I unfortunately did not understand why it is necessary for you to execute this self-join multiple times but looking at your code snippet, there are some things to consider:

  • while loop: you are using .count() as your condition. Be aware that count() is considered an action in PySpark, while joins and other data operations are transformations. While transformations are lazily evaluated and optimized upon computation, each action will trigger the computation of all previously defined transformations. In your case, each iteration forces the Spark application to recalculate everything that has been done so far. This will lead to performance issues, especially when you are running a lot iterations. In a perfect world, the oinly actions we are using in Spark application is the eventual write operation at the end of your script. If it is possible to determine the number of loops before initiating it, you could set a fixed limit for it instead of using a While loop. 

If you can eloborate on the necessity of multiple loops and self-joins, we can look into additional ways to tackle your performance issues. 

Kind regards,

Niels 

Hi,

I've added more dummy data to the df_a example to ease the understanding of the problem.

I need to get a full flow combining all de orders, starting from df, looking for new_order into df_a, and the setting new_order as order to, again, get new_order of it.

 

For example, the provided table:

order next_order next_value
AAA CCC 6878
BBB DDD 5676
DDD EEE 5653
CCC FFF 5666
EEE GGG 54353
FFF HHH 8878

I need to get 2 flows: AAA->CCC->FFF->HHH and BBB->DDD->EEE->GGG (I get this flows joining df with df_a and setting df as the result of this join, iteratively)

It's like a child-parent relationship in the same table, I need to get the full flow starting with the values in df dataframe. The child-parent relationship is in df_a. I need to loop unil child has not parent, meaning that the flow has ended.

 

 

Helpful resources

Announcements
Join our Fabric User Panel

Join our Fabric User Panel

This is your chance to engage directly with the engineering team behind Fabric and Power BI. Share your experiences and shape the future.

June FBC25 Carousel

Fabric Monthly Update - June 2025

Check out the June 2025 Fabric update to learn about new features.

June 2025 community update carousel

Fabric Community Update - June 2025

Find out what's new and trending in the Fabric community.