Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Join us for an expert-led overview of the tools and concepts you'll need to become a Certified Power BI Data Analyst and pass exam PL-300. Register now.

Reply
AlvinB
Frequent Visitor

Spark error - It appears that you are attempting to reference SparkContext

Hi all,

In Notebook in Power BI Service by using PySpark, I made a function to calculate the business days:

def fx_calBusinessDays(pStartDate, pEndDate) :
    vStartDateStr = pStartDate.strftime("%Y-%m-%d")
    vEndDateStr = pEndDate.strftime("%Y-%m-%d")

    if pEndDate < pStartDate :
        vStartDateStr = pEndDate.strftime("%Y-%m-%d")
        vEndDateStr = pStartDate.strftime("%Y-%m-%d")
    
    noOfDays = abs(pEndDate - pStartDate).days
    # print(f"noOfDays = {noOfDays}")


    sqlStr = f"""select * from Calendar where Date between '{vStartDateStr}' and '{vEndDateStr}' and (IsWeekend = 1 or IsPublicHoliday = 1)"""
    df = spark.sql(sqlStr)
    # print(f"noOfDays = {df.count()}")

    return noOfDays - df.count()

 

And I wsas calling this from another function like below:

df = spark.sql("select * from TABLE limit 5")

def fx_getOutstandingDays(DateA, DateB):
    vLocalTimeZone = pytz.timezone('blurblur')
    toDateTime = datetime.now(vLocalTimeZone)
    toDate = toDateTime.date()
    print(toDate)

    if DateA is not None :
        return 0
    else :
        kkk = fx_calBusinessDays(DateB, toDate ) <-- Error in calling a function defined outside
        return kkk 

vUdfRegister = udf(fx_getOutstandingDays, IntegerType())
df_with_cat = df.withColumn("New_column_name", vUdfRegister(df["DateA"], df["DateB"]))
display(df_with_cat)

 

Error shows:

Traceback (most recent call last): File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 459, in dumps return cloudpickle.dumps(obj, pickle_protocol) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps cp.dump(obj) File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle/cloudpickle_fast.py", line 632, in dump return Pickler.dump(self, obj) ^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 467, in __getnewargs__ raise PySparkRuntimeError( pyspark.errors.exceptions.base.PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

 

I was searching and probably I'm calling the function wrongly but I do not know why.

I appreciate for your help on this.

1 ACCEPTED SOLUTION

Hi @AlvinB

I have reviewed your scenario and was able to reproduce the issue you encountered when trying to calculate business days using a PySpark UDF in a Power BI Notebook with my sample codes.

As suspected, the root cause is due to trying to use spark.sql() (which depends on Spark content) inside a UDF, which runs on worker nodes  this is not supported in Spark's distributed execution environment.
Below are the steps tried from my end :
Step 1:

 

from pyspark.sql.types import StructType, StructField, DateType, IntegerType
from datetime import datetime

calendar_data = [
    ("2025-05-19", 0, 0),
    ("2025-05-20", 0, 0),
    ("2025-05-21", 0, 1), 
    ("2025-05-22", 0, 0),
    ("2025-05-23", 0, 0),
    ("2025-05-24", 1, 0),  
    ("2025-05-25", 1, 0),  
    ("2025-05-26", 0, 0),
]

schema = StructType([
    StructField("Date", DateType(), True),
    StructField("IsWeekend", IntegerType(), True),
    StructField("IsPublicHoliday", IntegerType(), True),
])

calendar_df = spark.createDataFrame(
    [(datetime.strptime(d[0], "%Y-%m-%d").date(), d[1], d[2]) for d in calendar_data],
    schema=schema
)

calendar_df.show()

 

Step 2:

 

data = [
    (datetime.strptime("2025-05-19", "%Y-%m-%d").date(), None),
    (None, datetime.strptime("2025-05-19", "%Y-%m-%d").date()),
    (None, datetime.strptime("2025-05-23", "%Y-%m-%d").date()),
    (datetime.strptime("2025-05-21", "%Y-%m-%d").date(), None),
]

schema2 = StructType([
    StructField("DateA", DateType(), True),
    StructField("DateB", DateType(), True),
])

df = spark.createDataFrame(data, schema=schema2)

df.show()

 

Step 3:

 

from pyspark.sql.functions import col

non_business_dates = set(
    row.Date for row in calendar_df.filter((col("IsWeekend") == 1) | (col("IsPublicHoliday") == 1)).collect()
)

broadcast_non_business = spark.sparkContext.broadcast(non_business_dates)

 

Step 4:

 

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from datetime import timedelta, datetime

def calc_business_days(start_date, end_date):
    if start_date is None or end_date is None:
        return None

    if start_date > end_date:
        start_date, end_date = end_date, start_date

    count = 0
    current = start_date
    while current <= end_date:
        if current not in broadcast_non_business.value:
            count += 1
        current += timedelta(days=1)
    return count

def fx_getOutstandingDays(DateA, DateB):
    fixed_today = datetime.strptime("2025-05-26", "%Y-%m-%d").date()

    if DateA is not None:
        return 0
    else:
        return calc_business_days(DateB, fixed_today)

