Check your eligibility for this 50% exam voucher offer and join us for free live learning sessions to get prepared for Exam DP-700.
Get StartedJoin us at the 2025 Microsoft Fabric Community Conference. March 31 - April 2, Las Vegas, Nevada. Use code FABINSIDER for $400 discount. Register now
I am trying to execute the following code from a notebook on an F64 Trial capacity. My Stock_Price table is more than 600 Billion, (yes billion not million) rows ....
from pyspark.sql.functions import col, date_format
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, DateType, LongType
# Load the data using DataFrame API
df = spark.read.table("LH.Stock_Price")
# Create an RDD from the DataFrame
rdd = df.select(
col("TYPE"),
col("Vendor"),
col("TIMESTAMP").cast("date").alias("DATE"),
col("ID")
).rdd
# Map to key-value pairs
mapped_rdd = rdd.map(lambda row: ((row["TYPE"], row["Vendor"], row["DATE"]), (1, {row["ID"]})))
# Use reduceByKey to aggregate counts and distinct counts
reduced_rdd = mapped_rdd.reduceByKey(lambda a, b: (a[0] + b[0], a[1].union(b[1])))
# Map the results to the desired schema
result_rdd = reduced_rdd.map(lambda x: Row(
TYPE=x[0][0],
Vendor=x[0][1],
DATE=x[0][2],
ID_COUNT=x[1][0],
ID_DISTINCT_COUNT=len(x[1][1])
))
# Define the schema for the resulting DataFrame
schema = StructType([
StructField("TYPE", StringType(), True),
StructField("Vendor", StringType(), True),
StructField("DATE", DateType(), True),
StructField("ID_COUNT", LongType(), True),
StructField("ID_DISTINCT_COUNT", LongType(), True)
])
# Convert the RDD back to a DataFrame
result_df = spark.createDataFrame(result_rdd, schema)
# Add a new column for the month partition
result_df = result_df.withColumn("MONTH", date_format(col("DATE"), "yyyy-MM"))
result_df.write.format("delta").mode("overwrite").partitionBy("MONTH").option("overwriteSchema", "true").save("Tables/Stocks")
when running the code it run for around 50 seconds then it go to job, where the status remain in progress at a certain task number for long time (24 hours) without giving any errorr, and at the same time it shows that the processed is 0 rows, data read is 0B and Data write is also 0 B
how can i check what is going wrong? and is there anything i can do to optimize my code?
Solved! Go to Solution.
HI @mkj1213,
I'd like to suggest you use print and datetime function to output the message about each steps spend time to confirm which step lagged.
#import datetime to calculate
from datetime import datetime
print("step name")
start_datetime=datetime.now()
#your code
end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)
Sample code:
from pyspark.sql.functions import col, date_format
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, DateType, LongType
#import datetime to calculate
from datetime import datetime
# Load the data using DataFrame API
print("Load data")
start_datetime=datetime.now()
df = spark.read.table("LH.Stock_Price")
end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)
# Create an RDD from the DataFrame
print("Create an RDD")
start_datetime=datetime.now()
rdd = df.select(
col("TYPE"),
col("Vendor"),
col("TIMESTAMP").cast("date").alias("DATE"),
col("ID")
).rdd
end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)
Regards,
Xiaoxin Sheng
HI @mkj1213,
I'd like to suggest you use print and datetime function to output the message about each steps spend time to confirm which step lagged.
#import datetime to calculate
from datetime import datetime
print("step name")
start_datetime=datetime.now()
#your code
end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)
Sample code:
from pyspark.sql.functions import col, date_format
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, DateType, LongType
#import datetime to calculate
from datetime import datetime
# Load the data using DataFrame API
print("Load data")
start_datetime=datetime.now()
df = spark.read.table("LH.Stock_Price")
end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)
# Create an RDD from the DataFrame
print("Create an RDD")
start_datetime=datetime.now()
rdd = df.select(
col("TYPE"),
col("Vendor"),
col("TIMESTAMP").cast("date").alias("DATE"),
col("ID")
).rdd
end_datetime= datetime.now()
duration = end_datetime - start_datetime
print("finish with: ",duration)
Regards,
Xiaoxin Sheng
March 31 - April 2, 2025, in Las Vegas, Nevada. Use code MSCUST for a $150 discount!
Check out the February 2025 Fabric update to learn about new features.
User | Count |
---|---|
26 | |
3 | |
3 | |
2 | |
2 |
User | Count |
---|---|
11 | |
8 | |
7 | |
5 | |
4 |