How Not To Persist Spark Dataset / DataFrame toNeo4j (And How To?) 1

Spark provide an API for graphs and graph-parallel computation (GraphX) and Neo4j is probably the most popular graph database so you might expect some sort of “bridge” between them. That’s actually the case, and Neo4j will direct you to the Neo4j-Spark-Connector. It’s a Scala based implementation that uses the officially supported Neo4j Java driver behind the hood. It covers a series of use cases but, I argue, not the one mentioned in the title.

The scenario I’m considering could result from processing data with GraphX and then persisting the results in Neo4j where they could then be read by other process (including another Spark job).  Probably because is not such an unusual use case, there are people asking details about it on  Stackoverflow and in this issue.

Neo4j-Spark-Connector allegedly cover this case and there is also an example in: Neo4jDataFrame.mergeEdgeList. If we take a look at the code though, mergeEdgeList basically map over the DataFrame and for each row call a method called execute:

def mergeEdgeList(sc: SparkContext,
                      dataFrame: DataFrame,
                      source: (String,Seq[String]),
                      relationship: (String,Seq[String]),
                      target: (String,Seq[String]),
                      renamedColumns: Map[String,String] = Map.empty): Unit = {
   val partitions = Math.max(1,(dataFrame.count() / 10000).asInstanceOf[Int])
   val config = Neo4jConfig(sc.getConf)
   dataFrame.repartition(partitions).foreachPartition( rows => {
      ...
      execute(config, mergeStatement, Map("rows" -> params).asJava, write = true)
   })
}

And then execute will do the proper writing in the db:

def execute(config : Neo4jConfig, query: String, parameters: java.util.Map[String, AnyRef], write: Boolean = false) : ResultSummary = {
    val driver: Driver = config.driver()
    val session = driver.session()
    try {
      val runner =  new TransactionWork[ResultSummary]() {
         override def execute(tx:Transaction) : ResultSummary =
            tx.run(query, parameters).consume()
      }
      if (write) {
        session.writeTransaction(runner)
      }
      else
        session.readTransaction(runner)
    } finally {
      if (session.isOpen) session.close()
      driver.close()
    }
  }

There are at least three issues with the code above. Let’s start with the first one:

  1. In mergeEdgeList, count and foreachPartition are actions and as such will trigger the execution of the given dataframe twice. And that without the caller being aware of it.
  2. execute create a neo4j driver (through the java driver) and, from it, a session, then it closes both. And this is done for each partition in the dataframe.
  3. Because of 2. you’ll have a session per partition hence the scope of the transaction will span over a single partition. Now, what the scope of a transaction should be, has to be decided case by case but you won’t have that freedom of choice here. For example you might need a broader scope (fail or succeed to upload the full dataframe so there are only two possible outcomes: either the dataframe has been written correctly in Neo4j or it hasn’t) or no transaction support at all.

Point 1 should be addressed as it could be pretty insidious or at least made the caller aware that the dataframe needs to be cached. Overall the Neo4j-Spark-Connector seems to fit other scenarios (streaming, reading, update, etc) but probably shouldn’t be the first pick when a potentially demanding spark job is meant to create a new Neo4j db from scratch.

After some research, I reckon the best way (also from a performance point of view) is to persist the dataframe as csv and leverage the Neo4j load csv functionality. Although it might look a pretty clumsy solution at first, the csv header can be tailored in order to pass some useful information to the Neo4j csv loader. For example the csv headers:

movieId:ID,title,year:int,:LABEL

Tell the loader that movieId is the node’s id (and its uniqueness will be enforced), year is a node’s property of type int and the value under the third column in the csv is a label for the node.

I wrote a simple proof of concept that illustrate the Neo4j csv loader functionalities: https://github.com/lansaloltd/spark-neo4j

The example is fictional and loosely based on a de-duplication problem. The starting point is a dataset where couple of documents were marked as duplicate by a machine learning algorithm. The data is filtered first to keep only duplicates and the AI algorithm confidence level becomes a property of the relationship between two documents (how likely they are to be indeed duplicates). In this scenario the output has to be a disconnected graph (as we can’t expect all documents to resemble each other) but with islands or clusters of documents linked one to the others or, more precisely, connected components.

It requires Neo4j Desktop installed (download here). Once started you should see something similar to this:

neo4j-initial-screen

Click on “Add Graph”:
neo4j-add-graph

Then chose a name and a password for your DB (if you like the code example to work out of the box, then chose “password” as password and version 3.5.x):

neo4j-add-graph-2

Click on “Manage” and then “Settings” tab and comment out dbms.directories.import=import then press “Apply”:

neo4j-settings

That’s because the unit test will create the csv files to import under the target directory which is an external directory for Neo4j. In a production environment that has security implications you should be aware of.

Finally, click “Start” to start the database and once active, you can run the tests in GraphAnalysisTest. If successful you might still see 0 nodes / 0 relationships on the DB section but that seems to be only a refresh issue. Click on Neo4J Browser:

browser

Then on the DB icon on the top left corner:

neo4j-db-icon

And now you should see the graph being successfully imported into Neo4j:

neo4j-graph-2

There are 31 nodes (or vertexes in graph terminology) and 29 relationships (or edges).

———————————

1. [This is an updated version of the original post to amend a mistake in the first version. It was spotted while talking with few colleagues, weeks after the initial publication when still working on the “csv” solution for a presentation: the connection gets created at the foreachPartition level, which means one connection per partition not per record (as erroneously stated).
I apologize for all the time it took me to get back on this post and find the time to update it but these have been quite hectic times (not just from a work perspective).]

Scala Type Classes and Family Resemblances

This post is not a general introduction to type classes in Scala. There are quite a few good posts already on the subject (here, here and here for example), but it is about type classes and the somewhat-cryptic title require a brief explanation. In Scala with Cats, the authors describe a type class as a programming pattern that “allow us to extend existing libraries with new functionality, without using traditional inheritance, and without altering the original library source code.” Although that is not the only possible definition, it’s a perfect starting point for what I would like to cover. Can type classes help us at “removing” old functionalities in the same way? And how and why we might end up in such a situation? Let’s try to model a simple type for a bird to see how the issue could arise:

sealed trait Bird {
  def fly: Unit
}

In a OOP approach we could proceed adding a few implementations, a parrot for example:

sealed trait Bird {
  def fly: Unit
}

class Parrot(name: String) extends Bird {
  override def fly: Unit = println(s"I'm a parrot and I can fly. My name is $name")
}

And perhaps a few other birds until we get to penguins. Penguins are birds, but there is a problem: penguins don’t fly.

sealed trait Bird {
  def fly: Unit
}

class Penguin(name: String) extends Bird {
  override def fly: Unit = ??? // throw new UnsupportedOperationException("Sorry, can't fly")
}

A solution might be to throw an UnsupportedOperationException but not an optimal one you might argue: catching those problems at compile time would be better. We probably were a bit rushed to add a fly method as part of the Bird trait. Not every birds flies and penguins are not the only exception. But was it just an oversight or there is more?

Ludwig Wittgenstein introduced in his Philosophical Investigations the the idea of family resemblance to describe certain concepts in our language for which we don’t have a clear set of traits shared across all members but more a general overlapping mesh of these features like in a large family picture where we can recognize each single member as part of the family and the common traits (hair color, shape of the nose, etc.) spread across them but where no definite set of those traits is shared by every single member.

Back to our problem, “flying” is such a common characteristic of birds that is almost understandable why that def fly slipped within the Bird trait.1
But perhaps there is something that a type class approach could do to mitigate those issues (all code available here2). Assume we have the same birds as above:

sealed trait Bird // no fly method now

class Parrot(val name: String) extends Bird
class Penguin(val name: String) extends Bird

And proceed defining the various components of our type classes. First the traits/behavior (note that there is no mention of Bird in them)

trait Fly[A] {
  def fly(a: A): Unit
}

and

// for penguins
trait Swim[A] {
  def swim(a: A): Unit
}
// for parrots
trait Talk[A] {
  def talk(a: A): Unit
}

and then two instances implementing the above traits (one for a penguin and the other for a parrot):

implicit val parrotFly: Fly[Parrot] = new Fly[Parrot] {
    def fly(parrot: Parrot): Unit = {
      println(s"I'm a parrot and I can fly. My name is ${parrot.name}")
    }
  }

implicit val parrotTalk: Talk[Parrot] = new Talk[Parrot] {
    def talk(parrot: Parrot): Unit = {
      println(s"I'm a parrot and I can talk. My name is ${parrot.name}")
    }
  }

  implicit val penguinSwim: Swim[Penguin] = new Swim[Penguin] {
    def swim(penguin: Penguin): Unit = {
      println(s"I'm a penguin and I can swim. My name is ${penguin.name}")
    }
  }

And finally the api, the last bit that acts like glue and, relying on implicits, puts everything together.

  implicit class FlyOps[A](val value: A) extends AnyVal {
    def fly(implicit flyInstance: Fly[A]): Unit = flyInstance.fly(value)
  }

implicit class TalkOps[A](val value: A) extends AnyVal {
    def talk(implicit talkInstance: Talk[A]): Unit = talkInstance.talk(value)
  }

  implicit class SwimOps[A](val value: A) extends AnyVal {
    def swim(implicit swimInstance: Swim[A]): Unit = swimInstance.swim(value)
  }

Now that we have all the pieces in place, we can see if we achieved something good out of it. We need two birds first:

val parrot = new Parrot("George")
val penguin = new Penguin("Pingu")

The api and instances defined above need to be imported for the implicits to do their magic:

// here is where I defined the api in the sample code
import com.lansalo.family.resemblance.fp.api.ImplicitBehavioursApi._
// here the instances implementing the traits
import com.lansalo.family.resemblance.fp.Instances._ 

parrot.fly
// As expected, parrots can fly and this will output:
// "I'm a parrot and I can fly. My name is George"

parrot.talk
// Will print: "I'm a parrot and I can talk. My name is George"

parrot.swim
// compilation error because parrots can't swim

penguin.swim
// will output: "I'm a penguin and I can swim. My name is Pingu"

penguin.fly
// compilation error, because penguin don't fly and more practically, 
// because we didn't define an instance for Fly[Penguin] 
// and the api will complain that one can't be found

