Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Don't miss out! 2025 Microsoft Fabric Community Conference, March 31 - April 2, Las Vegas, Nevada. Use code MSCUST for a $150 discount. Prices go up February 11th. Register now.

Reply
cmilanes932211
Frequent Visitor

Parallelization on Fabric using TF_on Spark

Hi, im trying to print a hello world using this code:

import os
import datetime

import numpy as np
import pandas as pd

# PySpark / Spark
from pyspark.sql import SparkSession

# TensorFlowOnSpark
from tensorflowonspark import TFCluster, TFNode

spark = SparkSession.builder \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", "1") \
    .config("spark.dynamicAllocation.enabled", "false") \
    .config("spark.shuffle.service.enabled", "false") \
    .getOrCreate()

def map_fun(tf_args, ctx😞
    cluster, server = TFNode.start_cluster_server(ctx)
    print('ctx')
    if ctx.job_name == "ps":
        server.join()
    else:
        print("Hello from worker", ctx.task_index)

# Configuración consistente
cluster = TFCluster.run(
    sc=spark.sparkContext,
    map_fun=map_fun,
    tf_args={},
    num_executors=2,  # Debe coincidir con spark.executor.instances
    num_ps=0,         # Número de servidores de parámetros
    input_mode=TFCluster.InputMode.SPARK
)

# RDD vacío para el entrenamiento
rdd = spark.sparkContext.parallelize([])
cluster.train(rdd, 1)
#cluster.shutdown()

The idea is that if it works use it to run a LSTM using this config.
8 REPLIES 8
cmilanes932211
Frequent Visitor

Thanks a lot, but there is an issue. Sorry for the missunderstanding.

Your code primarily disables GPU usage and lists the available devices for TensorFlow to confirm it's using the CPU. While it serves as a basic example to ensure TensorFlow is configured correctly on a local machine, it doesn't align with what I need for my use case.

My Objective

I am working on running multiple neural network models in parallel, specifically leveraging a Spark cluster in Azure Fabric to maximize cluster utilization. The goal is to:

  1. Execute independent TensorFlow models on Spark executors in parallel.
  2. Perform a proof-of-concept using a sample use case and extend it to more workloads.

Why the Code Falls Short

  1. No Parallel Execution:

    • The provided code runs a simple main() function that prints "Hello, World!" and lists TensorFlow devices. It does not demonstrate any parallel execution or utilization of multiple models or Spark resources.
  2. Local Execution Only:

    • The code is designed to run locally on a single machine and does not integrate with Spark or distribute tasks across a cluster.
  3. No Spark Integration:

    • There is no use of Spark for distributing workloads or managing parallel execution, which is critical for maximizing cluster resource utilization.

What I’m Looking For

To align with my goal, I need:

  1. Code that integrates TensorFlow with Spark to distribute the execution of multiple neural network models across the cluster.
  2. A framework or approach that maximizes the use of Spark executors and ensures TensorFlow tasks efficiently utilize the allocated resources (e.g., CPU or GPU).

If you have suggestions or examples related to running TensorFlow models in parallel on a Spark cluster, I’d greatly appreciate your input!

cmilanes932211
Frequent Visitor

Thanks a lot, but there is an issue. Sorry for the missunderstanding.

Your code primarily disables GPU usage and lists the available devices for TensorFlow to confirm it's using the CPU. While it serves as a basic example to ensure TensorFlow is configured correctly on a local machine, it doesn't align with what I need for my use case.

My Objective

I am working on running multiple neural network models in parallel, specifically leveraging a Spark cluster in Azure Fabric to maximize cluster utilization. The goal is to:

  1. Execute independent TensorFlow models on Spark executors in parallel.
  2. Perform a proof-of-concept using a sample use case and extend it to more workloads.

Why the Code Falls Short

  1. No Parallel Execution:

    • The provided code runs a simple main() function that prints "Hello, World!" and lists TensorFlow devices. It does not demonstrate any parallel execution or utilization of multiple models or Spark resources.
  2. Local Execution Only:

    • The code is designed to run locally on a single machine and does not integrate with Spark or distribute tasks across a cluster.
  3. No Spark Integration:

    • There is no use of Spark for distributing workloads or managing parallel execution, which is critical for maximizing cluster resource utilization.

What I’m Looking For

To align with my goal, I need:

  1. Code that integrates TensorFlow with Spark to distribute the execution of multiple neural network models across the cluster.
  2. A framework or approach that maximizes the use of Spark executors and ensures TensorFlow tasks efficiently utilize the allocated resources (e.g., CPU or GPU).

If you have suggestions or examples related to running TensorFlow models in parallel on a Spark cluster, I’d greatly appreciate your input!

cmilanes932211
Frequent Visitor

Thanks again for your answer, but there is an issue. Your code primarily disables GPU usage and lists the available devices for TensorFlow to confirm it's using the CPU. While it serves as a basic example to ensure TensorFlow is configured correctly on a local machine, it doesn't align with what I need for my use case.

My Objective

I am working on running multiple neural network models in parallel, specifically leveraging a Spark cluster in Azure Fabric to maximize cluster utilization. The goal is to:

  1. Execute independent TensorFlow models on Spark executors in parallel.
  2. Perform a proof-of-concept using a sample use case and extend it to more workloads.

Why the Code Falls Short

  1. No Parallel Execution:

    • The provided code runs a simple main() function that prints "Hello, World!" and lists TensorFlow devices. It does not demonstrate any parallel execution or utilization of multiple models or Spark resources.
  2. Local Execution Only:

    • The code is designed to run locally on a single machine and does not integrate with Spark or distribute tasks across a cluster.
  3. No Spark Integration:

    • There is no use of Spark for distributing workloads or managing parallel execution, which is critical for maximizing cluster resource utilization.

What I’m Looking For

To align with my goal, I need:

  1. Code that integrates TensorFlow with Spark to distribute the execution of multiple neural network models across the cluster.
  2. A framework or approach that maximizes the use of Spark executors and ensures TensorFlow tasks efficiently utilize the allocated resources (e.g., CPU or GPU).

If you have suggestions or examples related to running TensorFlow models in parallel on a Spark cluster, I’d greatly appreciate your input!

cmilanes932211
Frequent Visitor

Hi, @v-veshwara-msft this is the error:

eeding partition <itertools.chain object at 0x79effedf9150> into input queue <multiprocessing.queues.JoinableQueue object at 0x79efffb109d0>
2025-01-20 15:12:43.268392: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2025-01-20 15:12:43.268770: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2025-01-20 15:12:43,271 ERROR TaskResources [Executor task launch worker for task 7.0 in stage 14.0 (TID 19)]: Task 19 failed by error: 
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1248, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1238, in process
    out_iter = func(split_index, iterator)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 5434, in pipeline_func
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 5434, in pipeline_func
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 5434, in pipeline_func
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 840, in func
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1795, in func
  File "/home/trusted-service-user/cluster-env/clonedenv/lib/python3.11/site-packages/tensorflowonspark/TFSparkNode.py", line 511, in _train
    raise Exception("Exception in worker:\n" + e_str)
Exception: Exception in worker:
Traceback (most recent call last):
  File "/home/trusted-service-user/cluster-env/clonedenv/lib/python3.11/site-packages/tensorflowonspark/TFSparkNode.py", line 427, in wrapper_fn_background
    wrapper_fn(args, context)
  File "/home/trusted-service-user/cluster-env/clonedenv/lib/python3.11/site-packages/tensorflowonspark/TFSparkNode.py", line 421, in wrapper_fn
    fn(args, context)
  File "/tmp/ipykernel_17400/3281000864.py", line 23, in map_fun
  File "/home/trusted-service-user/cluster-env/trident_env/lib/python3.11/site-packages/tensorflowonspark/TFNode.py", line 91, in start_cluster_server
    raise Exception("DEPRECATED: Use higher-level APIs like `tf.keras` or `tf.estimator`")
Exception: DEPRECATED: Use higher-level APIs like `tf.keras` or `tf.estimator`


the problem is that it does not print the hello world

v-veshwara-msft
Community Support
Community Support

Hi @cmilanes932211 ,

Thank you for posting in the Microsoft Fabric Community.

The code you provided looks good. Have you tried running it, and did you encounter any errors? If so, please share the error details so we can assist you in resolving them.

Best Regards,
Vinay.

LogLastModifiedTime:Wed Jan 22 20:46:14 +0000 2025
LogLength:508
LogContents:
WARN StatusConsoleListener The use of package scanning to locate plugins is deprecated and will be removed in a future release
WARN StatusConsoleListener The use of package scanning to locate plugins is deprecated and will be removed in a future release
WARN StatusConsoleListener The use of package scanning to locate plugins is deprecated and will be removed in a future release
WARN StatusConsoleListener The use of package scanning to locate plugins is deprecated and will be removed in a future release
End of LogType:stdout.This log file belongs to a running container (container_1737578301594_0001_01_000003) and so may not be complete.

2025-01-22 20:47:34,935 ERROR TaskResources [Executor task launch worker for task 7.0 in stage 14.0 (TID 19)]: Task 19 failed by error: 
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1248, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1238, in process
    out_iter = func(split_index, iterator)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 5434, in pipeline_func
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 5434, in pipeline_func
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 5434, in pipeline_func
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 840, in func
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1795, in func
  File "/home/trusted-service-user/cluster-env/clonedenv/lib/python3.11/site-packages/tensorflowonspark/TFSparkNode.py", line 511, in _train
    raise Exception("Exception in worker:\n" + e_str)
Exception: Exception in worker:
Traceback (most recent call last):
  File "/home/trusted-service-user/cluster-env/clonedenv/lib/python3.11/site-packages/tensorflowonspark/TFSparkNode.py", line 427, in wrapper_fn_background
    wrapper_fn(args, context)
  File "/home/trusted-service-user/cluster-env/clonedenv/lib/python3.11/site-packages/tensorflowonspark/TFSparkNode.py", line 421, in wrapper_fn
    fn(args, context)
  File "/tmp/ipykernel_13752/3281000864.py", line 23, in map_fun
  File "/home/trusted-service-user/cluster-env/trident_env/lib/python3.11/site-packages/tensorflowonspark/TFNode.py", line 91, in start_cluster_server
    raise Exception("DEPRECATED: Use higher-level APIs like `tf.keras` or `tf.estimator`")
Exception: DEPRECATED: Use higher-level APIs like `tf.keras` or `tf.estimator`


	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1056)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2603)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
