Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

The Power BI Data Visualization World Championships is back! Get ahead of the game and start preparing now! Learn more

Reply
AndrewWestran
Helper I
Helper I

Fabric Notebook - Message Maxsize

Hi 

 

I am facing the following issue when loading data to a table using a notebook (PySpark):

 

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 12182:0 was 182487782 bytes, which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values. at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2682) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2618) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2617) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2617) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1190) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1190) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1190) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2870) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2812) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2801) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

 

Code as follows:

 

import os
import glob
import pandas as pds

directory = r"/lakehouse/default/Files/C4SQLStage/DBTZBA/*.csv"

for filename in glob.glob(directory):
    f = os.path.join(directory, filename)

    # Load each csv file into a new Pandas DataFrame
    # We use Pandas here as it supports multi-character delimiters
    df = pds.read_csv(f, delimiter='', engine='python')

    # Get column details for checking the number of columns
    col = df.columns

    # For 54 column table versions, set the column names
    if len(col) != 55:
        if df.iat[0, 1][0:2].upper() == 'DW':
            df.columns = ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr']
        else:
            df.columns = ['cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','ClientID','C4SqlClientDatabaseName']
    # For 55 column table versions, set the column names
    else:
        if df.iat[0, 1][0:2].upper() == 'DW':
            df.columns = ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','C4SQL_LastUpdated']
        else:
            df.columns = ['cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','ClientID','C4SqlClientDatabaseName','C4SQL_LastUpdated']

    # Create a new spark DataFrame using the common subset of columns from the DBTZBA table
    # We use spark here as it supports writing to tables
    df_new = spark.createDataFrame(df.loc[:, ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr']])
   
    # Append the csv data to the new table
    df_new.write.mode("append").format("delta").saveAsTable("DBTZBA")
 
Error is on the last line of the code.
 
The file being processed is the first file in the queue that is in the region of 1GB in size. Previous files are all under 400MB.
1 ACCEPTED SOLUTION
AndrewWestran
Helper I
Helper I

Problem solved.

Assuming the issue was related to the large file size, I decided to 'batch' the data load for files that failed to load whole.

Updated Code:

 

import os
import glob
import pandas as pds
import warnings

warnings.filterwarnings("ignore", message="iteritems is deprecated")

#spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

directory = r"/lakehouse/default/Files/C4SQLStage/DBTZBA/*.csv"

for filename in glob.glob(directory):
    f = os.path.join(directory, filename)

    # Load each csv file into a new Pandas DataFrame
    # We use Pandas here as it supports multi-character delimiters
    df = pds.read_csv(f, delimiter='', engine='python')

    # Get column details for checking the number of columns
    col = df.columns

    # For 54 column table versions, set the column names
    if len(col) != 55:
        #print(f"{filename}, Columns: {len(col)}, DBName: {df.iat[0, 1]},  Date Last Updated: {df.iat[0, 53]}")
        if df.iat[0, 1][0:2].upper() == 'DW':
            df.columns = ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr']
        else:
            df.columns = ['cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','ClientID','C4SqlClientDatabaseName']
    # For 55 column table versions, set the column names
    else:
        #print(f"{filename}, Columns: {len(col)}, DBName: {df.iat[0, 1]},  Date Last Updated: {df.iat[0, 54]}")
        if df.iat[0, 1][0:2].upper() == 'DW':
            df.columns = ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','C4SQL_LastUpdated']
        else:
            df.columns = ['cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','ClientID','C4SqlClientDatabaseName','C4SQL_LastUpdated']

    # Create a new DataFrame using the common subset of columns from the DBTZBA table
    df_common = df.loc[:, ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr']]
    try:
        # Set up a spark DataFrame here to handle the write (append) to the new table
        spark_df = spark.createDataFrame(df_common)
        spark_df.write.mode("append").format("delta").saveAsTable("DBTZBA")
    except Exception as e:
        print(f"Error processing file: {filename}, Trying Month level batching.")

        # Iterate through the range of months represented in dtDatum values
        for unqValue in pds.to_datetime(df_common['dtDatum']).dt.strftime('%Y-%m').unique():

            # Filter the DBTZBA DataFrame for the selected month
            spark_df = spark.createDataFrame(df_common[pds.to_datetime(df_common['dtDatum']).dt.strftime('%Y-%m') == unqValue])

            # Append the csv data to the new table
            # We use spark here as it supports writing to tables
            try:
                spark_df.write.mode("append").format("delta").saveAsTable("DBTZBA")
            except Exception as e2:
                print(f"Error processing file: {filename}, Error: {e2}")

View solution in original post

2 REPLIES 2
AndrewWestran
Helper I
Helper I

Problem solved.

Assuming the issue was related to the large file size, I decided to 'batch' the data load for files that failed to load whole.

Updated Code:

 

import os
import glob
import pandas as pds
import warnings

warnings.filterwarnings("ignore", message="iteritems is deprecated")

#spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

directory = r"/lakehouse/default/Files/C4SQLStage/DBTZBA/*.csv"

for filename in glob.glob(directory):
    f = os.path.join(directory, filename)

    # Load each csv file into a new Pandas DataFrame
    # We use Pandas here as it supports multi-character delimiters
    df = pds.read_csv(f, delimiter='', engine='python')

    # Get column details for checking the number of columns
    col = df.columns

    # For 54 column table versions, set the column names
    if len(col) != 55:
        #print(f"{filename}, Columns: {len(col)}, DBName: {df.iat[0, 1]},  Date Last Updated: {df.iat[0, 53]}")
        if df.iat[0, 1][0:2].upper() == 'DW':
            df.columns = ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr']
        else:
            df.columns = ['cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','ClientID','C4SqlClientDatabaseName']
    # For 55 column table versions, set the column names
    else:
        #print(f"{filename}, Columns: {len(col)}, DBName: {df.iat[0, 1]},  Date Last Updated: {df.iat[0, 54]}")
        if df.iat[0, 1][0:2].upper() == 'DW':
            df.columns = ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','C4SQL_LastUpdated']
        else:
            df.columns = ['cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr','ClientID','C4SqlClientDatabaseName','C4SQL_LastUpdated']

    # Create a new DataFrame using the common subset of columns from the DBTZBA table
    df_common = df.loc[:, ['ClientID','C4SqlClientDatabaseName','cMarktNr','dtDatum','siKassNr','lBonNr','lPosNr','cStornoZle','cStorZleManag','cRueckZahlArt','cGewArt','cPfandArt','cNegArt','cStornoArt','cPreisInfo','cPreisUeb','cRabatt','cWaagArt','cDatenEing','cArtInTrAkt','cZwSumme','cPresseCode','cArtSatz','cManEing','cManBedRab','cMxNAutoRab','cPersRabatt','cAutoRabatt','cBonbezug','cKombiRab','dPluNr','dMenge','dVKPreis','dUmsatz','dSTPreis','siAbteilung','siRabSatz','dRabSumme','dArtNr','sArtBez','dEKPreis','cArtBezLang','siHauptabt','lRegalnr','siFachfolnr','lAktionsnr','lWgr','siPrzMwSt','dRabWertBon','dPreisUebWert','dVirtRabWert','lRabAktNr','lBAAutorNr','siBACodeNr']]
    try:
        # Set up a spark DataFrame here to handle the write (append) to the new table
        spark_df = spark.createDataFrame(df_common)
        spark_df.write.mode("append").format("delta").saveAsTable("DBTZBA")
    except Exception as e:
        print(f"Error processing file: {filename}, Trying Month level batching.")

        # Iterate through the range of months represented in dtDatum values
        for unqValue in pds.to_datetime(df_common['dtDatum']).dt.strftime('%Y-%m').unique():

            # Filter the DBTZBA DataFrame for the selected month
            spark_df = spark.createDataFrame(df_common[pds.to_datetime(df_common['dtDatum']).dt.strftime('%Y-%m') == unqValue])

            # Append the csv data to the new table
            # We use spark here as it supports writing to tables
            try:
                spark_df.write.mode("append").format("delta").saveAsTable("DBTZBA")
            except Exception as e2:
                print(f"Error processing file: {filename}, Error: {e2}")
AndrewWestran
Helper I
Helper I

Full error message:

 

Error: An error occurred while calling o4931.saveAsTable. : org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:283) at org.apache.spark.sql.delta.files.TransactionalWrite.$anonfun$writeFiles$1(TransactionalWrite.scala:456) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:391) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:355) at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:103) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:221) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:218) at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:103) at org.apache.spark.sql.delta.commands.WriteIntoDelta.write(WriteIntoDelta.scala:337) at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:98) at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:91) at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:252) at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:91) at org.apache.spark.sql.delta.catalog.WriteIntoDeltaBuilder$$anon$1$$anon$2.insert(DeltaTableV2.scala:279) at org.apache.spark.sql.execution.datasources.v2.SupportsV1Write.writeWithV1(V1FallbackWriters.scala:79) at org.apache.spark.sql.execution.datasources.v2.SupportsV1Write.writeWithV1$(V1FallbackWriters.scala:78) at org.apache.spark.sql.execution.datasources.v2.AppendDataExecV1.writeWithV1(V1FallbackWriters.scala:34) at org.apache.spark.sql.execution.datasources.v2.V1FallbackWriters.run(V1FallbackWriters.scala:66) at org.apache.spark.sql.execution.datasources.v2.V1FallbackWriters.run$(V1FallbackWriters.scala:65) at org.apache.spark.sql.execution.datasources.v2.AppendDataExecV1.run(V1FallbackWriters.scala:34) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:108) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:104) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:104) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:88) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:82) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:136) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:901) at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:661) at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:595) at sun.reflect.GeneratedMethodAccessor252.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301) at org.apache.spark.sql.execution.OptimizeWriteExchangeExec.doExecute(OptimizeWriteExchangeExec.scala:65) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:230) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:268) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:265) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:226) at org.apache.spark.sql.delta.constraints.DeltaInvariantCheckerExec.doExecute(DeltaInvariantCheckerExec.scala:72) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:230) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:268) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:265) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:226) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:218) ... 61 more Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 4083:0 was 182487775 bytes, which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values. at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2682) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2618) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2617) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2617) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1190) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1190) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1190) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2870) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2812) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2801) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) Error processing file: /lakehouse/default/Files/C4SQLStage/DBTZBA/KW10972.dbo.DBTZBA.csv, Error: An error occurred while calling o4958.saveAsTable. : org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:283) at org.apache.spark.sql.delta.files.TransactionalWrite.$anonfun$writeFiles$1(TransactionalWrite.scala:456) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:391) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:355) at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:103) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:221) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:218) at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:103) at org.apache.spark.sql.delta.commands.WriteIntoDelta.write(WriteIntoDelta.scala:337) at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:98) at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:91) at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:252) at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:91) at org.apache.spark.sql.delta.catalog.WriteIntoDeltaBuilder$$anon$1$$anon$2.insert(DeltaTableV2.scala:279) at org.apache.spark.sql.execution.datasources.v2.SupportsV1Write.writeWithV1(V1FallbackWriters.scala:79) at org.apache.spark.sql.execution.datasources.v2.SupportsV1Write.writeWithV1$(V1FallbackWriters.scala:78) at org.apache.spark.sql.execution.datasources.v2.AppendDataExecV1.writeWithV1(V1FallbackWriters.scala:34) at org.apache.spark.sql.execution.datasources.v2.V1FallbackWriters.run(V1FallbackWriters.scala:66) at org.apache.spark.sql.execution.datasources.v2.V1FallbackWriters.run$(V1FallbackWriters.scala:65) at org.apache.spark.sql.execution.datasources.v2.AppendDataExecV1.run(V1FallbackWriters.scala:34) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:108) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:104) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:104) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:88) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:82) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:136) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:901) at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:661) at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:595) at sun.reflect.GeneratedMethodAccessor252.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301) at org.apache.spark.sql.execution.OptimizeWriteExchangeExec.doExecute(OptimizeWriteExchangeExec.scala:65) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:230) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:268) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:265) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:226) at org.apache.spark.sql.delta.constraints.DeltaInvariantCheckerExec.doExecute(DeltaInvariantCheckerExec.scala:72) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:230) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:268) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:265) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:226) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:218) ... 61 more Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 4155:0 was 144432865 bytes, which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values. at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2682) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2618) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2617) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2617) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1190) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1190) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1190) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2870) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2812) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2801) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

Helpful resources

Announcements
December Fabric Update Carousel

Fabric Monthly Update - December 2025

Check out the December 2025 Fabric Holiday Recap!

FabCon Atlanta 2026 carousel

FabCon Atlanta 2026

Join us at FabCon Atlanta, March 16-20, for the ultimate Fabric, Power BI, AI and SQL community-led event. Save $200 with code FABCOMM.