Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Enhance your career with this limited time 50% discount on Fabric and Power BI exams. Ends August 31st. Request your voucher.

kustortininja

Handling Multiple Schemas in Eventstream for Azure Event Hub and Azure IOT Hub data sources

One of the challenges with streaming data is that data comes in a variety of schemas, which can be dynamic and are not always predictable. As applications and data structures change, schema values can sometimes be wildly different across devices or event inputs. Recently a customer reached out with several questions on this common problem. With the capabilities in Fabric Real Time Intelligence we can flexibly ingest this data. Enough of the background let’s get to the problem!

 

In this scenario, let’s imagine I have one central IOT Hub in my organization that I am sending my data to. Different devices send their sensor readings at regular intervals at various points, but they are different devices with different schemas. One is sensor readings for temperature, humidity, pressure; and the other is a totally different segment of my business showing engine_speed, fuel_level, status, latitude, longitude, and others. Don’t judge this is purely for demonstration. 😊

kustortininja_0-1752236259467.png

 

 

At any rate, I’ve provisioned this IOT hub, and connected my eventstream to it in Fabric. If I preview the data in the derived stream, I can notice my problem immediately:

 

 

The schemas are different! If I were to carry this mapping through directly, I would wind up with a lot of junk data that would not be super useful. The trick is to let Eventhouse pull the data from the upstream source directly. Create an Eventhouse destination, and ensure that “Direct Ingestion” is selected. Send the data to a KQL database that you have pre-created [Protip: Always ensure you’ve created the eventhouse before you create the eventstream] and then publish the stream. Note that it may take a few minutes before the stream starts connecting to the eventhouse.

 

 

kustortininja_2-1752236259473.png

After the configuration wizard appears, select it to configure the table you want the records to write to. In the wizard that appears, give your new table a name and after you click next, turn the Kusto mapping off. The default setting will carry the data through like this:

kustortininja_0-1752236343404.png

You don’t want this. Essentially, we want to tell the Kusto ingestion process to collapse the entire json record into a single value, and we’ll use the dynamic data type in Kusto to break it out later. In the screenshot above, you’ll notice at the top it says “Nested Levels.” Change this from 1 to 0. KQL is very good at breaking apart JSON (you may have seen some of my other posts about it’s superpowers at this) but in this case we don’t want it to be so eager. Once you change it to 0, you should see something much more what like what we want:

kustortininja_1-1752236363119.png

One big dynamic JSON array. 😊 Write it to the raw table and let your events start to flow, which we can also see in our table:

kustortininja_2-1752236379978.png

 

 

Now what we’d like to do is tell Kusto to break out the payload per our instructions. For the sake of simplicity, I’m going to only do 1 of the 2, but the process is the same for both. We’re going to use Update Policies in the Kusto engine to magically write these to the correct table. Here is my complete script. Notice that I’ve utilized the pack_all() and bag_remove_keys functions to carry through unused columns. This also helps me code defensively. In the event additional columns are added to the source, I will magically pick them up and load them to the destination table, without incurring any transformation/load failures.

 

// Extract specific fields from the parsed JSON and assign them to new columns
| extend 
    vehicleId = ParsedJson.vehicleId,
    timestamp = ParsedJson.timestamp,
    speedKph = ParsedJson.speed_kph,
    engine_rpm = ParsedJson.engine_rpm,
    fuel_level_pct = ParsedJson.fuel_level_pct,
    IngestionTime = ingestion_time(), // Capture the ingestion time of the event
    EventEnqueuedTime = ParsedJson.EventEnqueuedUtcTime,
    EventProcessedTime = ParsedJson.EventProcessedUtcTime,
    IOTHubEnqueuedTime = ParsedJson.IoTHub.EnqueuedTime
// Remove the original 'Data' and 'ParsedJson' columns to clean up the output
| project-away Data, ParsedJson
// Remove the extracted fields from 'AdditionalData' so it only contains the remaining dynamic content
| extend AdditionalData = bag_remove_keys(
    AdditionalData,
    dynamic([
        "vehicleId",
        "speedKph",
        "timestamp",
        "engine_rpm",
        "fuel_level_pct",
        "IngestionTime",
        "EventEnqueuedUtcTime",
        "EventProcessedUtcTime",
        "IOTHub.EnqueuedTime"
    ])
)
// Reorder the columns for better readability and structure
| project-reorder 
    vehicleId, 
    timestamp, 
    speedKph, 
    engine_rpm, 
    fuel_level_pct, 
    IngestionTime, 
    EventEnqueuedTime, 
    EventProcessedTime, 
    IOTHubEnqueuedTime, 
    AdditionalData

 

Carrying this through, I’ll create a function, the destination table, and attach it to an update policy:

//encapsulate your query in an update function
.create-or-alter function with (docstring="My simple function that breaks my JSON out for vehicles into their appropriate table structure", folder="MyFunctions") fnMyVehicleTelemetry {
    iothubmultischematest
| extend ParsedJson= parse_json(Data)
| where isnotempty(ParsedJson.vehicleId)
| extend vehicleId = tostring(ParsedJson.vehicleId), timestamp=todatetime(ParsedJson.timestamp), speedKph=toreal(ParsedJson.speed_kph), engine_rpm=toreal(ParsedJson.engine_rpm), fuel_level_pct=toreal(ParsedJson.fuel_level_pct), IngestionTime=ingestion_time(), EventEnqueuedTime=todatetime(ParsedJson.EventEnqueuedUtcTime), EventProcessedTime=todatetime(ParsedJson.EventProcessedUtcTime), IOTHubEnqueuedTime=todatetime(ParsedJson.IoTHub.EnqueuedTime)
| project-away Data, ParsedJson
}

//create your destination table
.create table VehicleTelemetry(vehicleId:string, timestamp: datetime , speedKph: real , engine_rpm: real , fuel_level_pct: real , IngestionTime: datetime , EventEnqueuedTime: datetime , EventProcessedTime: datetime , IOTHubEnqueuedTime: datetime , AdditionalData: dynamic )

//now attach my function to the update policy to automatically move data into its new home. Note that IsTransactional must be set to False in this scenario.
.alter table VehicleTelemetry policy update
```[{
    "IsEnabled": "TRUE",
    "Source": "iothubmultischematest",
    "Query": "fnMyVehicleTelemetry",
    "IsTransactional": "FALSE", 
    "PropagateIngestionProperties": "TRUE"
}]```

 

I'm only doing one of the two schemas in the above, but the process would be the same. For each schema that is being passed into eventhouse, create a new update policy in a different function that creates a different destination table. If you'd like, set the maximum retention policy on the raw source table off the default and set it to something like 1 day as you are only landing the data in that table to break it apart. 

 

Here, we are using the combined power of Eventstream and Eventhouse to create truly dynamic, flexible ingestion processes. Is this useful? What do you think? Let me know in the comments below!

Comments