2025-01-22 20:47:34,944 ERROR Executor [Executor task launch worker for task 7.0 in stage 14.0 (TID 19)]: Exception in task 7.0 in stage 14.0 (TID 19)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1248, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1238, in process
    out_iter = func(split_index, iterator)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 5434, in pipeline_func
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 5434, in pipeline_func
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 5434, in pipeline_func
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 840, in func
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1795, in func
  File "/home/trusted-service-user/cluster-env/clonedenv/lib/python3.11/site-packages/tensorflowonspark/TFSparkNode.py", line 511, in _train
    raise Exception("Exception in worker:\n" + e_str)
Exception: Exception in worker:
Traceback (most recent call last):
  File "/home/trusted-service-user/cluster-env/clonedenv/lib/python3.11/site-packages/tensorflowonspark/TFSparkNode.py", line 427, in wrapper_fn_background
    wrapper_fn(args, context)
  File "/home/trusted-service-user/cluster-env/clonedenv/lib/python3.11/site-packages/tensorflowonspark/TFSparkNode.py", line 421, in wrapper_fn
    fn(args, context)
  File "/tmp/ipykernel_13752/3281000864.py", line 23, in map_fun
  File "/home/trusted-service-user/cluster-env/trident_env/lib/python3.11/site-packages/tensorflowonspark/TFNode.py", line 91, in start_cluster_server
    raise Exception("DEPRECATED: Use higher-level APIs like `tf.keras` or `tf.estimator`")
