<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Performance issue with PySpark code in Data Engineering</title>
    <link>https://community.fabric.microsoft.com/t5/Data-Engineering/Performance-issue-with-PySpark-code/m-p/4618510#M8102</link>
    <description>&lt;P&gt;Hi,&lt;/P&gt;
&lt;P&gt;I've added more dummy data to the df_a example to ease the understanding of the problem.&lt;/P&gt;
&lt;P&gt;I need to get a full flow combining all de orders, starting from df, looking for &lt;EM&gt;new_order&lt;/EM&gt; into df_a, and the setting &lt;EM&gt;new_order&lt;/EM&gt; as &lt;EM&gt;order&lt;/EM&gt; to, again, get &lt;EM&gt;new_order&lt;/EM&gt; of it.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;For example, the provided table:&lt;/P&gt;
&lt;TABLE border="1" width="100%"&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;&lt;STRONG&gt;order&lt;/STRONG&gt;&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;&lt;STRONG&gt;next_order&lt;/STRONG&gt;&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;&lt;STRONG&gt;next_value&lt;/STRONG&gt;&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;AAA&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;CCC&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;6878&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;BBB&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;DDD&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;5676&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD height="30px"&gt;DDD&lt;/TD&gt;
&lt;TD height="30px"&gt;EEE&lt;/TD&gt;
&lt;TD height="30px"&gt;5653&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD height="30px"&gt;CCC&lt;/TD&gt;
&lt;TD height="30px"&gt;FFF&lt;/TD&gt;
&lt;TD height="30px"&gt;5666&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD height="30px"&gt;EEE&lt;/TD&gt;
&lt;TD height="30px"&gt;GGG&lt;/TD&gt;
&lt;TD height="30px"&gt;54353&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD height="28px"&gt;FFF&lt;/TD&gt;
&lt;TD height="28px"&gt;HHH&lt;/TD&gt;
&lt;TD height="28px"&gt;8878&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;P&gt;I need to get 2 flows: AAA-&amp;gt;CCC-&amp;gt;FFF-&amp;gt;HHH and BBB-&amp;gt;DDD-&amp;gt;EEE-&amp;gt;GGG (I get this flows joining df with df_a and setting df as the result of this join, iteratively)&lt;/P&gt;
&lt;P&gt;It's like a child-parent relationship in the same table, I need to get the full flow starting with the values in df dataframe. The child-parent relationship is in df_a. I need to loop unil child has not parent, meaning that the flow has ended.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Thu, 20 Mar 2025 15:58:25 GMT</pubDate>
    <dc:creator>amaaiia</dc:creator>
    <dc:date>2025-03-20T15:58:25Z</dc:date>
    <item>
      <title>Performance issue with PySpark code</title>
      <link>https://community.fabric.microsoft.com/t5/Data-Engineering/Performance-issue-with-PySpark-code/m-p/4618167#M8094</link>
      <description>&lt;P&gt;I have a performance problem with a pyspark code. Does anyone know hoy to optimize it? The code I have is quite robust, but here is a basic example of what I do.&lt;/P&gt;
