Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Enhance your career with this limited time 50% discount on Fabric and Power BI exams. Ends August 31st. Request your voucher.

Reply
GlassShark1
Helper III
Helper III

Can't ingest via API/Eventsteam - Notebook hanging

Starting from scratch with RTA and struggling.

I've created an Eventhouse, a Notebook (that calls an API and returns json) and an Eventstream.

The notebook is scheduled every 5 minutes. The Eventstream has a custom end point on the left (that provides the keys i need for the notebook on publish), and on the right it pointings to a table i have set up in the KQL Database in the eventhouse manually through the UI (New table, adding column names and mappings).

The issue is with the notebook - whenever it runs it seems to work - data is ingested inside of a minute but the notebook hangs, and subsequent refreshes fail.

My code is below.

# Step 1: Install what we need
%pip install azure-eventhub requests

# Step 2: Import libraries
import json
import requests
import pandas as pd
from datetime import datetime
from azure.eventhub import EventHubProducerClient, EventData
import time

# Step 3: Your connection details
eventhub_namespace = "esehlnrljzmeus6ltvsfen.servicebus.windows.net"
eventhub_name = "xxx"
sas_key_name = "xxx"
sas_key_value = "xxx"

connection_str = (
    f"Endpoint=sb://{eventhub_namespace}/;"
    f"SharedAccessKeyName={sas_key_name};"
    f"SharedAccessKey={sas_key_value};"
    f"EntityPath={eventhub_name}"
)

api_params = {
    "resource_id": "292f788f-4339-455b-8cc0-153e14509d4d",
    "filters": json.dumps({"Region": "South West"})
}
api_headers = {
    "Authorization": (
        "xxx"
        "xxx"
        "xxx"
    )
}

print(f"Setup complete at {datetime.now()}")

# Step 4: Get data from National Grid API with proper error handling
print(f"Fetching data at {datetime.now()}")

records = []
api_success = False

try:
    # Add timeout and proper error handling
    response = requests.get(api_url, params=api_params, headers=api_headers, timeout=30)
    response.raise_for_status()  # Raises exception for bad status codes
   
    data = response.json()
    records = data.get('result', {}).get('records', [])
   
    print(f"Found {len(records)} records")
   
    if not records:
        print("No records found - notebook will complete without processing")
    else:
        api_success = True
       
except requests.exceptions.Timeout:
    print("API request timed out - notebook will complete without processing")
except requests.exceptions.RequestException as e:
    print(f"API request failed: {e} - notebook will complete without processing")
except json.JSONDecodeError as e:
    print(f"JSON decode error: {e} - notebook will complete without processing")
except Exception as e:
    print(f"Unexpected error fetching data: {e} - notebook will complete without processing")

def clean_and_stringify_record(record):
    """
    Cleans the record by:
    - Removing spaces from keys
    - Converting all values to strings (or empty string if None)
    - Adding notebook_processed_time timestamp
    """
    try:
        cleaned = {k.replace(" ", ""): str(v) if v is not None else "" for k, v in record.items()}
        # Add timestamp when record was processed by this notebook
        cleaned['notebook_processed_time'] = datetime.now().isoformat()
        return cleaned
    except Exception as e:
        print(f"Error cleaning record: {e}")
        return {}

# Step 5: Send to Event Hub with proper error handling and batching
def send_to_eventhub_in_batches(records, batch_size=100):
    """Send records to Event Hub in batches with proper error handling"""
   
    try:
        producer = EventHubProducerClient.from_connection_string(connection_str)
        total_sent = 0
       
        with producer:
            for i in range(0, len(records), batch_size):
                batch_records = records[i:i + batch_size]
               
                try:
                    event_batch = producer.create_batch()
                   
                    for record in batch_records:
                        cleaned_record = clean_and_stringify_record(record)
                       
                        if cleaned_record:  # Only add non-empty records
                            try:
                                event_batch.add(EventData(json.dumps(cleaned_record)))
                            except Exception as e:
                                print(f"Failed to add record to batch: {e}")
                                continue
                   
                    if len(event_batch) > 0:
                        producer.send_batch(event_batch)
                        batch_num = i // batch_size + 1
                        print(f"Sent batch {batch_num}: {len(batch_records)} records")
                        total_sent += len(batch_records)
                    else:
                        print(f"Batch {i // batch_size + 1} was empty - skipping")
                       
                except Exception as e:
                    print(f"Error processing batch {i // batch_size + 1}: {e}")
                    continue
                   
                # Small delay between batches to prevent overwhelming the service
                time.sleep(0.1)
       
        print(f"Successfully sent {total_sent} records to Event Hub")
        return total_sent
       
    except Exception as e:
        print(f"Event Hub connection error: {e}")
        return 0

# Execute the sending process
if records and api_success:
    print(f"Starting to send {len(records)} records to Event Hub")
    sent_count = send_to_eventhub_in_batches(records)
   
    if sent_count > 0:
        print(f"Successfully processed {sent_count} records")
        execution_status = "SUCCESS"
    else:
        print("Failed to send any records")
        execution_status = "FAILED"
else:
    print("No records to process or API call failed")
    execution_status = "SKIPPED"

# Final completion message
print(f"Notebook execution completed with status: {execution_status} at {datetime.now()}")
print("All done! Event Hub ingestion process complete.")

# Force completion by ensuring all variables are properly cleaned up
del records, api_success, execution_status
print("Notebook execution finished.")
1 ACCEPTED SOLUTION
GlassShark1
Helper III
Helper III

eventually took a different approach to this to solve.

tried a longer refresh schedule and jobs still ended up hanging. Used azure bus instead of the event hub producer (which i think was having trouble closing maybe) and set the whole script in a continous loop with a timer spacing the polling as opposed managing a refresh schedule in the notebook itself.

View solution in original post

3 REPLIES 3
GlassShark1
Helper III
Helper III

eventually took a different approach to this to solve.

tried a longer refresh schedule and jobs still ended up hanging. Used azure bus instead of the event hub producer (which i think was having trouble closing maybe) and set the whole script in a continous loop with a timer spacing the polling as opposed managing a refresh schedule in the notebook itself.

v-menakakota
Community Support
Community Support

Hi @GlassShark1   ,
Thanks for reaching out to the Microsoft fabric community forum. 

Please go through the below solved issue and documentations links which may help you in resolving the issue.
Solved: Ingest Data from Streaming API - Microsoft Fabric Community
Microsoft Fabric event streams overview - Microsoft Fabric | Microsoft Learn
Microsoft Fabric event streams capacity consumption - Microsoft Fabric | Microsoft Learn


If I misunderstand your needs or you still have problems on it, please feel free to let us know.  

Best Regards,  
Community Support Team 

lbendlin
Super User
Super User

The notebook is scheduled every 5 minutes. 

5 minutes seems to be the sweet/sour spot where it is not economical yet to keep the spark session running, and where the spin-up delay of the new session has a considerable impact.  Most likely your environment is busy spinning up and tearing down, and not doing much else.

Helpful resources

Announcements
Fabric July 2025 Monthly Update Carousel

Fabric Monthly Update - July 2025

Check out the July 2025 Fabric update to learn about new features.

August 2025 community update carousel

Fabric Community Update - August 2025

Find out what's new and trending in the Fabric community.

Top Solution Authors