Exception: DEPRECATED: Use higher-level APIs like `tf.keras` or `tf.estimator`


Hi @cmilanes932211 ,
Thank you for sharing the details and logs. After analyzing the errors, here’s a summary of the issues and the resolution:
1. Deprecated APIs:
--APIs like TFNode.start_cluster_server are specific to frameworks like TensorFlowOnSpark, which might be outdated or less frequently maintained.
--Modern TensorFlow applications avoid these lower-level APIs in favor of built-in, high-level abstractions like tf.keras.
2. CUDA and GPU Errors:
--The logs show that CUDA drivers are missing, and attempts to initialize GPU operations fail.
--This suggests that your environment lacks GPU support or proper drivers, causing TensorFlow to fall back to the CPU.
--Additionally, the errors about registering factories (e.g., cuFFT, cuDNN) occur due to redundant initializations of CUDA plugins.
3. Desired Output Issue:
--Despite running, the script doesn’t print "Hello, World!", which may stem from focusing solely on TensorFlow configuration instead of explicitly including the print statement.

To address these issues here is a simplified code. It ensures compatibility with your environment by explicitly disabling GPU usage (as per the missing CUDA drivers).

import os
import tensorflow as tf

# Disable GPU usage
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

def main():
print("Hello, World!")

# Verify TensorFlow is using the CPU
print("Devices available:")
for device in tf.config.list_physical_devices():
print(f" - {device.device_type}: {device.name}")

