Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Enhance your career with this limited time 50% discount on Fabric and Power BI exams. Ends August 31st. Request your voucher.

Reply
mkj1213
Helper I
Helper I

Running notebook not doing anything with no error

I am trying to execute the following code from a notebook on an F64 Trial capacity. My Stock_Price table is more than 600 Billion, (yes billion not million) rows .... 

 

from pyspark.sql.functions import col, date_format
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, DateType, LongType



# Load the data using DataFrame API
df = spark.read.table("LH.Stock_Price")
# Create an RDD from the DataFrame
rdd = df.select(
    col("TYPE"),
    col("Vendor"),
    col("TIMESTAMP").cast("date").alias("DATE"),
    col("ID")
).rdd
# Map to key-value pairs
mapped_rdd = rdd.map(lambda row: ((row["TYPE"], row["Vendor"], row["DATE"]), (1, {row["ID"]})))
# Use reduceByKey to aggregate counts and distinct counts
reduced_rdd = mapped_rdd.reduceByKey(lambda a, b: (a[0] + b[0], a[1].union(b[1])))
# Map the results to the desired schema
result_rdd = reduced_rdd.map(lambda x: Row(
    TYPE=x[0][0],
    Vendor=x[0][1],
    DATE=x[0][2],
    ID_COUNT=x[1][0],
    ID_DISTINCT_COUNT=len(x[1][1])
))
# Define the schema for the resulting DataFrame
schema = StructType([
    StructField("TYPE", StringType(), True),
    StructField("Vendor", StringType(), True),
    StructField("DATE", DateType(), True),
    StructField("ID_COUNT", LongType(), True),
    StructField("ID_DISTINCT_COUNT", LongType(), True)
])

# Convert the RDD back to a DataFrame
result_df = spark.createDataFrame(result_rdd, schema)

# Add a new column for the month partition
result_df = result_df.withColumn("MONTH", date_format(col("DATE"), "yyyy-MM"))

result_df.write.format("delta").mode("overwrite").partitionBy("MONTH").option("overwriteSchema", "true").save("Tables/Stocks")

 

when running the code it run for around 50 seconds then it go to job, where the status remain in progress at a certain task number for long time (24 hours) without giving any errorr, and at the same time it shows that the processed is 0 rows, data read is 0B and Data write is also 0 B

mkj1213_0-1722335781951.png

how can i check what is going wrong? and is there anything i can do to optimize my code?

1 ACCEPTED SOLUTION
Anonymous
Not applicable

HI @mkj1213,

I'd like to suggest you use print and datetime function to output the message about each steps spend time to confirm which step lagged.

 

#import datetime to calculate
from datetime import datetime

print("step name")
start_datetime=datetime.now()

#your code

end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)

 

Sample code:

 

from pyspark.sql.functions import col, date_format
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, DateType, LongType

#import datetime to calculate
from datetime import datetime

# Load the data using DataFrame API
print("Load data")
start_datetime=datetime.now()

df = spark.read.table("LH.Stock_Price")

end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)

# Create an RDD from the DataFrame
print("Create an RDD")
start_datetime=datetime.now()

rdd = df.select(
    col("TYPE"),
    col("Vendor"),
    col("TIMESTAMP").cast("date").alias("DATE"),
    col("ID")
).rdd

end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)

 

Regards,

Xiaoxin Sheng

View solution in original post

1 REPLY 1
Anonymous
Not applicable

HI @mkj1213,

I'd like to suggest you use print and datetime function to output the message about each steps spend time to confirm which step lagged.

 

#import datetime to calculate
from datetime import datetime

print("step name")
start_datetime=datetime.now()

#your code

end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)

 

Sample code:

 

from pyspark.sql.functions import col, date_format
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, DateType, LongType

#import datetime to calculate
from datetime import datetime

# Load the data using DataFrame API
print("Load data")
start_datetime=datetime.now()

df = spark.read.table("LH.Stock_Price")

end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)

# Create an RDD from the DataFrame
print("Create an RDD")
start_datetime=datetime.now()

rdd = df.select(
    col("TYPE"),
    col("Vendor"),
    col("TIMESTAMP").cast("date").alias("DATE"),
    col("ID")
).rdd

end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)

 

Regards,

Xiaoxin Sheng

Helpful resources

Announcements
July 2025 community update carousel

Fabric Community Update - July 2025

Find out what's new and trending in the Fabric community.

June FBC25 Carousel

Fabric Monthly Update - June 2025

Check out the June 2025 Fabric update to learn about new features.