Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Join us at FabCon Vienna from September 15-18, 2025, for the ultimate Fabric, Power BI, SQL, and AI community-led learning event. Save €200 with code FABCOMM. Get registered

Reply
nkailasamurthy
Regular Visitor

[URGENT] Py4JJavaError when calling notebook from pipeline

Hi there,

 

I have a pipeline in fabric where I'm iterating through a bunch of csv files in lakehouse and calling a notebook to load that csv data to delta tables. I have couple hundred files that I have to load and this error seems intermittent. It throws this error for a few files and when I try rerunning the pipeline for the failed csvs, it loads just fine with no changes done. The error message is 

 

Notebook execution failed at Notebook service with http status code - '200', please check the Run logs on Notebook, additional details - 'Error name - Py4JJavaError, Error value - An error occurred while calling z:com.microsoft.spark.notebook.visualization.display.getDisplayResultForIPython.
: java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:529)
at scala.None$.get(Option.scala:527)
at com.microsoft.spark.notebook.visualization.display$.exec(Display.scala:338)
at com.microsoft.spark.notebook.visualization.display$.$anonfun$getDisplayResultInternal$1(Display.scala:216)
at com.microsoft.spark.notebook.common.trident.CertifiedTelemetryUtils$.withTelemetry(CertifiedTelemetryUtils.scala:71)
at com.microsoft.spark.notebook.visualization.display$.getDisplayResultInternal(Display.scala:205)
at com.microsoft.spark.notebook.visualization.display$.getDisplayResultForIPython(Display.scala:113)
at com.microsoft.spark.notebook.visualization.display.getDisplayResultForIPython(Display.scala)
at jdk.internal.reflect.GeneratedMethodAccessor372.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
' :

 

Attaching screenshots of my pipeline and notebook below

Pipeline - Iterating through all teh csv in a folder

nkailasamurthy_0-1730001300315.png

Inside the foreach I have a notebook activity and a delete activiy to delete the processed files

nkailasamurthy_1-1730001347419.png

 

Notebook

nkailasamurthy_2-1730001492512.png

Then the main code(attached below) is basically reading the csv file, filtering out null ids, type casting, aggregate, left anti join with the target(to remove anything that already exists in the target) and append to the delta table 

 

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, col, sum, avg

filepath=sourcefolder+filename

df = spark.read.format("csv").option("header","true").load(filepath)
df_target_data = spark.read.format("delta").table(tablename)


df_not_null = df.where(col("user_id").isNotNull())

df_users = df_not_null.alias('a').join(\
    df_user_list.alias('b'),\
    on="user_id",\
    how="inner"\
).select('a.*')


df_type_cast= df_users.select("date",\
    "user_id",\
    "name",\
    "email",\
    "type",\
    col("all_calls").cast('int').alias("all_calls"),\
    col("inbound_calls").cast('int').alias("inbound_calls"),\
    col("outbound_calls").cast('int'),\
    col("voicemails").cast('int'),\
    col("missed").cast('int'),\
    col("abandoned").cast('int'),\
    col("forwarded").cast('int'),\
    col("minutes").cast('decimal(10,2)'),\
    col("acd").cast('decimal(10,2)'),\
    col("inbound_minutes").cast('decimal(10,2)'),\
    col("outbound_minutes").cast('decimal(10,2)'),\
    col("internal_calls").cast('int'),\
    col("forwarding_number").cast('int'),\
    col("desktop_app").cast('int'),\
    col("mobile_voip").cast('int'),\
    col("desk_phone").cast('int'),\
    col("web_app").cast('int'),\
    col("callbacks_cancelled").cast('int'),\
    col("handled").cast('int'),\
    col("answered").cast('int'),\
    col("answered_transferred").cast('int'),\
    col("message").cast('int'),\
    col("spam").cast('int'),\
    col("in_queue_voicemail").cast('int'),\
    col("dtmf_voicemail").cast('int'),\
    col("direct_to_voicemail").cast('int'),\
    col("transfer_voicemail").cast('int'),\
    col("outbound_connected").cast('int'),\
    col("connected_transferred").cast('int'),\
    col("transferred_out").cast('int'),\
    col("transferred_in").cast('int'),\
    col("dtmf_transfer").cast('int'),\
    col("auto_transfer").cast('int'),\
    col("router_transfer").cast('int'),\
    col("forward_transfer").cast('int'),\
    col("ringing_duration").cast('decimal(10,2)'),\
    col("avg_ringing_duration").cast('decimal(10,2)'),\
    col("hold_duration").cast('decimal(10,2)'),\
    col("avg_hold_duration").cast('decimal(10,2)'),\
    col("talk_duration").cast('decimal(10,2)'),\
    col("avg_talk_duration").cast('decimal(10,2)')\
)