if __name__ == "__main__":
main()


This eliminates the dependency on older or deprecated components like TensorFlowOnSpark.
It uses a direct Python print statement to achieve the desired "Hello, World!" output while ensuring TensorFlow initializes properly, confirming compatibility.

Please try this solution and let us know if further assistance is required.

If this post helps, then please consider Accept it as the solution to help the other members find it more quickly and a kudos would be appreciated.

Best Regards.

 

Thanks a lot, but there is an issue. Sorry for the missunderstanding.

Your code primarily disables GPU usage and lists the available devices for TensorFlow to confirm it's using the CPU. While it serves as a basic example to ensure TensorFlow is configured correctly on a local machine, it doesn't align with what I need for my use case.

My Objective

I am working on running multiple neural network models in parallel, specifically leveraging a Spark cluster in Azure Fabric to maximize cluster utilization. The goal is to:

  1. Execute independent TensorFlow models on Spark executors in parallel.
  2. Perform a proof-of-concept using a sample use case and extend it to more workloads.

Why the Code Falls Short

  1. No Parallel Execution:

    • The provided code runs a simple main() function that prints "Hello, World!" and lists TensorFlow devices. It does not demonstrate any parallel execution or utilization of multiple models or Spark resources.
  2. Local Execution Only:

    • The code is designed to run locally on a single machine and does not integrate with Spark or distribute tasks across a cluster.
  3. No Spark Integration:

    • There is no use of Spark for distributing workloads or managing parallel execution, which is critical for maximizing cluster resource utilization.

What I’m Looking For

To align with my goal, I need:

  1. Code that integrates TensorFlow with Spark to distribute the execution of multiple neural network models across the cluster.
  2. A framework or approach that maximizes the use of Spark executors and ensures TensorFlow tasks efficiently utilize the allocated resources (e.g., CPU or GPU).

If you have suggestions or examples related to running TensorFlow models in parallel on a Spark cluster, I’d greatly appreciate your input!

Helpful resources

Announcements
Las Vegas 2025

Join us at the Microsoft Fabric Community Conference

March 31 - April 2, 2025, in Las Vegas, Nevada. Use code MSCUST for a $150 discount! Prices go up Feb. 11th.

JanFabricDE_carousel

Fabric Monthly Update - January 2025

Explore the power of Python Notebooks in Fabric!

JanFabricDW_carousel

Fabric Monthly Update - January 2025

Unlock the latest Fabric Data Warehouse upgrades!

JanFabricDF_carousel

Fabric Monthly Update - January 2025

Take your data replication to the next level with Fabric's latest updates!

Top Solution Authors
Top Kudoed Authors