The ultimate Fabric, Power BI, SQL, and AI community-led learning event. Save €200 with code FABCOMM.
Get registeredEnhance your career with this limited time 50% discount on Fabric and Power BI exams. Ends September 15. Request your voucher.
Py4JJavaError: An error occurred while calling o84455.csv. : org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 661:0 was 145766442 bytes, which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values. at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3055)
even though I have the below at the beginning of running the script:
Solved! Go to Solution.
Thanks for clarifying! So you’re using coalesce(1) because you need a single CSV file, then you move it to SFTP. That makes sense.
But keep in mind:
Possible solutions:
If you must have a single file, try to:
Alternative approach (if you keep hitting errors):
Let me know if you still get errors or want help with merging the part files after export!
this line gets the error:
It looks like you’re hitting the error when writing out your CSV with:
df_clean.coalesce(1).write.mode("overwrite").option("header", "true").csv(onelake_path)
with a pretty big DataFrame (349,927 rows, 24 columns).
If you’re still facing the same Spark error about message size or task serialization, it’s likely because:
Things to try:
If you can share the exact error message you’re getting here, it may help pinpoint the issue better!
Thanks for clarifying! So you’re using coalesce(1) because you need a single CSV file, then you move it to SFTP. That makes sense.
But keep in mind:
Possible solutions:
If you must have a single file, try to:
Alternative approach (if you keep hitting errors):
Let me know if you still get errors or want help with merging the part files after export!
Combine those part files into one CSV outside of Spark (with a shell script, Python, etc.) before SFTP transfer --> how can this be achieved? with Python code? I need it to be automated too.
Hi @tan_thiamhuat ,
Great follow-up and thanks for sharing your code and error details.
For spark.rpc.message.maxSize, there isn’t a fixed “maximum” documented, but in practice, most Spark clusters won’t let you go much above 2047 MB due to JVM and system-level constraints. Setting it higher (like 8192) almost always fails, and even if it works, handling such a large message in a single node is very risky for memory/serialization.
Best practice:
If you want, I can share a sample script for merging part files after export. Let me know if you still get errors or if you want to try another approach!
Hope this helps!
where do you put this?
spark-submit --conf spark.rpc.message.maxSize=4096
I am running a Notebook
Make sure all nodes (driver and workers) have the same configuration value for spark.rpc.message.maxSize. --> how to check or configure this?
You’re running your Spark job from a Notebook, so you can’t use the spark-submit --conf CLI option. You also want to know how to ensure all nodes (driver and workers) have the same spark.rpc.message.maxSize configuration.
Possible Causes:
Solution / Steps to Try:
Setting config in a Notebook (Python example):
Make sure you set the config before creating your SparkSession. For example:
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("Increase RPC Message Size") \ .config("spark.rpc.message.maxSize", "4096") \ .getOrCreate()
If you already have a running SparkSession, you’ll need to stop it (spark.stop()) and recreate it with the new config.
Ensuring All Nodes Get the Config:
spark.rpc.message.maxSize 4096
Check Config Propagation:
To verify the setting on all nodes, run the following in your Notebook:
print(spark.conf.get("spark.rpc.message.maxSize"))
This prints the value currently set in your SparkSession config.
For advanced/standalone clusters, you may need admin access to check configs on each node.
Extra Tips / Documentation:
Let me know if you need more details on a specific environment!
Py4JJavaError Traceback (most recent call last) Cell In[29], line 105 102 print(onelake_path) 103 print(df_clean.count()) --> 105 df_clean.coalesce(1).write.mode("overwrite").option("header", "true").csv(onelake_path) 107 files = mssparkutils.fs.ls(onelake_path) 108 csv_file = next((f.path for f in files if f.path.endswith(".csv")), None) File /opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py:1864, in DataFrameWriter.csv(self, path, mode, compression, sep, quote, escape, header, nullValue, escapeQuotes, quoteAll, dateFormat, timestampFormat, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, charToEscapeQuoteEscaping, encoding, emptyValue, lineSep) 1845 self.mode(mode) 1846 self._set_opts( 1847 compression=compression, 1848 sep=sep, (...) 1862 lineSep=lineSep, 1863 ) -> 1864 self._jwrite.csv(path) File ~/cluster-env/trident_env/lib/python3.11/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"😞 File /opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py:179, in capture_sql_exception.<locals>.deco(*a, **kw) 177 def deco(*a: Any, **kw: Any) -> Any: 178 try: --> 179 return f(*a, **kw) 180 except Py4JJavaError as e: 181 converted = convert_exception(e.java_exception) File ~/cluster-env/trident_env/lib/python3.11/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value)) Py4JJavaError: An error occurred while calling o15420.csv. : org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 112:0 was 145765540 bytes, which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values. at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3055) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2991) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2990) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2990) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1294) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1294) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1294) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3262) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3193) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3182) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1028) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2568) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:361) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:322) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:358) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:230) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:191) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111) at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:386) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:414) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:386) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:203) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:132) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:220) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:101) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:961) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:203) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:191) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:175) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:169) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:251) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:905) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:413) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:380) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:242) at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:896) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829)
Hi @tan_thiamhuat ,
You are getting this error because the size of the serialized task is much larger than the value allowed by spark.rpc.message.maxSize. Even though you set spark.rpc.message.maxSize to 2048, it might still not be enough for your data, or the configuration may not be applied correctly everywhere in your cluster.
Here’s what you can try:
spark = SparkSession.builder \ .appName("Increase RPC Message Size") \ .config("spark.rpc.message.maxSize", "4096") \ .getOrCreate()
And when submitting your job:
spark-submit --conf spark.rpc.message.maxSize=4096 ...
Try increasing the value even more if necessary, but keep in mind that very large values can cause instability or memory issues on your cluster. If possible, break your data into smaller partitions or avoid collecting very large objects at once.
Make sure all nodes (driver and workers) have the same configuration value for spark.rpc.message.maxSize.
If you are sending very large DataFrames or objects, try to process them in smaller chunks or use broadcast variables only for reasonably sized data.
After updating the configs, restart your Spark session or cluster to ensure settings are applied.
If you still get this error, please share more details about when the error happens (for example, during collect(), join, or broadcast), so I can give more targeted advice.
User | Count |
---|---|
16 | |
14 | |
8 | |
7 | |
5 |
User | Count |
---|---|
32 | |
31 | |
23 | |
15 | |
11 |