Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Enhance your career with this limited time 50% discount on Fabric and Power BI exams. Ends September 15. Request your voucher.

Reply
tan_thiamhuat
Post Patron
Post Patron

Py4JJavaError: An error occurred while calling o84455.csv.

Py4JJavaError: An error occurred while calling o84455.csv. : org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 661:0 was 145766442 bytes, which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values. at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3055)

 

even though I have the below at the beginning of running the script:

spark = SparkSession.builder \
    .appName("Increase RPC Message Size") \
    .config("spark.rpc.message.maxSize", "2048") \
    .getOrCreate()

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")
spark.conf.get("spark.rpc.message.maxSize")
1 ACCEPTED SOLUTION

Thanks for clarifying! So you’re using coalesce(1) because you need a single CSV file, then you move it to SFTP. That makes sense.

But keep in mind:

  • Using coalesce(1) on a big DataFrame forces all the data to one node/worker, which can cause memory issues or serialization errors—especially with large datasets like yours.
  • That’s usually why the Spark job fails or throws those message size/serialization errors.

Possible solutions:

  1. If you must have a single file, try to:

    • Increase spark.rpc.message.maxSize even more (if you haven’t already).
    • Make sure your cluster has enough memory/resources for one node to handle the whole DataFrame.
    • If possible, filter or reduce your data before doing coalesce(1) to make the final file smaller.
  2. Alternative approach (if you keep hitting errors):

    • Write the CSV without coalesce(1) (so you get multiple part files).
    • Combine those part files into one CSV outside of Spark (with a shell script, Python, etc.) before SFTP transfer.

Let me know if you still get errors or want help with merging the part files after export!

View solution in original post

11 REPLIES 11
tan_thiamhuat
Post Patron
Post Patron

this line gets the error:

 

df_clean.coalesce(1).write.mode("overwrite").option("header", "true").csv(onelake_path)
where df_clean is of 349327 rows and 24 columns
onelake_path = 'Files/Cleaned/3_BillingExport_20250504111740385_cleaned.csv'

It looks like you’re hitting the error when writing out your CSV with:

Python
 
df_clean.coalesce(1).write.mode("overwrite").option("header", "true").csv(onelake_path)

with a pretty big DataFrame (349,927 rows, 24 columns).

If you’re still facing the same Spark error about message size or task serialization, it’s likely because:

  • The data being written is too large for the default Spark RPC message size.
  • The config change (spark.rpc.message.maxSize) might not be picked up by all nodes, or it may need to be increased even more.

Things to try:

  • Double check you set the config before creating your SparkSession (as in my previous post).
  • Try bumping up spark.rpc.message.maxSize higher (4096, 8192, etc).
  • If possible, avoid coalesce(1) (writing as a single file) for huge data—it forces all data to one node, which can cause memory or serialization issues.
  • Try writing without coalesce(1) and see if it works.

If you can share the exact error message you’re getting here, it may help pinpoint the issue better!

    df_clean.coalesce(1).write.mode("overwrite").option("header", "true").csv(onelake_path)

    files = mssparkutils.fs.ls(onelake_path)
    csv_file = next((f.path for f in files if f.path.endswith(".csv")), None)
    print(csv_file)

    if not csv_file:
        raise Exception("CSV file not found in lakehouse path.")

    local_csv = f"/tmp/{cleaned_name}"
    print(local_csv)
    mssparkutils.fs.cp(csv_file, f"file://{local_csv}")
    sftp.put(local_csv, f"/primary/CleanedData/{cleaned_name}")
 
The reason why I need coalesce(1) is that I need a single CSV file, which I then need to transfer it back to SFTP Server from the Lakehouse.
Unless there is a way to combine back different CSV files into one single CSV file into the SFTP Server.

Thanks for clarifying! So you’re using coalesce(1) because you need a single CSV file, then you move it to SFTP. That makes sense.

But keep in mind:

  • Using coalesce(1) on a big DataFrame forces all the data to one node/worker, which can cause memory issues or serialization errors—especially with large datasets like yours.
  • That’s usually why the Spark job fails or throws those message size/serialization errors.

Possible solutions:

  1. If you must have a single file, try to:

    • Increase spark.rpc.message.maxSize even more (if you haven’t already).
    • Make sure your cluster has enough memory/resources for one node to handle the whole DataFrame.
    • If possible, filter or reduce your data before doing coalesce(1) to make the final file smaller.
  2. Alternative approach (if you keep hitting errors):

    • Write the CSV without coalesce(1) (so you get multiple part files).
    • Combine those part files into one CSV outside of Spark (with a shell script, Python, etc.) before SFTP transfer.

Let me know if you still get errors or want help with merging the part files after export!

Combine those part files into one CSV outside of Spark (with a shell script, Python, etc.) before SFTP transfer --> how can this be achieved? with Python code? I need it to be automated too.

spark = SparkSession.builder \
    .appName("Increase RPC Message Size") \
    .config("spark.rpc.message.maxSize", "8192") \
    .getOrCreate()
 
what is the maximum size it can be increased? even 8192 fails.

Hi @tan_thiamhuat ,

Great follow-up and thanks for sharing your code and error details.

For spark.rpc.message.maxSize, there isn’t a fixed “maximum” documented, but in practice, most Spark clusters won’t let you go much above 2047 MB due to JVM and system-level constraints. Setting it higher (like 8192) almost always fails, and even if it works, handling such a large message in a single node is very risky for memory/serialization.

Best practice:

  • Try to avoid coalesce(1) for very large dataframes.
  • If you must create a single CSV, write with default partitioning (you’ll get multiple part files), then merge those files outside Spark (e.g., with a shell script or Python).
  • If you need to filter, sample, or aggregate your data before exporting, that can help reduce file size and avoid hitting these limits.

