Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Join us at the 2025 Microsoft Fabric Community Conference. March 31 - April 2, Las Vegas, Nevada. Use code FABINSIDER for $400 discount. Register now

Reply
nkailasamurthy
Regular Visitor

[URGENT] Py4JJavaError when calling notebook from pipeline

Hi there,

 

I have a pipeline in fabric where I'm iterating through a bunch of csv files in lakehouse and calling a notebook to load that csv data to delta tables. I have couple hundred files that I have to load and this error seems intermittent. It throws this error for a few files and when I try rerunning the pipeline for the failed csvs, it loads just fine with no changes done. The error message is 

 

Notebook execution failed at Notebook service with http status code - '200', please check the Run logs on Notebook, additional details - 'Error name - Py4JJavaError, Error value - An error occurred while calling z:com.microsoft.spark.notebook.visualization.display.getDisplayResultForIPython.
: java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:529)
at scala.None$.get(Option.scala:527)
at com.microsoft.spark.notebook.visualization.display$.exec(Display.scala:338)
at com.microsoft.spark.notebook.visualization.display$.$anonfun$getDisplayResultInternal$1(Display.scala:216)
at com.microsoft.spark.notebook.common.trident.CertifiedTelemetryUtils$.withTelemetry(CertifiedTelemetryUtils.scala:71)
at com.microsoft.spark.notebook.visualization.display$.getDisplayResultInternal(Display.scala:205)
at com.microsoft.spark.notebook.visualization.display$.getDisplayResultForIPython(Display.scala:113)
at com.microsoft.spark.notebook.visualization.display.getDisplayResultForIPython(Display.scala)
at jdk.internal.reflect.GeneratedMethodAccessor372.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
' :

 

Attaching screenshots of my pipeline and notebook below

Pipeline - Iterating through all teh csv in a folder

nkailasamurthy_0-1730001300315.png

Inside the foreach I have a notebook activity and a delete activiy to delete the processed files

nkailasamurthy_1-1730001347419.png

 

Notebook

nkailasamurthy_2-1730001492512.png

Then the main code(attached below) is basically reading the csv file, filtering out null ids, type casting, aggregate, left anti join with the target(to remove anything that already exists in the target) and append to the delta table 

 

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, col, sum, avg

filepath=sourcefolder+filename

df = spark.read.format("csv").option("header","true").load(filepath)
df_target_data = spark.read.format("delta").table(tablename)


df_not_null = df.where(col("user_id").isNotNull())

df_users = df_not_null.alias('a').join(\
    df_user_list.alias('b'),\
    on="user_id",\
    how="inner"\
).select('a.*')


df_type_cast= df_users.select("date",\
    "user_id",\
    "name",\
    "email",\
    "type",\
    col("all_calls").cast('int').alias("all_calls"),\
    col("inbound_calls").cast('int').alias("inbound_calls"),\
    col("outbound_calls").cast('int'),\
    col("voicemails").cast('int'),\
    col("missed").cast('int'),\
    col("abandoned").cast('int'),\
    col("forwarded").cast('int'),\
    col("minutes").cast('decimal(10,2)'),\
    col("acd").cast('decimal(10,2)'),\
    col("inbound_minutes").cast('decimal(10,2)'),\
    col("outbound_minutes").cast('decimal(10,2)'),\
    col("internal_calls").cast('int'),\
    col("forwarding_number").cast('int'),\
    col("desktop_app").cast('int'),\
    col("mobile_voip").cast('int'),\
    col("desk_phone").cast('int'),\
    col("web_app").cast('int'),\
    col("callbacks_cancelled").cast('int'),\
    col("handled").cast('int'),\
    col("answered").cast('int'),\
    col("answered_transferred").cast('int'),\
    col("message").cast('int'),\
    col("spam").cast('int'),\
    col("in_queue_voicemail").cast('int'),\
    col("dtmf_voicemail").cast('int'),\
    col("direct_to_voicemail").cast('int'),\
    col("transfer_voicemail").cast('int'),\
    col("outbound_connected").cast('int'),\
    col("connected_transferred").cast('int'),\
    col("transferred_out").cast('int'),\
    col("transferred_in").cast('int'),\
    col("dtmf_transfer").cast('int'),\
    col("auto_transfer").cast('int'),\
    col("router_transfer").cast('int'),\
    col("forward_transfer").cast('int'),\
    col("ringing_duration").cast('decimal(10,2)'),\
    col("avg_ringing_duration").cast('decimal(10,2)'),\
    col("hold_duration").cast('decimal(10,2)'),\
    col("avg_hold_duration").cast('decimal(10,2)'),\
    col("talk_duration").cast('decimal(10,2)'),\
    col("avg_talk_duration").cast('decimal(10,2)')\
)


