<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Performance issue with Spark structured streaming job in Data Engineering</title>
    <link>https://community.fabric.microsoft.com/t5/Data-Engineering/Performance-issue-with-Spark-structured-streaming-job/m-p/4067939#M3340</link>
    <description>&lt;P&gt;Hi &lt;a href="https://community.fabric.microsoft.com/t5/user/viewprofilepage/user-id/751587"&gt;@MiSchroe&lt;/a&gt;&amp;nbsp;,&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Given that the same code runs efficiently on Databricks, the problem may be in the configuration or environment of the current setup.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;I have the following suggestions for performance tuning:&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;It is recommended to increase spark.executor.heartbeatInterval from 10 seconds to 20 seconds.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Increase driver and executor memory.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Increase parallelism: increase the number of partitions in Kafka.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Correctly resize the executor.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;More details on performance tuning can be found in this article:&lt;/P&gt;
&lt;P&gt;&lt;A href="https://developer.hpe.com/blog/performance-tuning-of-an-apache-kafkaspark-streaming-system/" target="_blank"&gt;Performance Tuning of an Apache Kafka/Spark Streaming System | HPE Developer Portal&lt;/A&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Best Regards,&lt;BR /&gt;Yang&lt;BR /&gt;Community Support Team&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;If there is any post&amp;nbsp;&lt;STRONG&gt;&lt;EM&gt;helps&lt;/EM&gt;&lt;/STRONG&gt;, then please consider&amp;nbsp;&lt;STRONG&gt;&lt;EM&gt;Accept it as the solution&lt;/EM&gt;&lt;/STRONG&gt;&amp;nbsp;&amp;nbsp;to help the other members find it more quickly.&lt;BR /&gt;If I misunderstand your needs or you still have problems on it, please feel free to let us know.&amp;nbsp;&lt;STRONG&gt;&lt;EM&gt;Thanks a lot!&lt;/EM&gt;&lt;/STRONG&gt;&lt;/P&gt;</description>
    <pubDate>Tue, 30 Jul 2024 01:51:38 GMT</pubDate>
    <dc:creator>Anonymous</dc:creator>
    <dc:date>2024-07-30T01:51:38Z</dc:date>
    <item>
      <title>Performance issue with Spark structured streaming job</title>
      <link>https://community.fabric.microsoft.com/t5/Data-Engineering/Performance-issue-with-Spark-structured-streaming-job/m-p/4066907#M3329</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;we face some performance issues when running a Spark Job Definition with a structured streaming job getting data from a Kafka. We just want to dump the Kafka data into a Lakehouse for further processing in later steps.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Our issue is, that we have an average latency of around 30 seconds between the event being visible to Kafka and the time the event is picked up by the Spark job. I seems that the overall micro-batch processing time is the main issue and I don't know why it takes so long. I have already tried with different cluster sizes but due to the fact that we're just passing through the data the cluster size doesn't really matter. At least, the number of vCores should match the number of Kafka partitions, which in our case is 6 partitions.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I took the same source code and have run it in a Databricks notebook where I get an average latency of 0.6 seconds, so it's not an issue on the source side.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Here is our source code:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;import sys
import os
from notebookutils import mssparkutils
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import pyspark.sql.functions as f
import pyspark.sql.types as t


