Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Enhance your career with this limited time 50% discount on Fabric and Power BI exams. Ends August 31st. Request your voucher.

Reply
syl-ade
Helper I
Helper I

Integrating Data from Microsoft Fabric's Lakehouse with Neo4j Graph Database

Hello,

 

I've been unsuccessfully trying to integrate data Microsoft Fabric's Lakehouse with Neo4j Graph Database. 

I have run this code snippets: neo4j-partners/neo4j-microsoft-fabric

 

My code looks like that:

cell 1

 

 

 

%%pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Neo4jSparkConnector") \
    .config("spark.jars.packages", "org.neo4j:neo4j-connector-apache-spark_2.12:5.3.2_for_spark_3") \
    .getOrCreate()

 

 

 

cell 2

 

 

%%spark
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("Neo4j Notebook")
  .config("spark.jars.packages", "org.neo4j:neo4j-connector-apache-spark_2.12:4.0.0_for_spark_3")
  .getOrCreate()

 

 

 

result: 

 

 

import org.apache.spark.sql.SparkSession

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@7c037b94

 

 

 

 

cell 3

 

 

%%spark

// Set base path
val absfss_Base_Path = "abfss://Neo4j_Workspace1@onelake.dfs.fabric.microsoft.com/Northwind_Lakehouse.Lakehouse/Files/Northwind/"

// Import required libraries
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper

// Create Spark session
val spark = SparkSession.builder().appName("Neo4j Notebook").getOrCreate()

 

 

 

result:

 

 

absfss_Base_Path: String = abfss://Neo4j_Workspace1@onelake.dfs.fabric.microsoft.com/Northwind_Lakehouse.Lakehouse/Files/Northwind/
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@5dfa3492

 

 

 

 

cell 4

 

 

%%spark
// Read Northwind data files
val customerDF = spark.read.option("header", true).csv(absfss_Base_Path + "customers.csv")
val supplierDF = spark.read.option("header", true).csv(absfss_Base_Path + "suppliers.csv")
val stagedOrderDF = spark.read.option("header", true).csv(absfss_Base_Path + "orders.csv")
  .withColumn("addressID", concat_ws(", ", col("shipName"), col("shipAddress"), 
  col("shipCity"), col("shipRegion"), col("shipPostalCode"), col("shipCountry")))
val orderDetailDF = spark.read.option("header", true).csv(absfss_Base_Path + "order-details.csv")
val productDF = spark.read.option("header", true).csv(absfss_Base_Path + "products.csv")
val categoryDF = spark.read.option("header", true).csv(absfss_Base_Path + "categories.csv")

//create seperate addressesDF and finalize orderDF
val addressDF = stagedOrderDF
 .select($"addressID", 
    $"shipName".alias("name"), 
    $"shipAddress".alias("address"), 
    $"shipCity".alias("city"), 
    $"shipRegion".alias("region"), 
    $"shipPostalCode".alias("postalCode"), 
    $"shipCountry".alias("country"))
 .dropDuplicates("addressID")
val orderDF = stagedOrderDF.drop("shipName","shipAddress", "shipCity", "shipRegion", "shipPostalCode", "shipCountry")

 

 

 

result:

 

 

customerDF: org.apache.spark.sql.DataFrame = [customerID: string, companyName: string ... 10 more fields]
supplierDF: org.apache.spark.sql.DataFrame = [supplierID: string, companyName: string ... 10 more fields]
stagedOrderDF: org.apache.spark.sql.DataFrame = [orderID: string, customerID: string ... 13 more fields]
orderDetailDF: org.apache.spark.sql.DataFrame = [orderID: string, productID: string ... 3 more fields]
productDF: org.apache.spark.sql.DataFrame = [productID: string, productName: string ... 8 more fields]
categoryDF: org.apache.spark.sql.DataFrame = [categoryID: string, categoryName: string ... 2 more fields]
addressDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [addressID: string, name: string ... 5 more fields]
orderDF: org.apache.spark.sql.DataFrame = [orderID: string, customerID: string ... 7 more fields]

 

 

 

 