df_aggregate = df_type_cast.groupBy("date","user_id","name","email").agg(sum("all_calls").alias("all_calls"),\
    sum("inbound_calls").alias("inbound_calls"),\
    sum("outbound_calls").alias("outbound_calls"),\
    sum("voicemails").alias("voicemails"),\
    sum("missed").alias("missed"),\
    sum("abandoned").alias("abandoned"),\
    sum("forwarded").alias("forwarded"),\
    sum("minutes").alias("minutes"),\
    sum("acd").alias("acd"),\
    sum("inbound_minutes").alias("inbound_minutes"),\
    sum("outbound_minutes").alias("outbound_minutes"),\
    sum("internal_calls").alias("internal_calls"),\
    sum("forwarding_number").alias("forwarding_number"),\
    sum("desktop_app").alias("desktop_app"),\
    sum("mobile_voip").alias("mobile_voip"),\
    sum("desk_phone").alias("desk_phone"),\
    sum("web_app").alias("web_app"),\
    sum("callbacks_cancelled").alias("callbacks_cancelled"),\
    sum("handled").alias("handled"),\
    sum("answered").alias("answered"),\
    sum("answered_transferred").alias("answered_transferred"),\
    sum("message").alias("message"),\
    sum("spam").alias("spam"),\
    sum("in_queue_voicemail").alias("in_queue_voicemail"),\
    sum("dtmf_voicemail").alias("dtmf_voicemail"),\
    sum("direct_to_voicemail").alias("direct_to_voicemail"),\
    sum("transfer_voicemail").alias("transfer_voicemail"),\
    sum("outbound_connected").alias("outbound_connected"),\
    sum("connected_transferred").alias("connected_transferred"),\
    sum("transferred_out").alias("transferred_out"),\
    sum("transferred_in").alias("transferred_in"),\
    sum("dtmf_transfer").alias("dtmf_transfer"),\
    sum("auto_transfer").alias("auto_transfer"),\
    sum("router_transfer").alias("router_transfer"),\
    sum("forward_transfer").alias("forward_transfer"),\
    sum("ringing_duration").alias("ringing_duration"),\
    avg("avg_ringing_duration").alias("avg_ringing_duration"),\
    sum("hold_duration").alias("hold_duration"),\
    avg("avg_hold_duration").alias("avg_hold_duration"),\
    sum("talk_duration").alias("talk_duration"),\
    avg("avg_talk_duration").alias("avg_talk_duration")\
)


# If the call stat already exists, it will not be inserted. They are simply filtered out

df_final=df_aggregate.join(df_target_data, on=['date','user_id','name','email'], how="left_anti")
display(df_final)

df_final.write.format("delta").mode("append").saveAsTable("Stats_User_Calls")
 
Can someone please help me understand why this is happening?
1 REPLY 1
lbendlin
Super User
Super User

If you have a Pro license you can open a Pro ticket at https://admin.powerplatform.microsoft.com/newsupportticket/powerbi
Otherwise you can raise an issue at https://community.fabric.microsoft.com/t5/Issues/idb-p/Issues .

Helpful resources

Announcements
Las Vegas 2025

Join us at the Microsoft Fabric Community Conference

March 31 - April 2, 2025, in Las Vegas, Nevada. Use code MSCUST for a $150 discount!

FebFBC_Carousel

Fabric Monthly Update - February 2025

Check out the February 2025 Fabric update to learn about new features.

Feb2025 NL Carousel

Fabric Community Update - February 2025

Find out what's new and trending in the Fabric community.