if __name__ == "__main__":

    #Spark session builder
    #Disable delta.optimizeWrite to ensure data is written as fast as possible
    spark = (SparkSession
          .builder
          .appName("test")
          .config("spark.microsoft.delta.optimizeWrite.enabled", "false")
          .config("spark.sql.parquet.vorder.enabled", "false")
          .config("spark.ms.autotune.enabled", "true")
          .getOrCreate())
    
    spark_context = spark.sparkContext
    spark_context.setLogLevel("DEBUG")



    print("spark.synapse.pool.name : " + spark.conf.get("spark.synapse.pool.name")) 
    print() 
    print("spark.driver.cores : " + spark.conf.get("spark.driver.cores")) 
    print("spark.driver.memory : " + spark.conf.get("spark.driver.memory")) 
    print("spark.executor.cores : " + spark.conf.get("spark.executor.cores")) 
    print("spark.executor.memory : " + spark.conf.get("spark.executor.memory")) 
    print("spark.executor.instances: " + spark.conf.get("spark.executor.instances")) 
    print() 
    print("spark.dynamicAllocation.enabled : " + spark.conf.get("spark.dynamicAllocation.enabled")) 
    print("spark.dynamicAllocation.maxExecutors : " + spark.conf.get("spark.dynamicAllocation.maxExecutors")) 
    print("spark.dynamicAllocation.minExecutors : " + spark.conf.get("spark.dynamicAllocation.minExecutors")) 

    checkpoint_path = "Files/checkpoint/path/to/checkpoint/location"

    # Works with API-Key und Secret
    df = (
        spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "&amp;lt;my kafka server&amp;gt;:9092")
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format("&amp;lt;my kafka api key&amp;gt;", "&amp;lt;my kafka api secret&amp;gt;"))
        .option("kafka.ssl.endpoint.identification.algorithm", "https")
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("subscribe", "&amp;lt;my kafka topic&amp;gt;")
        .option("startingOffsets", "earliest")
        .option("failOnDataLoss", "false")
        .load()
    )

    # Add metadata to the dataframe, e.g. current timestamp and convert key and value to string
    df = df.withColumns({"key": f.col("key").cast("string"), "value": f.col("value").cast("string"), "timestamp_datalake": f.current_timestamp()})

    df_stream = (
        df.writeStream
        .format("delta")
        .option("checkpointLocation", checkpoint_path)
        .option("mergeSchema", "true")
        .outputMode("append")
        .toTable("my_test_table")
    )
    
    df_stream.awaitTermination()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;This is a sample of the Spark UI:&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="MiSchroe_0-1722251930376.png" style="width: 400px;"&gt;&lt;img src="https://community.fabric.microsoft.com/t5/image/serverpage/image-id/1140861i016027D9E0242885/image-size/medium?v=v2&amp;amp;px=400" role="button" title="MiSchroe_0-1722251930376.png" alt="MiSchroe_0-1722251930376.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;BTW: Importing the same data with an Event Streams from Kafka to a KQL database takes even longer ~40 seconds avg latency.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Any help is much appreciated.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Michael&lt;/P&gt;</description>
      <pubDate>Mon, 29 Jul 2024 11:35:37 GMT</pubDate>
      <guid>https://community.fabric.microsoft.com/t5/Data-Engineering/Performance-issue-with-Spark-structured-streaming-job/m-p/4066907#M3329</guid>
      <dc:creator>MiSchroe</dc:creator>
      <dc:date>2024-07-29T11:35:37Z</dc:date>
    </item>
    <item>
      <title>Re: Performance issue with Spark structured streaming job</title>
      <link>https://community.fabric.microsoft.com/t5/Data-Engineering/Performance-issue-with-Spark-structured-streaming-job/m-p/4067939#M3340</link>
      <description>&lt;P&gt;Hi &lt;a href="https://community.fabric.microsoft.com/t5/user/viewprofilepage/user-id/751587"&gt;@MiSchroe&lt;/a&gt;&amp;nbsp;,&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Given that the same code runs efficiently on Databricks, the problem may be in the configuration or environment of the current setup.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;I have the following suggestions for performance tuning:&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;It is recommended to increase spark.executor.heartbeatInterval from 10 seconds to 20 seconds.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Increase driver and executor memory.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Increase parallelism: increase the number of partitions in Kafka.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Correctly resize the executor.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;More details on performance tuning can be found in this article:&lt;/P&gt;
&lt;P&gt;&lt;A href="https://developer.hpe.com/blog/performance-tuning-of-an-apache-kafkaspark-streaming-system/" target="_blank"&gt;Performance Tuning of an Apache Kafka/Spark Streaming System | HPE Developer Portal&lt;/A&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Best Regards,&lt;BR /&gt;Yang&lt;BR /&gt;Community Support Team&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;If there is any post&amp;nbsp;&lt;STRONG&gt;&lt;EM&gt;helps&lt;/EM&gt;&lt;/STRONG&gt;, then please consider&amp;nbsp;&lt;STRONG&gt;&lt;EM&gt;Accept it as the solution&lt;/EM&gt;&lt;/STRONG&gt;&amp;nbsp;&amp;nbsp;to help the other members find it more quickly.&lt;BR /&gt;If I misunderstand your needs or you still have problems on it, please feel free to let us know.&amp;nbsp;&lt;STRONG&gt;&lt;EM&gt;Thanks a lot!&lt;/EM&gt;&lt;/STRONG&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 30 Jul 2024 01:51:38 GMT</pubDate>
      <guid>https://community.fabric.microsoft.com/t5/Data-Engineering/Performance-issue-with-Spark-structured-streaming-job/m-p/4067939#M3340</guid>
      <dc:creator>Anonymous</dc:creator>
      <dc:date>2024-07-30T01:51:38Z</dc:date>
    </item>
    <item>
      <title>Re: Performance issue with Spark structured streaming job</title>
      <link>https://community.fabric.microsoft.com/t5/Data-Engineering/Performance-issue-with-Spark-structured-streaming-job/m-p/4069576#M3354</link>
      <description>&lt;P&gt;Hi&amp;nbsp;@Anonymous&lt;/a&gt;,&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;thanks for your answer.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I have implemented your suggestions and here are the results:&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;Spark executor heartbeat -&amp;gt; no change.&lt;/LI&gt;&lt;LI&gt;Increased driver and 3 executors to X-large nodes with each 32 cores and 224GB memory. -&amp;gt; no change.&lt;/LI&gt;&lt;LI&gt;Increase the number of Kafka partitions: As it is an external system I can't change that one. There are 6 partitions on that Kafka topic. (And it runs perfectly with this setup in Databricks, so the number of partitions is not an issue here.)&lt;/LI&gt;&lt;LI&gt;Resize the executor: see #2 -&amp;gt; no change.&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;The topic produces messages at a rate of ~10 messages per second with each meassage ~3kB in size. Compared to what is mentioned in the linked blog entry this is a drop in the ocean.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Regards,&lt;/P&gt;&lt;P&gt;Michael&lt;/P&gt;</description>
      <pubDate>Tue, 30 Jul 2024 16:29:08 GMT</pubDate>
      <guid>https://community.fabric.microsoft.com/t5/Data-Engineering/Performance-issue-with-Spark-structured-streaming-job/m-p/4069576#M3354</guid>
      <dc:creator>MiSchroe</dc:creator>
      <dc:date>2024-07-30T16:29:08Z</dc:date>
    </item>
  </channel>
</rss>