fx_getOutstandingDays_udf = udf(fx_getOutstandingDays, IntegerType())

 

Step 5:

 

from pyspark.sql.functions import col

df_with_business_days = df.withColumn("OutstandingDays", fx_getOutstandingDays_udf(col("DateA"), col("DateB")))

df_with_business_days.show()

 

To help you move forward, I restructured the logic to:

  • Load the calendar table outside the UDF.
  • Broadcast the non-business days (weekends & holidays).
  • Calculate business days using pure Python logic inside the UDF.

I was able to achieve the expected output as per your requirement.
Please refer to the screenshot below showing the correct business day results:

vhjannapu_1-1748269506229.png

If the response has addressed your query, please Accept it as a solution and give a 'Kudos' so other members can easily find it.

Best Regards,
Harshitha.
Community Support Team

View solution in original post

5 REPLIES 5
AlvinB
Frequent Visitor

I am quite new to PySpark & Python and I got no idea how to transform my code. Any code suggestion for above code would be appreciated.

Hi @AlvinB

I have reviewed your scenario and was able to reproduce the issue you encountered when trying to calculate business days using a PySpark UDF in a Power BI Notebook with my sample codes.

As suspected, the root cause is due to trying to use spark.sql() (which depends on Spark content) inside a UDF, which runs on worker nodes  this is not supported in Spark's distributed execution environment.
Below are the steps tried from my end :
Step 1:

 

from pyspark.sql.types import StructType, StructField, DateType, IntegerType
from datetime import datetime

calendar_data = [
    ("2025-05-19", 0, 0),
    ("2025-05-20", 0, 0),
    ("2025-05-21", 0, 1), 
    ("2025-05-22", 0, 0),
    ("2025-05-23", 0, 0),
    ("2025-05-24", 1, 0),  
    ("2025-05-25", 1, 0),  
    ("2025-05-26", 0, 0),
]

schema = StructType([
    StructField("Date", DateType(), True),
    StructField("IsWeekend", IntegerType(), True),
    StructField("IsPublicHoliday", IntegerType(), True),
])

calendar_df = spark.createDataFrame(
    [(datetime.strptime(d[0], "%Y-%m-%d").date(), d[1], d[2]) for d in calendar_data],
    schema=schema
)

calendar_df.show()

 

Step 2:

 

data = [
    (datetime.strptime("2025-05-19", "%Y-%m-%d").date(), None),
    (None, datetime.strptime("2025-05-19", "%Y-%m-%d").date()),
    (None, datetime.strptime("2025-05-23", "%Y-%m-%d").date()),
    (datetime.strptime("2025-05-21", "%Y-%m-%d").date(), None),
]

schema2 = StructType([
    StructField("DateA", DateType(), True),
    StructField("DateB", DateType(), True),
])

df = spark.createDataFrame(data, schema=schema2)

df.show()

 

Step 3:

 

from pyspark.sql.functions import col

non_business_dates = set(
    row.Date for row in calendar_df.filter((col("IsWeekend") == 1) | (col("IsPublicHoliday") == 1)).collect()
)

broadcast_non_business = spark.sparkContext.broadcast(non_business_dates)

 

Step 4:

 

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from datetime import timedelta, datetime

def calc_business_days(start_date, end_date):
    if start_date is None or end_date is None:
        return None

    if start_date > end_date:
        start_date, end_date = end_date, start_date

    count = 0
    current = start_date
    while current <= end_date:
        if current not in broadcast_non_business.value:
            count += 1
        current += timedelta(days=1)
    return count

def fx_getOutstandingDays(DateA, DateB):
    fixed_today = datetime.strptime("2025-05-26", "%Y-%m-%d").date()

    if DateA is not None:
        return 0
    else:
        return calc_business_days(DateB, fixed_today)

fx_getOutstandingDays_udf = udf(fx_getOutstandingDays, IntegerType())

 

Step 5:

 

from pyspark.sql.functions import col

df_with_business_days = df.withColumn("OutstandingDays", fx_getOutstandingDays_udf(col("DateA"), col("DateB")))

df_with_business_days.show()

 

To help you move forward, I restructured the logic to:

  • Load the calendar table outside the UDF.
  • Broadcast the non-business days (weekends & holidays).
  • Calculate business days using pure Python logic inside the UDF.

I was able to achieve the expected output as per your requirement.
Please refer to the screenshot below showing the correct business day results:

vhjannapu_1-1748269506229.png

If the response has addressed your query, please Accept it as a solution and give a 'Kudos' so other members can easily find it.

Best Regards,
Harshitha.
Community Support Team

Thanks heaps. It works.

AlvinB
Frequent Visitor

I merged all into one function for a easy test but it has the same error:

from    datetime import *
from    pyspark.sql.functions import udf
from    pyspark.sql.types import IntegerType, DateType
import  pytz

# Read from the table
df = spark.sql("select * from TABLE limit 5")

