Power BI is turning 10, and we’re marking the occasion with a special community challenge. Use your creativity to tell a story, uncover trends, or highlight something unexpected.
Get startedJoin us for an expert-led overview of the tools and concepts you'll need to become a Certified Power BI Data Analyst and pass exam PL-300. Register now.
Hi all,
In Notebook in Power BI Service by using PySpark, I made a function to calculate the business days:
def fx_calBusinessDays(pStartDate, pEndDate) :
vStartDateStr = pStartDate.strftime("%Y-%m-%d")
vEndDateStr = pEndDate.strftime("%Y-%m-%d")
if pEndDate < pStartDate :
vStartDateStr = pEndDate.strftime("%Y-%m-%d")
vEndDateStr = pStartDate.strftime("%Y-%m-%d")
noOfDays = abs(pEndDate - pStartDate).days
# print(f"noOfDays = {noOfDays}")
sqlStr = f"""select * from Calendar where Date between '{vStartDateStr}' and '{vEndDateStr}' and (IsWeekend = 1 or IsPublicHoliday = 1)"""
df = spark.sql(sqlStr)
# print(f"noOfDays = {df.count()}")
return noOfDays - df.count()
And I wsas calling this from another function like below:
df = spark.sql("select * from TABLE limit 5")
def fx_getOutstandingDays(DateA, DateB):
vLocalTimeZone = pytz.timezone('blurblur')
toDateTime = datetime.now(vLocalTimeZone)
toDate = toDateTime.date()
print(toDate)
if DateA is not None :
return 0
else :
kkk = fx_calBusinessDays(DateB, toDate ) <-- Error in calling a function defined outside
return kkk
vUdfRegister = udf(fx_getOutstandingDays, IntegerType())
df_with_cat = df.withColumn("New_column_name", vUdfRegister(df["DateA"], df["DateB"]))
display(df_with_cat)
Error shows:
Traceback (most recent call last): File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 459, in dumps return cloudpickle.dumps(obj, pickle_protocol) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps cp.dump(obj) File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle/cloudpickle_fast.py", line 632, in dump return Pickler.dump(self, obj) ^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 467, in __getnewargs__ raise PySparkRuntimeError( pyspark.errors.exceptions.base.PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
I was searching and probably I'm calling the function wrongly but I do not know why.
I appreciate for your help on this.
Solved! Go to Solution.
Hi @AlvinB,
I have reviewed your scenario and was able to reproduce the issue you encountered when trying to calculate business days using a PySpark UDF in a Power BI Notebook with my sample codes.
As suspected, the root cause is due to trying to use spark.sql() (which depends on Spark content) inside a UDF, which runs on worker nodes this is not supported in Spark's distributed execution environment.
Below are the steps tried from my end :
Step 1:
from pyspark.sql.types import StructType, StructField, DateType, IntegerType
from datetime import datetime
calendar_data = [
("2025-05-19", 0, 0),
("2025-05-20", 0, 0),
("2025-05-21", 0, 1),
("2025-05-22", 0, 0),
("2025-05-23", 0, 0),
("2025-05-24", 1, 0),
("2025-05-25", 1, 0),
("2025-05-26", 0, 0),
]
schema = StructType([
StructField("Date", DateType(), True),
StructField("IsWeekend", IntegerType(), True),
StructField("IsPublicHoliday", IntegerType(), True),
])
calendar_df = spark.createDataFrame(
[(datetime.strptime(d[0], "%Y-%m-%d").date(), d[1], d[2]) for d in calendar_data],
schema=schema
)
calendar_df.show()
Step 2:
data = [
(datetime.strptime("2025-05-19", "%Y-%m-%d").date(), None),
(None, datetime.strptime("2025-05-19", "%Y-%m-%d").date()),
(None, datetime.strptime("2025-05-23", "%Y-%m-%d").date()),
(datetime.strptime("2025-05-21", "%Y-%m-%d").date(), None),
]
schema2 = StructType([
StructField("DateA", DateType(), True),
StructField("DateB", DateType(), True),
])
df = spark.createDataFrame(data, schema=schema2)
df.show()
Step 3:
from pyspark.sql.functions import col
non_business_dates = set(
row.Date for row in calendar_df.filter((col("IsWeekend") == 1) | (col("IsPublicHoliday") == 1)).collect()
)
broadcast_non_business = spark.sparkContext.broadcast(non_business_dates)
Step 4:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from datetime import timedelta, datetime
def calc_business_days(start_date, end_date):
if start_date is None or end_date is None:
return None
if start_date > end_date:
start_date, end_date = end_date, start_date
count = 0
current = start_date
while current <= end_date:
if current not in broadcast_non_business.value:
count += 1
current += timedelta(days=1)
return count
def fx_getOutstandingDays(DateA, DateB):
fixed_today = datetime.strptime("2025-05-26", "%Y-%m-%d").date()
if DateA is not None:
return 0
else:
return calc_business_days(DateB, fixed_today)
fx_getOutstandingDays_udf = udf(fx_getOutstandingDays, IntegerType())
Step 5:
from pyspark.sql.functions import col
df_with_business_days = df.withColumn("OutstandingDays", fx_getOutstandingDays_udf(col("DateA"), col("DateB")))
df_with_business_days.show()
To help you move forward, I restructured the logic to:
I was able to achieve the expected output as per your requirement.
Please refer to the screenshot below showing the correct business day results:
If the response has addressed your query, please Accept it as a solution and give a 'Kudos' so other members can easily find it.
Best Regards,
Harshitha.
Community Support Team
I am quite new to PySpark & Python and I got no idea how to transform my code. Any code suggestion for above code would be appreciated.
Hi @AlvinB,
I have reviewed your scenario and was able to reproduce the issue you encountered when trying to calculate business days using a PySpark UDF in a Power BI Notebook with my sample codes.
As suspected, the root cause is due to trying to use spark.sql() (which depends on Spark content) inside a UDF, which runs on worker nodes this is not supported in Spark's distributed execution environment.
Below are the steps tried from my end :
Step 1:
from pyspark.sql.types import StructType, StructField, DateType, IntegerType
from datetime import datetime
calendar_data = [
("2025-05-19", 0, 0),
("2025-05-20", 0, 0),
("2025-05-21", 0, 1),
("2025-05-22", 0, 0),
("2025-05-23", 0, 0),
("2025-05-24", 1, 0),
("2025-05-25", 1, 0),
("2025-05-26", 0, 0),
]
schema = StructType([
StructField("Date", DateType(), True),
StructField("IsWeekend", IntegerType(), True),
StructField("IsPublicHoliday", IntegerType(), True),
])
calendar_df = spark.createDataFrame(
[(datetime.strptime(d[0], "%Y-%m-%d").date(), d[1], d[2]) for d in calendar_data],
schema=schema
)
calendar_df.show()
Step 2:
data = [
(datetime.strptime("2025-05-19", "%Y-%m-%d").date(), None),
(None, datetime.strptime("2025-05-19", "%Y-%m-%d").date()),
(None, datetime.strptime("2025-05-23", "%Y-%m-%d").date()),
(datetime.strptime("2025-05-21", "%Y-%m-%d").date(), None),
]
schema2 = StructType([
StructField("DateA", DateType(), True),
StructField("DateB", DateType(), True),
])
df = spark.createDataFrame(data, schema=schema2)
df.show()
Step 3:
from pyspark.sql.functions import col
non_business_dates = set(
row.Date for row in calendar_df.filter((col("IsWeekend") == 1) | (col("IsPublicHoliday") == 1)).collect()
)
broadcast_non_business = spark.sparkContext.broadcast(non_business_dates)
Step 4:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from datetime import timedelta, datetime
def calc_business_days(start_date, end_date):
if start_date is None or end_date is None:
return None
if start_date > end_date:
start_date, end_date = end_date, start_date
count = 0
current = start_date
while current <= end_date:
if current not in broadcast_non_business.value:
count += 1
current += timedelta(days=1)
return count
def fx_getOutstandingDays(DateA, DateB):
fixed_today = datetime.strptime("2025-05-26", "%Y-%m-%d").date()
if DateA is not None:
return 0
else:
return calc_business_days(DateB, fixed_today)
fx_getOutstandingDays_udf = udf(fx_getOutstandingDays, IntegerType())
Step 5:
from pyspark.sql.functions import col
df_with_business_days = df.withColumn("OutstandingDays", fx_getOutstandingDays_udf(col("DateA"), col("DateB")))
df_with_business_days.show()
To help you move forward, I restructured the logic to:
I was able to achieve the expected output as per your requirement.
Please refer to the screenshot below showing the correct business day results:
If the response has addressed your query, please Accept it as a solution and give a 'Kudos' so other members can easily find it.
Best Regards,
Harshitha.
Community Support Team
Thanks heaps. It works.
I merged all into one function for a easy test but it has the same error:
from datetime import *
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, DateType
import pytz
# Read from the table
df = spark.sql("select * from TABLE limit 5")
def fx_getOutstandingDays(DateA, DateB):
if DateA is not None:
return 0
# Get the current time in the local timezone
localTimeZone = pytz.timezone('Somewhere_region')
toDateTime = datetime.now(localTimeZone)
toDate = toDateTime.date()
startDateStr = DateB.strftime("%Y-%m-%d")
endDateStr = toDate.strftime("%Y-%m-%d")
if DateB > toDate :
startDateStr = toDate.strftime("%Y-%m-%d")
endDateStr = DateB.strftime("%Y-%m-%d")
noOfDays = abs(toDate - DateB).days
sqlStr = f"""select * from Calendar where Date between '{startDateStr}' and '{endDateStr}' and (IsWeekend = 1 or IsPublicHoliday = 1)"""
df = spark.sql(sqlStr)
return noOfDays - df.count()
udfRegister = udf(fx_getOutstandingDays, IntegerType())
df_new = df.withColumn("BusinessDays", udfRegister(df["DateA"], df["DateB"]))
display(df_new)
And the error looks related with the UDF function...:
PicklingError Traceback (most recent call last)
Cell In[8], line 34
31 return noOfDays - df.count()
33 udfRegister = udf(fx_getOutstandingDays, IntegerType())
---> 34 df_new = df.withColumn("BusinessDays", udfRegister(df["DateA"], df["DateB"]))
35 display(df_new)
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/udf.py:423, in UserDefinedFunction._wrapped.<locals>.wrapper(*args)
421 @functools.wraps(self.func, assigned=assignments)
422 def wrapper(*args: "ColumnOrName") -> Column:
--> 423 return self(*args)
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/udf.py:400, in UserDefinedFunction.__call__(self, *cols)
398 sc.profiler_collector.add_profiler(id, memory_profiler)
399 else:
--> 400 judf = self._judf
401 jPythonUDF = judf.apply(_to_seq(sc, cols, _to_java_column))
402 return Column(jPythonUDF)
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/udf.py:321, in UserDefinedFunction._judf(self)
314 @property
315 def _judf(self) -> JavaObject:
316 # It is possible that concurrent access, to newly created UDF,
317 # will initialize multiple UserDefinedPythonFunctions.
318 # This is unlikely, doesn't affect correctness,
319 # and should have a minimal performance impact.
320 if self._judf_placeholder is None:
--> 321 self._judf_placeholder = self._create_judf(self.func)
322 return self._judf_placeholder
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/udf.py:330, in UserDefinedFunction._create_judf(self, func)
327 spark = SparkSession._getActiveSessionOrCreate()
328 sc = spark.sparkContext
--> 330 wrapped_func = _wrap_function(sc, func, self.returnType)
331 jdt = spark._jsparkSession.parseDataType(self.returnType.json())
332 assert sc._jvm is not None
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/udf.py:59, in _wrap_function(sc, func, returnType)
57 else:
58 command = (func, returnType)
---> 59 pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
60 assert sc._jvm is not None
61 return sc._jvm.SimplePythonFunction(
62 bytearray(pickled_command),
63 env,
(...)
68 sc._javaAccumulator,
69 )
File /opt/spark/python/lib/pyspark.zip/pyspark/rdd.py:5251, in _prepare_for_python_RDD(sc, command)
5248 def _prepare_for_python_RDD(sc: "SparkContext", command: Any) -> Tuple[bytes, Any, Any, Any]:
5249 # the serialized command will be compressed by broadcast
5250 ser = CloudPickleSerializer()
-> 5251 pickled_command = ser.dumps(command)
5252 assert sc._jvm is not None
5253 if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc): # Default 1M
5254 # The broadcast will have same life cycle as created PythonRDD
File /opt/spark/python/lib/pyspark.zip/pyspark/serializers.py:469, in CloudPickleSerializer.dumps(self, obj)
467 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
468 print_exec(sys.stderr)
--> 469 raise pickle.PicklingError(msg)
PicklingError: Could not serialize object: PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
Any help would be appreciated.
Hi @AlvinB,
Thank you for reaching out to the Microsoft fabric community forum.
When using Spark, remember that SparkContext and functions like spark.sql() or df.count() only work on the driver, not inside UDFs, which run on worker nodes. If you use these inside a UDF, Spark will give a serialization error because it can't send driver-only code to the workers. To avoid this, do all Spark operations outside the UDF. If the UDF needs data like a calendar, load it first and share it using a broadcast variable. Do any complex work before creating the UDF, and only pass simple values into it so everything works smoothly with Spark’s setup.
If the response has addressed your query, please Accept it as a solution and give a 'Kudos' so other members can easily find it
Best Regards,
Harshitha.
Community Support Team
This is your chance to engage directly with the engineering team behind Fabric and Power BI. Share your experiences and shape the future.
Check out the June 2025 Power BI update to learn about new features.
User | Count |
---|---|
59 | |
33 | |
27 | |
25 | |
24 |
User | Count |
---|---|
63 | |
53 | |
32 | |
24 | |
20 |