df_aggregate = df_type_cast.groupBy("date","user_id","name","email").agg(sum("all_calls").alias("all_calls"),\
    sum("inbound_calls").alias("inbound_calls"),\
    sum("outbound_calls").alias("outbound_calls"),\
    sum("voicemails").alias("voicemails"),\
    sum("missed").alias("missed"),\
    sum("abandoned").alias("abandoned"),\
    sum("forwarded").alias("forwarded"),\
    sum("minutes").alias("minutes"),\
    sum("acd").alias("acd"),\
    sum("inbound_minutes").alias("inbound_minutes"),\
    sum("outbound_minutes").alias("outbound_minutes"),\
    sum("internal_calls").alias("internal_calls"),\
    sum("forwarding_number").alias("forwarding_number"),\
    sum("desktop_app").alias("desktop_app"),\
    sum("mobile_voip").alias("mobile_voip"),\
    sum("desk_phone").alias("desk_phone"),\
    sum("web_app").alias("web_app"),\
    sum("callbacks_cancelled").alias("callbacks_cancelled"),\
    sum("handled").alias("handled"),\
    sum("answered").alias("answered"),\
    sum("answered_transferred").alias("answered_transferred"),\
    sum("message").alias("message"),\
    sum("spam").alias("spam"),\
    sum("in_queue_voicemail").alias("in_queue_voicemail"),\
    sum("dtmf_voicemail").alias("dtmf_voicemail"),\
    sum("direct_to_voicemail").alias("direct_to_voicemail"),\
    sum("transfer_voicemail").alias("transfer_voicemail"),\
    sum("outbound_connected").alias("outbound_connected"),\
    sum("connected_transferred").alias("connected_transferred"),\
    sum("transferred_out").alias("transferred_out"),\
    sum("transferred_in").alias("transferred_in"),\
    sum("dtmf_transfer").alias("dtmf_transfer"),\
    sum("auto_transfer").alias("auto_transfer"),\
    sum("router_transfer").alias("router_transfer"),\
    sum("forward_transfer").alias("forward_transfer"),\
    sum("ringing_duration").alias("ringing_duration"),\
    avg("avg_ringing_duration").alias("avg_ringing_duration"),\
    sum("hold_duration").alias("hold_duration"),\
    avg("avg_hold_duration").alias("avg_hold_duration"),\
    sum("talk_duration").alias("talk_duration"),\
    avg("avg_talk_duration").alias("avg_talk_duration")\
)


# If the call stat already exists, it will not be inserted. They are simply filtered out

df_final=df_aggregate.join(df_target_data, on=['date','user_id','name','email'], how="left_anti")
display(df_final)

df_final.write.format("delta").mode("append").saveAsTable("Stats_User_Calls")
 
Can someone please help me understand why this is happening?
1 REPLY 1
lbendlin
Super User
Super User

If you have a Pro license you can open a Pro ticket at https://admin.powerplatform.microsoft.com/newsupportticket/powerbi
Otherwise you can raise an issue at https://community.fabric.microsoft.com/t5/Issues/idb-p/Issues .

Helpful resources

Announcements
Join our Fabric User Panel

Join our Fabric User Panel

This is your chance to engage directly with the engineering team behind Fabric and Power BI. Share your experiences and shape the future.

June FBC25 Carousel

Fabric Monthly Update - June 2025

Check out the June 2025 Fabric update to learn about new features.

June 2025 community update carousel

Fabric Community Update - June 2025

Find out what's new and trending in the Fabric community.