&lt;P&gt;- I have a df table:&lt;/P&gt;
&lt;TABLE border="1" width="100%"&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD width="50%"&gt;&lt;STRONG&gt;order&lt;/STRONG&gt;&lt;/TD&gt;
&lt;TD width="50%"&gt;&lt;STRONG&gt;value&lt;/STRONG&gt;&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD width="50%"&gt;AAA&lt;/TD&gt;
&lt;TD width="50%"&gt;23656&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD width="50%"&gt;BBB&lt;/TD&gt;
&lt;TD width="50%"&gt;4647&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;P&gt;&lt;BR /&gt;- I have another table df_a:&lt;/P&gt;
&lt;TABLE border="1" width="100%"&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;&lt;STRONG&gt;order&lt;/STRONG&gt;&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;&lt;STRONG&gt;next_order&lt;/STRONG&gt;&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;&lt;STRONG&gt;next_value&lt;/STRONG&gt;&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;AAA&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;CCC&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;6878&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;BBB&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;DDD&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;5676&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD height="30px"&gt;DDD&lt;/TD&gt;
&lt;TD height="30px"&gt;EEE&lt;/TD&gt;
&lt;TD height="30px"&gt;5653&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD height="30px"&gt;CCC&lt;/TD&gt;
&lt;TD height="30px"&gt;FFF&lt;/TD&gt;
&lt;TD height="30px"&gt;5666&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD height="30px"&gt;EEE&lt;/TD&gt;
&lt;TD height="30px"&gt;GGG&lt;/TD&gt;
&lt;TD height="30px"&gt;54353&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD height="28px"&gt;FFF&lt;/TD&gt;
&lt;TD height="28px"&gt;HHH&lt;/TD&gt;
&lt;TD height="28px"&gt;8878&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;In my code, my goal is to do join of df with df_a until no record from df matches with df_a as follows:&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;while df.filter(col('order').isNotNull()).count()&amp;gt;0:
   df_orig=df.join(df_a, [df['order']==df_a['order']]) \
        .select(df_a['next_order'].alias('order')
                df_a['next_value'].alias('value'))
   df = df_orig&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;I get an error with this code becaue I'm trying to join same dataframe several time with itself. However, I can't use alias because it'd be same alias every loop.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;I've tried to change &lt;EM&gt;df=df_orig&lt;/EM&gt; with &lt;EM&gt;df=df_orig.rdd.toDF(df_orig.schema) s&lt;/EM&gt;imulating it is a new dataframe and it works, but in 10 loops I get an error I think because fo memory reasons due tu rdd geretation:&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="amaaiia_0-1742473458461.png" style="width: 400px;"&gt;&lt;img src="https://community.fabric.microsoft.com/t5/image/serverpage/image-id/1252369i39037BABECA98A9A/image-size/medium?v=v2&amp;amp;px=400" role="button" title="amaaiia_0-1742473458461.png" alt="amaaiia_0-1742473458461.png" /&gt;&lt;/span&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;My real use case is more complex than this dummy example (more data, more join conditions...) but I think the main idea is reflected.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 20 Mar 2025 15:29:21 GMT</pubDate>
      <guid>https://community.fabric.microsoft.com/t5/Data-Engineering/Performance-issue-with-PySpark-code/m-p/4618167#M8094</guid>
      <dc:creator>amaaiia</dc:creator>
      <dc:date>2025-03-20T15:29:21Z</dc:date>
    </item>
    <item>
      <title>Re: Performance issue with PySpark code</title>
      <link>https://community.fabric.microsoft.com/t5/Data-Engineering/Performance-issue-with-PySpark-code/m-p/4618480#M8101</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.fabric.microsoft.com/t5/user/viewprofilepage/user-id/686932"&gt;@amaaiia&lt;/a&gt;&amp;nbsp;,&lt;/P&gt;&lt;P&gt;I unfortunately did not understand why it is necessary for you to execute this self-join multiple times but looking at your code snippet, there are some things to consider:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;while loop: you are using .count() as your condition. Be aware that count() is considered an&amp;nbsp;&lt;STRONG&gt;action&lt;/STRONG&gt; in PySpark, while joins and other data operations are &lt;STRONG&gt;transformations&lt;/STRONG&gt;. While transformations are lazily evaluated and optimized upon computation, each action will trigger the computation of&amp;nbsp;&lt;STRONG&gt;all&lt;/STRONG&gt; previously defined transformations. In your case, each iteration forces the Spark application to recalculate everything that has been done so far. This will lead to performance issues, especially when you are running a lot iterations. In a perfect world, the oinly actions we are using in Spark application is the eventual write operation at the end of your script. If it is possible to determine the number of loops before initiating it, you could set a fixed limit for it instead of using a While loop.&amp;nbsp;&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;If you can eloborate on the necessity of multiple loops and self-joins, we can look into additional ways to tackle your performance issues.&amp;nbsp;&lt;/P&gt;&lt;P&gt;Kind regards,&lt;/P&gt;&lt;P&gt;Niels&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 20 Mar 2025 15:12:13 GMT</pubDate>
      <guid>https://community.fabric.microsoft.com/t5/Data-Engineering/Performance-issue-with-PySpark-code/m-p/4618480#M8101</guid>
      <dc:creator>ObungiNiels</dc:creator>
      <dc:date>2025-03-20T15:12:13Z</dc:date>
    </item>
    <item>
      <title>Re: Performance issue with PySpark code</title>
      <link>https://community.fabric.microsoft.com/t5/Data-Engineering/Performance-issue-with-PySpark-code/m-p/4618510#M8102</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;
