How Not To Persist Spark Dataset / DataFrame toNeo4j (And How To?) 1

Spark provide an API for graphs and graph-parallel computation (GraphX) and Neo4j is probably the most popular graph database so you might expect some sort of “bridge” between them. That’s actually the case, and Neo4j will direct you to the Neo4j-Spark-Connector. It’s a Scala based implementation that uses the officially supported Neo4j Java driver behind the hood. It covers a series of use cases but, I argue, not the one mentioned in the title.

The scenario I’m considering could result from processing data with GraphX and then persisting the results in Neo4j where they could then be read by other process (including another Spark job).  Probably because is not such an unusual use case, there are people asking details about it on  Stackoverflow and in this issue.

Neo4j-Spark-Connector allegedly cover this case and there is also an example in: Neo4jDataFrame.mergeEdgeList. If we take a look at the code though, mergeEdgeList basically map over the DataFrame and for each row call a method called execute:

def mergeEdgeList(sc: SparkContext,
                      dataFrame: DataFrame,
                      source: (String,Seq[String]),
                      relationship: (String,Seq[String]),
                      target: (String,Seq[String]),
                      renamedColumns: Map[String,String] = Map.empty): Unit = {
   val partitions = Math.max(1,(dataFrame.count() / 10000).asInstanceOf[Int])
   val config = Neo4jConfig(sc.getConf)
   dataFrame.repartition(partitions).foreachPartition( rows => {
      ...
      execute(config, mergeStatement, Map("rows" -> params).asJava, write = true)
   })
}

And then execute will do the proper writing in the db:

def execute(config : Neo4jConfig, query: String, parameters: java.util.Map[String, AnyRef], write: Boolean = false) : ResultSummary = {
    val driver: Driver = config.driver()
    val session = driver.session()
    try {
      val runner =  new TransactionWork[ResultSummary]() {
         override def execute(tx:Transaction) : ResultSummary =
            tx.run(query, parameters).consume()
      }
      if (write) {
        session.writeTransaction(runner)
      }
      else
        session.readTransaction(runner)
    } finally {
      if (session.isOpen) session.close()
      driver.close()
    }
  }

There are at least three issues with the code above. Let’s start with the first one:

  1. In mergeEdgeList, count and foreachPartition are actions and as such will trigger the execution of the given dataframe twice. And that without the caller being aware of it.
  2. execute create a neo4j driver (through the java driver) and, from it, a session, then it closes both. And this is done for each partition in the dataframe.
  3. Because of 2. you’ll have a session per partition hence the scope of the transaction will span over a single partition. Now, what the scope of a transaction should be, has to be decided case by case but you won’t have that freedom of choice here. For example you might need a broader scope (fail or succeed to upload the full dataframe so there are only two possible outcomes: either the dataframe has been written correctly in Neo4j or it hasn’t) or no transaction support at all.

Point 1 should be addressed as it could be pretty insidious or at least made the caller aware that the dataframe needs to be cached. Overall the Neo4j-Spark-Connector seems to fit other scenarios (streaming, reading, update, etc) but probably shouldn’t be the first pick when a potentially demanding spark job is meant to create a new Neo4j db from scratch.

After some research, I reckon the best way (also from a performance point of view) is to persist the dataframe as csv and leverage the Neo4j load csv functionality. Although it might look a pretty clumsy solution at first, the csv header can be tailored in order to pass some useful information to the Neo4j csv loader. For example the csv headers:

movieId:ID,title,year:int,:LABEL

Tell the loader that movieId is the node’s id (and its uniqueness will be enforced), year is a node’s property of type int and the value under the third column in the csv is a label for the node.

I wrote a simple proof of concept that illustrate the Neo4j csv loader functionalities: https://github.com/lansaloltd/spark-neo4j

The example is fictional and loosely based on a de-duplication problem. The starting point is a dataset where couple of documents were marked as duplicate by a machine learning algorithm. The data is filtered first to keep only duplicates and the AI algorithm confidence level becomes a property of the relationship between two documents (how likely they are to be indeed duplicates). In this scenario the output has to be a disconnected graph (as we can’t expect all documents to resemble each other) but with islands or clusters of documents linked one to the others or, more precisely, connected components.

It requires Neo4j Desktop installed (download here). Once started you should see something similar to this:

neo4j-initial-screen

Click on “Add Graph”:
neo4j-add-graph

Then chose a name and a password for your DB (if you like the code example to work out of the box, then chose “password” as password and version 3.5.x):

neo4j-add-graph-2

Click on “Manage” and then “Settings” tab and comment out dbms.directories.import=import then press “Apply”:

neo4j-settings

That’s because the unit test will create the csv files to import under the target directory which is an external directory for Neo4j. In a production environment that has security implications you should be aware of.

Finally, click “Start” to start the database and once active, you can run the tests in GraphAnalysisTest. If successful you might still see 0 nodes / 0 relationships on the DB section but that seems to be only a refresh issue. Click on Neo4J Browser:

browser

Then on the DB icon on the top left corner:

neo4j-db-icon

And now you should see the graph being successfully imported into Neo4j:

neo4j-graph-2

There are 31 nodes (or vertexes in graph terminology) and 29 relationships (or edges).

———————————

1. [This is an updated version of the original post to amend a mistake in the first version. It was spotted while talking with few colleagues, weeks after the initial publication when still working on the “csv” solution for a presentation: the connection gets created at the foreachPartition level, which means one connection per partition not per record (as erroneously stated).
I apologize for all the time it took me to get back on this post and find the time to update it but these have been quite hectic times (not just from a work perspective).]

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s