cell 5

 

 

%%spark
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

val spark = SparkSession.builder()
  .appName("Neo4j Notebook")
  .config("spark.jars.packages", "org.neo4j:neo4j-connector-apache-spark_2.12:5.2.3_for_spark_3")
  .getOrCreate()

 

 

 

result:

 

 

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@5dfa3492

 

 

 

 

cell 6

 

 

%%spark
// Load JSON file for Neo4j credentials
val jsonString = spark.read.text(absfss_Base_Path + "neo4j-conn.json").as[String].collect().mkString("\n")

// Parse JSON string
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
val data = mapper.readValue[Map[String, Any]](jsonString)

// Extract Neo4j connection details
val neo4jUrl = data("NEO4J_URL").asInstanceOf[String]
val neo4jUsername = data("NEO4J_USERNAME").asInstanceOf[String]
val neo4jPassword = data("NEO4J_PASSWORD").asInstanceOf[String]

 

 

 

result:

 

 

jsonString: String =
"

{
    "NEO4J_URL": "neo4j+s://80c19ba0.databases.neo4j.io",
    "NEO4J_USERNAME": "neo4j",
    "NEO4J_PASSWORD": "**MY PASSWORD**",
    "AURA_INSTANCEID": "80c19ba0",
    "AURA_INSTANCENAME": "Instance01"
  }"
warning: one deprecation (since 2.12.1); for details, enable `:setting -deprecation' or `:replay -deprecation'
mapper: com.fasterxml.jackson.databind.ObjectMapper with com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper = $default$anon$1@d83795a
res23: com.fasterxml.jackson.databind.ObjectMapper = $default$anon$1@d83795a
data: Map[String,Any] = Map(NEO4J_PASSWORD -> **MY PASSWORD**, NEO4J_URL -> neo4j+s://80c19ba0.databases.neo4j.io, NEO4J_USERNAME -> neo4j, AURA_INSTANCEID -> 80c19ba0, AURA_INSTANCENAME -> Instance01)
neo4jUrl: String = neo4j+s://80c19ba0.databases.neo4j.io
neo4jUsername: String = neo4j
neo4jPassword: String = **MY PASSWORD**

 

 

 

cell 7

 

 

%%spark
import org.apache.spark.sql.{DataFrame, SaveMode}

// Write nodes to Neo4j
def writeNodesToNeo4j(dataFrame: DataFrame, label: String, nodeKey: String): Unit = {
  dataFrame.write.format("org.neo4j.spark.DataSource")
    .mode(SaveMode.Overwrite)
    .option("url", neo4jUrl)
    .option("authentication.basic.username", neo4jUsername)
    .option("authentication.basic.password", neo4jPassword)
    .option("labels", label)
    .option("node.keys", nodeKey)
    .option("schema.optimization.node.keys", "KEY") //create node key constraints under the hood
    .save()
}

writeNodesToNeo4j(customerDF, "Customer", "customerID")
writeNodesToNeo4j(supplierDF, "Supplier", "supplierID")
writeNodesToNeo4j(orderDF, "Order", "orderID")
writeNodesToNeo4j(productDF, "Product", "productID")
writeNodesToNeo4j(categoryDF, "Category", "categoryID")
writeNodesToNeo4j(addressDF, "Address", "addressID")

 

 

 

result:

Diagnostics: Spark_User_UserApp_ClassNotFound

 

 

org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: org.neo4j.spark.DataSource. Please find packages at `https://spark.apache.org/third-party-projects.html`.
  at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)

  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:650)

  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:700)

  at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:909)

  at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:276)

  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:250)

  at $defaultwriteNodesToNeo4j(<console>:62)

  ... 64 elided

Caused by: java.lang.ClassNotFoundException: org.neo4j.spark.DataSource.DefaultSource

  at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:72)

  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)

  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)

  at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:636)

  at scala.util.Try$.apply(Try.scala:213)

  at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:636)

  at scala.util.Failure.orElse(Try.scala:224)

  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:636)

  ... 69 more

 

 

 

 