def fx_getOutstandingDays(DateA, DateB):

    if DateA is not None:
        return 0

    # Get the current time in the local timezone
    localTimeZone = pytz.timezone('Somewhere_region')
    toDateTime = datetime.now(localTimeZone)
    toDate = toDateTime.date()

    startDateStr = DateB.strftime("%Y-%m-%d")
    endDateStr = toDate.strftime("%Y-%m-%d")

    if DateB > toDate :
        startDateStr = toDate.strftime("%Y-%m-%d")
        endDateStr = DateB.strftime("%Y-%m-%d")
    
    noOfDays = abs(toDate - DateB).days

    sqlStr = f"""select * from Calendar where Date between '{startDateStr}' and '{endDateStr}' and (IsWeekend = 1 or IsPublicHoliday = 1)"""
    df = spark.sql(sqlStr)

    return noOfDays - df.count()

udfRegister = udf(fx_getOutstandingDays, IntegerType())
df_new = df.withColumn("BusinessDays", udfRegister(df["DateA"], df["DateB"]))
display(df_new)

 

And the error looks related with the UDF function...:

PicklingError                             Traceback (most recent call last)
Cell In[8], line 34
     31     return noOfDays - df.count()
     33 udfRegister = udf(fx_getOutstandingDays, IntegerType())
---> 34 df_new = df.withColumn("BusinessDays", udfRegister(df["DateA"], df["DateB"]))
     35 display(df_new)

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/udf.py:423, in UserDefinedFunction._wrapped.<locals>.wrapper(*args)
    421 @functools.wraps(self.func, assigned=assignments)
    422 def wrapper(*args: "ColumnOrName") -> Column:
--> 423     return self(*args)

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/udf.py:400, in UserDefinedFunction.__call__(self, *cols)
    398         sc.profiler_collector.add_profiler(id, memory_profiler)
    399 else:
--> 400     judf = self._judf
    401     jPythonUDF = judf.apply(_to_seq(sc, cols, _to_java_column))
    402 return Column(jPythonUDF)

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/udf.py:321, in UserDefinedFunction._judf(self)
    314 @property
    315 def _judf(self) -> JavaObject:
    316     # It is possible that concurrent access, to newly created UDF,
    317     # will initialize multiple UserDefinedPythonFunctions.
    318     # This is unlikely, doesn't affect correctness,
    319     # and should have a minimal performance impact.
    320     if self._judf_placeholder is None:
--> 321         self._judf_placeholder = self._create_judf(self.func)
    322     return self._judf_placeholder

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/udf.py:330, in UserDefinedFunction._create_judf(self, func)
    327 spark = SparkSession._getActiveSessionOrCreate()
    328 sc = spark.sparkContext
--> 330 wrapped_func = _wrap_function(sc, func, self.returnType)
    331 jdt = spark._jsparkSession.parseDataType(self.returnType.json())
    332 assert sc._jvm is not None

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/udf.py:59, in _wrap_function(sc, func, returnType)
     57 else:
     58     command = (func, returnType)
---> 59 pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
     60 assert sc._jvm is not None
     61 return sc._jvm.SimplePythonFunction(
     62     bytearray(pickled_command),
     63     env,
   (...)
     68     sc._javaAccumulator,
     69 )

File /opt/spark/python/lib/pyspark.zip/pyspark/rdd.py:5251, in _prepare_for_python_RDD(sc, command)
   5248 def _prepare_for_python_RDD(sc: "SparkContext", command: Any) -> Tuple[bytes, Any, Any, Any]:
   5249     # the serialized command will be compressed by broadcast
   5250     ser = CloudPickleSerializer()
-> 5251     pickled_command = ser.dumps(command)
   5252     assert sc._jvm is not None
   5253     if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc):  # Default 1M
   5254         # The broadcast will have same life cycle as created PythonRDD

File /opt/spark/python/lib/pyspark.zip/pyspark/serializers.py:469, in CloudPickleSerializer.dumps(self, obj)
    467     msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
    468 print_exec(sys.stderr)
--> 469 raise pickle.PicklingError(msg)

PicklingError: Could not serialize object: PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

 

Any help would be appreciated.

Hi @AlvinB,
Thank you for reaching out to the Microsoft fabric community forum.

When using Spark, remember that SparkContext and functions like spark.sql() or df.count() only work on the driver, not inside UDFs, which run on worker nodes. If you use these inside a UDF, Spark will give a serialization error because it can't send driver-only code to the workers. To avoid this, do all Spark operations outside the UDF. If the UDF needs data like a calendar, load it first and share it using a broadcast variable. Do any complex work before creating the UDF, and only pass simple values into it so everything works smoothly with Spark’s setup.


If the response has addressed your query, please Accept it as a solution and give a 'Kudos' so other members can easily find it

Best Regards,
Harshitha.
Community Support Team

Helpful resources

Announcements
Join our Fabric User Panel

Join our Fabric User Panel

This is your chance to engage directly with the engineering team behind Fabric and Power BI. Share your experiences and shape the future.

June 2025 Power BI Update Carousel

Power BI Monthly Update - June 2025

Check out the June 2025 Power BI update to learn about new features.

June 2025 community update carousel

Fabric Community Update - June 2025

Find out what's new and trending in the Fabric community.