Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Microsoft is giving away 50,000 FREE Microsoft Certification exam vouchers. Get Fabric certified for FREE! Learn more

Reply
mkj1213
Helper I
Helper I

Running notebook not doing anything with no error

I am trying to execute the following code from a notebook on an F64 Trial capacity. My Stock_Price table is more than 600 Billion, (yes billion not million) rows .... 

 

from pyspark.sql.functions import col, date_format
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, DateType, LongType



# Load the data using DataFrame API
df = spark.read.table("LH.Stock_Price")
# Create an RDD from the DataFrame
rdd = df.select(
    col("TYPE"),
    col("Vendor"),
    col("TIMESTAMP").cast("date").alias("DATE"),
    col("ID")
).rdd
# Map to key-value pairs
mapped_rdd = rdd.map(lambda row: ((row["TYPE"], row["Vendor"], row["DATE"]), (1, {row["ID"]})))
# Use reduceByKey to aggregate counts and distinct counts
reduced_rdd = mapped_rdd.reduceByKey(lambda a, b: (a[0] + b[0], a[1].union(b[1])))
# Map the results to the desired schema
result_rdd = reduced_rdd.map(lambda x: Row(
    TYPE=x[0][0],
    Vendor=x[0][1],
    DATE=x[0][2],
    ID_COUNT=x[1][0],
    ID_DISTINCT_COUNT=len(x[1][1])
))
# Define the schema for the resulting DataFrame
schema = StructType([
    StructField("TYPE", StringType(), True),
    StructField("Vendor", StringType(), True),
    StructField("DATE", DateType(), True),
    StructField("ID_COUNT", LongType(), True),
    StructField("ID_DISTINCT_COUNT", LongType(), True)
])

# Convert the RDD back to a DataFrame
result_df = spark.createDataFrame(result_rdd, schema)

# Add a new column for the month partition
result_df = result_df.withColumn("MONTH", date_format(col("DATE"), "yyyy-MM"))

result_df.write.format("delta").mode("overwrite").partitionBy("MONTH").option("overwriteSchema", "true").save("Tables/Stocks")

 

when running the code it run for around 50 seconds then it go to job, where the status remain in progress at a certain task number for long time (24 hours) without giving any errorr, and at the same time it shows that the processed is 0 rows, data read is 0B and Data write is also 0 B

mkj1213_0-1722335781951.png

how can i check what is going wrong? and is there anything i can do to optimize my code?

1 ACCEPTED SOLUTION
v-shex-msft
Community Support
Community Support

HI @mkj1213,

I'd like to suggest you use print and datetime function to output the message about each steps spend time to confirm which step lagged.

 

#import datetime to calculate
from datetime import datetime

print("step name")
start_datetime=datetime.now()

#your code

end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)

 

Sample code:

 

from pyspark.sql.functions import col, date_format
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, DateType, LongType

#import datetime to calculate
from datetime import datetime

# Load the data using DataFrame API
print("Load data")
start_datetime=datetime.now()

df = spark.read.table("LH.Stock_Price")

end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)

# Create an RDD from the DataFrame
print("Create an RDD")
start_datetime=datetime.now()

rdd = df.select(
    col("TYPE"),
    col("Vendor"),
    col("TIMESTAMP").cast("date").alias("DATE"),
    col("ID")
).rdd

end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)

 

Regards,

Xiaoxin Sheng

Community Support Team _ Xiaoxin
If this post helps, please consider accept as solution to help other members find it more quickly.

View solution in original post

1 REPLY 1
v-shex-msft
Community Support
Community Support

HI @mkj1213,

I'd like to suggest you use print and datetime function to output the message about each steps spend time to confirm which step lagged.

 

#import datetime to calculate
from datetime import datetime

print("step name")
start_datetime=datetime.now()

#your code

end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)

 

Sample code:

 

from pyspark.sql.functions import col, date_format
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, DateType, LongType

#import datetime to calculate
from datetime import datetime

# Load the data using DataFrame API
print("Load data")
start_datetime=datetime.now()

df = spark.read.table("LH.Stock_Price")

end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)

# Create an RDD from the DataFrame
print("Create an RDD")
start_datetime=datetime.now()

rdd = df.select(
    col("TYPE"),
    col("Vendor"),
    col("TIMESTAMP").cast("date").alias("DATE"),
    col("ID")
).rdd

end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)

 

Regards,

Xiaoxin Sheng

Community Support Team _ Xiaoxin
If this post helps, please consider accept as solution to help other members find it more quickly.

Helpful resources

Announcements
Notebook Gallery Carousel1

NEW! Community Notebooks Gallery

Explore and share Fabric Notebooks to boost Power BI insights in the new community notebooks gallery.

April2025 Carousel

Fabric Community Update - April 2025

Find out what's new and trending in the Fabric community.