Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Join us at FabCon Vienna from September 15-18, 2025, for the ultimate Fabric, Power BI, SQL, and AI community-led learning event. Save €200 with code FABCOMM. Get registered

Reply
BilalBobat
New Member

Spark Job Structured Streaming Lakehouse Files

I am attempting a variation to this post:
Solved: PySpark Notebook Using Structured Streaming with D... - Microsoft Fabric Community

 

Trying to use Spark Job defintion in Microsoft Fabric to use Structured Streaming from Lakehouse Files into Lakehouse Table.

 

Running the script as standalone notebook works fine, however as Spark Job, I dont get data populated. The delta table gets created and the the checkpoint location, however no data is being populated. 

Any suggestions be much appreciated.


Here is the working notebook example script:

from pyspark.sql.types import *
userSchema = StructType().add("name", "string").add("sales", "integer")

csvDF = spark \
    .readStream \
    .schema(userSchema) \
    .option("maxFilesPerTrigger", 1) \
    .csv("Files/streamingdata/streamingfiles") \
    .writeStream \
    .format("delta")\
    .outputMode("append") \
    .option("checkpointLocation", "Files/Notebook/_checkpoint/Struc_streaming_csv_data") \
    .toTable("Notebook_Struc_streaming_csv_data")

 

and here is the non populating Spark Job defintion:

from pyspark.sql import SparkSession
from pyspark.sql.types import *

if __name__ == "__main__":
       
    spark = SparkSession.builder.appName("MyApp").getOrCreate()
    spark.sparkContext.setLogLevel("DEBUG")

    userSchema = StructType().add("name", "string").add("sales", "integer")
   
    csvDF = spark \
    .readStream \
    .schema(userSchema) \
    .option("maxFilesPerTrigger", 1) \
    .csv("Files/streamingdata/streamingfiles") \
    .writeStream \
    .format("delta")\
    .outputMode("append") \
    .option("checkpointLocation", "Files/_checkpoint/Struc_streaming_csv_data") \
    .toTable("Struc_streaming_csv_data")



1 ACCEPTED SOLUTION
puneetvijwani
Resolver IV
Resolver IV

@BilalBobat lets try this.. more simpler version, This works for me just Fine

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType


spark = SparkSession.builder \
.appName("Stream CSV to Delta Table") \
.getOrCreate()


userSchema = StructType().add("name", "string").add("sales", "integer")


streamingDF = spark.readStream \
.schema(userSchema) \
.option("maxFilesPerTrigger", 1) \
.csv("Files/Streaming/") # Replace with the actual path to your streaming CSV files


query = streamingDF.writeStream \
.trigger(processingTime='5 seconds') \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", "Tables/Streaming_Table_test/_checkpoint") \
.start("Tables/Streaming_Table_test") # Replace with the path where you want to save the Delta table


query.awaitTermination()

 

View solution in original post

5 REPLIES 5
puneetvijwani
Resolver IV
Resolver IV

Note : you  need to fix the Indentation of the code i shared as it is sensitive to indentation
When i paste it here it looses the Indentation

Or you can download the testjob.py from here
https://github.com/puneetvijwani/fabricNotebooks

Perfect, that works. Thanks again for your extensive speeedy help on this, much appreciated.  🙏
Have a super day. 

puneetvijwani
Resolver IV
Resolver IV

@BilalBobat lets try this.. more simpler version, This works for me just Fine

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType


spark = SparkSession.builder \
.appName("Stream CSV to Delta Table") \
.getOrCreate()


userSchema = StructType().add("name", "string").add("sales", "integer")


streamingDF = spark.readStream \
.schema(userSchema) \
.option("maxFilesPerTrigger", 1) \
.csv("Files/Streaming/") # Replace with the actual path to your streaming CSV files


query = streamingDF.writeStream \
.trigger(processingTime='5 seconds') \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", "Tables/Streaming_Table_test/_checkpoint") \
.start("Tables/Streaming_Table_test") # Replace with the path where you want to save the Delta table


query.awaitTermination()

 

BilalBobat
New Member

Thanks for your response, not late at all, appreciate your help on this, extremely helpful.
Tried your code and nope no data being populated unfortunately..

here is sample of the csv file I am loading if that helps from Files

alpha,100
beta,200
charlie,300

puneetvijwani
Resolver IV
Resolver IV

@BilalBobat 
Sorry for late response , can you try this code and let me know if its working for you 

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType

if __name__ == "__main__":
try:
spark = SparkSession.builder.appName("MyApp").getOrCreate()
spark.sparkContext.setLogLevel("DEBUG")

# Define the schema
userSchema = StructType().add("name", "string").add("sales", "integer")

# Read the csv files into a DataFrame
query = (spark.readStream
.schema(userSchema)
.option("maxFilesPerTrigger", 1)
.csv("Files/streamingdata/streamingfiles")
.writeStream
.trigger(processingTime='5 seconds') # Added a time-based trigger
.format("delta")
.outputMode("append")
.option("checkpointLocation", "Files/_checkpoint/Struc_streaming_csv_data")
.toTable("Struc_streaming_csv_data"))

query.awaitTermination()

except Exception as e:
print(f"An error occurred: {e}")

 

Helpful resources

Announcements
Join our Fabric User Panel

Join our Fabric User Panel

This is your chance to engage directly with the engineering team behind Fabric and Power BI. Share your experiences and shape the future.

May FBC25 Carousel

Fabric Monthly Update - May 2025

Check out the May 2025 Fabric update to learn about new features.

June 2025 community update carousel

Fabric Community Update - June 2025

Find out what's new and trending in the Fabric community.