So, both parrots and penguins are Birds but parrots can fly (but not swim) whereas penguins can swim (but not fly, or talk for that matter) and errors are picked up at compile time. All good then! Or not? Well… it turns out there is a friendly parrot in New Zealand that can’t fly: the kakapo (and apparently is the only one). Once again we found ourselves victims of the same mistake, to reiterate how easy it is to generalize and ignore edge cases. But perhaps this time we are in a better position to face the issue without the need to throw runtime errors (ie. UnsupportedOperationException) or to modify the data defined so far (which could be out of our control). Clearly a new member has to be added to the Bird family and even if it doesn’t fly, it still has to be a parrot:

sealed trait Bird

class Parrot(val name: String) extends Bird
class Penguin(val name: String) extends Bird
class Kakapo(override val name: String) extends Parrot(name)

the Kakapo is still a parrot:

kakapo.isInstanceOf[Parrot] // returns true

But it can’t fly (kakapo.fly will not compile) although the reason might not be immediately clear. It’s true that we have an instance of Fly[Parrot] in scope and equally, a Kakapo is a parrot (as shown above) but their parent/child relationship doesn’t get propagated to the higher-kinded type Fly[A] as this is invariant in its type A.
For the same reason, the kakapo can’t talk like other parrots. I must admit that my knowledge on the whole kakapo subject is quite limited but from the information I gather, that seems to be the case. On the other hand, the ability to speak is widespread across the parrot family. It might well be that with a bit of patient training someone could teach few words to a kakapo. In that case, we can still adapt to this newly discovered behavior (still without modify our models) by adding an instance for it:

implicit val kakapoTalk: Talk[Kakapo] = new Talk[Kakapo] {
    def talk(kakapo: Kakapo): Unit = {
      println(s"I'm a kakapo and I can talk. My name is ${kakapo.name}")
    }
  }

Even if there is no perfect solution in those cases, a type class approach seems to provide a better way to deal with those scenarios where the we are trying to model concepts that relate to the family resemblance paradigm.

———————————

1. [All that remind me of my first experience as a developer, working for a big Italian utility company. It was long time ago and we were trying (among other things) to model a “meter reader” attached to user accounts. We were struggling with old legacy data and when we thought we capture the essence of a meter reader (and its types and hierarchy) it was just a matter of time and a new account pop up with a generally old type of reader, breaking our assumptions. It was then a matter to find someone with the know out in a position to throw some light on the weird new type. Usually someone close to retirement.  ]
2. [The examples will rely on implicits, although, strictly speaking, type classes can be implemented without using them. In the code there are also few example of the more verbose version which does not use implicits]

 

Spark ML Model Tuning

The purpose of this post is to provide some basic guidelines and supporting code for the tuning of an ML pipeline based on a random forest algorithm using cross-validation along with some final consideration about the computational cost involved and possibility to train/tuning the model in parallel.

Tuning

A random forest algorithm takes few hyperparameters. The most important ones are probably numTrees and maxDepth (but in code example there is also some reference to maxBins). The number of trees in the forest (numTrees) is inversely proportional to the variance in predictions: more trees generally improve the accuracy but also the computational load of the training phase increase proportionally. Analogously, a deep tree generally lead to better metrics but also increase the risk of overfitting and the training computational cost. Tuning the model is the process of working out the optimal values for those hyperparameters and striking the right balance between costs and benefits.

Cross Validation

The idea is to split a given labelled dataset in chunks (folds) then iterates through them and, at each iteration, one chunk will be used as a validation set and the remaining as a training set. For example, with fold = 3, the process splits the data in three folds: a, b, c and then iterates three times:

1st iteration -> a + b = training, c = validation
2nd iteration -> a + c = training, b = validation
3rd iteration -> b + c = training, a = validation

To note that the whole process is indeed training and evaluating three times (as in this example folds = 3).

An Estimator can then be plugged in the cross validation process with the intention of keeping track of the relationship between certain hyperparameters values and some metrics and pick up the best performing model.

Data and Code

Code can be found here. Inspired by the Cambridge Analytica scandal, and as a warning that great power comes with great responsibility, the example uses a completely fictional and auto generated dataset where made up citizens are listed with the party they voted for (the label) along with some features (the usual suspects: age, income, gender, etc.):

+----+------+---+----+------+---------+-------+
|id  |gender|age|area|income|education|vote   |
+----+------+---+----+------+---------+-------+
|650 |female|99 |A   |57000 |1        |tory   |
|1523|male  |24 |B   |5500  |3        |labour |
|434 |male  |82 |D   |5500  |3        |liberal|
|174 |male  |26 |C   |69000 |1        |liberal|
+----+------+---+----+------+---------+-------+

Road Map

As mentioned, the data in the example is not only fictional but some statistical correlation has been artificially introduced in the dataset and the features engineering part is simplified but on the other hand that is all what is needed in order to illustrate the advantages of tuning. The focus need to be on the different outcome between a tuned versus a not tuned model. Quite obviously, a tuned model is expected to perform better than a non tuned one and the following road map should prove the point:

  1. Given a training dataset, train the model without any tuning
  2. Using the same training dataset, train the model using cross validation and an evaluator to work out the optimal hyperparameters values
  3. Train a new model using the optimal hyperparameters values and same training dataset at point 1
  4. Evaluate the model at point 1 (not tuned) and model at point 3 (tuned) against a given validation set and compare the results

Evaluation will be based on f1 metric (which takes into consideration precision and recall). Besides being the default, f1 seems the most natural choice for the example.

Tuning the Model

First point in the road map is to train a random forest model without any specific tuning (ie. default value for numTrees and maxDepth will be used)1. In the example that is done in a training method that takes as a parameter a given training set.

val vectorAssembler = new VectorAssembler().setInputCols(Array("f_gender", "age", "f_area", "income", "f_education"))
    .setOutputCol("features")
val genderIndexer = new StringIndexer().setInputCol("gender").setOutputCol("f_gender")
val areaIndexer = new StringIndexer().setInputCol("area").setOutputCol("f_area")
val educationIndexer = new StringIndexer().setInputCol("education").setOutputCol("f_education")
val voteIntentionIndexer = new StringIndexer().setInputCol("vote").setOutputCol("label")

def training(trainingSet: DataFrame): PipelineModel = {
  val transformationPipeline = new Pipeline()
  val classifier = new RandomForestClassifier().setSeed(5043) // set the seed for reproducibility
  val trainingPipeline: transformationPipeline.type = transformationPipeline.setStages(
    Array(genderIndexer, areaIndexer, educationIndexer, voteIntentionIndexer, vectorAssembler, classifier))
  trainingPipeline.fit(trainingSet)
}

Then the model can be evaluated against a given validation set and its performance expressed in terms of f1:

def validation(model: PipelineModel, validation: DataFrame) = {
    // Make predictions.
    val predictions = model.transform(validation)

    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("label")
      .setPredictionCol("prediction")
      .setMetricName("f1") // f1 is the default anyway

    val f1: Double = evaluator.evaluate(predictions)
    println("@@ f1 ====> " + f1)
  }

The whole flow is implemented in a test in VoteIntentionTrainingTest called train and evaluate model. Once executed it should return a value for f1 of 0.6736260921544567 and this will the benchmark to compare the tuned version against.

Better Hyperparameters Values

Figuring out the optimal hyperparameters value should be more interesting. A ParamGridBuilder allows to pass different candidates for each hyperparameter. Considering the Spark default values (numTrees = 20, maxDepth = 5), a good first approximation would be two values surrounding the default. For example numTrees is 20 hence passing an Array(15, 20, 25) will give an idea about whether better metrics can be obtained with values greater or smaller than the default one.

def tuning(trainingSet: DataFrame): CrossValidatorModel = {

  val rfClassifier = new RandomForestClassifier()
  val pipeline= new Pipeline().setStages(
    Array(genderIndexer, areaIndexer, educationIndexer, voteIntentionIndexer, vectorAssembler, rfClassifier))

  val nFolds: Int = 8
  val metric: String = "f1"
  val paramGrid = new ParamGridBuilder()
    .addGrid(rfClassifier.numTrees, Array(15, 20, 25))
    .addGrid(rfClassifier.maxDepth, Array(4, 5, 6))
    .build()
  val evaluator = new MulticlassClassificationEvaluator() .setLabelCol("label") .setPredictionCol("prediction") .setMetricName(metric)

  val cv: CrossValidator = new CrossValidator()
    .setEstimator(pipeline)
    .setEstimatorParamMaps(paramGrid)
    .setEvaluator(evaluator)
    .setNumFolds(nFolds)
    .setParallelism(3)

  val fittedPipeline: CrossValidatorModel = cv.fit(trainingSet)
  val best: Model[_] = fittedPipeline.bestModel
  val stages: Seq[Transformer] = best.asInstanceOf[PipelineModel].stages.toList


  val paramMap = fittedPipeline.getEstimatorParamMaps
    .zip(fittedPipeline.avgMetrics)
    .maxBy(_._2)
    ._1

  println(s"@@@@ best num trees:     ${stages(5).asInstanceOf[RandomForestClassificationModel].getNumTrees}")
  println(s"@@@@ best max depth:     ${stages(5).asInstanceOf[RandomForestClassificationModel].getMaxDepth}")
  println(s"@@@@ best max bins:      ${stages(5).asInstanceOf[RandomForestClassificationModel].getMaxBins}")

  println(s"@@@@ best paramMap:      ${paramMap}")

  fittedPipeline
}

Once the pipeline is fitted we grab the best model, find out the optimal hyperparameters values (among the ones provided in ParamGridBuilder) and print them in the console. As a note, we are interested in stage 6 (so the 5th element in the array of stages) because the pipeline (as defined in this example) has 6 stages, the last one being the classifier (the one we are interested in) but paramMap returns the same results. The whole process is implemented in VoteIntentionTrainingTest as the test tuning a model and save it in $TunedModelPath and if executed should return the following optimal values:

num trees:     25
max depth:     6

We could refine those results further and for each hyperparameter pass new candidates in an interval around the optimal value. For example, in the numTrees case an Array(23, 24, 25, 26, 27) could be passed to the ParamGridBuilder with a new set candidates to fulfil the role of optimal value. I’ll omit that here but that should return the following values:

