Starting December 3, join live sessions with database experts and the Microsoft product team to learn just how easy it is to get started
Learn moreGet certified in Microsoft Fabric—for free! For a limited time, get a free DP-600 exam voucher to use by the end of 2024. Register now
I'm simply trying to extract JSON data from a single column in an Eventstream. I define the schema and parse the column but it yields null values in the table. I have verified that the JSON is properly formatted.
# Welcome to your new notebook
# Type here in the cell editor to add code!
#This is an example of Reading data from Kusto. Replace T with your <tablename>
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,BooleanType,DoubleType
from pyspark.sql.functions import from_json, col, trim
kustoQuery = "**** | take 10"
kustoUri = "https://*****.kusto.fabric.microsoft.com"
# The database to write the data
database = "*****"
# The table to write the data
table = "*****"
# The access credentials for the write
accessToken = mssparkutils.credentials.getToken(kustoUri)
kustoDf = spark.read\
.format("com.microsoft.kusto.spark.synapse.datasource")\
.option("accessToken", accessToken)\
.option("kustoCluster", kustoUri)\
.option("kustoDatabase", database) \
.option("kustoQuery", kustoQuery).load()
kustoDf = kustoDf.withColumn("boundQueryResult", col("boundQueryResult").cast(StringType()))
#display(kustoDf.boundQueryResult)
schema = StructType([
StructField("unit", StringType(), True),
StructField("point", StringType(), True),
StructField("value", StringType(), True),
StructField("status", StringType(), True)
])
# Parse the JSON column into a structured DataFrame using the inferred schema
parsed_df = kustoDf.withColumn("parsed_json", from_json(trim(col("boundQueryResult")), schema))
display(parsed_df)
# Flatten the DataFrame by selecting all fields from the parsed JSON
# Adjust the column names to match the actual structure of your JSON data
flattened_df = parsed_df.select("parsed_json.*")
# Show the transformed DataFrame
display(flattened_df)
Solved! Go to Solution.
One way to verify if you get a correct JSON from Event hub is to do a data dump into a KQL table with a Dynamic column that takes the whole JSON object.
Then try something like this
| TempJSONTable
print extract_json("$.boundQueryResult", json, typeof(string));
Hi @BelugaPhil ,
There are a couple of things you might want to check:
1.Ensure that the column name "boundQueryResult" matches exactly with the column name in your Eventstream, including case sensitivity.
2.Verify that the JSON structure within "boundQueryResult" matches the schema you've defined. For example, if your JSON has nested objects or arrays, you would need to define the schema accordingly.
3.Verify that the `boundQueryResult` column contains valid JSON strings. You can use `display(kustoDf.select("boundQueryResult"))` to inspect the data.
4. The `trim` function removes leading and trailing spaces. Ensure that this is necessary for your data. If not, you can remove it.
For more details, please refer:
python - How to Flatten JSON file using pyspark - Stack Overflow
How to Flatten Json Files Dynamically Using Apache PySpark(Python) | by Thomas Thomas | Medium
Best Regards,
Neeko Tang
If this post helps, then please consider Accept it as the solution to help the other members find it more quickly.
I verified all of the above items to be proper and I eliminated the trim function. When I display the boundQueryResult column I get the following (data snipped for brevity):
[{"unit":"RTU01","point":"SetptSchedule","value":"Customer","status":"{ok} @ def"},{"unit":"RTU01","point":"ZoneTemp","value":"16.2 °F","status":"{fault,down,stale} @ def"},{"unit":"RTU01","point":"SupplyTemp","value":"-58.0 °F","status":"{fault,down,stale} @ def"},{"unit":"RTU01","point":"ActiveSetpoint","value":"0.0 °F","status":"{fault,down,stale} @ def"},{"unit":"RTU01","point":"SetpointOffset","value":"0.0 Δ°F","status":"{ok} @ def"},{"unit":"RTU01","point":"ReturnTemp","value":"-58.0 °F","status":"{fault,down,stale} @ def"},{"unit":"RTU01","point":"OutdoorTemp","value":"91.4 °F","status":"{ok} @ def"},{"unit":"RTU01","point":"ZoneAirQuality","value":"0 ppm","status":"{fault,down,stale} @ def"},{"unit":"RTU01","point":"IndoorHumidity","value":"0 %RH","status":"{fault,down,stale} @ def"}]
I put this into a JSON validator and it checked out perfectly yet I am still experiencing null values on my ouput table. When I download the JSON file from the display function above it shows all of the exit backslashes prior to the double quotes. Could these special characters be messing with the parsing?
One way to verify if you get a correct JSON from Event hub is to do a data dump into a KQL table with a Dynamic column that takes the whole JSON object.
Then try something like this
| TempJSONTable
print extract_json("$.boundQueryResult", json, typeof(string));