Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Join us at the 2025 Microsoft Fabric Community Conference. March 31 - April 2, Las Vegas, Nevada. Use code FABINSIDER for $400 discount. Register now

Reply
mkj1213
Helper I
Helper I

Running notebook not doing anything with no error

I am trying to execute the following code from a notebook on an F64 Trial capacity. My Stock_Price table is more than 600 Billion, (yes billion not million) rows .... 

 

from pyspark.sql.functions import col, date_format
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, DateType, LongType



# Load the data using DataFrame API
df = spark.read.table("LH.Stock_Price")
# Create an RDD from the DataFrame
rdd = df.select(
    col("TYPE"),
    col("Vendor"),
    col("TIMESTAMP").cast("date").alias("DATE"),
    col("ID")
).rdd
# Map to key-value pairs
mapped_rdd = rdd.map(lambda row: ((row["TYPE"], row["Vendor"], row["DATE"]), (1, {row["ID"]})))
# Use reduceByKey to aggregate counts and distinct counts
reduced_rdd = mapped_rdd.reduceByKey(lambda a, b: (a[0] + b[0], a[1].union(b[1])))
# Map the results to the desired schema
result_rdd = reduced_rdd.map(lambda x: Row(
    TYPE=x[0][0],
    Vendor=x[0][1],
    DATE=x[0][2],
    ID_COUNT=x[1][0],
    ID_DISTINCT_COUNT=len(x[1][1])
))
# Define the schema for the resulting DataFrame
schema = StructType([
    StructField("TYPE", StringType(), True),
    StructField("Vendor", StringType(), True),
    StructField("DATE", DateType(), True),
    StructField("ID_COUNT", LongType(), True),
    StructField("ID_DISTINCT_COUNT", LongType(), True)
])

# Convert the RDD back to a DataFrame
result_df = spark.createDataFrame(result_rdd, schema)

# Add a new column for the month partition
result_df = result_df.withColumn("MONTH", date_format(col("DATE"), "yyyy-MM"))

result_df.write.format("delta").mode("overwrite").partitionBy("MONTH").option("overwriteSchema", "true").save("Tables/Stocks")

 

when running the code it run for around 50 seconds then it go to job, where the status remain in progress at a certain task number for long time (24 hours) without giving any errorr, and at the same time it shows that the processed is 0 rows, data read is 0B and Data write is also 0 B

mkj1213_0-1722335781951.png

how can i check what is going wrong? and is there anything i can do to optimize my code?

1 ACCEPTED SOLUTION
v-shex-msft
Community Support
Community Support

HI @mkj1213,

I'd like to suggest you use print and datetime function to output the message about each steps spend time to confirm which step lagged.

 

#import datetime to calculate
from datetime import datetime

print("step name")
start_datetime=datetime.now()

#your code

end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)

 

Sample code:

 

from pyspark.sql.functions import col, date_format
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, DateType, LongType

#import datetime to calculate
from datetime import datetime

# Load the data using DataFrame API
print("Load data")
start_datetime=datetime.now()

df = spark.read.table("LH.Stock_Price")

end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)

# Create an RDD from the DataFrame
print("Create an RDD")
start_datetime=datetime.now()

rdd = df.select(
    col("TYPE"),
    col("Vendor"),
    col("TIMESTAMP").cast("date").alias("DATE"),
    col("ID")
).rdd

end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)

 

Regards,

Xiaoxin Sheng

Community Support Team _ Xiaoxin
If this post helps, please consider accept as solution to help other members find it more quickly.

View solution in original post

1 REPLY 1
v-shex-msft
Community Support
Community Support

HI @mkj1213,

I'd like to suggest you use print and datetime function to output the message about each steps spend time to confirm which step lagged.

 

#import datetime to calculate
from datetime import datetime

print("step name")
start_datetime=datetime.now()

#your code

end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)

 

Sample code:

 

from pyspark.sql.functions import col, date_format
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, DateType, LongType

#import datetime to calculate
from datetime import datetime

# Load the data using DataFrame API
print("Load data")
start_datetime=datetime.now()

df = spark.read.table("LH.Stock_Price")

end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)

# Create an RDD from the DataFrame
print("Create an RDD")
start_datetime=datetime.now()

rdd = df.select(
    col("TYPE"),
    col("Vendor"),
    col("TIMESTAMP").cast("date").alias("DATE"),
    col("ID")
).rdd

end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)

 

Regards,

Xiaoxin Sheng

Community Support Team _ Xiaoxin
If this post helps, please consider accept as solution to help other members find it more quickly.

Helpful resources

Announcements
Las Vegas 2025

Join us at the Microsoft Fabric Community Conference

March 31 - April 2, 2025, in Las Vegas, Nevada. Use code MSCUST for a $150 discount!

FebFBC_Carousel

Fabric Monthly Update - February 2025

Check out the February 2025 Fabric update to learn about new features.

Feb2025 NL Carousel

Fabric Community Update - February 2025

Find out what's new and trending in the Fabric community.