March 31 - April 2, 2025, in Las Vegas, Nevada. Use code MSCUST for a $150 discount! Early bird discount ends December 31.
Register NowBe one of the first to start using Fabric Databases. View on-demand sessions with database experts and the Microsoft product team to learn just how easy it is to get started. Watch now
I'm encountering difficulty reproducing the PySpark notebook example using a delta table as a streaming sink in the following training module: https://learn.microsoft.com/en-us/training/modules/work-delta-lake-tables-fabric/5-use-delta-lake-st.... The following error occurs when I run the notebook:
Py4JJavaError: An error occurred while calling o4291.load. : java.lang.UnsupportedOperationException at org.apache.hadoop.fs.http.AbstractHttpFileSystem.listStatus(AbstractHttpFileSystem.java:95) at org.apache.hadoop.fs.http.HttpsFileSystem.listStatus(HttpsFileSystem.java:23) at org.apache.spark.util.HadoopFSUtils$.listLeafFiles(HadoopFSUtils.scala:225) at org.apache.spark.util.HadoopFSUtils$.$anonfun$parallelListLeafFilesInternal$1(HadoopFSUtils.scala:95) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFilesInternal(HadoopFSUtils.scala:85) at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFiles(HadoopFSUtils.scala:69) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:158) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:131) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:94) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:66) at org.apache.spark.sql.execution.datasources.DataSource.createInMemoryFileIndex(DataSource.scala:567) at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$sourceSchema$2(DataSource.scala:268) at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:164) at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$1(DataSource.scala:164) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:169) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:262) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:34) at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:196) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:210) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750)
I've tried using both a JSON and CSV file with the following data:
{
"device":"Dev1"
,"status":"ok"
}
,{
"device":"Dev2"
,"status":"ok"
}
device,status
device1,ok
Here is the code I'm attempting to run:
# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Create a stream that reads JSON data from a folder
inputPath = '<Full HTTPS URL from file properties in lakehouse here>/testdata.csv'
#jsonSchema = StructType([
csvSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
#stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
#stream_df = spark.readStream.schema(csvSchema).option("maxFilesPerTrigger", 1).csv(inputPath)
stream_df = spark.readStream.format("csv").schema(csvSchema).option("header",True).option("maxFilesPerTrigger",1).load(inputPath)
# Write the stream to a delta table
#table_path = 'Files/delta_tables/devicetable'
#checkpoint_path = 'Files/delta_tables/checkpoint'
#delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)
The error occurs on lines 13, 14 or 15 -- all three variations return the same error. I'm not an expert in PySpark yet, but the error is not very clear. I don't see any messages related to parsing the data, and the data schema seems simple enough. Perhaps the issue is related to dependencies? I'm at an impasse.
Solved! Go to Solution.
@dt3288 i have used structured steaming for incremental load before and presented on one of my sessions
https://www.youtube.com/watch?v=bNdKX-9nXTs
And reference notebooks are here
https://github.com/puneetvijwani/fabricNotebooks
Also i have tested your code it seems working fine for reading the testdata.csv as stream as i loaded in Files (lakehouse) and used relative path however you can also try copying abfss path by right clikcing the file and copy ABFS path
@dt3288 Glad to know it worked feel free to mark this as accepted "solution" if you want and if you're feeling very kind, give me a Kudos 😀
Thanks, @puneetvijwani. It worked when I used the ABFS path but not the relative path or full URL.
@dt3288 i have used structured steaming for incremental load before and presented on one of my sessions
https://www.youtube.com/watch?v=bNdKX-9nXTs
And reference notebooks are here
https://github.com/puneetvijwani/fabricNotebooks
Also i have tested your code it seems working fine for reading the testdata.csv as stream as i loaded in Files (lakehouse) and used relative path however you can also try copying abfss path by right clikcing the file and copy ABFS path
It is great, thanks for sharing
March 31 - April 2, 2025, in Las Vegas, Nevada. Use code MSCUST for a $150 discount!
Your insights matter. That’s why we created a quick survey to learn about your experience finding answers to technical questions.
Arun Ulag shares exciting details about the Microsoft Fabric Conference 2025, which will be held in Las Vegas, NV.
User | Count |
---|---|
6 | |
2 | |
1 | |
1 | |
1 |
User | Count |
---|---|
10 | |
3 | |
3 | |
2 | |
2 |