num trees:     24
max depth:     6

Tuned vs Not Tuned Model

The optimal values can now be used to override the default ones:

  def training(trainingSet: DataFrame): PipelineModel = {
    val transformationPipeline = new Pipeline()
    val classifier = new RandomForestClassifier()
      .setNumTrees(24) // <- Tuned value
      .setMaxDepth(6) // <- Tuned value
      .setSeed(5043) // set the seed for reproducibility
    val trainingPipeline: transformationPipeline.type = transformationPipeline.setStages(
      Array(genderIndexer, areaIndexer, educationIndexer, voteIntentionIndexer, vectorAssembler, classifier))
    trainingPipeline.fit(trainingSet)
  }

And an evaluation of the new model against the same validation dataset, should return an f1 of 0.6919821403245205 which is an improvement if compared to the previous value of f1 (0.6736260921544567) obtained with the default hyperparameters value.

numTrees = 20, maxDepth = 5 -> f1 = 0.6736260921544567 (default values)
numTrees = 24, maxDepth = 6 -> f1 = 0.6919821403245205

Training and Parallelism

It is worth noting that cross validation and the parameter grid increase substantially the computational cost of fitting a model. The first tuning example:

.addGrid(rfClassifier.numTrees, Array(15, 20, 25))
.addGrid(rfClassifier.maxDepth, Array(4, 5, 6))

will evaluate 9 different parameter combinations (3 * 3) and this on the top of the cross validation cost (8 folds). In this fictional example we are dealing with dataset intentionally small but in a real case scenario Spark and a distributed architecture are likely to make a big difference and we can take advantage of this via the setParallelism method.

val cv: CrossValidator = new CrossValidator()
      .setEstimator(pipeline)
      .setEstimatorParamMaps(paramGrid)
      .setEvaluator(evaluator)
      .setNumFolds(nFolds)
      .setParallelism(3)

That would set the max number of threads to use when running parallel algorithms (default is 1 for serial execution).

———————————

1. [In Spark 2.4.3 the default values are: numTrees = 20, maxDepth = 5, maxBins = 32]

Scala Stream: There’s no reason to use it if you don’t actually need one.

The first time I read about Scala Streams in Functional Programming in Scala I was impressed by the example’s captivating simplicity. I assume a certain familiarity with the idea of a Stream but just as a quick recap if that is not the case, the original example in the book was:

List(1,2,3,4).map(_ + 10).filter(_%2==0).map(_ * 3)

Which gets evaluated, step by step, as:

List(11,12,13,14).filter(_%2==0).map(_ * 3)
List(12,14).map(_ * 3)
List(36,42)

Whereas a Stream version

Stream(1,2,3,4).map(_ + 10).filter(_%2==0).map(_ * 3).toList1

because of its laziness, will evaluate in the following way:

Stream(1,2,3,4).map(_ + 10).filter(_ % 2 == 0).toList
cons(11, Stream(2,3,4).map(_ + 10)).filter(_ % 2 == 0).toList
Stream(2,3,4).map(_ + 10).filter(_ % 2 == 0).toList
cons(12, Stream(3,4).map(_ + 10)).filter(_ % 2 == 0).toList
12 :: Stream(3,4).map(_ + 10).filter(_ % 2 == 0).toList
12 :: cons(13, Stream(4).map(_ + 10)).filter(_ % 2 == 0).toList
12 :: Stream(4).map(_ + 10).filter(_ % 2 == 0).toList
12 :: cons(14, Stream().map(_ + 10)).filter(_ % 2 == 0).toList
12 :: 14 :: Stream().map(_ + 10).filter(_ % 2 == 0).toList
12 :: 14 :: List()2

Note that no intermediate results like List(11,12,13,14) or List(12,14) get instantiated in this case. It looks like there is simply less work to do but this impression of efficiency might be misleading. A simple micro benchmark reveals how the Stream perform worse than the List:

testOriginalExampleWithList    thrpt   20  4368200.644 ± 155109.000  ops/s
testOriginalExampleWithStream  thrpt   20  2881809.414 ± 122061.743  ops/s

The reason has to be tracked down in the Stream lazy nature along with its immutability as it belongs to the Scala immutable collection but at the same time its tail is not “real” yet (sort to speak) and it will be evaluated when required. On that respect the tail resemble a rule or a set of instructions to materialise its elements. The marriage between these two aspects of a Stream requires some synchronisation which is the toll to pay (and the reason behind the different performance above):

/** A lazy cons cell, from which streams are built. */
@SerialVersionUID(-602202424901551803L)
final class Cons[+A](hd: A, tl: => Stream[A]) extends Stream[A] {
  override def isEmpty = false
  override def head = hd
  @volatile private[this] var tlVal: Stream[A] = _
  @volatile private[this] var tlGen = tl _
  def tailDefined: Boolean = tlGen eq null
  override def tail: Stream[A] = {
    if (!tailDefined)
      synchronized {
        if (!tailDefined) {
          tlVal = tlGen()
          tlGen = null
        }
      }

    tlVal
  }
}3

So, why use a Stream after all? As the title (and the Scala doc) suggest, there are occasions where a lazy sequence comes handy. One well known example is the possibility to define “infinite” sequences like the Fibonacci one in the documentation. An other aspect, and potentially an advantage in some situation, is the generally low memory footprint of a Stream (remember, the intermediate sequences, like List(11,12,13,14) and List(12,14) in the example, don’t get instantiated).

The following terminate successfully on my machine with roughly 4GB of heap memory4:

Stream.fill(600)("Some random sample text.")
      .map(_ * 500000 + "test")
      .zipWithIndex.filter(_._2 % 2 == 0)
      .map(_._1.contains("test"))

Whereas its List equivalent throws a

java.lang.OutOfMemoryError: Java heap space

.The above case is build purely with the intent to trigger the exception but compare this:

final case class Wrapper(index: Int, text: String)

val listInput: List[Int] = (1 to 500).toList

listInput
      .map(Wrapper(_, "Some simple text"))
      .map(_.copy(text = "some simple text" * 500))
      .map(w => w.copy(text = w.text + "look for me!"))
      .filter(_.index % 2 == 0)
      .map(_.text.toLowerCase)
      .count(_.contains("look for me!"))

with this one:

val streamInput: Stream[Int] = (1 to 500).toStream

streamInput
      .map(Wrapper(_, "Some simple text"))
      .map(_.copy(text = "some simple text" * 500))
      .map(w => w.copy(text = w.text + "look for me!"))
      .filter(_.index % 2 == 0)
      .map(_.text.toLowerCase)
      .count(_.contains("look for me!"))

Both complete successfully but the Stream version in this case is performing better than the List one:

List    thrpt   15  76.588 ± 2.734  ops/s
Stream  thrpt   15  80.246 ± 2.260  ops/s

Flight Recorder shows what is happening under the hood: the first example with List generates more intermediate results that then increase the amount of work for the garbage collector:

gc-list

Whereas in the Stream case the garbage collector is definitely less busy and less CPU resources are needed in order to free the heap.

gc-stream

In this last case, the amount of extra work on the garbage collector was sufficient to tip the balance in favor of a lazy sequence. Those cases are probably the exceptions rather than the rule but I guess the lesson is that wherever there are long chain of transformations on non lazy collection and/or the suspect that those might involve extra work for the GC, then perhaps the idea to test a lazy alternative should be taken into consideration.

All code and tests available here.

———————————

1. [a Stream is lazy hence we need to turn it into a List to trigger the evaluation]
2. [Chiusano P., Bjarnason R. Functional Programming in Scala. Manning, 2014. p.72]
3. [The code comes from the definition of the Stream class.]
4. [All the numbers regarding performance and potential out of memory errors or general memory usage, depends obviously on the specific hardware and configuration in use (in my case an old laptop and not much heap assigned) but I think the general idea behind those tests, rather than the specific numbers, still stands.]

Microbenchmarking and Performance Tests in Scala: JMH and ScalaMeter

In many contexts, micro-benchmarking and performance are not the main issue and other considerations should drive a developer’s choices regarding implementation. That’s not always the case tough. This post is just to share and illustrate a sort of Scala project template that I put together and found quite useful in few situations where micro-benchmarking and performance might be a concern (or simply a legitimate curiosity). Broadly speaking, in Scala there are two viable approaches: JMH (Java Microbenchmarking Harness) and ScalaMeter and, if you are interested, a discussion about their pros and cons and their place in the Scala ecosystem is available here. To note that JMH is part of OpenJdk project but that doesn’t mean that benchmarks have to run on Open Jdk. As a matter of fact, all the examples presented in this post were executed on an Oracle Jdk. NOTE: all the code is available here.

Settings and dependencies

For JMH, we  can use sbt-jmh, an sbt plugin for JMH, so I added the following line to project/plugins.sbt:

addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.3.4")

And then we set the dependencies in the build.sbt

import sbt.Keys._

lazy val root = (project in file(".")).
  enablePlugins(JmhPlugin).
  settings(
    name := "scala-jmh-performance-testing",
    organization := "LansaloLtd",
    version := "1.0",
    scalaVersion := "2.12.3",
    libraryDependencies ++= Seq(
      "org.openjdk.jmh" % "jmh-generator-annprocess" % "1.21",
      "commons-codec" % "commons-codec" % "1.9",
      "com.storm-enroute" %% "scalameter" % "0.8.2",
      "org.scalatest" %% "scalatest" % "3.0.1" % Test
    ),
    testFrameworks += new TestFramework("org.scalameter.ScalaMeterFramework"),
    parallelExecution in Test := false
  )

Personally I like to use both ScalaMeter and JMH (that is why both are present as dependency above).

Far to be an exhaustive list of pros and cons between the two but I would like to mention at least that the first allows to create graphs really easily and follow the same approach as ScalaCheck with generators that can be composed via for-comprehension, on the other hand is not actively maintained. The second seems to be the preferred choice for Scala and for some measurement is really straight forward. At the present time, the only documentation available for JMH is via examples. Note also that we have introduced a dependency on OpenJDK (org.openjdk.jmh) but that is independent from the runtime jdk.

Targets

