This time we’re going bigger than ever. Fabric, Power BI, SQL, AI and more. We're covering it all. You won't want to miss it.
Learn moreDid you hear? There's a new SQL AI Developer certification (DP-800). Start preparing now and be one of the first to get certified. Register now
Coauthor: Ramachandran G, RTI Engineering Manager, Microsoft
In today’s data-driven world, organizations rely heavily on real-time insights to monitor systems, detect anomalies, and make informed decisions. One of the key challenges in achieving this is efficiently ingesting and transforming log data from diverse sources into a format that can be analysed instantly.
Real-time Intelligence is an end-to-end solution for event-driven scenarios, streaming data, and data logs. It enables the extraction of insights, visualization, and action on data in motion by handling data ingestion, transformation, storage, analytics, visualization, tracking, AI, and real-time actions.
The Eventstream feature in the Microsoft Fabric Real-Time Intelligence experience lets you bring real-time events into Fabric, transform them, and then route them to various destinations without writing any code (no-code). Additionally, with Apache Kafka endpoints available on the Eventstream item, you can send or consume real-time events using the Kafka protocol.
Eventhouse provide a solution for handling and analyzing large volumes of data, particularly in scenarios requiring real-time analytics and exploration. They're designed to handle real-time data streams efficiently, which lets organizations ingest, process, and analyze data in near real-time. These aspects make Eventhouse useful for scenarios where timely insights are crucial.
Logstash is an open-source data processing tool that enables the collection, transformation, and forwarding of data from a wide variety of sources. It acts as a data pipeline engine, helping organizations manage and streamline the flow of structured and unstructured data across systems.
Whether you're managing infrastructure logs, application events, or telemetry data, this guide will walk you through setting up a seamless pipeline that bridges raw log data with real-time analytics in Fabric.
Ex: For windows, C:\ drive
Ingest_Logs_using_Logstash_into_Real-Time_Intelligence
In this post, we are going to use a bash script to generate continuous system metrics. We can store the below script in a shell file Ex: GenerateJsonSample.sh.
#!/usr/bin/env bash
# you will need to install jq for JSON handling
while true
do
# Generate random IP
random_ip=$(dd if=/dev/urandom bs=4 count=1 2>/dev/null | od -An -tu1 | sed -e 's/^ *//' -e 's/ */./g')
# Generate random response size and HTTP status
random_size=$(( (RANDOM % 65535) + 1 ))
status_codes=(200 201 400 404 500)
random_status=${status_codes[$RANDOM % ${#status_codes[@]}]}
# Generate current timestamp in ISO 8601
timestamp=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
# Random endpoint
endpoints=("/api/data" "/user/login" "/metrics" "/products/123" "/health")
random_endpoint=${endpoints[$RANDOM % ${#endpoints[@]}]}
# Construct JSON log
json_log=$(jq -c -n --arg ip "$random_ip" --arg ts "$timestamp" --arg endpoint "$random_endpoint" --arg status "$random_status" --arg size "$random_size" '{ip: $ip, timestamp: $ts, endpoint: $endpoint, status: ($status|tonumber), size: ($size|tonumber)}')
echo "$json_log" | tee -a '/tmp/jsonlogs.txt'
sleep 0.1
done
./GenerateJsonSample.sh
{"ip":"191.137.181.189","timestamp":"2025-07-08T04:58:04Z","endpoint":"/metrics","status":400,"size":760}
{"ip":"63.27.40.41","timestamp":"2025-07-08T04:58:04Z","endpoint":"/user/login","status":404,"size":10908}
{"ip":"28.39.101.198","timestamp":"2025-07-08T04:58:04Z","endpoint":"/health","status":500,"size":6086}
{"ip":"233.220.66.250","timestamp":"2025-07-08T04:58:04Z","endpoint":"/api/data","status":200,"size":5718}
{"ip":"186.6.135.228","timestamp":"2025-07-08T04:58:04Z","endpoint":"/metrics","status":400,"size":4729}
{"ip":"116.63.93.212","timestamp":"2025-07-08T04:58:04Z","endpoint":"/user/login","status":404,"size":24704}
{"ip":"206.89.144.138","timestamp":"2025-07-08T04:58:04Z","endpoint":"/products/123","status":404,"size":31}
{"ip":"151.218.40.55","timestamp":"2025-07-08T04:58:04Z","endpoint":"/user/login","status":404,"size":24437}
{"ip":"20.9.227.147","timestamp":"2025-07-08T04:58:04Z","endpoint":"/user/login","status":400,"size":10039}
{"ip":"131.148.221.81","timestamp":"2025-07-08T04:58:05Z","endpoint":"/products/123","status":404,"size":6435}
Ingest_Logs_using_Logstash_into_Real-Time_Intelligence
Note: For next steps,
// create table to store logs
.create-merge table LogstashWeblogsJson (
status: int,
size: int,
ip: string,
timestamp: datetime,
endpoint: string,
host: dynamic,
log: dynamic,
event: dynamic
)
// create json mapping to map incoming data to columns in above table
.create table LogstashWeblogsJson ingestion json mapping "LogstashRawMapping" '[{ "column": "status", "path": "$.status" },{ "column": "size", "path": "$.size" }, { "column": "ip", "path": "$.ip" },{ "column": "timestamp", "path": "$.timestamp" },{ "column": "endpoint", "path": "$.endpoint" }, { "column": "host", "path": "$.host" },{ "column": "log", "path": "$.log" },{ "column": "event", "path": "$.event" }]'
From the Overview section on Eventhouse homepage, copy the ingestion URL.
Ingest_Logs_using_Logstash_into_Real-Time_Intelligence
Config file for Logstash is present in config folder where Logstash is installed. Open it in a text editor and add input and output blocks.
Note: For authentication of Logstash to Eventhouse, either Service Principal, or CLI or Managed Identity can be used. The following example uses CLI authentication using my personal login credentials.
Note: Input should read from logs getting generated by GenerateJsonSample.sh . So, make sure path mentioned in input block matches the path in shell script.
input {
file {
path => "/tmp/jsonlogs.txt"
start_position => "beginning"
codec => json
}
}
output {
kusto {
path => "C:\Logstash\tmp\kusto\%{+YYYY-MM-dd-HH-mm-ss}.txt"
ingest_url => "<ingestion url copied in Step 6>"
cli_auth => true
database => "Logs_EH"
table => "LogstashWeblogsJson"
mapping => "LogstashRawMapping"
}
}
.\logstash -f "C:\logstash\logstash-9.0.3\config\logstash-sample.conf"
Query the table in Eventhouse to see real-time log data flowing.
LogstashWeblogsJson
| take 10
Ingest_Logs_using_Logstash_into_Real-Time_Intelligence
Create a 'custom endpoint' source in Eventstream. Once created, click on the source node and select 'Kafka' and collect details of Bootstrap server, Security protocol, Topic name, SASL mechanism, SASL JASS config
Ingest_Logs_using_Logstash_into_Real-Time_Intelligence
similar to step 7 but output will point to the above kafka endpoint of Eventstream
input {
file {
path => "/tmp/jsonlogs.txt"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => json
}
}
output {
kafka {
codec => json
topic_id => "<Kafka topic name>"
bootstrap_servers => "<Kafka bootstrap server>"
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
jaas_path => "<path to>/jaas.conf"
}
}
Note: jaas.conf file should be present in local path with the jaas config copied in step 10. More details here - Kafka output plugin | Logstash Plugins
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="$ConnectionString"
password="<SASL JAAS config>";
};
.\logstash -f "C:\logstash\logstash-9.0.3\config\logstash-kafka.conf"
Ingest_Logs_using_Logstash_into_Real-Time_Intelligence
Logstash output plugins are available for Real-Time Intelligence for ingesting logs, sensor or telemetry data and perform large scale analytics. We can stream the logs either to Eventstream or Eventhouse providing the flexibility to users.
Note: Although the documentation points to ADX, the plugin is fully compatible with Eventhouse.
Reach out to us on RTI Forum: Get Help with Real-Time Intelligence.
Request or upvote a suggestion on Fabric Ideas RTI: Fabric Ideas.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.