Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Did you hear? There's a new SQL AI Developer certification (DP-800). Start preparing now and be one of the first to get certified. Register now

Reply
dt3288
New Member

PySpark Notebook Using Structured Streaming with Delta Table Sink - Unsupported Operation Exception

I'm encountering difficulty reproducing the PySpark notebook example using a delta table as a streaming sink in the following training module: https://learn.microsoft.com/en-us/training/modules/work-delta-lake-tables-fabric/5-use-delta-lake-st....  The following error occurs when I run the notebook:

 

Py4JJavaError: An error occurred while calling o4291.load. : java.lang.UnsupportedOperationException at org.apache.hadoop.fs.http.AbstractHttpFileSystem.listStatus(AbstractHttpFileSystem.java:95) at org.apache.hadoop.fs.http.HttpsFileSystem.listStatus(HttpsFileSystem.java:23) at org.apache.spark.util.HadoopFSUtils$.listLeafFiles(HadoopFSUtils.scala:225) at org.apache.spark.util.HadoopFSUtils$.$anonfun$parallelListLeafFilesInternal$1(HadoopFSUtils.scala:95) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFilesInternal(HadoopFSUtils.scala:85) at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFiles(HadoopFSUtils.scala:69) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:158) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:131) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:94) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:66) at org.apache.spark.sql.execution.datasources.DataSource.createInMemoryFileIndex(DataSource.scala:567) at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$sourceSchema$2(DataSource.scala:268) at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:164) at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$1(DataSource.scala:164) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:169) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:262) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:34) at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:196) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:210) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750)

 

I've tried using both a JSON and CSV file with the following data:

 

{
    "device":"Dev1"
    ,"status":"ok"
}
,{
    "device":"Dev2"
    ,"status":"ok"
}
device,status
device1,ok

 

 Here is the code I'm attempting to run:

 

# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
inputPath = '<Full HTTPS URL from file properties in lakehouse here>/testdata.csv'
#jsonSchema = StructType([
csvSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
#stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
#stream_df = spark.readStream.schema(csvSchema).option("maxFilesPerTrigger", 1).csv(inputPath)
stream_df = spark.readStream.format("csv").schema(csvSchema).option("header",True).option("maxFilesPerTrigger",1).load(inputPath)

# Write the stream to a delta table
#table_path = 'Files/delta_tables/devicetable'
#checkpoint_path = 'Files/delta_tables/checkpoint'
#delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)

 

The error occurs on lines 13, 14 or 15 -- all three variations return the same error.  I'm not an expert in PySpark yet, but the error is not very clear.  I don't see any messages related to parsing the data, and the data schema seems simple enough.  Perhaps the issue is related to dependencies?  I'm at an impasse.

1 ACCEPTED SOLUTION
puneetvijwani
Resolver IV
Resolver IV

@dt3288 i have used structured steaming for incremental load before and presented on one of my sessions 
https://www.youtube.com/watch?v=bNdKX-9nXTs
And reference notebooks are here 
https://github.com/puneetvijwani/fabricNotebooks

Also i have tested your code it seems working fine for reading the testdata.csv as stream as i loaded in Files (lakehouse) and used relative path however you can also try copying abfss path by right clikcing the file and copy ABFS path

# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
inputPath = 'Files/testdata.csv'
#jsonSchema = StructType([
csvSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
#stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
#stream_df = spark.readStream.schema(csvSchema).option("maxFilesPerTrigger", 1).csv(inputPath)
stream_df = spark.readStream.format("csv").schema(csvSchema).option("header",True).option("maxFilesPerTrigger",1).load(inputPath)

View solution in original post

4 REPLIES 4
puneetvijwani
Resolver IV
Resolver IV

@dt3288  Glad to know it worked feel free to mark this as accepted "solution" if you want  and if you're feeling very kind, give me a Kudos 😀

dt3288
New Member

Thanks, @puneetvijwani.  It worked when I used the ABFS path but not the relative path or full URL.

puneetvijwani
Resolver IV
Resolver IV

@dt3288 i have used structured steaming for incremental load before and presented on one of my sessions 
https://www.youtube.com/watch?v=bNdKX-9nXTs
And reference notebooks are here 
https://github.com/puneetvijwani/fabricNotebooks

Also i have tested your code it seems working fine for reading the testdata.csv as stream as i loaded in Files (lakehouse) and used relative path however you can also try copying abfss path by right clikcing the file and copy ABFS path

# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
inputPath = 'Files/testdata.csv'
#jsonSchema = StructType([
csvSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
#stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
#stream_df = spark.readStream.schema(csvSchema).option("maxFilesPerTrigger", 1).csv(inputPath)
stream_df = spark.readStream.format("csv").schema(csvSchema).option("header",True).option("maxFilesPerTrigger",1).load(inputPath)

It is great, thanks for sharing

Helpful resources

Announcements
April Fabric Update Carousel

Fabric Monthly Update - April 2026

Check out the April 2026 Fabric update to learn about new features.

Fabric SQL PBI Data Days

Data Days 2026 coming soon!

Sign up to receive a private message when registration opens and key events begin.

New to Fabric survey Carousel

New to Fabric Survey

If you have recently started exploring Fabric, we'd love to hear how it's going. Your feedback can help with product improvements.