<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Spark Job Structured Streaming Lakehouse Files in Data Engineering</title>
    <link>https://community.fabric.microsoft.com/t5/Data-Engineering/Spark-Job-Structured-Streaming-Lakehouse-Files/m-p/3406757#M883</link>
    <description>&lt;P&gt;Note : you&amp;nbsp; need to fix the&amp;nbsp;&lt;SPAN&gt;Indentation of the code i shared as it is&amp;nbsp;sensitive to indentation&lt;BR /&gt;When i paste it here it looses the&amp;nbsp;Indentation&lt;BR /&gt;&lt;BR /&gt;Or you can download the testjob.py from here&lt;BR /&gt;&lt;A href="https://github.com/puneetvijwani/fabricNotebooks" target="_blank"&gt;https://github.com/puneetvijwani/fabricNotebooks&lt;/A&gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;/P&gt;</description>
    <pubDate>Wed, 30 Aug 2023 13:52:00 GMT</pubDate>
    <dc:creator>puneetvijwani</dc:creator>
    <dc:date>2023-08-30T13:52:00Z</dc:date>
    <item>
      <title>Spark Job Structured Streaming Lakehouse Files</title>
      <link>https://community.fabric.microsoft.com/t5/Data-Engineering/Spark-Job-Structured-Streaming-Lakehouse-Files/m-p/3406008#M879</link>
      <description>&lt;P&gt;I am attempting a variation to this post:&lt;BR /&gt;&lt;A href="https://community.fabric.microsoft.com/t5/General-Discussion/PySpark-Notebook-Using-Structured-Streaming-with-Delta-Table/m-p/3379079" target="_blank" rel="noopener"&gt;Solved: PySpark Notebook Using Structured Streaming with D... - Microsoft Fabric Community&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Trying to use Spark Job defintion in Microsoft Fabric to use Structured Streaming from Lakehouse Files into Lakehouse Table.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Running the script as standalone notebook works fine, however as Spark Job, I dont get data populated. The delta table gets created and the the checkpoint location, however no data is being populated.&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;Any suggestions be much appreciated.&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;Here is the working notebook example script:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql.types &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; * &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;userSchema = StructType().add(&lt;/SPAN&gt;&lt;SPAN&gt;"name"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"string"&lt;/SPAN&gt;&lt;SPAN&gt;).add(&lt;/SPAN&gt;&lt;SPAN&gt;"sales"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"integer"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;csvDF = spark \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .readStream \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .schema(userSchema) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .option(&lt;/SPAN&gt;&lt;SPAN&gt;"maxFilesPerTrigger"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;1&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .csv(&lt;/SPAN&gt;&lt;SPAN&gt;"Files/streamingdata/streamingfiles"&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .writeStream \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;)\&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .outputMode(&lt;/SPAN&gt;&lt;SPAN&gt;"append"&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .option(&lt;/SPAN&gt;&lt;SPAN&gt;"checkpointLocation"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"Files/Notebook/_checkpoint/Struc_streaming_csv_data"&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .toTable(&lt;/SPAN&gt;&lt;SPAN&gt;"Notebook_Struc_streaming_csv_data"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;and here is the non populating Spark Job defintion:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; SparkSession&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql.types &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; * &lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;if&lt;/SPAN&gt;&lt;SPAN&gt; __name__ == &lt;/SPAN&gt;&lt;SPAN&gt;"__main__"&lt;/SPAN&gt;&lt;SPAN&gt;:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; spark = SparkSession.builder.appName(&lt;/SPAN&gt;&lt;SPAN&gt;"MyApp"&lt;/SPAN&gt;&lt;SPAN&gt;).getOrCreate()&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; spark.sparkContext.setLogLevel(&lt;/SPAN&gt;&lt;SPAN&gt;"DEBUG"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; userSchema = StructType().add(&lt;/SPAN&gt;&lt;SPAN&gt;"name"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"string"&lt;/SPAN&gt;&lt;SPAN&gt;).add(&lt;/SPAN&gt;&lt;SPAN&gt;"sales"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"integer"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; csvDF = spark \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .readStream \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .schema(userSchema) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .option(&lt;/SPAN&gt;&lt;SPAN&gt;"maxFilesPerTrigger"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;1&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .csv(&lt;/SPAN&gt;&lt;SPAN&gt;"Files/streamingdata/streamingfiles"&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .writeStream \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .format(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;)\&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .outputMode(&lt;/SPAN&gt;&lt;SPAN&gt;"append"&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .option(&lt;/SPAN&gt;&lt;SPAN&gt;"checkpointLocation"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"Files/_checkpoint/Struc_streaming_csv_data"&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .toTable(&lt;/SPAN&gt;&lt;SPAN&gt;"Struc_streaming_csv_data"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 30 Aug 2023 07:25:31 GMT</pubDate>
      <guid>https://community.fabric.microsoft.com/t5/Data-Engineering/Spark-Job-Structured-Streaming-Lakehouse-Files/m-p/3406008#M879</guid>
      <dc:creator>BilalBobat</dc:creator>
      <dc:date>2023-08-30T07:25:31Z</dc:date>
    </item>
    <item>
      <title>Re: Spark Job Structured Streaming Lakehouse Files</title>
      <link>https://community.fabric.microsoft.com/t5/Data-Engineering/Spark-Job-Structured-Streaming-Lakehouse-Files/m-p/3406046#M880</link>
      <description>&lt;P&gt;&lt;a href="https://community.fabric.microsoft.com/t5/user/viewprofilepage/user-id/567344"&gt;@BilalBobat&lt;/a&gt;&amp;nbsp;&lt;BR /&gt;Sorry for late response , can you try this code and let me know if its working for you&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;from pyspark.sql import SparkSession&lt;BR /&gt;from pyspark.sql.types import StructType&lt;/P&gt;&lt;P&gt;if __name__ == "__main__":&lt;BR /&gt;try:&lt;BR /&gt;spark = SparkSession.builder.appName("MyApp").getOrCreate()&lt;BR /&gt;spark.sparkContext.setLogLevel("DEBUG")&lt;/P&gt;&lt;P&gt;# Define the schema&lt;BR /&gt;userSchema = StructType().add("name", "string").add("sales", "integer")&lt;/P&gt;&lt;P&gt;# Read the csv files into a DataFrame&lt;BR /&gt;query = (spark.readStream&lt;BR /&gt;.schema(userSchema)&lt;BR /&gt;.option("maxFilesPerTrigger", 1)&lt;BR /&gt;.csv("Files/streamingdata/streamingfiles")&lt;BR /&gt;.writeStream&lt;BR /&gt;.trigger(processingTime='5 seconds') &lt;STRONG&gt;# Added a time-based trigger&lt;/STRONG&gt;&lt;BR /&gt;.format("delta")&lt;BR /&gt;.outputMode("append")&lt;BR /&gt;.option("checkpointLocation", "Files/_checkpoint/Struc_streaming_csv_data")&lt;BR /&gt;.toTable("Struc_streaming_csv_data"))&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;query.awaitTermination()&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;except Exception as e:&lt;BR /&gt;print(f"An error occurred: {e}")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 30 Aug 2023 07:45:28 GMT</pubDate>
      <guid>https://community.fabric.microsoft.com/t5/Data-Engineering/Spark-Job-Structured-Streaming-Lakehouse-Files/m-p/3406046#M880</guid>
      <dc:creator>puneetvijwani</dc:creator>
      <dc:date>2023-08-30T07:45:28Z</dc:date>
    </item>
    <item>
      <title>Re: Spark Job Structured Streaming Lakehouse Files</title>
      <link>https://community.fabric.microsoft.com/t5/Data-Engineering/Spark-Job-Structured-Streaming-Lakehouse-Files/m-p/3406102#M881</link>
      <description>&lt;P&gt;Thanks for your response, not late at all, appreciate your help on this, extremely helpful.&lt;BR /&gt;Tried your code and nope no data being populated unfortunately..&lt;BR /&gt;&lt;BR /&gt;here is sample of the csv file I am loading if that helps from Files&lt;BR /&gt;&lt;BR /&gt;alpha,100&lt;BR /&gt;beta,200&lt;BR /&gt;charlie,300&lt;/P&gt;</description>
      <pubDate>Wed, 30 Aug 2023 08:10:06 GMT</pubDate>
      <guid>https://community.fabric.microsoft.com/t5/Data-Engineering/Spark-Job-Structured-Streaming-Lakehouse-Files/m-p/3406102#M881</guid>
      <dc:creator>BilalBobat</dc:creator>
      <dc:date>2023-08-30T08:10:06Z</dc:date>
    </item>
    <item>
      <title>Re: Spark Job Structured Streaming Lakehouse Files</title>
      <link>https://community.fabric.microsoft.com/t5/Data-Engineering/Spark-Job-Structured-Streaming-Lakehouse-Files/m-p/3406726#M882</link>
      <description>&lt;P&gt;&lt;a href="https://community.fabric.microsoft.com/t5/user/viewprofilepage/user-id/567344"&gt;@BilalBobat&lt;/a&gt;&amp;nbsp;lets try this.. more simpler version, This works for me just Fine&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;from pyspark.sql import SparkSession&lt;BR /&gt;from pyspark.sql.types import StructType&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;spark = SparkSession.builder \&lt;BR /&gt;.appName("Stream CSV to Delta Table") \&lt;BR /&gt;.getOrCreate()&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;userSchema = StructType().add("name", "string").add("sales", "integer")&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;streamingDF = spark.readStream \&lt;BR /&gt;.schema(userSchema) \&lt;BR /&gt;.option("maxFilesPerTrigger", 1) \&lt;BR /&gt;.csv("Files/Streaming/") # Replace with the actual path to your streaming CSV files&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;query = streamingDF.writeStream \&lt;BR /&gt;.trigger(processingTime='5 seconds') \&lt;BR /&gt;.outputMode("append") \&lt;BR /&gt;.format("delta") \&lt;BR /&gt;.option("checkpointLocation", "Tables/Streaming_Table_test/_checkpoint") \&lt;BR /&gt;.start("Tables/Streaming_Table_test") # Replace with the path where you want to save the Delta table&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;query.awaitTermination()&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 30 Aug 2023 13:42:37 GMT</pubDate>
      <guid>https://community.fabric.microsoft.com/t5/Data-Engineering/Spark-Job-Structured-Streaming-Lakehouse-Files/m-p/3406726#M882</guid>
      <dc:creator>puneetvijwani</dc:creator>
      <dc:date>2023-08-30T13:42:37Z</dc:date>
    </item>
    <item>
      <title>Re: Spark Job Structured Streaming Lakehouse Files</title>
      <link>https://community.fabric.microsoft.com/t5/Data-Engineering/Spark-Job-Structured-Streaming-Lakehouse-Files/m-p/3406757#M883</link>
      <description>&lt;P&gt;Note : you&amp;nbsp; need to fix the&amp;nbsp;&lt;SPAN&gt;Indentation of the code i shared as it is&amp;nbsp;sensitive to indentation&lt;BR /&gt;When i paste it here it looses the&amp;nbsp;Indentation&lt;BR /&gt;&lt;BR /&gt;Or you can download the testjob.py from here&lt;BR /&gt;&lt;A href="https://github.com/puneetvijwani/fabricNotebooks" target="_blank"&gt;https://github.com/puneetvijwani/fabricNotebooks&lt;/A&gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 30 Aug 2023 13:52:00 GMT</pubDate>
      <guid>https://community.fabric.microsoft.com/t5/Data-Engineering/Spark-Job-Structured-Streaming-Lakehouse-Files/m-p/3406757#M883</guid>
      <dc:creator>puneetvijwani</dc:creator>
      <dc:date>2023-08-30T13:52:00Z</dc:date>
    </item>
    <item>
      <title>Re: Spark Job Structured Streaming Lakehouse Files</title>
      <link>https://community.fabric.microsoft.com/t5/Data-Engineering/Spark-Job-Structured-Streaming-Lakehouse-Files/m-p/3406963#M884</link>
      <description>&lt;P&gt;Perfect, that works. Thanks again for your extensive speeedy help on this, much appreciated.&amp;nbsp;&amp;nbsp;&lt;span class="lia-unicode-emoji" title=":folded_hands:"&gt;🙏&lt;/span&gt;&lt;BR /&gt;Have a super day.&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 30 Aug 2023 15:06:01 GMT</pubDate>
      <guid>https://community.fabric.microsoft.com/t5/Data-Engineering/Spark-Job-Structured-Streaming-Lakehouse-Files/m-p/3406963#M884</guid>
      <dc:creator>BilalBobat</dc:creator>
      <dc:date>2023-08-30T15:06:01Z</dc:date>
    </item>
  </channel>
</rss>

