The ultimate Fabric, Power BI, SQL, and AI community-led learning event. Save €200 with code FABCOMM.
Get registeredEnhance your career with this limited time 50% discount on Fabric and Power BI exams. Ends August 31st. Request your voucher.
Hello,
I've been unsuccessfully trying to integrate data Microsoft Fabric's Lakehouse with Neo4j Graph Database.
I have run this code snippets: neo4j-partners/neo4j-microsoft-fabric
My code looks like that:
cell 1
%%pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Neo4jSparkConnector") \
.config("spark.jars.packages", "org.neo4j:neo4j-connector-apache-spark_2.12:5.3.2_for_spark_3") \
.getOrCreate()
cell 2
%%spark
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Neo4j Notebook")
.config("spark.jars.packages", "org.neo4j:neo4j-connector-apache-spark_2.12:4.0.0_for_spark_3")
.getOrCreate()
result:
import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@7c037b94
cell 3
%%spark
// Set base path
val absfss_Base_Path = "abfss://Neo4j_Workspace1@onelake.dfs.fabric.microsoft.com/Northwind_Lakehouse.Lakehouse/Files/Northwind/"
// Import required libraries
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
// Create Spark session
val spark = SparkSession.builder().appName("Neo4j Notebook").getOrCreate()
result:
absfss_Base_Path: String = abfss://Neo4j_Workspace1@onelake.dfs.fabric.microsoft.com/Northwind_Lakehouse.Lakehouse/Files/Northwind/
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@5dfa3492
cell 4
%%spark
// Read Northwind data files
val customerDF = spark.read.option("header", true).csv(absfss_Base_Path + "customers.csv")
val supplierDF = spark.read.option("header", true).csv(absfss_Base_Path + "suppliers.csv")
val stagedOrderDF = spark.read.option("header", true).csv(absfss_Base_Path + "orders.csv")
.withColumn("addressID", concat_ws(", ", col("shipName"), col("shipAddress"),
col("shipCity"), col("shipRegion"), col("shipPostalCode"), col("shipCountry")))
val orderDetailDF = spark.read.option("header", true).csv(absfss_Base_Path + "order-details.csv")
val productDF = spark.read.option("header", true).csv(absfss_Base_Path + "products.csv")
val categoryDF = spark.read.option("header", true).csv(absfss_Base_Path + "categories.csv")
//create seperate addressesDF and finalize orderDF
val addressDF = stagedOrderDF
.select($"addressID",
$"shipName".alias("name"),
$"shipAddress".alias("address"),
$"shipCity".alias("city"),
$"shipRegion".alias("region"),
$"shipPostalCode".alias("postalCode"),
$"shipCountry".alias("country"))
.dropDuplicates("addressID")
val orderDF = stagedOrderDF.drop("shipName","shipAddress", "shipCity", "shipRegion", "shipPostalCode", "shipCountry")
result:
customerDF: org.apache.spark.sql.DataFrame = [customerID: string, companyName: string ... 10 more fields]
supplierDF: org.apache.spark.sql.DataFrame = [supplierID: string, companyName: string ... 10 more fields]
stagedOrderDF: org.apache.spark.sql.DataFrame = [orderID: string, customerID: string ... 13 more fields]
orderDetailDF: org.apache.spark.sql.DataFrame = [orderID: string, productID: string ... 3 more fields]
productDF: org.apache.spark.sql.DataFrame = [productID: string, productName: string ... 8 more fields]
categoryDF: org.apache.spark.sql.DataFrame = [categoryID: string, categoryName: string ... 2 more fields]
addressDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [addressID: string, name: string ... 5 more fields]
orderDF: org.apache.spark.sql.DataFrame = [orderID: string, customerID: string ... 7 more fields]
cell 5
%%spark
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
val spark = SparkSession.builder()
.appName("Neo4j Notebook")
.config("spark.jars.packages", "org.neo4j:neo4j-connector-apache-spark_2.12:5.2.3_for_spark_3")
.getOrCreate()
result:
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@5dfa3492
cell 6
%%spark
// Load JSON file for Neo4j credentials
val jsonString = spark.read.text(absfss_Base_Path + "neo4j-conn.json").as[String].collect().mkString("\n")
// Parse JSON string
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
val data = mapper.readValue[Map[String, Any]](jsonString)
// Extract Neo4j connection details
val neo4jUrl = data("NEO4J_URL").asInstanceOf[String]
val neo4jUsername = data("NEO4J_USERNAME").asInstanceOf[String]
val neo4jPassword = data("NEO4J_PASSWORD").asInstanceOf[String]
result:
jsonString: String =
"
{
"NEO4J_URL": "neo4j+s://80c19ba0.databases.neo4j.io",
"NEO4J_USERNAME": "neo4j",
"NEO4J_PASSWORD": "**MY PASSWORD**",
"AURA_INSTANCEID": "80c19ba0",
"AURA_INSTANCENAME": "Instance01"
}"
warning: one deprecation (since 2.12.1); for details, enable `:setting -deprecation' or `:replay -deprecation'
mapper: com.fasterxml.jackson.databind.ObjectMapper with com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper = $default$anon$1@d83795a
res23: com.fasterxml.jackson.databind.ObjectMapper = $default$anon$1@d83795a
data: Map[String,Any] = Map(NEO4J_PASSWORD -> **MY PASSWORD**, NEO4J_URL -> neo4j+s://80c19ba0.databases.neo4j.io, NEO4J_USERNAME -> neo4j, AURA_INSTANCEID -> 80c19ba0, AURA_INSTANCENAME -> Instance01)
neo4jUrl: String = neo4j+s://80c19ba0.databases.neo4j.io
neo4jUsername: String = neo4j
neo4jPassword: String = **MY PASSWORD**
cell 7
%%spark
import org.apache.spark.sql.{DataFrame, SaveMode}
// Write nodes to Neo4j
def writeNodesToNeo4j(dataFrame: DataFrame, label: String, nodeKey: String): Unit = {
dataFrame.write.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("url", neo4jUrl)
.option("authentication.basic.username", neo4jUsername)
.option("authentication.basic.password", neo4jPassword)
.option("labels", label)
.option("node.keys", nodeKey)
.option("schema.optimization.node.keys", "KEY") //create node key constraints under the hood
.save()
}
writeNodesToNeo4j(customerDF, "Customer", "customerID")
writeNodesToNeo4j(supplierDF, "Supplier", "supplierID")
writeNodesToNeo4j(orderDF, "Order", "orderID")
writeNodesToNeo4j(productDF, "Product", "productID")
writeNodesToNeo4j(categoryDF, "Category", "categoryID")
writeNodesToNeo4j(addressDF, "Address", "addressID")
result:
Diagnostics: Spark_User_UserApp_ClassNotFound
org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: org.neo4j.spark.DataSource. Please find packages at `https://spark.apache.org/third-party-projects.html`.
at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:650)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:700)
at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:909)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:276)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:250)
at $defaultwriteNodesToNeo4j(<console>:62)
... 64 elided
Caused by: java.lang.ClassNotFoundException: org.neo4j.spark.DataSource.DefaultSource
at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:72)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:636)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:636)
at scala.util.Failure.orElse(Try.scala:224)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:636)
... 69 more
The problem shows up on running the 7th cell of code. Any idea how to solve that?
Solved! Go to Solution.
Hi @syl-ade,
Thanks for your update. I'm glad you were able to create nodes successfully using your new approach. However, I see that your function writeNodesToNeo4j is still not working as expected.
Let me know if you still face issues! If this helps, please mark it as a solution and give a "Kudos" so other community members can find it easily.
Thank you.
Hi @syl-ade,
I wanted to check if you had the opportunity to review the information provided. Please feel free to contact us if you have any further questions. If my response has addressed your query, please accept it as a solution and give a 'Kudos' so other members can easily find it.
Thank you.
Hi @syl-ade,
May I ask if you have resolved this issue? If so, please mark the helpful reply and accept it as the solution. This will be helpful for other community members who have similar problems to solve it faster.
Thank you.
Hi @syl-ade,
Thanks for posting your query in Microsoft community forum.
The error message suggests that the Neo4j Spark Connector is not being properly recognized. Please try the following fixes:
Cell 1: 5.3.2_for_spark_3
Cell 2: 4.0.0_for_spark_3
Cell 5: 5.2.3_for_spark_3
To prevent conflicts, please update all your Spark session configurations to the latest stable version, 5.3.2_for_spark_3.
If this helps, kindly Accept it as a solution and give a "Kudos" so other members can find it more easily.
Thank you.
The solution provided did not help t-wth the problem.
It started to create nodes with this code.
%%configure -f
{
"conf": {
"spark.jars": "abfss://Neo4j_Workspace1@onelake.dfs.fabric.microsoft.com/Northwind_Lakehouse.Lakehouse/Files/Northwind/neo4j-spark-connector-5.3.1-s_2.12.jar"
}
}
import org.apache.spark.sql.{SparkSession, SaveMode}
import org.apache.spark.sql.functions._
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
// Set base path
val absfss_Base_Path = "abfss://Neo4j_Workspace1@onelake.dfs.fabric.microsoft.com/Northwind_Lakehouse.Lakehouse/Files/Northwind/"
// Read Northwind data files
val customerDF = spark.read.option("header", true).csv(s"$absfss_Base_Path/customers.csv")
val supplierDF = spark.read.option("header", true).csv(s"$absfss_Base_Path/suppliers.csv")
val stagedOrderDF = spark.read.option("header", true).csv(s"$absfss_Base_Path/orders.csv")
.withColumn("addressID", concat_ws(", ", col("shipName"), col("shipAddress"), col("shipCity"), col("shipRegion"), col("shipPostalCode"), col("shipCountry")))
val orderDetailDF = spark.read.option("header", true).csv(s"$absfss_Base_Path/order-details.csv")
val productDF = spark.read.option("header", true).csv(s"$absfss_Base_Path/products.csv")
val categoryDF = spark.read.option("header", true).csv(s"$absfss_Base_Path/categories.csv")
// Create separate addressesDF and finalize orderDF
val addressDF = stagedOrderDF
.select($"addressID", $"shipName".alias("name"), $"shipAddress".alias("address"), $"shipCity".alias("city"), $"shipRegion".alias("region"), $"shipPostalCode".alias("postalCode"), $"shipCountry".alias("country"))
.dropDuplicates("addressID")
val orderDF = stagedOrderDF.drop("shipName", "shipAddress", "shipCity", "shipRegion", "shipPostalCode", "shipCountry")
// Load JSON file for Neo4j credentials
val jsonString = spark.read.text(s"$absfss_Base_Path/neo4j-conn.json").as[String].collect().mkString("\n")
// Parse JSON string
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
val data = mapper.readValue[Map[String, Any]](jsonString)
// Extract Neo4j connection details
val neo4jUrl = data("NEO4J_URL").asInstanceOf[String]
val neo4jUsername = data("NEO4J_USERNAME").asInstanceOf[String]
val neo4jPassword = data("NEO4J_PASSWORD").asInstanceOf[String]
val neo4jOptions = Map(
"url" -> neo4jUrl,
"authentication.basic.username" -> neo4jUsername,
"authentication.basic.password" -> neo4jPassword
)
customerDF.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.options(neo4jOptions)
.option("labels", "Customer")
.option("node.keys", "customerID")
.save()
supplierDF.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.options(neo4jOptions)
.option("labels", "Supplier")
.option("node.keys", "supplierID")
.save()
stagedOrderDF.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.options(neo4jOptions)
.option("labels", "Order")
.option("node.keys", "orderID")
.save()
Yet i still do not know how to run this part succesfully
// Write nodes to Neo4j
def writeNodesToNeo4j(dataFrame: DataFrame, label: String, nodeKey: String): Unit = {
dataFrame.write.format("org.neo4j.spark")
.mode(SaveMode.Overwrite)
.option("url", neo4jUrl)
.option("authentication.basic.username", neo4jUsername)
.option("authentication.basic.password", neo4jPassword)
.option("labels", label)
.option("node.keys", nodeKey)
.option("schema.optimization.node.keys", "KEY") //create node key constraints under the hood
.save()
}
writeNodesToNeo4j(customerDF, "Customer", "customerID")
writeNodesToNeo4j(supplierDF, "Supplier", "supplierID")
writeNodesToNeo4j(orderDF, "Order", "orderID")
writeNodesToNeo4j(productDF, "Product", "productID")
writeNodesToNeo4j(categoryDF, "Category", "categoryID")
writeNodesToNeo4j(addressDF, "Address", "addressID")
Hi @syl-ade,
Thanks for your update. I'm glad you were able to create nodes successfully using your new approach. However, I see that your function writeNodesToNeo4j is still not working as expected.
Let me know if you still face issues! If this helps, please mark it as a solution and give a "Kudos" so other community members can find it easily.
Thank you.
Hi @syl-ade,
I hope this information is helpful. Please let me know if you have any further questions or if you'd like to discuss this further. If this answers your question, please Accept it as a solution and give it a 'Kudos' so others can find it easily.
Thank you.
Hello @syl-ade
why spark session is created 3 times?
I know it might not make lot of difference, but still asking
I tried executing it in different forms and just did not cleaned it yet. As long as it works it does not bother me while testing the code.
User | Count |
---|---|
4 | |
2 | |
2 | |
2 | |
2 |
User | Count |
---|---|
15 | |
10 | |
9 | |
6 | |
5 |