Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Get Fabric certified for FREE! Don't miss your chance! Learn more

Reply
dt3288
New Member

PySpark Notebook Using Structured Streaming with Delta Table Sink - Unsupported Operation Exception

I'm encountering difficulty reproducing the PySpark notebook example using a delta table as a streaming sink in the following training module: https://learn.microsoft.com/en-us/training/modules/work-delta-lake-tables-fabric/5-use-delta-lake-st....  The following error occurs when I run the notebook:

 

Py4JJavaError: An error occurred while calling o4291.load. : java.lang.UnsupportedOperationException at org.apache.hadoop.fs.http.AbstractHttpFileSystem.listStatus(AbstractHttpFileSystem.java:95) at org.apache.hadoop.fs.http.HttpsFileSystem.listStatus(HttpsFileSystem.java:23) at org.apache.spark.util.HadoopFSUtils$.listLeafFiles(HadoopFSUtils.scala:225) at org.apache.spark.util.HadoopFSUtils$.$anonfun$parallelListLeafFilesInternal$1(HadoopFSUtils.scala:95) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFilesInternal(HadoopFSUtils.scala:85) at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFiles(HadoopFSUtils.scala:69) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:158) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:131) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:94) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:66) at org.apache.spark.sql.execution.datasources.DataSource.createInMemoryFileIndex(DataSource.scala:567) at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$sourceSchema$2(DataSource.scala:268) at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:164) at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$1(DataSource.scala:164) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:169) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:262) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:34) at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:196) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:210) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750)

 

I've tried using both a JSON and CSV file with the following data:

 

{
    "device":"Dev1"
    ,"status":"ok"
}
,{
    "device":"Dev2"
    ,"status":"ok"
}
device,status
device1,ok

 

 Here is the code I'm attempting to run:

 

# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
inputPath = '<Full HTTPS URL from file properties in lakehouse here>/testdata.csv'
#jsonSchema = StructType([
csvSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
#stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
#stream_df = spark.readStream.schema(csvSchema).option("maxFilesPerTrigger", 1).csv(inputPath)
stream_df = spark.readStream.format("csv").schema(csvSchema).option("header",True).option("maxFilesPerTrigger",1).load(inputPath)

# Write the stream to a delta table
#table_path = 'Files/delta_tables/devicetable'
#checkpoint_path = 'Files/delta_tables/checkpoint'
#delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)

 

The error occurs on lines 13, 14 or 15 -- all three variations return the same error.  I'm not an expert in PySpark yet, but the error is not very clear.  I don't see any messages related to parsing the data, and the data schema seems simple enough.  Perhaps the issue is related to dependencies?  I'm at an impasse.

1 ACCEPTED SOLUTION
puneetvijwani
Resolver IV
Resolver IV

@dt3288 i have used structured steaming for incremental load before and presented on one of my sessions 
https://www.youtube.com/watch?v=bNdKX-9nXTs
And reference notebooks are here 
https://github.com/puneetvijwani/fabricNotebooks

Also i have tested your code it seems working fine for reading the testdata.csv as stream as i loaded in Files (lakehouse) and used relative path however you can also try copying abfss path by right clikcing the file and copy ABFS path

# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
inputPath = 'Files/testdata.csv'
#jsonSchema = StructType([
csvSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
#stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
#stream_df = spark.readStream.schema(csvSchema).option("maxFilesPerTrigger", 1).csv(inputPath)
stream_df = spark.readStream.format("csv").schema(csvSchema).option("header",True).option("maxFilesPerTrigger",1).load(inputPath)

View solution in original post

4 REPLIES 4
puneetvijwani
Resolver IV
Resolver IV

@dt3288  Glad to know it worked feel free to mark this as accepted "solution" if you want  and if you're feeling very kind, give me a Kudos 😀

dt3288
New Member

Thanks, @puneetvijwani.  It worked when I used the ABFS path but not the relative path or full URL.

puneetvijwani
Resolver IV
Resolver IV

@dt3288 i have used structured steaming for incremental load before and presented on one of my sessions 
https://www.youtube.com/watch?v=bNdKX-9nXTs
And reference notebooks are here 
https://github.com/puneetvijwani/fabricNotebooks

Also i have tested your code it seems working fine for reading the testdata.csv as stream as i loaded in Files (lakehouse) and used relative path however you can also try copying abfss path by right clikcing the file and copy ABFS path

# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
inputPath = 'Files/testdata.csv'
#jsonSchema = StructType([
csvSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
#stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
#stream_df = spark.readStream.schema(csvSchema).option("maxFilesPerTrigger", 1).csv(inputPath)
stream_df = spark.readStream.format("csv").schema(csvSchema).option("header",True).option("maxFilesPerTrigger",1).load(inputPath)

It is great, thanks for sharing

Helpful resources

Announcements
Sticker Challenge 2026 Carousel

Join our Community Sticker Challenge 2026

If you love stickers, then you will definitely want to check out our Community Sticker Challenge!

Free Fabric Certifications

Free Fabric Certifications

Get Fabric certified for free! Don't miss your chance.

January Fabric Update Carousel

Fabric Monthly Update - January 2026

Check out the January 2026 Fabric update to learn about new features.

FabCon Atlanta 2026 carousel

FabCon Atlanta 2026

Join us at FabCon Atlanta, March 16-20, for the ultimate Fabric, Power BI, AI and SQL community-led event. Save $200 with code FABCOMM.