Purely for demonstration purposes we need two candidates or targets for our benchmark. For no particular reason I chose:

// candidate 1
def mapOnFunctionsAndList(input: String, slice: Int, funcs: List[String => Int]): List[Int] = {
    funcs.map(f => slide(input, slice).map(f).min)
}

and:

// candidate 2
def foldOnFunctionsList(input: String, slice: Int, funcs: List[String => Int]): List[Int] = {
    funcs.foldLeft(List.empty[Int])((acc, func) => {
      slide(input, slice).map(str => func(str)).min :: acc
    })
}

Basically, we start from a certain number n of functions (String => Int), then we slide a given String obtaining z substrings from it. For each function, 1 to n, we pass all the substrings obtaining z Int but we keep only the minimum.  Therefore, if we start with a list of n functions, we end up with a list on n Int. These two implementations are equivalent and return same results given the same inputs but are there differences when it comes to performances?

JMH

With JMH we can work out pretty easily the throughput for each of our two candidates. First we need to define the targets of our benchmark and a State for them:

package com.lansalo.jmh

import com.lansalo.Target._
import org.openjdk.jmh.annotations._

class TargetPerformance {

  import Scopes._

  def testMapOnFunctionsAndList(state: BenchmarkState): Unit = {
    mapOnFunctionsAndList(state.title, state.slice, state.funcs)
  }

  def testFoldOnFunctionsList(state: BenchmarkState): Unit = {
    foldOnFunctionsList(state.title, state.slice, state.funcs)
  }

}

object Scopes {

  import com.lansalo.HashingFunctions.hashingFunctions

  val novelTitle = "The Persecution and Assassination of Jean-Paul Marat as Performed by the Inmates of the Asylum of Charenton Under the Direction of the Marquis de Sade"

  @State(Scope.Benchmark)
  class BenchmarkState {
    val funcs: List[String => Int] = hashingFunctions(200)
    val title: String = novelTitle
    val slice: Int = 4
  }
}

And then the proper benchmark (that can also be executed within an IDE like IntelliJ):

package com.lansalo.jmh.benchmark

import java.util.concurrent.TimeUnit

import com.lansalo.jmh.{Scopes, TargetPerformance}
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.results.format.ResultFormatType
import org.openjdk.jmh.runner.Runner
import org.openjdk.jmh.runner.options.{Options, OptionsBuilder}

object BenchmarkRunner_ThroughputBenchmark {
  // run sbt clean jmh:compile from terminal first.
  def main(args: Array[String]): Unit = {
    val opt: Options = new OptionsBuilder().include(classOf[ThroughputBenchmark].getSimpleName)
      .resultFormat(ResultFormatType.TEXT).build
    new Runner(opt).run
  }
}

@OutputTimeUnit(TimeUnit.SECONDS)
@Warmup(iterations = 30)
@Measurement(iterations = 30)
@State(Scope.Benchmark)
private class ThroughputBenchmark extends TargetPerformance {

  @Benchmark
  @BenchmarkMode(Array(Mode.Throughput, Mode.AverageTime))
  @Fork(value = 1)
  override def testFoldOnFunctionsList(state: Scopes.BenchmarkState): Unit = super.testFoldOnFunctionsList(state)

  @Benchmark
  @BenchmarkMode(Array(Mode.Throughput, Mode.AverageTime))
  @Fork(value = 1)
  override def testMapOnFunctionsAndList(state: Scopes.BenchmarkState): Unit = super.testMapOnFunctionsAndList(state)

}

In this example, we measure the throughput and average time:

@BenchmarkMode(Array(Mode.Throughput, Mode.AverageTime))

in a separate JVM:

@Fork(value = 1)

We also run 30 warmup iterations (ignored for the final results) and 30 proper iterations where the actual measurement is taken:

@Warmup(iterations = 30)
@Measurement(iterations = 30)

Warmup is needed mainly because of JVM features as just-in-time compilation but that would take us to talk about all the background behind microbenchmarking which would be an entire chapter on its own. I can recommend few readings though: Nanotrusting the Nanotime, Avoiding Benchmarking Pitfalls on the JVM, Java JVM Warmup.1
We are almost ready, run sbt clean jmh:compile from the terminal and then we can run ThroughputBenchmark straight from IntelliJ and obtain something like:

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

Benchmark                                       Mode  Cnt   Score    Error  Units
ThroughputBenchmark.testFoldOnFunctionsList    thrpt   30  95.529 ±  0.940  ops/s
ThroughputBenchmark.testMapOnFunctionsAndList  thrpt   30  71.549 ±  0.734  ops/s
ThroughputBenchmark.testFoldOnFunctionsList     avgt   30   0.010 ±  0.001   s/op
ThroughputBenchmark.testMapOnFunctionsAndList   avgt   30   0.011 ±  0.001   s/op

Not really interested in the numbers themselves (which also depends upon the hardware) but more in the sort of intelligence we can gather in this way. ScalaMeter offer different possibilities and customizations as well. I quite like the graphs bits and with the following code:

package com.lansalo.scalameter.benchmark

import com.lansalo.HashingFunctions.hashingFunctions
import com.lansalo.Target._

import java.io.File

import org.scalameter.{Bench, Gen}
import org.scalameter.persistence.SerializationPersistor

import scala.util.Random

object PerformanceBenchmark extends Bench.OfflineReport {

  override lazy val persistor = SerializationPersistor(new File("target/scalameter/performance/results"))

  val functionsGen: Gen[Int] = Gen.range("functionsNumber")(100, 1000, 100)
  val inputGen: Gen[Int] = Gen.range("imputLength")(20, 200, 20)

  val inputs: Gen[(String, List[String => Int])] = for {
    functionsNumber  foldOnFunctionsList(param._1, 4, param._2)
      }
    }
  }

}

we can obtain the below graph:

map-fold

Where orange is for fold and blue for map.

We can also investigate the two implementations from a memory usage point of view. JMH provides a series of Profilers that can be quite handy in those cases (in the following example we chose a HotspotMemoryProfiler):

object BenchmarkRunner_MemoryFootprint {
  // run sbt clean jmh:compile from terminal first.
  def main(args: Array[String]): Unit = {
    val opt: Options = new OptionsBuilder().include(classOf[MemoryFootprint].getSimpleName).addProfiler(classOf[HotspotMemoryProfiler])
      .resultFormat(ResultFormatType.TEXT).build
    new Runner(opt).run
  }
}

Resulting in a long list of counters (below a partial output):

Benchmark                                                                                             Mode  Cnt            Score   Error  Units
MemoryFootprint.testFoldOnFunctionsList                                                              thrpt                71.335          ops/s
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.0.invocations                              thrpt               108.000              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.0.lastEntryTime                            thrpt       10179095151.000              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.0.lastExitTime                             thrpt       10178954989.000              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.0.time                                     thrpt          75456013.000              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.1.invocations                              thrpt                   ≈ 0              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.1.lastEntryTime                            thrpt                   ≈ 0              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.1.lastExitTime                             thrpt                   ≈ 0              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.1.time                                     thrpt                   ≈ 0              ?

you can take a look at here to see how to make sense of them. The following ScalaMeter example instead provide a more basic picture of the heap usage:

object MemoryBenchmark extends Bench.OfflineReport {

  override lazy val persistor = SerializationPersistor(new File("target/scalameter/memoryusage/results"))

  override def measurer = new Measurer.MemoryFootprint

// rest of the code is the same as above

}

For the two fold/map implementations the graph wouldn’t be so interesting (as they overlap) but you can take a look at another post I wrote about value classes to get an idea.

Finally, if running the benchmark on the Oracle JDK, we can use JMH to exploit the Flight Recorder:

object BenchmarkRunner_FlightRecorder {
  // run sbt clean jmh:compile from terminal first.
  def main(args: Array[String]): Unit = {
    val opt: Options = new OptionsBuilder().include(classOf[FlightRecorderDump].getSimpleName)
      .resultFormat(ResultFormatType.TEXT).build
    new Runner(opt).run
  }
}

@Warmup(iterations = 30)
@Measurement(iterations = 30)
@State(Scope.Benchmark)
private class FlightRecorderDump extends TargetPerformance {

  @Benchmark
  @Fork(value = 1, jvmArgsAppend = Array("-XX:+UnlockCommercialFeatures",
      "-XX:+FlightRecorder", "-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints",
      "-XX:FlightRecorderOptions=defaultrecording=true,dumponexit=true,dumponexitpath=/tmp/foldOnFunctionsList.jfr"))
  override def testFoldOnFunctionsList(state: Scopes.BenchmarkState): Unit = super.testFoldOnFunctionsList(state)

  @Benchmark
  @Fork(value = 1, jvmArgsAppend = Array("-XX:+UnlockCommercialFeatures",
    "-XX:+FlightRecorder", "-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints",
    "-XX:FlightRecorderOptions=defaultrecording=true,dumponexit=true,dumponexitpath=/tmp/mapOnFunctionsAndList.jfr"))
  override def testMapOnFunctionsAndList(state: Scopes.BenchmarkState): Unit = super.testMapOnFunctionsAndList(state)

}

And this will generate two separate reports, /tmp/foldOnFunctionsList.jfr for the fold version and /tmp/mapOnFunctionsAndList.jfr for the version with map. With JMC (Java Mission Control) we can compare a rich collection of parameters and and different aspects  between the two implementations. Below a general overview of the heap usage for the the map implementation, just as an example:

map-perf

—————————————-

1. [In this post a certain familiarity with basic concepts of benchmarking is somehow taken for granted as the main focus it to illustrate how JMH and ScalaMeter can be used to produce some results. Still, the interpretation of those results, require the understanding of the mentioned background]

Spark: How to Add Multiple Columns in Dataframes (and How Not to)

There are generally two ways to dynamically add columns to a dataframe in Spark. A foldLeft or a map (passing a RowEncoder). The foldLeft way is quite popular (and elegant) but recently I came across an issue regarding its performance when the number of columns to add is not trivial. I think it’s worth to share the lesson learned: a map solution offers substantial better performance when the number of columns to be added tends to increase.

Let’s start with an example dataframe with four columns: “id”, “author”, “title”, “incipit” (the opening line of the novel). A simple dataframe with just one row might look something like:

