Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

The Power BI Data Visualization World Championships is back! Get ahead of the game and start preparing now! Learn more

Reply
dt3288
New Member

PySpark Notebook Using Structured Streaming with Delta Table Sink - Unsupported Operation Exception

I'm encountering difficulty reproducing the PySpark notebook example using a delta table as a streaming sink in the following training module: https://learn.microsoft.com/en-us/training/modules/work-delta-lake-tables-fabric/5-use-delta-lake-st....  The following error occurs when I run the notebook:

 

Py4JJavaError: An error occurred while calling o4291.load. : java.lang.UnsupportedOperationException at org.apache.hadoop.fs.http.AbstractHttpFileSystem.listStatus(AbstractHttpFileSystem.java:95) at org.apache.hadoop.fs.http.HttpsFileSystem.listStatus(HttpsFileSystem.java:23) at org.apache.spark.util.HadoopFSUtils$.listLeafFiles(HadoopFSUtils.scala:225) at org.apache.spark.util.HadoopFSUtils$.$anonfun$parallelListLeafFilesInternal$1(HadoopFSUtils.scala:95) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFilesInternal(HadoopFSUtils.scala:85) at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFiles(HadoopFSUtils.scala:69) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:158) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:131) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:94) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:66) at org.apache.spark.sql.execution.datasources.DataSource.createInMemoryFileIndex(DataSource.scala:567) at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$sourceSchema$2(DataSource.scala:268) at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:164) at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$1(DataSource.scala:164) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:169) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:262) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:34) at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:196) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:210) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750)

 

I've tried using both a JSON and CSV file with the following data:

 

{
    "device":"Dev1"
    ,"status":"ok"
}
,{
    "device":"Dev2"
    ,"status":"ok"
}
device,status
device1,ok

 

 Here is the code I'm attempting to run:

 

# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
inputPath = '<Full HTTPS URL from file properties in lakehouse here>/testdata.csv'
#jsonSchema = StructType([
csvSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
#stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
#stream_df = spark.readStream.schema(csvSchema).option("maxFilesPerTrigger", 1).csv(inputPath)
stream_df = spark.readStream.format("csv").schema(csvSchema).option("header",True).option("maxFilesPerTrigger",1).load(inputPath)

# Write the stream to a delta table
#table_path = 'Files/delta_tables/devicetable'
#checkpoint_path = 'Files/delta_tables/checkpoint'
#delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)

 

The error occurs on lines 13, 14 or 15 -- all three variations return the same error.  I'm not an expert in PySpark yet, but the error is not very clear.  I don't see any messages related to parsing the data, and the data schema seems simple enough.  Perhaps the issue is related to dependencies?  I'm at an impasse.

1 ACCEPTED SOLUTION
puneetvijwani
Resolver IV
Resolver IV

@dt3288 i have used structured steaming for incremental load before and presented on one of my sessions 
https://www.youtube.com/watch?v=bNdKX-9nXTs
And reference notebooks are here 
https://github.com/puneetvijwani/fabricNotebooks

Also i have tested your code it seems working fine for reading the testdata.csv as stream as i loaded in Files (lakehouse) and used relative path however you can also try copying abfss path by right clikcing the file and copy ABFS path

# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
inputPath = 'Files/testdata.csv'
#jsonSchema = StructType([
csvSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
#stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
#stream_df = spark.readStream.schema(csvSchema).option("maxFilesPerTrigger", 1).csv(inputPath)
stream_df = spark.readStream.format("csv").schema(csvSchema).option("header",True).option("maxFilesPerTrigger",1).load(inputPath)

View solution in original post

4 REPLIES 4
puneetvijwani
Resolver IV
Resolver IV

@dt3288  Glad to know it worked feel free to mark this as accepted "solution" if you want  and if you're feeling very kind, give me a Kudos 😀

dt3288
New Member

Thanks, @puneetvijwani.  It worked when I used the ABFS path but not the relative path or full URL.

puneetvijwani
Resolver IV
Resolver IV

@dt3288 i have used structured steaming for incremental load before and presented on one of my sessions 
https://www.youtube.com/watch?v=bNdKX-9nXTs
And reference notebooks are here 
https://github.com/puneetvijwani/fabricNotebooks

Also i have tested your code it seems working fine for reading the testdata.csv as stream as i loaded in Files (lakehouse) and used relative path however you can also try copying abfss path by right clikcing the file and copy ABFS path

# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
inputPath = 'Files/testdata.csv'
#jsonSchema = StructType([
csvSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
#stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
#stream_df = spark.readStream.schema(csvSchema).option("maxFilesPerTrigger", 1).csv(inputPath)
stream_df = spark.readStream.format("csv").schema(csvSchema).option("header",True).option("maxFilesPerTrigger",1).load(inputPath)

It is great, thanks for sharing

Helpful resources

Announcements
December Fabric Update Carousel

Fabric Monthly Update - December 2025

Check out the December 2025 Fabric Holiday Recap!

FabCon Atlanta 2026 carousel

FabCon Atlanta 2026

Join us at FabCon Atlanta, March 16-20, for the ultimate Fabric, Power BI, AI and SQL community-led event. Save $200 with code FABCOMM.