The problem shows up on running the 7th cell of code. Any idea how to solve that?

 

1 ACCEPTED SOLUTION

Hi @syl-ade,

Thanks for your update. I'm glad you were able to create nodes successfully using your new approach. However, I see that your function writeNodesToNeo4j is still not working as expected.

  • In your function, you are using "org.neo4j.spark" as the format. However, it should be "org.neo4j.spark.DataSource", like in your working version.
  • Since your working approach included %%configure -f, ensure this configuration is present in the notebook before executing any Spark operations.
  • Run the %%configure -f block before running the function.
  • Try adding the following before calling writeNodesToNeo4j to ensure that Spark is recognizing the JAR correctly

Let me know if you still face issues! If this helps, please mark it as a solution and give a "Kudos" so other community members can find it easily.
Thank you.

View solution in original post

8 REPLIES 8
v-ssriganesh
Community Support
Community Support

Hi @syl-ade,
I wanted to check if you had the opportunity to review the information provided. Please feel free to contact us if you have any further questions. If my response has addressed your query, please accept it as a solution and give a 'Kudos' so other members can easily find it.
Thank you.

v-ssriganesh
Community Support
Community Support

Hi @syl-ade,

May I ask if you have resolved this issue? If so, please mark the helpful reply and accept it as the solution. This will be helpful for other community members who have similar problems to solve it faster.

Thank you.

v-ssriganesh
Community Support
Community Support

Hi @syl-ade,
Thanks for posting your query in Microsoft community forum.

The error message suggests that the Neo4j Spark Connector is not being properly recognized. Please try the following fixes:

  • In your code, different cells are using different versions of the Neo4j Spark Connector:

           Cell 1: 5.3.2_for_spark_3

           Cell 2: 4.0.0_for_spark_3

           Cell 5: 5.2.3_for_spark_3
To prevent conflicts, please update all your Spark session configurations to the latest stable version,                5.3.2_for_spark_3.

  • Kindly update your write function to utilize "org.neo4j.spark" instead of "org.neo4j.spark.DataSource". Please also ensure that all dependencies are properly loaded before executing the write function.

If this helps, kindly Accept it as a solution and give a "Kudos" so other members can find it more easily.
Thank you.

The solution provided did not help t-wth the problem. 
It started to create nodes with this code.

%%configure -f
{
  "conf": {
    "spark.jars": "abfss://Neo4j_Workspace1@onelake.dfs.fabric.microsoft.com/Northwind_Lakehouse.Lakehouse/Files/Northwind/neo4j-spark-connector-5.3.1-s_2.12.jar"
  }
}
import org.apache.spark.sql.{SparkSession, SaveMode}
import org.apache.spark.sql.functions._
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper


// Set base path
val absfss_Base_Path = "abfss://Neo4j_Workspace1@onelake.dfs.fabric.microsoft.com/Northwind_Lakehouse.Lakehouse/Files/Northwind/"

// Read Northwind data files
val customerDF = spark.read.option("header", true).csv(s"$absfss_Base_Path/customers.csv")
val supplierDF = spark.read.option("header", true).csv(s"$absfss_Base_Path/suppliers.csv")
val stagedOrderDF = spark.read.option("header", true).csv(s"$absfss_Base_Path/orders.csv")
  .withColumn("addressID", concat_ws(", ", col("shipName"), col("shipAddress"), col("shipCity"), col("shipRegion"), col("shipPostalCode"), col("shipCountry")))
val orderDetailDF = spark.read.option("header", true).csv(s"$absfss_Base_Path/order-details.csv")
val productDF = spark.read.option("header", true).csv(s"$absfss_Base_Path/products.csv")
val categoryDF = spark.read.option("header", true).csv(s"$absfss_Base_Path/categories.csv")

// Create separate addressesDF and finalize orderDF
val addressDF = stagedOrderDF
  .select($"addressID", $"shipName".alias("name"), $"shipAddress".alias("address"), $"shipCity".alias("city"), $"shipRegion".alias("region"), $"shipPostalCode".alias("postalCode"), $"shipCountry".alias("country"))
  .dropDuplicates("addressID")

