Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

The FabCon + SQLCon recap series starts April 14th at 8am Pacific. If you’re tracking where AI is going inside Fabric, this first session is a can't miss. Register now

Reply
BilalBobat
New Member

Spark Job Structured Streaming Lakehouse Files

I am attempting a variation to this post:
Solved: PySpark Notebook Using Structured Streaming with D... - Microsoft Fabric Community

 

Trying to use Spark Job defintion in Microsoft Fabric to use Structured Streaming from Lakehouse Files into Lakehouse Table.

 

Running the script as standalone notebook works fine, however as Spark Job, I dont get data populated. The delta table gets created and the the checkpoint location, however no data is being populated. 

Any suggestions be much appreciated.


Here is the working notebook example script:

from pyspark.sql.types import *
userSchema = StructType().add("name", "string").add("sales", "integer")

csvDF = spark \
    .readStream \
    .schema(userSchema) \
    .option("maxFilesPerTrigger", 1) \
    .csv("Files/streamingdata/streamingfiles") \
    .writeStream \
    .format("delta")\
    .outputMode("append") \
    .option("checkpointLocation", "Files/Notebook/_checkpoint/Struc_streaming_csv_data") \
    .toTable("Notebook_Struc_streaming_csv_data")

 

and here is the non populating Spark Job defintion:

from pyspark.sql import SparkSession
from pyspark.sql.types import *

if __name__ == "__main__":
       
    spark = SparkSession.builder.appName("MyApp").getOrCreate()
    spark.sparkContext.setLogLevel("DEBUG")

    userSchema = StructType().add("name", "string").add("sales", "integer")
   
    csvDF = spark \
    .readStream \
    .schema(userSchema) \
    .option("maxFilesPerTrigger", 1) \
    .csv("Files/streamingdata/streamingfiles") \
    .writeStream \
    .format("delta")\
    .outputMode("append") \
    .option("checkpointLocation", "Files/_checkpoint/Struc_streaming_csv_data") \
    .toTable("Struc_streaming_csv_data")



1 ACCEPTED SOLUTION
puneetvijwani
Resolver IV
Resolver IV

@BilalBobat lets try this.. more simpler version, This works for me just Fine

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType


spark = SparkSession.builder \
.appName("Stream CSV to Delta Table") \
.getOrCreate()


userSchema = StructType().add("name", "string").add("sales", "integer")


streamingDF = spark.readStream \
.schema(userSchema) \
.option("maxFilesPerTrigger", 1) \
.csv("Files/Streaming/") # Replace with the actual path to your streaming CSV files


query = streamingDF.writeStream \
.trigger(processingTime='5 seconds') \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", "Tables/Streaming_Table_test/_checkpoint") \
.start("Tables/Streaming_Table_test") # Replace with the path where you want to save the Delta table


query.awaitTermination()

 

View solution in original post

5 REPLIES 5
puneetvijwani
Resolver IV
Resolver IV

Note : you  need to fix the Indentation of the code i shared as it is sensitive to indentation
When i paste it here it looses the Indentation

Or you can download the testjob.py from here
https://github.com/puneetvijwani/fabricNotebooks

Perfect, that works. Thanks again for your extensive speeedy help on this, much appreciated.  🙏
Have a super day. 

puneetvijwani
Resolver IV
Resolver IV

@BilalBobat lets try this.. more simpler version, This works for me just Fine

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType


spark = SparkSession.builder \
.appName("Stream CSV to Delta Table") \
.getOrCreate()


userSchema = StructType().add("name", "string").add("sales", "integer")


streamingDF = spark.readStream \
.schema(userSchema) \
.option("maxFilesPerTrigger", 1) \
.csv("Files/Streaming/") # Replace with the actual path to your streaming CSV files


query = streamingDF.writeStream \
.trigger(processingTime='5 seconds') \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", "Tables/Streaming_Table_test/_checkpoint") \
.start("Tables/Streaming_Table_test") # Replace with the path where you want to save the Delta table


query.awaitTermination()

 

BilalBobat
New Member

Thanks for your response, not late at all, appreciate your help on this, extremely helpful.
Tried your code and nope no data being populated unfortunately..

here is sample of the csv file I am loading if that helps from Files

alpha,100
beta,200
charlie,300

puneetvijwani
Resolver IV
Resolver IV

@BilalBobat 
Sorry for late response , can you try this code and let me know if its working for you 

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType

if __name__ == "__main__":
try:
spark = SparkSession.builder.appName("MyApp").getOrCreate()
spark.sparkContext.setLogLevel("DEBUG")

# Define the schema
userSchema = StructType().add("name", "string").add("sales", "integer")

# Read the csv files into a DataFrame
query = (spark.readStream
.schema(userSchema)
.option("maxFilesPerTrigger", 1)
.csv("Files/streamingdata/streamingfiles")
.writeStream
.trigger(processingTime='5 seconds') # Added a time-based trigger
.format("delta")
.outputMode("append")
.option("checkpointLocation", "Files/_checkpoint/Struc_streaming_csv_data")
.toTable("Struc_streaming_csv_data"))

query.awaitTermination()

except Exception as e:
print(f"An error occurred: {e}")

 

Helpful resources

Announcements
FabCon and SQLCon Highlights Carousel

FabCon &SQLCon Highlights

Experience the highlights from FabCon & SQLCon, available live and on-demand starting April 14th.

New to Fabric survey Carousel

New to Fabric Survey

If you have recently started exploring Fabric, we'd love to hear how it's going. Your feedback can help with product improvements.

Join our Fabric User Panel

Join our Fabric User Panel

Share feedback directly with Fabric product managers, participate in targeted research studies and influence the Fabric roadmap.

March Fabric Update Carousel

Fabric Monthly Update - March 2026

Check out the March 2026 Fabric update to learn about new features.

Top Kudoed Authors