Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Be one of the first to start using Fabric Databases. View on-demand sessions with database experts and the Microsoft product team to learn just how easy it is to get started. Watch now

Reply
anawast
Microsoft Employee
Microsoft Employee

Discrepancy in spark.read.schema in Databricks and Azure Fabric.

Spark.read.json offers 2 functionalities:

1. You can impose a schema on top of spark.read.json using spark.read.schema(schema).json(df). This will ensure that only records that fit the schema will be produced as rows in the resultant df

2. You can send the records which don't fit the schema to a badRecordsPath using spark.read.option("badRecordsPath",path).schema(schema).json(df). 

 

While this work as described above in databricks, the same doesn't apply to fabric. 

 

As you can see in the code below, I have created a dataframe of json records on which i want to impose a schema. AS you can see the second record has a string for a value that should be of integertype. 

 

1. In databricks I get only 2 records in validRecordsTemp as expected, and the bad record goes and sits in the defined path

2. In fabric however I get all 3 records with NULL as the value for col1 in record 2 

databricks result:

anawast_0-1725294033835.png

Fabric result: 

anawast_1-1725294057167.png

 

 



Example Code: 

 

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{SQLContext, SQLImplicits, SparkSession}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}

val basePath="DqSchemacheck"
    val uuid=java.util.UUID.randomUUID.toString
    val baseDqFolder="/tmp/"+basePath+"/"+uuid

val df = Seq(
  ("{\"col1\":2,\"col2\":3}"),
  ("{\"col1\":\"failure\",\"col2\":3}"),
  ("{\"col1\":2,\"col2\":3}")
).toDF("body")

val dfStringDS=df.select(col("body")as "body").map(_.toString())

val schema=StructType(Seq(StructField("col1",IntegerType),StructField("col2",IntegerType)))

val validRecordsTemp=spark.read.option("badRecordsPath", baseDqFolder).schema(schema).json(dfStringDS)

display(validRecordsTemp)
1 REPLY 1
v-shex-msft
Community Support
Community Support

HI @anawast,

I suppose this may be related to the internal processing, they recognize and convert the invalid value to default values. If you do not want this part existed in result, you may need to do filter operation before they load into the data frame.

Regards,

Xiaoxin Sheng

Community Support Team _ Xiaoxin
If this post helps, please consider accept as solution to help other members find it more quickly.

Helpful resources

Announcements
Las Vegas 2025

Join us at the Microsoft Fabric Community Conference

March 31 - April 2, 2025, in Las Vegas, Nevada. Use code MSCUST for a $150 discount!

ArunFabCon

Microsoft Fabric Community Conference 2025

Arun Ulag shares exciting details about the Microsoft Fabric Conference 2025, which will be held in Las Vegas, NV.

December 2024

A Year in Review - December 2024

Find out what content was popular in the Fabric community during 2024.