How Not To Persist Spark Dataset / DataFrame toNeo4j (And How To?)

Spark provide an API for graphs and graph-parallel computation (GraphX) and Neo4j is probably the most popular graph database so you might expect some sort of “bridge” between them. That’s actually the case, and Neo4j will direct you to the Neo4j-Spark-Connector. It’s a Scala based implementation that uses the officially supported Neo4j Java driver behind the hood. It covers a series of use cases but, I argue, not the one mentioned in the title.

The scenario I’m considering could result from processing data with GraphX and then persisting the results in Neo4j where they could then be read by other process (including another Spark job).  Probably because is not such an unusual use case, there are people asking details about it on  Stackoverflow and in this issue.

Neo4j-Spark-Connector allegedly cover this case and there is also an example in: Neo4jDataFrame.mergeEdgeList. If we take a look at the code though, mergeEdgeList basically map over the DataFrame and for each row call a method called execute:

def mergeEdgeList(sc: SparkContext,
                      dataFrame: DataFrame,
                      source: (String,Seq[String]),
                      relationship: (String,Seq[String]),
                      target: (String,Seq[String]),
                      renamedColumns: Map[String,String] = Map.empty): Unit = {
   val partitions = Math.max(1,(dataFrame.count() / 10000).asInstanceOf[Int])
   val config = Neo4jConfig(sc.getConf)
   dataFrame.repartition(partitions).foreachPartition( rows => {
      ...
      execute(config, mergeStatement, Map("rows" -> params).asJava, write = true)
   })
}

And then execute will do the proper writing in the db:

def execute(config : Neo4jConfig, query: String, parameters: java.util.Map[String, AnyRef], write: Boolean = false) : ResultSummary = {
    val driver: Driver = config.driver()
    val session = driver.session()
    try {
      val runner =  new TransactionWork[ResultSummary]() {
         override def execute(tx:Transaction) : ResultSummary =
            tx.run(query, parameters).consume()
      }
      if (write) {
        session.writeTransaction(runner)
      }
      else
        session.readTransaction(runner)
    } finally {
      if (session.isOpen) session.close()
      driver.close()
    }
  }

There are at least three major issues with the code above. Let’s start with the first one:

  1. In mergeEdgeList, count and foreachPartition are actions and as such will trigger the execution of the given dataframe twice. And that without the caller being aware of it.
  2. execute create a neo4j driver (through the java driver) and, from it, a session, then it closes both. And this is done for each row in the dataframe.
  3. Because of 2. you’ll have a session per row hence the scope of the transaction will span over a single row. Now, what the scope of a transaction should be, has to be decided case by case but you won’t have that freedom of choice here. For example you might need a broader scope (fail or succeed to upload the full dataframe so there are only two possible outcomes: either the dataframe has been written correctly in Neo4j or it hasn’t) or no transaction support at all.

Point 1. might be amended and point 3. might not be a stopper but 2. is a pretty serious issue. Barely noticeable if the dataframe consists of few rows but definitely a stopper in a real case scenario with millions or billions of records. On this respect, I quote from the documentation of the Java driver the connector relies upon:

For full applications, a single Driver object should be created with application-wide scope and lifetime. This allows full utilization of the driver connection pool. The connection pool reduces network overhead added by sharing TCP connections between subsequent transactions. Network connections are acquired on demand from the pool when running Cypher queries, and returned back to connection pool after query execution finishes. As a result of this design, it is expensive to create and close a Driver object. Session objects, on the other hand, are very cheap to use.

If you consider that the connector creates a driver for each row in the dataframe, you can get an idea of the impact. On the other hand the driver is not serializable for its nature and, as such, cannot be created once and then passed to the Spark workers. Nor I see any other solution working out of the box.

After some research, I reckon the best way is to persist the dataframe as csv and leverage the Neo4j load csv functionality. I wrote a simple proof of concept that does exactly that and is available here: https://github.com/lansaloltd/spark-neo4j

The example is fictional and loosely based on a de-duplication problem. The starting point is a dataset where couple of documents were marked as duplicate by a machine learning algorithm. The data is filtered first to keep only duplicates and the AI algorithm confidence level becomes a property of the relationship between two documents (how likely they are to be indeed duplicates). In this scenario the output has to be a disconnected graph (as we can’t expect all documents to resemble each other) but with islands or clusters of documents linked one to the others or, more precisely, connected components.

It requires Neo4j Desktop installed (download here). Once started you should see something similar to this:

neo4j-initial-screen

Click on “Add Graph”:
neo4j-add-graph

Then chose a name and a password for your DB (if you like the code example to work out of the box, then chose “password” as password and version 3.5.x):

neo4j-add-graph-2

Click on “Manage” and then “Settings” tab and comment out dbms.directories.import=import then press “Apply”:

neo4j-settings

That’s because the unit test will create the csv files to import under the target directory which is an external directory for Neo4j. In a production environment that has security implications you should be aware of.

Finally, click “Start” to start the database and once active, you can run the tests in GraphAnalysisTest. If successful you might still see 0 nodes / 0 relationships on the DB section but that seems to be only a refresh issue. Click on Neo4J Browser:

browser

Then on the DB icon on the top left corner:

neo4j-db-icon

And now you should see the graph being successfully imported into Neo4j:

neo4j-graph-2

There are 31 nodes (or vertexes in graph terminology) and 29 relationships (or edges).

Finally, is worth noting that, even if the neo4j-spark-connector doesn’t address the scenario covered in this post, that’s not the only connector’s purpose and it might find an use in other scenarios.