+---+-----------+-------------+-----------------------------+
| id|     author|        title|                      incipit|
+---+-----------+-------------+-----------------------------+
|  6|Leo Tolstoy|Anna Karenina|Happy families are all alike |
+---+-----------+-------------+-----------------------------+

You are given an arbitrary list of words and, for each of them, you would like to add a column (named after the word) to the original dataframe and flag with a boolean whether or not that word appear at least once in the opening line (incipit).
For a list of two words List("families", "giraffe"), the above dataframe will be transformed into the following:

+---+-----------+-------------+-----------------------------+--------+-------+
| id|     author|        title|                      incipit|families|giraffe|
+---+-----------+-------------+-----------------------------+--------+-------+
|  6|Leo Tolstoy|Anna Karenina|Happy families are all alike |    true|  false|
+---+-----------+-------------+-----------------------------+--------+-------+

As the list of columns is arbitrary, there are two possible approaches to this problem. I wrapper both in a method to make testing easier. First approach would be the foldLeft way:

def addColumnsViaFold(df: DataFrame, columns: List[String]): DataFrame = {
  import df.sparkSession.implicits._
  columns.foldLeft(df)((acc, col) => {
    acc.withColumn(col, acc("incipit").as[String].contains(col))
  })
}

And the second one (which involves a bit more coding) is the map way:

def addColumnsViaMap(df: DataFrame, words: List[String]): DataFrame = {
   val encoder = RowEncoder.apply(getSchema(df, words))
   df.map(mappingRows(df.schema)(words))(encoder)
}

private val mappingRows: StructType => List[String] => Row => Row =
  (schema) => (words) => (row) => {
    val addedCols: List[Boolean] = words.map {
      word => row.getString(schema.fieldIndex("incipit")).contains(word)
    }
    Row.merge(row, Row.fromSeq(addedCols))
  }

private def getSchema(df: DataFrame, words: List[String]): StructType = {
  var schema: StructType = df.schema
  words.foreach(word => schema = schema.add(word, DataTypes.BooleanType, false))
  schema
}

The code (including all tests) is available here
When we run the scala meter tests to get some idea of how the two approaches behave when dealing with 100 new columns, we get the following results1. For foldLeft (addColumnsViaFold method):
adding-column-via-fold-100
Whereas those one are the results for map (addColumnsViaMap method):
adding_columns_via_map-100
When the number of columns increases, foldLeft is taking considerably more time. If we take the number of columns to 500 the result is similar (and more dramatic)
Once again, for foldLeft :
adding-column-via-fold
And for map:
adding_columns_via_map-500
With 1000 columns, foldLeft job aborts:

[error] Error running separate JVM: java.lang.OutOfMemoryError: GC overhead limit exceeded

Probably sign that the heap is running low and the CG can’t free much memory. Still, the map based solution seems to cope much better even with 1000 row:
adding_columns_via_map
When it comes to the reason behind this different behavior, my guess would be that somehow Catalyst is not able to optimize the foldLeft operation. The below plans (explain(true)) show how the Parsed and Analyzed Logical Plans end up in a sort of nested structure.

