Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Be one of the first to start using Fabric Databases. View on-demand sessions with database experts and the Microsoft product team to learn just how easy it is to get started. Watch now

Reply
dt3288
New Member

PySpark Notebook Using Structured Streaming with Delta Table Sink - Unsupported Operation Exception

I'm encountering difficulty reproducing the PySpark notebook example using a delta table as a streaming sink in the following training module: https://learn.microsoft.com/en-us/training/modules/work-delta-lake-tables-fabric/5-use-delta-lake-st....  The following error occurs when I run the notebook:

 

Py4JJavaError: An error occurred while calling o4291.load. : java.lang.UnsupportedOperationException at org.apache.hadoop.fs.http.AbstractHttpFileSystem.listStatus(AbstractHttpFileSystem.java:95) at org.apache.hadoop.fs.http.HttpsFileSystem.listStatus(HttpsFileSystem.java:23) at org.apache.spark.util.HadoopFSUtils$.listLeafFiles(HadoopFSUtils.scala:225) at org.apache.spark.util.HadoopFSUtils$.$anonfun$parallelListLeafFilesInternal$1(HadoopFSUtils.scala:95) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFilesInternal(HadoopFSUtils.scala:85) at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFiles(HadoopFSUtils.scala:69) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:158) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:131) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:94) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:66) at org.apache.spark.sql.execution.datasources.DataSource.createInMemoryFileIndex(DataSource.scala:567) at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$sourceSchema$2(DataSource.scala:268) at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:164) at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$1(DataSource.scala:164) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:169) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:262) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:34) at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:196) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:210) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750)

 

I've tried using both a JSON and CSV file with the following data:

 

{
    "device":"Dev1"
    ,"status":"ok"
}
,{
    "device":"Dev2"
    ,"status":"ok"
}
device,status
device1,ok

 

 Here is the code I'm attempting to run:

 

# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
inputPath = '<Full HTTPS URL from file properties in lakehouse here>/testdata.csv'
#jsonSchema = StructType([
csvSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
#stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
#stream_df = spark.readStream.schema(csvSchema).option("maxFilesPerTrigger", 1).csv(inputPath)
stream_df = spark.readStream.format("csv").schema(csvSchema).option("header",True).option("maxFilesPerTrigger",1).load(inputPath)

# Write the stream to a delta table
#table_path = 'Files/delta_tables/devicetable'
#checkpoint_path = 'Files/delta_tables/checkpoint'
#delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)

 

The error occurs on lines 13, 14 or 15 -- all three variations return the same error.  I'm not an expert in PySpark yet, but the error is not very clear.  I don't see any messages related to parsing the data, and the data schema seems simple enough.  Perhaps the issue is related to dependencies?  I'm at an impasse.

1 ACCEPTED SOLUTION
puneetvijwani
Resolver IV
Resolver IV

@dt3288 i have used structured steaming for incremental load before and presented on one of my sessions 
https://www.youtube.com/watch?v=bNdKX-9nXTs
And reference notebooks are here 
https://github.com/puneetvijwani/fabricNotebooks

Also i have tested your code it seems working fine for reading the testdata.csv as stream as i loaded in Files (lakehouse) and used relative path however you can also try copying abfss path by right clikcing the file and copy ABFS path

# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
inputPath = 'Files/testdata.csv'
#jsonSchema = StructType([
csvSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
#stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
#stream_df = spark.readStream.schema(csvSchema).option("maxFilesPerTrigger", 1).csv(inputPath)
stream_df = spark.readStream.format("csv").schema(csvSchema).option("header",True).option("maxFilesPerTrigger",1).load(inputPath)

View solution in original post

4 REPLIES 4
puneetvijwani
Resolver IV
Resolver IV

@dt3288  Glad to know it worked feel free to mark this as accepted "solution" if you want  and if you're feeling very kind, give me a Kudos 😀

dt3288
New Member

Thanks, @puneetvijwani.  It worked when I used the ABFS path but not the relative path or full URL.

puneetvijwani
Resolver IV
Resolver IV

@dt3288 i have used structured steaming for incremental load before and presented on one of my sessions 
https://www.youtube.com/watch?v=bNdKX-9nXTs
And reference notebooks are here 
https://github.com/puneetvijwani/fabricNotebooks

Also i have tested your code it seems working fine for reading the testdata.csv as stream as i loaded in Files (lakehouse) and used relative path however you can also try copying abfss path by right clikcing the file and copy ABFS path

# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
inputPath = 'Files/testdata.csv'
#jsonSchema = StructType([
csvSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
#stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
#stream_df = spark.readStream.schema(csvSchema).option("maxFilesPerTrigger", 1).csv(inputPath)
stream_df = spark.readStream.format("csv").schema(csvSchema).option("header",True).option("maxFilesPerTrigger",1).load(inputPath)

It is great, thanks for sharing

Helpful resources

Announcements
Las Vegas 2025

Join us at the Microsoft Fabric Community Conference

March 31 - April 2, 2025, in Las Vegas, Nevada. Use code MSCUST for a $150 discount!

Dec Fabric Community Survey

We want your feedback!

Your insights matter. That’s why we created a quick survey to learn about your experience finding answers to technical questions.

ArunFabCon

Microsoft Fabric Community Conference 2025

Arun Ulag shares exciting details about the Microsoft Fabric Conference 2025, which will be held in Las Vegas, NV.

December 2024

A Year in Review - December 2024

Find out what content was popular in the Fabric community during 2024.