If you want, I can share a sample script for merging part files after export. Let me know if you still get errors or if you want to try another approach!

Hope this helps!

tan_thiamhuat
Post Patron
Post Patron

where do you put this?

spark-submit --conf spark.rpc.message.maxSize=4096

I am running a Notebook 

 

  1. Make sure all nodes (driver and workers) have the same configuration value for spark.rpc.message.maxSize. --> how to check or configure this?

@tan_thiamhuat ,


You’re running your Spark job from a Notebook, so you can’t use the spark-submit --conf CLI option. You also want to know how to ensure all nodes (driver and workers) have the same spark.rpc.message.maxSize configuration.


Possible Causes:

  • Setting the config only in the SparkSession may not always propagate to all nodes, especially in some environments (e.g., managed clusters or when using Notebooks).
  • If the config isn’t set everywhere, you may still hit the same error.

Solution / Steps to Try:

  1. Setting config in a Notebook (Python example):
    Make sure you set the config before creating your SparkSession. For example:

    Python
     
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
        .appName("Increase RPC Message Size") \
        .config("spark.rpc.message.maxSize", "4096") \
        .getOrCreate()

    If you already have a running SparkSession, you’ll need to stop it (spark.stop()) and recreate it with the new config.

  2. Ensuring All Nodes Get the Config:

    • In most managed environments (Databricks, EMR, etc.), setting the config as above should propagate to all nodes automatically when you create the session.
    • If you have access to cluster configuration files (like spark-defaults.conf), you can set it there for all nodes:
      Code
       
      spark.rpc.message.maxSize 4096
    • For Yarn/EMR, you can add it to cluster-wide Spark configs.
  3. Check Config Propagation:
    To verify the setting on all nodes, run the following in your Notebook:

    Python
     
    print(spark.conf.get("spark.rpc.message.maxSize"))

    This prints the value currently set in your SparkSession config.
    For advanced/standalone clusters, you may need admin access to check configs on each node.


Extra Tips / Documentation:

  • Spark Configuration Docs
  • If you are on Databricks or another managed service, their documentation may have additional steps for global configs.

Let me know if you need more details on a specific environment!

Py4JJavaError Traceback (most recent call last) Cell In[29], line 105 102 print(onelake_path) 103 print(df_clean.count()) --> 105 df_clean.coalesce(1).write.mode("overwrite").option("header", "true").csv(onelake_path) 107 files = mssparkutils.fs.ls(onelake_path) 108 csv_file = next((f.path for f in files if f.path.endswith(".csv")), None) File /opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py:1864, in DataFrameWriter.csv(self, path, mode, compression, sep, quote, escape, header, nullValue, escapeQuotes, quoteAll, dateFormat, timestampFormat, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, charToEscapeQuoteEscaping, encoding, emptyValue, lineSep) 1845 self.mode(mode) 1846 self._set_opts( 1847 compression=compression, 1848 sep=sep, (...) 1862 lineSep=lineSep, 1863 ) -> 1864 self._jwrite.csv(path) File ~/cluster-env/trident_env/lib/python3.11/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"😞 File /opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py:179, in capture_sql_exception.<locals>.deco(*a, **kw) 177 def deco(*a: Any, **kw: Any) -> Any: 178 try: --> 179 return f(*a, **kw) 180 except Py4JJavaError as e: 181 converted = convert_exception(e.java_exception) File ~/cluster-env/trident_env/lib/python3.11/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value)) Py4JJavaError: An error occurred while calling o15420.csv. : org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 112:0 was 145765540 bytes, which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values. at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3055) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2991) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2990) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2990) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1294) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1294) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1294) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3262) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3193) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3182) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1028) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2568) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:361) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:322) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:358) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:230) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:191) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111) at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:386) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:414) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:386) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:203) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:132) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:220) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:101) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:961) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:203) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:191) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:175) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:169) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:251) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:905) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:413) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:380) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:242) at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:896) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829)

burakkaragoz
Community Champion
Community Champion

Hi @tan_thiamhuat ,

 

You are getting this error because the size of the serialized task is much larger than the value allowed by spark.rpc.message.maxSize. Even though you set spark.rpc.message.maxSize to 2048, it might still not be enough for your data, or the configuration may not be applied correctly everywhere in your cluster.

Here’s what you can try:

  1. Make sure you are setting spark.rpc.message.maxSize in both your SparkSession and in your spark-submit command, like this:
Python
 
spark = SparkSession.builder \
    .appName("Increase RPC Message Size") \
    .config("spark.rpc.message.maxSize", "4096") \
    .getOrCreate()

And when submitting your job:

Code
 
spark-submit --conf spark.rpc.message.maxSize=4096 ...
  1. Try increasing the value even more if necessary, but keep in mind that very large values can cause instability or memory issues on your cluster. If possible, break your data into smaller partitions or avoid collecting very large objects at once.

  2. Make sure all nodes (driver and workers) have the same configuration value for spark.rpc.message.maxSize.

  3. If you are sending very large DataFrames or objects, try to process them in smaller chunks or use broadcast variables only for reasonably sized data.

  4. After updating the configs, restart your Spark session or cluster to ensure settings are applied.

If you still get this error, please share more details about when the error happens (for example, during collect(), join, or broadcast), so I can give more targeted advice. 

Helpful resources

Announcements
August Fabric Update Carousel

Fabric Monthly Update - August 2025

Check out the August 2025 Fabric update to learn about new features.

August 2025 community update carousel

Fabric Community Update - August 2025

Find out what's new and trending in the Fabric community.