== Parsed Logical Plan ==
Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 20 more fields]
+- AnalysisBarrier
      +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 19 more fields]
         +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 18 more fields]
            +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 17 more fields]
               +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 16 more fields]
                  +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 15 more fields]
                     +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 14 more fields]
                        +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 13 more fields]
                           +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 12 more fields]
                              +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 11 more fields]
                                 +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 10 more fields]
                                    +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 9 more fields]
                                       +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 8 more fields]
                                          +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 7 more fields]
                                             +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 6 more fields]
                                                +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 5 more fields]
                                                   +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 4 more fields]
                                                      +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 3 more fields]
                                                         +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 2 more fields]
                                                            +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, Contains(incipit#12, toing) AS toing#368]
                                                               +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, Contains(incipit#12, hence) AS hence#342]
                                                                  +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, Contains(incipit#12, four) AS four#317]
                                                                     +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, Contains(incipit#12, John) AS John#293]
                                                                        +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, Contains(incipit#12, pop) AS pop#270]
                                                                           +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, Contains(incipit#12, drunk) AS drunk#248]
                                                                              +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, Contains(incipit#12, two) AS two#227]
                                                                                 +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, Contains(incipit#12, when) AS when#207]
                                                                                    +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, Contains(incipit#12, city) AS city#188]
                                                                                       +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, Contains(incipit#12, morning) AS morning#170]
                                                                                          +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, Contains(incipit#12, way) AS way#153]
                                                                                             +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, Contains(incipit#12, pig) AS pig#137]
                                                                                                +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, Contains(incipit#12, happy) AS happy#122]
                                                                                                   +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, Contains(incipit#12, was) AS was#108]
                                                                                                      +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, Contains(incipit#12, anna) AS anna#95]
                                                                                                         +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, Contains(incipit#12, in) AS in#83]
                                                                                                            +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, Contains(incipit#12, man) AS man#72]
                                                                                                               +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, Contains(incipit#12, with) AS with#62]
                                                                                                                  +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, Contains(incipit#12, alike) AS alike#53]
                                                                                                                     +- Project [id#9, author#10, title#11, incipit#12, to#38, Contains(incipit#12, get) AS get#45]
                                                                                                                        +- Project [id#9, author#10, title#11, incipit#12, Contains(incipit#12, to) AS to#38]
                                                                                                                           +- Project [_1#4 AS id#9, _2#5 AS author#10, _3#6 AS title#11, _4#7 AS incipit#12]
                                                                                                                              +- LocalRelation [_1#4, _2#5, _3#6, _4#7]

== Analyzed Logical Plan ==
id: int, author: string, title: string, incipit: string, to: boolean, get: boolean, alike: boolean, with: boolean, man: boolean, in: boolean, anna: boolean, was: boolean, happy: boolean, pig: boolean, way: boolean, morning: boolean, city: boolean, when: boolean, two: boolean, drunk: boolean, pop: boolean, John: boolean, four: boolean, hence: boolean, ... 20 more fields
Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 20 more fields]
+- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 19 more fields]
   +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 18 more fields]
      +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 17 more fields]
         +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 16 more fields]
            +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 15 more fields]
               +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 14 more fields]
                  +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 13 more fields]
                     +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 12 more fields]
                        +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 11 more fields]
                           +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 10 more fields]
                              +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 9 more fields]
                                 +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 8 more fields]
                                    +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 7 more fields]
                                       +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 6 more fields]
                                          +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 5 more fields]
                                             +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 4 more fields]
                                                +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 3 more fields]
                                                   +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 2 more fields]
                                                      +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, Contains(incipit#12, toing) AS toing#368]
                                                         +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, Contains(incipit#12, hence) AS hence#342]
                                                            +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, Contains(incipit#12, four) AS four#317]
                                                               +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, Contains(incipit#12, John) AS John#293]
                                                                  +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, Contains(incipit#12, pop) AS pop#270]
                                                                     +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, Contains(incipit#12, drunk) AS drunk#248]
                                                                        +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, Contains(incipit#12, two) AS two#227]
                                                                           +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, Contains(incipit#12, when) AS when#207]
                                                                              +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, Contains(incipit#12, city) AS city#188]
                                                                                 +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, Contains(incipit#12, morning) AS morning#170]
                                                                                    +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, Contains(incipit#12, way) AS way#153]
                                                                                       +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, Contains(incipit#12, pig) AS pig#137]
                                                                                          +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, Contains(incipit#12, happy) AS happy#122]
                                                                                             +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, Contains(incipit#12, was) AS was#108]
                                                                                                +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, Contains(incipit#12, anna) AS anna#95]
                                                                                                   +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, Contains(incipit#12, in) AS in#83]
                                                                                                      +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, Contains(incipit#12, man) AS man#72]
                                                                                                         +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, Contains(incipit#12, with) AS with#62]
                                                                                                            +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, Contains(incipit#12, alike) AS alike#53]
                                                                                                               +- Project [id#9, author#10, title#11, incipit#12, to#38, Contains(incipit#12, get) AS get#45]
                                                                                                                  +- Project [id#9, author#10, title#11, incipit#12, Contains(incipit#12, to) AS to#38]
                                                                                                                     +- Project [_1#4 AS id#9, _2#5 AS author#10, _3#6 AS title#11, _4#7 AS incipit#12]
                                                                                                                        +- LocalRelation [_1#4, _2#5, _3#6, _4#7]

== Optimized Logical Plan ==
Project [id#9, author#10, title#11, incipit#12, Contains(incipit#12, to) AS to#38, Contains(incipit#12, get) AS get#45, Contains(incipit#12, alike) AS alike#53, Contains(incipit#12, with) AS with#62, Contains(incipit#12, man) AS man#72, Contains(incipit#12, in) AS in#83, Contains(incipit#12, anna) AS anna#95, Contains(incipit#12, was) AS was#108, Contains(incipit#12, happy) AS happy#122, Contains(incipit#12, pig) AS pig#137, Contains(incipit#12, way) AS way#153, Contains(incipit#12, morning) AS morning#170, Contains(incipit#12, city) AS city#188, Contains(incipit#12, when) AS when#207, Contains(incipit#12, two) AS two#227, Contains(incipit#12, drunk) AS drunk#248, Contains(incipit#12, pop) AS pop#270, Contains(incipit#12, John) AS John#293, Contains(incipit#12, four) AS four#317, Contains(incipit#12, hence) AS hence#342, ... 20 more fields]
+- InMemoryRelation [id#9, author#10, title#11, incipit#12], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
      +- LocalTableScan [id#9, author#10, title#11, incipit#12]

== Physical Plan ==
*(1) Project [id#9, author#10, title#11, incipit#12, Contains(incipit#12, to) AS to#38, Contains(incipit#12, get) AS get#45, Contains(incipit#12, alike) AS alike#53, Contains(incipit#12, with) AS with#62, Contains(incipit#12, man) AS man#72, Contains(incipit#12, in) AS in#83, Contains(incipit#12, anna) AS anna#95, Contains(incipit#12, was) AS was#108, Contains(incipit#12, happy) AS happy#122, Contains(incipit#12, pig) AS pig#137, Contains(incipit#12, way) AS way#153, Contains(incipit#12, morning) AS morning#170, Contains(incipit#12, city) AS city#188, Contains(incipit#12, when) AS when#207, Contains(incipit#12, two) AS two#227, Contains(incipit#12, drunk) AS drunk#248, Contains(incipit#12, pop) AS pop#270, Contains(incipit#12, John) AS John#293, Contains(incipit#12, four) AS four#317, Contains(incipit#12, hence) AS hence#342, ... 20 more fields]
+- InMemoryTableScan [author#10, id#9, incipit#12, title#11]
      +- InMemoryRelation [id#9, author#10, title#11, incipit#12], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- LocalTableScan [id#9, author#10, title#11, incipit#12]

Not sure whether that is somehow related to the Catalyst inability to optimize foldLeft but the explained map plan doesn’t show the same nested structure:

== Parsed Logical Plan ==
'SerializeFromObject [validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, id), IntegerType) AS id#83, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, author), StringType), true, false) AS author#84, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, title), StringType), true, false) AS title#85, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, incipit), StringType), true, false) AS incipit#86, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, to), BooleanType) AS to#87, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, get), BooleanType) AS get#88, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, alike), BooleanType) AS alike#89, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, with), BooleanType) AS with#90, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, man), BooleanType) AS man#91, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, in), BooleanType) AS in#92, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, anna), BooleanType) AS anna#93, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, was), BooleanType) AS was#94, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, happy), BooleanType) AS happy#95, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, pig), BooleanType) AS pig#96, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, way), BooleanType) AS way#97, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 15, morning), BooleanType) AS morning#98, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 16, city), BooleanType) AS city#99, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 17, when), BooleanType) AS when#100, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 18, two), BooleanType) AS two#101, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 19, drunk), BooleanType) AS drunk#102, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 20, pop), BooleanType) AS pop#103, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 21, John), BooleanType) AS John#104, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 22, four), BooleanType) AS four#105, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 23, hence), BooleanType) AS hence#106, ... 20 more fields]
+- 'MapElements , interface org.apache.spark.sql.Row, [StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true)], obj#82: org.apache.spark.sql.Row
   +- 'DeserializeToObject unresolveddeserializer(createexternalrow(getcolumnbyordinal(0, IntegerType), getcolumnbyordinal(1, StringType).toString, getcolumnbyordinal(2, StringType).toString, getcolumnbyordinal(3, StringType).toString, StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true))), obj#81: org.apache.spark.sql.Row
      +- AnalysisBarrier
            +- Project [_1#4 AS id#9, _2#5 AS author#10, _3#6 AS title#11, _4#7 AS incipit#12]
               +- LocalRelation [_1#4, _2#5, _3#6, _4#7]

== Analyzed Logical Plan ==
id: int, author: string, title: string, incipit: string, to: boolean, get: boolean, alike: boolean, with: boolean, man: boolean, in: boolean, anna: boolean, was: boolean, happy: boolean, pig: boolean, way: boolean, morning: boolean, city: boolean, when: boolean, two: boolean, drunk: boolean, pop: boolean, John: boolean, four: boolean, hence: boolean, ... 20 more fields
SerializeFromObject [validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, id), IntegerType) AS id#83, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, author), StringType), true, false) AS author#84, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, title), StringType), true, false) AS title#85, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, incipit), StringType), true, false) AS incipit#86, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, to), BooleanType) AS to#87, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, get), BooleanType) AS get#88, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, alike), BooleanType) AS alike#89, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, with), BooleanType) AS with#90, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, man), BooleanType) AS man#91, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, in), BooleanType) AS in#92, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, anna), BooleanType) AS anna#93, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, was), BooleanType) AS was#94, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, happy), BooleanType) AS happy#95, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, pig), BooleanType) AS pig#96, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, way), BooleanType) AS way#97, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 15, morning), BooleanType) AS morning#98, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 16, city), BooleanType) AS city#99, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 17, when), BooleanType) AS when#100, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 18, two), BooleanType) AS two#101, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 19, drunk), BooleanType) AS drunk#102, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 20, pop), BooleanType) AS pop#103, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 21, John), BooleanType) AS John#104, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 22, four), BooleanType) AS four#105, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 23, hence), BooleanType) AS hence#106, ... 20 more fields]
+- MapElements , interface org.apache.spark.sql.Row, [StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true)], obj#82: org.apache.spark.sql.Row
   +- DeserializeToObject createexternalrow(id#9, author#10.toString, title#11.toString, incipit#12.toString, StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true)), obj#81: org.apache.spark.sql.Row
      +- Project [_1#4 AS id#9, _2#5 AS author#10, _3#6 AS title#11, _4#7 AS incipit#12]
         +- LocalRelation [_1#4, _2#5, _3#6, _4#7]

== Optimized Logical Plan ==
SerializeFromObject [validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, id), IntegerType) AS id#83, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, author), StringType), true, false) AS author#84, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, title), StringType), true, false) AS title#85, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, incipit), StringType), true, false) AS incipit#86, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, to), BooleanType) AS to#87, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, get), BooleanType) AS get#88, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, alike), BooleanType) AS alike#89, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, with), BooleanType) AS with#90, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, man), BooleanType) AS man#91, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, in), BooleanType) AS in#92, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, anna), BooleanType) AS anna#93, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, was), BooleanType) AS was#94, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, happy), BooleanType) AS happy#95, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, pig), BooleanType) AS pig#96, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, way), BooleanType) AS way#97, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 15, morning), BooleanType) AS morning#98, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 16, city), BooleanType) AS city#99, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 17, when), BooleanType) AS when#100, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 18, two), BooleanType) AS two#101, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 19, drunk), BooleanType) AS drunk#102, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 20, pop), BooleanType) AS pop#103, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 21, John), BooleanType) AS John#104, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 22, four), BooleanType) AS four#105, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 23, hence), BooleanType) AS hence#106, ... 20 more fields]
+- MapElements , interface org.apache.spark.sql.Row, [StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true)], obj#82: org.apache.spark.sql.Row
   +- DeserializeToObject createexternalrow(id#9, author#10.toString, title#11.toString, incipit#12.toString, StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true)), obj#81: org.apache.spark.sql.Row
      +- InMemoryRelation [id#9, author#10, title#11, incipit#12], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- LocalTableScan [id#9, author#10, title#11, incipit#12]

== Physical Plan ==
*(1) SerializeFromObject [validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, id), IntegerType) AS id#83, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, author), StringType), true, false) AS author#84, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, title), StringType), true, false) AS title#85, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, incipit), StringType), true, false) AS incipit#86, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, to), BooleanType) AS to#87, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, get), BooleanType) AS get#88, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, alike), BooleanType) AS alike#89, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, with), BooleanType) AS with#90, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, man), BooleanType) AS man#91, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, in), BooleanType) AS in#92, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, anna), BooleanType) AS anna#93, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, was), BooleanType) AS was#94, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, happy), BooleanType) AS happy#95, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, pig), BooleanType) AS pig#96, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, way), BooleanType) AS way#97, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 15, morning), BooleanType) AS morning#98, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 16, city), BooleanType) AS city#99, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 17, when), BooleanType) AS when#100, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 18, two), BooleanType) AS two#101, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 19, drunk), BooleanType) AS drunk#102, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 20, pop), BooleanType) AS pop#103, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 21, John), BooleanType) AS John#104, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 22, four), BooleanType) AS four#105, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 23, hence), BooleanType) AS hence#106, ... 20 more fields]
+- *(1) MapElements , obj#82: org.apache.spark.sql.Row
   +- *(1) DeserializeToObject createexternalrow(id#9, author#10.toString, title#11.toString, incipit#12.toString, StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true)), obj#81: org.apache.spark.sql.Row
      +- InMemoryTableScan [id#9, author#10, title#11, incipit#12]
            +- InMemoryRelation [id#9, author#10, title#11, incipit#12], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- LocalTableScan [id#9, author#10, title#11, incipit#12]

Whatever the root cause is, the conclusion is clear. As per Spark 2.3.0 (and probably previous versions) adding (dynamically) a congruous number of columns to a dataframe should be done via a map operation and not foldLeft for the reasons we’ve seen.

—————————————-

1. [I run the tests on a virtual box with three cores running Spark 2.3.0. Time in milliseconds reflects the underline infrastructure and I would expect different performance on a proper cluster. Nevertheless, that a foldLeft solution performs significantly worse that a map based solution should be an outcome independent of the underline hardware.]

Custom JSON serializer

In Scala there are quite a few frameworks that make the implementation of RESTful JSON services quite straightforward (Play, Scalatra, Akka HTTP are the main ones). They all follow the same general idea: a JSON payload deserialize into a Scala class (usually a case class) and serialize back into JSON. For example a JSON payload like:

{"name": "Paul", "age": 45, "gender": "male"}

Can be easily deserialized into something like:

final case class User(name: String, age: Int, gender: String)

This is pretty standard stuff and works almost out of the box in all the mentioned framework but in some cases you might want to take advantage of the type system for example by defining a type for “age”:

// A value class representing a person's age
final class Age(val age: Int) extends AnyVal

And also constant values for “gender” rather than a generic String type:

sealed trait Gender {
  val sex: String
}

final case object Male extends Gender {
  val sex = "male"
}

final case object Female extends Gender {
  val sex = "female"
}

final case object Other extends Gender {
  val sex = "other"
}

object GenderHelper {
  private val genders: Set[Gender] = Set(Male, Female, Other)
  def toGender(sex: String): Option[Gender] = genders.find(_.sex == sex)
}

We can now define User in a more robust way as:

final case class User(name: String, age: Age, gender: Gender)

Which looks better but doesn’t work out of the box anymore as, regardless of the JSON scala library we use, there is no way to tell how to serialize/deserialize types like Age or Gender. We need to write our own JSON custom serializer/deserializer. The following example uses Akka HTTP and Json4s (code is available here).

We first need to write two serializer/deserializer. One for Age:

import com.lansalo.model.Age
import org.json4s.CustomSerializer
import org.json4s.JsonAST.JInt

case object AgeSerializer extends CustomSerializer[Age](format => ( {
  case JInt(age) => new Age(age.intValue)
}, {
  case age: Int => JInt(BigInt(age))
}))

And one for Gender:

import com.lansalo.model.{Gender, GenderHelper}
import org.json4s.CustomSerializer
import org.json4s.JsonAST.JString

case object GenderSerializer extends CustomSerializer[Gender](format => ( {
  case JString(gender) => GenderHelper.toGender(gender).get
}, {
  case gender: Gender => JString(gender.sex)
}))

Then we just need to add them to the default json4s formats (in a Trait here):

import com.lansalo.json.serializer.{AgeSerializer, GenderSerializer}
import org.json4s.DefaultFormats

trait JsonSupport {
  implicit val formats = DefaultFormats + GenderSerializer + AgeSerializer
}

And finally bring the json formats in scope where we need it:

import akka.actor.ActorSystem
import akka.event.Logging
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.directives.MethodDirectives.{get, post}
import akka.http.scaladsl.server.directives.PathDirectives.path
import akka.http.scaladsl.server.directives.RouteDirectives.complete
import akka.util.Timeout
import com.lansalo.json.JsonSupport
import com.lansalo.model.{Age, Female, User}
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.jackson

import scala.concurrent.duration._

trait UserRoutes extends JsonSupport {

  implicit def system: ActorSystem

  lazy val log = Logging(system, classOf[UserRoutes])

  implicit lazy val timeout = Timeout(5.seconds)
  import Json4sSupport._

  implicit val serialization = jackson.Serialization // or native.Serialization

  lazy val userRoutes: Route = pathPrefix("users") {
    pathEnd {
      post {
        entity(as[User]) { user =>
          log.info(s"Received user: $user")
          complete((StatusCodes.Created, "OK"))
        }
      }
    } ~
      path(Segment) { name =>
        get {
          complete(User(name, new Age(22), Female))
        }
      }
  }
}

 

Scala Value Classes: Type-Safety for Free

Value classes are available since Scala 2.10 and the initial proposal SP-15 included several use case for this new mechanism but on this post I would like to focus on one in particular: the possibility to create wrappers with no boxing overhead and how we can leverage on this to improve our type system at zero cost.
And in doing that, I’ll add also some number to show what “zero cost” exactly means. The class Wrapper below:

class Wrapper(val flag: Boolean) extends AnyVal

is a value class. Generally speaking a value class has a single, public val parameter and extends directly AnyVal. There is a specific list of criteria that a value class must satisfy in order to be considered as such (for all the details, including universal traits, see SP-15) but here I would like to focus on the general idea behind such mechanism and it’s benefits.

In the example above, Wrapper is the type at compile time but when we run the program, at runtime the representation is a boolean and the “free” in the title refers precisely to the fact that Wrapper never gets actually created at runtime hence no boxing overhead and no extra memory allocation (but I’ll get back on this after an example).

So far so good… but how could we exploit this feature at our advantage? An example (perhaps a bit clumsy) should illustrate the point. Let’s say we are working on an online auction web site and we have an entity for a generic auction item:

final case class Item (
   itemId: Int,
   asked: Double,
   soldPrice: Double
)

The model is pretty simple: an id to identify uniquely an item, the asking price expressed as double (although in a real case scenario BigDecimal1 would be the right choice) and the price the item has been sold for. And, for no particular reason beyond our curiosity, we also have a method to check whether, overall, the buyers payed more than the asking price (for example for all the items sold in the last month):

def totalPerformanceBase(list: List[Item]): Double = {
   list.foldLeft(0d)((acc, item) => {
      acc + (item.soldPrice - item.asked)
   })
}

The method is called totalPerformance because we are going to check the performance of this method later and use it as our base reference.

Representing a price as a Double2 is perfectly fine but on the other hand we also have bets, shipping costs and quite likely many other domain models which include a price. We could create our own type to represent specifically an Item‘s price in order to make our code more robust and prevent common mistakes (pass a price for a bet to an item or vice versa), after all that’s the advantage of having a type system. The compiler will spot immediately the error, less runtime bugs, improved IDE experience and so forth. So we write a wrapper for our price:

final case class Price(price: Double)

And re-factor the Item class:

final case class Item (
   itemId: Int,
   asked: Price,
   soldPrice: Price
)

Also totalPerformance needs to be refactored accordingly:

def totalPerformance(list: List[Item]): Double = {
   list.foldLeft(0d)((acc, item) => {
      acc + (item.soldPrice.price - item.asked.price)
   })
}

In the graph below, the blue line represents our base implementation with a Double whereas the orange line represents the performance of the version with the wrapper Price.
Selection_01

You can find the Scala meter tests and all the code here, but bear in mind that results might vary based on your hardware. However the important point to notice here is that introducing Price as a wrapper for extra type safety comes with a small price in terms of performance. When totalPerformance crunches a list of half a million items we can see a difference of a couple of milliseconds3. Besides, because the wrapper Price gets actually instantiated, also from a memory usage point of view we can note a similar behavior. Let’s consider for example the following methods:

// For the original Item
def instantiate(list: List[Int]): List[Item] = {
  list.map(n => Item(n, n, n + 1.0))
}

// For the Item using Price as wrapper
def instantiate(list: List[Int]): List[Item] = {
   list.map(n => Item(n, Price(n), Price(n + 1.0)))
}

From the graph below we can notice the different heap memory usage while increasing progressively the number of items (in orange the implementation with Price wrapper and blue the original implementation with Double)

Selection_02

Just one note: the y-axis’ measure is actually in Kb (that’s a known issue with Scala Meter) so when the size is 970000, for instance, the heap usage for the Item with Double is 54222.97 Kb against one of 93120.00 Kb for the version with the price wrapper.

Value classes comes into play because allow us to keep the type safety we gain with the wrapper without paying the extra price (both in terms of execution time and memory usage).
We can change slightly the Price definition to make it a value class:

final case class PriceVal(price: Double) extends AnyVal {
   def -(that: PriceVal) = PriceVal(this.price - that.price)
   def +(that: PriceVal) = PriceVal(this.price + that.price)
}

and change our Item definition to reflect this:

final case class Item (
   itemId: Int,
   asked: PriceVal,
   soldPrice: PriceVal
)

We also need to refactor a bit our methods to accommodate the new Item‘s definition:

def totalPerformance(list: List[Item]): PriceVal = {
   list.foldLeft(PriceVal(0d))((acc, item) => {
     acc + (item.soldPrice - item.asked)
   })
}

def instantiate(list: List[Int]): List[Item] = {
   list.map(n => Item(n, PriceVal(n), PriceVal(n + 1.0)))
}

Now we can compare totalPerformance time usage with all the solutions we came up with:

Selection_03

In orange is the implementation with the wrapper Price
In blue is the original implementation of Item with Double
in green the one with the value class PriceVal

Is quite evident how there isn’t any significant difference between the base implementation with Double and the one with value classes. On the other hand, the version with a Price wrapper doesn’t perform as efficiently as the others. Analogous outcome when the result for the memory usage is examined:

Selection_04-bis

Once again, no difference between the Double and value class implementation and that’s why you see only one line with the green and blue lines perfectly overlapped.
In case the two overlapping lines hide a bit too much what’s happening, the graph below instead compare just the memory usage of the Price wrapper solution (in orange) against the version using PriceVal value class (in green)

Selection_04

———————————-

1. [Here there is a good explanation of why double is not a wise choice when dealing with money but for the purpose of this post is perfectly fine and illustrate the point quite effectively]
2. [Note that in Scala, Double itself is not represented by an object in the underlying runtime system and is equivalent to a double Java primitive]
3. [Whether those few milliseconds actually make a difference it largely depends on the context. For the purpose of this post, the only relevant aspect is that there is a difference]

Scala Variance Explained

There are quite a few articles and resources about variance in Scala available on line. Still, I decided to add my own contribute with this post, trying to keep the whole topic as simple as I could, using familiar concept, few diagrams and some code you can play with. That hopefully will make the whole thing a bit easier to grasp. I also decided not to include reference to category theory: not because of not use (on the contrary) but because I hope this rough explanation could work as a sort of encouragement and introduction to it (if you want to dig it further I linked some resources at the end of the page).

Kingdom Animalia

As we need an example, actually an analogy, I chose something that lack originality but not familiarity (hopefully). The code is available here but you won’t find any surprise in it. It’s just a hierarchy of classes modeling the relationship between different groups of animals:
Kingdom Animalia

Variance

From the hierarchy above Vertebrate is the parent class of Fish (or Fish a subtype of Vertebrate) and we can clearly write something like:

val vertebrate: Vertebrate = new Fish()

And that’s because a fish is a vertebrate (or at least we modeled it wisely as such). But what about a higher-kinded types like T[A]? Let’s say for example that we have a generic class Cage[A] that takes a type parameter, what can be said about the relationship between Cage[Vertebrate] and Cage[Fish]? Is Cage[Vertebrate] the parent of Cage[Fish]? Does the same relationship between a vertebrate and a fish apply to the respective higher-kinded type?

functor
Variance is mainly about how we answer this question. We have precisely three options: invariance, covariance, and contravariance

Invariance

In the invariant case the answer to the previous question is pretty simple. There is no relationship between the types and their corresponding higher-kinded types.

invariance
An invariant class Cage would be defined in the following way in Scala:

class Cage[A]

Then we can unsurprisingly write val mammalCage: Cage[Mammal] = new Cage[Mammal] but, despite Primate being a subtype of Mammal, the following will not compile:

val primateCage: Cage[Mammal] = new Cage[Primate]
// Expect an error like:
// [error] Note: kingdom.animalia.vertebrate.mammal.primate.Primate <: kingdom.animalia.vertebrate.mammal.Mammal, but class Cage is invariant in type A.
// [error] You may wish to define A as +A instead.

And for analogous reasons val vertebrateCage: Cage[Mammal] = new Cage[Vertebrate] won’t compile either. Conclusion: an invariant class Cage does not preserve the inheritance relationship between its type arguments.

Covariance

In the covariant case the answer to our initial question is more articulate and interesting. If B is a subtype of A, then even F[B] is a subtype of F[A].

covariance
Back to our example, a covariant Cage class in Scala would look like:

class Cage[+A]

And:

val mammalCage: Cage[Mammal] = new Cage[Mammal]
val primateCage: Cage[Mammal] = new Cage[Primate]
val homoCage: Cage[Mammal] = new Cage[Homo]

Will all compile without errors because of the parent/child relationship between mammal/primate (or mammal/homo) is extended to the higher-kinded type Cage so that Cage[Mammal] is the parent of Cage[Primate]. Of course the same won’t be true of:

// expect a type mismatch error if you try to compile any of these
val reptileCage: Cage[Mammal] = new Cage[Reptile]
val vertebrateCage: Cage[Mammal] = new Cage[Vertebrate]

As there is no parent/child relationship between a mammal and a reptile, nor between a mammal and a vertebrate. In this last case though, the opposite is true actually. All mammals are vertebrate and we modeled Vertebrate to be the parent of Mammal and that introduce us to the next topic.

Contravariance

As you might have guessed at this point, contravariance is the opposite of covariance:

contravariance
We still extend the types’ inheritance to the corresponding higher-kinded type but in this case, so to speak, the arrow is inverted. More precisely, if B is a subtype of A, then F[A] is a subtype of F[B] (note the difference with the covariant case). A contravariant Cage class in Scala would look like:

class Cage[-A]

Then in such a cage for a mammal, we can keep:

val mammalCage: Cage[Mammal] = new Cage[Mammal]
val vertebrateCage: Cage[Mammal] = new Cage[Vertebrate]
val animalCage: Cage[Mammal] = new Cage[Animal]

But the following lines will not compile:

// type mismatch error
val primateCage: Cage[Mammal] = new Cage[Primate]
// type mismatch error
val molluscCage: Cage[Mammal] = new Cage[Mollusc]

Recap

  • In an invariant Cage[Mammal] I can keep only a type Mammal (not its subtypes or supertypes)
  • In a covariant Cage[Mammal] I can keep a type Mammal or one of its subtypes (but not its supertypes)
  • In a contravariant Cage[Mammal] I can keep a type Mammal or one of its supertypes (but not its subtypes)

And note that this restriction is imposed at compile time. In other words, the compiler will check for you what can and cannot be passed to an higher-kinded type accordingly to its type arguments variance definition.

Covariant type A occurs in contravariant position

That was a sort of really general overview but It might leave you with some question about the use of variance from a practical perspective, including advantages and disadvantages. In this section (and the following), I’ll try to present some example along with some common Scala class that makes use of variance and I’ll start first with some code that does not compile: a broken cage.

class BrokenCage[+A](var guest: A)

The BrokenCage class is defined as covariant in is type but then we simply pass a guest of type A. The compiler will not digest that and will return an error like covariant type A occurs in contravariant position in type A of value guest.
To explain what is happening here and why the compiler is not happy, an absurd reasoning might help. So let’s assume for a moment that the code above compile and see what happens. I first create a cage with a primate in it:

val primateCage = new BrokenCage[Primate](new Primate)

Because BrokenCage is covariant in A, BrokenCage[Primate] is a subtype of BrokenCage[Animal] and therefore we can assign a BrokenCage[Primate] to a BrokenCage[Animal]

val animalCage: BrokenCage[Animal] = primateCage

So now we have an animalCage of type BrokenCage[Animal]. Then there shouldn’t be any problem to assign an invertebrate as a guest. After all an invertebrate is an animal and BrokenCage is covariant in its type.

animalCage.guest = new Invertebrate

But our animalCage is the primateCage we define at the beginning so basically we managed to put an invertebrate in a cage for a primate! This issue is known as heap pollution and the Scala compiler will prevent these sort of problems from happening, but it can occur for example in a normal Java array:

Primate[] primates = new Primate[5];
Animal[] animals =  primates;
animals[0] = new Invertebrate();

This code will compile and will result in an ArrayStoreException at runtime. One of the advantages of variance in Scala is precisely that these kind of error are captured at compile time rather than at runtime.
And just as a note (as it’s out of scope for the present post), the BrokenCage class can be fixed either using a val (rather than a var) or an upper type bound like in:

class FixedCage[+A, B <: A](var guest: B)

Where we specify that the guest B has to be a subtype of A (there are a couple of simple test around it in the code)

Option

To see how variance (covariance in the specific case) is used in Scala, the familiar Option trait is a good example. A simple implementation would be (and the actual Scala implementation is not much different):

sealed trait Option[+A]

case class Some[+A]( value: A ) extends Option[A]
case object None extends Option[Nothing]

Option is covariant in it’s type A (and so it’s Some). But why is covariant might not be immediately evident so to understand a bit more about it, let’s try to make it invariant and see what sort of behavior we should expect as a result of it:

sealed trait Option[A]

case class Some[A]( value: A ) extends Option[A]
case object None extends Option[Nothing]

The first thing to notice is that even Some has to be invariant in its type argument A as it extends Option and now Option is invariant in A. Besides (not surprisingly) we cannot assign an Option[Primate] to an Option[Mammal].

var optionalMammal: Option[Mammal] = Some(new Mammal)
val optionalPrimate: Option[Primate] = Some(new Primate)
// This won't compile
// optionalMammal = optionalPrimate

Despite Primate being a subtype of Mammal, that relationship doesn’t get passed to our invariant version of Option. Same applies to this invariant version of Some (for the same reason). Perhaps not so obvious, we cannot assign a None to Option[Mammal]:

// This won't compile:
// val mammal: Option[Mammal] = None

Nothing is a subtype of any type in Scala but this time Option is invariant so we can’t assign to an Option[Mammal] anything that is not an Option[Mammal], hence not an Option[Nothing], nor None which extends Option[Nothing]. We could get around this inconvenient with an upper type bound defining Option trait like sealed trait Option[A <: Nothing] but I let you imagine the usefulness of such Option type. More generally, an invariant version of Option fails to meet those expectation an Option type is supposed to deliver (in the code, you can find also a version of a covariant Option with an invariant definition for Some).

Function

Functions are another interesting example of variance usage in the Scala language. The trait Function1 is defined as contravariant in its argument and covariant in its returned type. Simplifying it looks like:

trait Function1[-X,+Y] {
  def apply(arg: X): Y
}

What we are saying with such type definition is that if A is a supertype of X and B is a subtype of Y, then Function1[A,B] is a subtype of Function1[X,Y]. In terms of the kingdom animalia analogy used so far, Function1[Vertebrate,Homo] is a subtype of Function1[Mammal,Primate] as Vertebrate is a supertype of Mammal and Homo is a subtype of Primate:
functions

Although the reason for such a choice might not be immediately evident. When we think about a class in our animal kingdom it’s quite intuitive what a parent/child relationship involves. Primate is a subtype of Mammal because all Primates are Mammals. Hence, if you ask me for a mammal, I can give you a primate as a primate is a mammal. This concept has a name: Liskov substitution principle. It states (from Wikipedia):

If S is a subtype of T, then objects of type T may be replaced with objects of type S (i.e. an object of type T may be substituted with any object of a subtype S) without altering any of the desirable properties of T

Back to our example, if we want to understand why Function1[Vertebrate,Homo] is a subtype of Function1[Mammal,Primate], or in other words, why Function1 is defined as contravariant in its argument and covariant in its returned type, then we need to start from those desirable properties that the present definition is not supposed to alter (and that, likely, a different definition would have altered instead). For a function (and for a category in category theory) we expect the composition operation as a primitive. Function composition can be defined more rigorously but intuitively it says that if we have a function from A to B and a function from B to C, then we can always compose the two functions and obtain a function from A to C.

function1_right
Translated in Scala

val f: Vertebrate => Mammal = ???
val g: Mammal => Primate = ???

// We can compose those to obtain a new function Vertebrate => Primate
val fromVertebrateToPrimate: Vertebrate => Primate = f andThen g // or g compose f

Given the definition of Function we saw, can we safely say that replacing a function with one of its subtypes doesn’t alter the compose operation? Let’s start checking whether making the return type covariant is actually a problem.
Assume g stays the same but we replace f with a subtype of f. Accordingly to our definition of function the return type is covariant so Vertebrate => Homo is a subtype of f (as Homo is a subtype of Mammal)

function-compose2

It should be clear from the picture that replacing f with its subtype doesn’t prevent us from compose the two functions. g takes a Mammal as argument and Homo is a Mammal.
Now let’s keep f as it is and replace g with a subtype of g. As a function is contravariant in its argument, a function Vertebrate => Primate is a subtype of g (as the argument Vertebrate is a supertype of Mammal)

function-compose1

Even in this case the composition is preserved. g takes a vertebrate now and all mammals are vertebrate. To complete the picture, remain to be seen how a different definition of function would have impact on the composition operation. What for example if the return type is contravariant? Then we could replace f with a function Vertebrate => Vertebrate (with the return type being a a supertype of Mammal and therefore Vertebrate => Vertebrate a subtype of Vertebrate => Mammal). Would that impact on our ability to compose the two functions?

function-boken1

Yes, as the picture above show. For example we pass a vertebrate to f and f returns a fish (which is a vertebrate but not a mammal). g is not defined for fishes though, only for mammals.
Similar outcome if we define function as being covariant in its argument. In the following example we keep f unchanged but we replace the original g with a subtype: Primate => Primate. If we define Function as covariant in its argument then I can replace Mammal with its subtype Primate and therefore Primate => Primate would be a legitimate subtype of Mammal => Primate accordingly to this definition:

function-broken2

Even in this case our ability to compose the two function is lost as g is not defined for all the possible mammals but just for a specific subset of them: primates. If f returns a dog, g wouldn’t know what to do with it.
The lesson in case of function can be expressed in more formal way introducing the concept of domain and codomain of a function. A domain is the set of values for which the function is defined (its possible arguments) and the codomain is the set of all possible values that the function can return. So, for instance, in a function Mammal => Homo the set of all mammals is the domain of the function and the set of all humans is the codomain. We can now say that given two functions A => B and C => D, those can be composed into a function A => D if and only if the codomain of the first function (B) is a subset of the domain of the second function (C). And that is what is captured in Scala definition of function with its contravariant argument type and covariant returned type.

Code is available here: https://github.com/lansaloltd/type-variance/
And here: https://github.com/lansaloltd/kingdom-animalia