val orderDF = stagedOrderDF.drop("shipName", "shipAddress", "shipCity", "shipRegion", "shipPostalCode", "shipCountry")

// Load JSON file for Neo4j credentials
val jsonString = spark.read.text(s"$absfss_Base_Path/neo4j-conn.json").as[String].collect().mkString("\n")

// Parse JSON string
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
val data = mapper.readValue[Map[String, Any]](jsonString)

// Extract Neo4j connection details
val neo4jUrl = data("NEO4J_URL").asInstanceOf[String]
val neo4jUsername = data("NEO4J_USERNAME").asInstanceOf[String]
val neo4jPassword = data("NEO4J_PASSWORD").asInstanceOf[String]

val neo4jOptions = Map(
  "url" -> neo4jUrl,
  "authentication.basic.username" -> neo4jUsername,
  "authentication.basic.password" -> neo4jPassword
)

customerDF.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.Overwrite)
  .options(neo4jOptions)
  .option("labels", "Customer")
  .option("node.keys", "customerID")
  .save()

supplierDF.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.Overwrite)
  .options(neo4jOptions)
  .option("labels", "Supplier")
  .option("node.keys", "supplierID")
  .save()

stagedOrderDF.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.Overwrite)
  .options(neo4jOptions)
  .option("labels", "Order")
  .option("node.keys", "orderID")
  .save()


 Yet i still do not know how to run this part succesfully

// Write nodes to Neo4j
def writeNodesToNeo4j(dataFrame: DataFrame, label: String, nodeKey: String): Unit = {
  dataFrame.write.format("org.neo4j.spark")
    .mode(SaveMode.Overwrite)
    .option("url", neo4jUrl)
    .option("authentication.basic.username", neo4jUsername)
    .option("authentication.basic.password", neo4jPassword)
    .option("labels", label)
    .option("node.keys", nodeKey)
    .option("schema.optimization.node.keys", "KEY") //create node key constraints under the hood
    .save()
}

writeNodesToNeo4j(customerDF, "Customer", "customerID")
writeNodesToNeo4j(supplierDF, "Supplier", "supplierID")
writeNodesToNeo4j(orderDF, "Order", "orderID")
writeNodesToNeo4j(productDF, "Product", "productID")
writeNodesToNeo4j(categoryDF, "Category", "categoryID")
writeNodesToNeo4j(addressDF, "Address", "addressID")

Hi @syl-ade,

Thanks for your update. I'm glad you were able to create nodes successfully using your new approach. However, I see that your function writeNodesToNeo4j is still not working as expected.

  • In your function, you are using "org.neo4j.spark" as the format. However, it should be "org.neo4j.spark.DataSource", like in your working version.
  • Since your working approach included %%configure -f, ensure this configuration is present in the notebook before executing any Spark operations.
  • Run the %%configure -f block before running the function.
  • Try adding the following before calling writeNodesToNeo4j to ensure that Spark is recognizing the JAR correctly

Let me know if you still face issues! If this helps, please mark it as a solution and give a "Kudos" so other community members can find it easily.
Thank you.

Hi @syl-ade,
I hope this information is helpful. Please let me know if you have any further questions or if you'd like to discuss this further. If this answers your question, please Accept it as a solution and give it a 'Kudos' so others can find it easily.
Thank you.

nilendraFabric
Super User
Super User

Hello @syl-ade 

 

why spark session is created 3 times?

 

I know it might not make lot of difference, but still asking 

I tried executing it in different forms and just did not cleaned it yet. As long as it works it does not bother me while testing the code. 

Helpful resources

Announcements
Fabric July 2025 Monthly Update Carousel

Fabric Monthly Update - July 2025

Check out the July 2025 Fabric update to learn about new features.

August 2025 community update carousel

Fabric Community Update - August 2025

Find out what's new and trending in the Fabric community.