&lt;P&gt;I've added more dummy data to the df_a example to ease the understanding of the problem.&lt;/P&gt;
&lt;P&gt;I need to get a full flow combining all de orders, starting from df, looking for &lt;EM&gt;new_order&lt;/EM&gt; into df_a, and the setting &lt;EM&gt;new_order&lt;/EM&gt; as &lt;EM&gt;order&lt;/EM&gt; to, again, get &lt;EM&gt;new_order&lt;/EM&gt; of it.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;For example, the provided table:&lt;/P&gt;
&lt;TABLE border="1" width="100%"&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;&lt;STRONG&gt;order&lt;/STRONG&gt;&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;&lt;STRONG&gt;next_order&lt;/STRONG&gt;&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;&lt;STRONG&gt;next_value&lt;/STRONG&gt;&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;AAA&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;CCC&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;6878&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;BBB&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;DDD&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;5676&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD height="30px"&gt;DDD&lt;/TD&gt;
&lt;TD height="30px"&gt;EEE&lt;/TD&gt;
&lt;TD height="30px"&gt;5653&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD height="30px"&gt;CCC&lt;/TD&gt;
&lt;TD height="30px"&gt;FFF&lt;/TD&gt;
&lt;TD height="30px"&gt;5666&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD height="30px"&gt;EEE&lt;/TD&gt;
&lt;TD height="30px"&gt;GGG&lt;/TD&gt;
&lt;TD height="30px"&gt;54353&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD height="28px"&gt;FFF&lt;/TD&gt;
&lt;TD height="28px"&gt;HHH&lt;/TD&gt;
&lt;TD height="28px"&gt;8878&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;P&gt;I need to get 2 flows: AAA-&amp;gt;CCC-&amp;gt;FFF-&amp;gt;HHH and BBB-&amp;gt;DDD-&amp;gt;EEE-&amp;gt;GGG (I get this flows joining df with df_a and setting df as the result of this join, iteratively)&lt;/P&gt;
&lt;P&gt;It's like a child-parent relationship in the same table, I need to get the full flow starting with the values in df dataframe. The child-parent relationship is in df_a. I need to loop unil child has not parent, meaning that the flow has ended.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 20 Mar 2025 15:58:25 GMT</pubDate>
      <guid>https://community.fabric.microsoft.com/t5/Data-Engineering/Performance-issue-with-PySpark-code/m-p/4618510#M8102</guid>
      <dc:creator>amaaiia</dc:creator>
      <dc:date>2025-03-20T15:58:25Z</dc:date>
    </item>
    <item>
      <title>Re: Performance issue with PySpark code</title>
      <link>https://community.fabric.microsoft.com/t5/Data-Engineering/Performance-issue-with-PySpark-code/m-p/4625168#M8212</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.fabric.microsoft.com/t5/user/viewprofilepage/user-id/686932"&gt;@amaaiia&lt;/a&gt;&amp;nbsp;,&lt;BR /&gt;Thanks for using Microsoft Fabric Community,&lt;/P&gt;
&lt;P class="" data-start="48" data-end="321"&gt;As mentioned by &lt;a href="https://community.fabric.microsoft.com/t5/user/viewprofilepage/user-id/729364"&gt;@ObungiNiels&lt;/a&gt;, the performance challenges in your code arise from excessive iterative self-joins, which lead to high shuffle costs and memory exhaustion. The error message indicates that some executors are failing due to memory constraints or task failures.&lt;/P&gt;
&lt;P class="" data-start="48" data-end="321"&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="" data-start="323" data-end="610"&gt;The key factors affecting performance include the use of count(), which forces Spark to recompute in each iteration, overwriting df, which leads to unnecessary recalculations, and repeated self-joins, which cause excessive data shuffling, increasing execution time and memory usage.&lt;/P&gt;
&lt;P class="" data-start="323" data-end="610"&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="" data-start="323" data-end="610"&gt;Here's an optimized version of code:&lt;BR /&gt;Instead of overwriting df, you can accumulate results using unionByName(). Also, avoid&amp;nbsp;count() by tracking new records using exceptAll().&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;from pyspark.sql.functions import col
# Sample Data
df = spark.createDataFrame([("AAA", 23656), ("BBB", 4647)], ["order", "value"])
df_a = spark.createDataFrame([
    ("AAA", "CCC", 6878),
    ("BBB", "DDD", 5676),
    ("DDD", "EEE", 5653),
    ("CCC", "FFF", 5666),
    ("EEE", "GGG", 54353),
    ("FFF", "HHH", 8878),
], ["order", "next_order", "next_value"])

# Initialize result DataFrame with the starting values
result_df = df
new_records = df
while new_records.count() &amp;gt; 0:  # Ensure loop only runs when new matches exist
    # Join only on new records to get next level data
    new_df = new_records.alias("r").join(
        df_a.alias("a"), col("r.order") == col("a.order"), "inner"
    ).select(
        col("a.next_order").alias("order"),
        col("a.next_value").alias("value")
    )

    # Remove already processed records to avoid re-processing
    new_records = new_df.exceptAll(result_df)

    # Accumulate new results without overwriting
    result_df = result_df.unionByName(new_records)

# Show final result
result_df.show()
&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;This approach should help optimize performance and prevent memory issues. Let us know if you need further assistance.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;If this post helps, then please consider Accept it as the solution to help the other members find it more quickly and a kudos would be appreciated.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Regards,&lt;BR /&gt;Vinay.&lt;/P&gt;</description>
      <pubDate>Wed, 26 Mar 2025 05:07:15 GMT</pubDate>
      <guid>https://community.fabric.microsoft.com/t5/Data-Engineering/Performance-issue-with-PySpark-code/m-p/4625168#M8212</guid>
      <dc:creator>v-veshwara-msft</dc:creator>
      <dc:date>2025-03-26T05:07:15Z</dc:date>
    </item>
  </channel>
</rss>

