Power BI is turning 10, and we’re marking the occasion with a special community challenge. Use your creativity to tell a story, uncover trends, or highlight something unexpected.
Get startedJoin us at FabCon Vienna from September 15-18, 2025, for the ultimate Fabric, Power BI, SQL, and AI community-led learning event. Save €200 with code FABCOMM. Get registered
I have a performance problem with a pyspark code. Does anyone know hoy to optimize it? The code I have is quite robust, but here is a basic example of what I do.
- I have a df table:
order | value |
AAA | 23656 |
BBB | 4647 |
- I have another table df_a:
order | next_order | next_value |
AAA | CCC | 6878 |
BBB | DDD | 5676 |
DDD | EEE | 5653 |
CCC | FFF | 5666 |
EEE | GGG | 54353 |
FFF | HHH | 8878 |
In my code, my goal is to do join of df with df_a until no record from df matches with df_a as follows:
while df.filter(col('order').isNotNull()).count()>0:
df_orig=df.join(df_a, [df['order']==df_a['order']]) \
.select(df_a['next_order'].alias('order')
df_a['next_value'].alias('value'))
df = df_orig
I get an error with this code becaue I'm trying to join same dataframe several time with itself. However, I can't use alias because it'd be same alias every loop.
I've tried to change df=df_orig with df=df_orig.rdd.toDF(df_orig.schema) simulating it is a new dataframe and it works, but in 10 loops I get an error I think because fo memory reasons due tu rdd geretation:
My real use case is more complex than this dummy example (more data, more join conditions...) but I think the main idea is reflected.
Solved! Go to Solution.
Hi @amaaiia ,
Thanks for using Microsoft Fabric Community,
As mentioned by @ObungiNiels, the performance challenges in your code arise from excessive iterative self-joins, which lead to high shuffle costs and memory exhaustion. The error message indicates that some executors are failing due to memory constraints or task failures.
The key factors affecting performance include the use of count(), which forces Spark to recompute in each iteration, overwriting df, which leads to unnecessary recalculations, and repeated self-joins, which cause excessive data shuffling, increasing execution time and memory usage.
Here's an optimized version of code:
Instead of overwriting df, you can accumulate results using unionByName(). Also, avoid count() by tracking new records using exceptAll().
from pyspark.sql.functions import col
# Sample Data
df = spark.createDataFrame([("AAA", 23656), ("BBB", 4647)], ["order", "value"])
df_a = spark.createDataFrame([
("AAA", "CCC", 6878),
("BBB", "DDD", 5676),
("DDD", "EEE", 5653),
("CCC", "FFF", 5666),
("EEE", "GGG", 54353),
("FFF", "HHH", 8878),
], ["order", "next_order", "next_value"])
# Initialize result DataFrame with the starting values
result_df = df
new_records = df
while new_records.count() > 0: # Ensure loop only runs when new matches exist
# Join only on new records to get next level data
new_df = new_records.alias("r").join(
df_a.alias("a"), col("r.order") == col("a.order"), "inner"
).select(
col("a.next_order").alias("order"),
col("a.next_value").alias("value")
)
# Remove already processed records to avoid re-processing
new_records = new_df.exceptAll(result_df)
# Accumulate new results without overwriting
result_df = result_df.unionByName(new_records)
# Show final result
result_df.show()
This approach should help optimize performance and prevent memory issues. Let us know if you need further assistance.
If this post helps, then please consider Accept it as the solution to help the other members find it more quickly and a kudos would be appreciated.
Regards,
Vinay.
Hi @amaaiia ,
Thanks for using Microsoft Fabric Community,
As mentioned by @ObungiNiels, the performance challenges in your code arise from excessive iterative self-joins, which lead to high shuffle costs and memory exhaustion. The error message indicates that some executors are failing due to memory constraints or task failures.
The key factors affecting performance include the use of count(), which forces Spark to recompute in each iteration, overwriting df, which leads to unnecessary recalculations, and repeated self-joins, which cause excessive data shuffling, increasing execution time and memory usage.
Here's an optimized version of code:
Instead of overwriting df, you can accumulate results using unionByName(). Also, avoid count() by tracking new records using exceptAll().
from pyspark.sql.functions import col
# Sample Data
df = spark.createDataFrame([("AAA", 23656), ("BBB", 4647)], ["order", "value"])
df_a = spark.createDataFrame([
("AAA", "CCC", 6878),
("BBB", "DDD", 5676),
("DDD", "EEE", 5653),
("CCC", "FFF", 5666),
("EEE", "GGG", 54353),
("FFF", "HHH", 8878),
], ["order", "next_order", "next_value"])
# Initialize result DataFrame with the starting values
result_df = df
new_records = df
while new_records.count() > 0: # Ensure loop only runs when new matches exist
# Join only on new records to get next level data
new_df = new_records.alias("r").join(
df_a.alias("a"), col("r.order") == col("a.order"), "inner"
).select(
col("a.next_order").alias("order"),
col("a.next_value").alias("value")
)
# Remove already processed records to avoid re-processing
new_records = new_df.exceptAll(result_df)
# Accumulate new results without overwriting
result_df = result_df.unionByName(new_records)
# Show final result
result_df.show()
This approach should help optimize performance and prevent memory issues. Let us know if you need further assistance.
If this post helps, then please consider Accept it as the solution to help the other members find it more quickly and a kudos would be appreciated.
Regards,
Vinay.
Hi @amaaiia ,
I unfortunately did not understand why it is necessary for you to execute this self-join multiple times but looking at your code snippet, there are some things to consider:
If you can eloborate on the necessity of multiple loops and self-joins, we can look into additional ways to tackle your performance issues.
Kind regards,
Niels
Hi,
I've added more dummy data to the df_a example to ease the understanding of the problem.
I need to get a full flow combining all de orders, starting from df, looking for new_order into df_a, and the setting new_order as order to, again, get new_order of it.
For example, the provided table:
order | next_order | next_value |
AAA | CCC | 6878 |
BBB | DDD | 5676 |
DDD | EEE | 5653 |
CCC | FFF | 5666 |
EEE | GGG | 54353 |
FFF | HHH | 8878 |
I need to get 2 flows: AAA->CCC->FFF->HHH and BBB->DDD->EEE->GGG (I get this flows joining df with df_a and setting df as the result of this join, iteratively)
It's like a child-parent relationship in the same table, I need to get the full flow starting with the values in df dataframe. The child-parent relationship is in df_a. I need to loop unil child has not parent, meaning that the flow has ended.
This is your chance to engage directly with the engineering team behind Fabric and Power BI. Share your experiences and shape the future.
Check out the June 2025 Fabric update to learn about new features.