Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Enhance your career with this limited time 50% discount on Fabric and Power BI exams. Ends August 31st. Request your voucher.

Reply
priyankabis
Helper I
Helper I

Unable to use a hardcoded date column in Delta table merge api

Hi

 

I am trying to insert a hardcoded data in the Delta table merge operation both during creation and update. However, the Delta table merge api is giving erorrs 

 

My code to create by update columns

 update_columns = {"effective_end_date": to_date(lit(feed_date_fmt),'MM/dd/yyyy'), "latest": lit(0),"processed_dttm": to_timestamp(lit(processed_dttm),date_time_format) }
 
Delta merge
 accounts_delta_table.alias("target")\
                .merge(source = final_df.alias("source"),
                 condition = "target.entity_id = source.merge_key and target.latest = 1 ") \
                 .whenMatchedUpdate(set = update_columns ) \
                 .whenNotMatchedInsert(values = create_columns) \
                 .execute()  
Error:
Py4JJavaError: An error occurred while calling o93436.execute. : org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322) at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:396) at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$2(MergeIntoCommand.scala:368) at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$2$adapted(MergeIntoCommand.scala:258) at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:237) at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$1(MergeIntoCommand.scala:258) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:141) at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:139) at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordFrameProfile(MergeIntoCommand.scala:84) at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:134) at com.microsoft.spark.telemetry.delta.SynapseLoggingShim.recordOperation(SynapseLoggingShim.scala:111) at com.microsoft.spark.telemetry.delta.SynapseLoggingShim.recordOperation$(SynapseLoggingShim.scala:93) at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordOperation(MergeIntoCommand.scala:84) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:133) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:123) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:113) at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordDeltaOperation(MergeIntoCommand.scala:84) at org.apache.spark.sql.delta.commands.MergeIntoCommand.runMerge(MergeIntoCommand.scala:256) at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runOrig$1(MergeIntoCommand.scala:249) at org.apache.spark.sql.delta.commands.merge.MergeIntoMaterializeSource.runWithMaterializedSourceLostRetries(MergeIntoMaterializeSource.scala:103) at org.apache.spark.sql.delta.commands.merge.MergeIntoMaterializeSource.runWithMaterializedSourceLostRetries$(MergeIntoMaterializeSource.scala:91) at org.apache.spark.sql.delta.commands.MergeIntoCommand.runWithMaterializedSourceLostRetries(MergeIntoCommand.scala:84) at org.apache.spark.sql.delta.commands.MergeIntoCommand.runOrig(MergeIntoCommand.scala:249) at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$run$2(MergeIntoCommand.scala:212) at org.apache.spark.sql.delta.sources.SQLConfUtils$.withGlutenDisabled(SQLConfUtils.scala:39) at org.apache.spark.sql.delta.commands.MergeIntoCommand.run(MergeIntoCommand.scala:212) at io.delta.tables.DeltaMergeBuilder.executeImpl(DeltaMergeBuilder.scala:306) at io.delta.tables.DeltaMergeBuilder.$anonfun$execute$2(DeltaMergeBuilder.scala:269) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at io.delta.tables.DeltaMergeBuilder.withActiveSession(DeltaMergeBuilder.scala:277) at io.delta.tables.DeltaMergeBuilder.$anonfun$execute$1(DeltaMergeBuilder.scala:269) at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:105) at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:91) at io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError(DeltaMergeBuilder.scala:148) at io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:266) at jdk.internal.reflect.GeneratedMethodAccessor414.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322) at org.apache.spark.sql.execution.OptimizeWriteExchangeExec.doExecute(OptimizeWriteExchangeExec.scala:74) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:282) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:279) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:227) at org.apache.spark.sql.delta.constraints.DeltaInvariantCheckerExec.doExecute(DeltaInvariantCheckerExec.scala:72) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:282) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:279) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:227) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$1(FileFormatWriter.scala:255) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:297) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:238) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:219) at org.apache.spark.sql.delta.files.TransactionalWrite.$anonfun$writeFiles$1(TransactionalWrite.scala:424) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:214) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:100) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:67) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:388) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:348) at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:139) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:216) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:213) at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:139) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:226) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:225) at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:139) at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$writeAllChanges$1(MergeIntoCommand.scala:1110) at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordMergeOperation(MergeIntoCommand.scala:1274) at org.apache.spark.sql.delta.commands.MergeIntoCommand.writeAllChanges(MergeIntoCommand.scala:829) at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$7(MergeIntoCommand.scala:333) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27) at org.apache.spark.sql.delta.commands.MergeIntoCommand.withStatusCode(MergeIntoCommand.scala:84) at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$6(MergeIntoCommand.scala:329) at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$12(MergeIntoCommand.scala:368) at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:393) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1557.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1557.0 (TID 19002) (vm-48472153 executor 3): org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: to_date(06/26/2024, Some(MM/dd/yyyy), Some(UTC)) at org.apache.spark.SparkException$.internalError(SparkException.scala:77) at org.apache.spark.SparkException$.internalError(SparkException.scala:81) at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:66) at org.apache.spark.sql.catalyst.expressions.RuntimeReplaceable.eval(Expression.scala:408) at org.apache.spark.sql.catalyst.expressions.RuntimeReplaceable.eval$(Expression.scala:407) at org.apache.spark.sql.catalyst.expressions.ParseToDate.eval(datetimeExpressions.scala:2044) at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:81) at org.apache.spark.sql.delta.commands.MergeIntoCommand$JoinedRowProcessor.$anonfun$processPartition$12(MergeIntoCommand.scala:1404) at scala.collection.immutable.List.map(List.scala:293) at org.apache.spark.sql.delta.commands.MergeIntoCommand$JoinedRowProcessor.processRow$1(MergeIntoCommand.scala:1404) at org.apache.spark.sql.delta.commands.MergeIntoCommand$JoinedRowProcessor.$anonfun$processPartition$13(MergeIntoCommand.scala:1413) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:764) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:168) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)
1 ACCEPTED SOLUTION

Hi

I can confirm that the merge condition is right, When i remove the date columns from update_columns or create_columns then the merge is successful.

Also when i use the same date or timestamp column in withColumn then i see no issues. 

View solution in original post

2 REPLIES 2
Anonymous
Not applicable

Hi @priyankabis ,

It looks like you’re trying to perform a merge operation on a Delta table with hardcoded data, but encountering errors. Here are a few things to check and consider:

 

Firstly, ensure that the data types and formats of the columns in update_columns and create_columns match those in the Delta table schema. For example, to_date and to_timestamp should be correctly formatted.

 

The condition in the merge function should be a valid SQL expression. Ensure that target.entity_id = source.merge_key and target.latest = 1 is correctly referencing the columns.


Finally you need to make sure that the column names in update_columns and create_columns are exactly the same as those in the Delta table.

 

 

 

Best Regards

Yilong Zhou

If this post helps, then please consider Accept it as the solution to help the other members find it more quickly.

Hi

I can confirm that the merge condition is right, When i remove the date columns from update_columns or create_columns then the merge is successful.

Also when i use the same date or timestamp column in withColumn then i see no issues. 

Helpful resources

Announcements
Fabric July 2025 Monthly Update Carousel

Fabric Monthly Update - July 2025

Check out the July 2025 Fabric update to learn about new features.

July 2025 community update carousel

Fabric Community Update - July 2025

Find out what's new and trending in the Fabric community.

Top Solution Authors