Scala Type Classes and Family Resemblances

This post is not a general introduction to type classes in Scala. There are quite a few good posts already on the subject (here, here and here for example), but it is about type classes and the somewhat-cryptic title require a brief explanation. In Scala with Cats, the authors describe a type class as a programming pattern that “allow us to extend existing libraries with new functionality, without using traditional inheritance, and without altering the original library source code.” Although that is not the only possible definition, it’s a perfect starting point for what I would like to cover. Can type classes help us at “removing” old functionalities in the same way? And how and why we might end up in such a situation? Let’s try to model a simple type for a bird to see how the issue could arise:

sealed trait Bird {
  def fly: Unit
}

In a OOP approach we could proceed adding a few implementations, a parrot for example:

sealed trait Bird {
  def fly: Unit
}

class Parrot(name: String) extends Bird {
  override def fly: Unit = println(s"I'm a parrot and I can fly. My name is $name")
}

And perhaps a few other birds until we get to penguins. Penguins are birds, but there is a problem: penguins don’t fly.

sealed trait Bird {
  def fly: Unit
}

class Penguin(name: String) extends Bird {
  override def fly: Unit = ??? // throw new UnsupportedOperationException("Sorry, can't fly")
}

A solution might be to throw an UnsupportedOperationException but not an optimal one you might argue: catching those problems at compile time would be better. We probably were a bit rushed to add a fly method as part of the Bird trait. Not every birds flies and penguins are not the only exception. But was it just an oversight or there is more?

Ludwig Wittgenstein introduced in his Philosophical Investigations the the idea of family resemblance to describe certain concepts in our language for which we don’t have a clear set of traits shared across all members but more a general overlapping mesh of these features like in a large family picture where we can recognize each single member as part of the family and the common traits (hair color, shape of the nose, etc.) spread across them but where no definite set of those traits is shared by every single member.

Back to our problem, “flying” is such a common characteristic of birds that is almost understandable why that def fly slipped within the Bird trait.1
But perhaps there is something that a type class approach could do to mitigate those issues (all code available here2). Assume we have the same birds as above:

sealed trait Bird // no fly method now

class Parrot(val name: String) extends Bird
class Penguin(val name: String) extends Bird

And proceed defining the various components of our type classes. First the traits/behavior (note that there is no mention of Bird in them)

trait Fly[A] {
  def fly(a: A): Unit
}

and

// for penguins
trait Swim[A] {
  def swim(a: A): Unit
}
// for parrots
trait Talk[A] {
  def talk(a: A): Unit
}

and then two instances implementing the above traits (one for a penguin and the other for a parrot):

implicit val parrotFly: Fly[Parrot] = new Fly[Parrot] {
    def fly(parrot: Parrot): Unit = {
      println(s"I'm a parrot and I can fly. My name is ${parrot.name}")
    }
  }

implicit val parrotTalk: Talk[Parrot] = new Talk[Parrot] {
    def talk(parrot: Parrot): Unit = {
      println(s"I'm a parrot and I can talk. My name is ${parrot.name}")
    }
  }

  implicit val penguinSwim: Swim[Penguin] = new Swim[Penguin] {
    def swim(penguin: Penguin): Unit = {
      println(s"I'm a penguin and I can swim. My name is ${penguin.name}")
    }
  }

And finally the api, the last bit that acts like glue and, relying on implicits, puts everything together.

  implicit class FlyOps[A](val value: A) extends AnyVal {
    def fly(implicit flyInstance: Fly[A]): Unit = flyInstance.fly(value)
  }

implicit class TalkOps[A](val value: A) extends AnyVal {
    def talk(implicit talkInstance: Talk[A]): Unit = talkInstance.talk(value)
  }

  implicit class SwimOps[A](val value: A) extends AnyVal {
    def swim(implicit swimInstance: Swim[A]): Unit = swimInstance.swim(value)
  }

Now that we have all the pieces in place, we can see if we achieved something good out of it. We need two birds first:

val parrot = new Parrot("George")
val penguin = new Penguin("Pingu")

The api and instances defined above need to be imported for the implicits to do their magic:

// here is where I defined the api in the sample code
import com.lansalo.family.resemblance.fp.api.ImplicitBehavioursApi._
// here the instances implementing the traits
import com.lansalo.family.resemblance.fp.Instances._ 

parrot.fly
// As expected, parrots can fly and this will output:
// "I'm a parrot and I can fly. My name is George"

parrot.talk
// Will print: "I'm a parrot and I can talk. My name is George"

parrot.swim
// compilation error because parrots can't swim

penguin.swim
// will output: "I'm a penguin and I can swim. My name is Pingu"

penguin.fly
// compilation error, because penguin don't fly and more practically, 
// because we didn't define an instance for Fly[Penguin] 
// and the api will complain that one can't be found

So, both parrots and penguins are Birds but parrots can fly (but not swim) whereas penguins can swim (but not fly, or talk for that matter) and errors are picked up at compile time. All good then! Or not? Well… it turns out there is a friendly parrot in New Zealand that can’t fly: the kakapo (and apparently is the only one). Once again we found ourselves victims of the same mistake, to reiterate how easy it is to generalize and ignore edge cases. But perhaps this time we are in a better position to face the issue without the need to throw runtime errors (ie. UnsupportedOperationException) or to modify the data defined so far (which could be out of our control). Clearly a new member has to be added to the Bird family and even if it doesn’t fly, it still has to be a parrot:

sealed trait Bird

class Parrot(val name: String) extends Bird
class Penguin(val name: String) extends Bird
class Kakapo(override val name: String) extends Parrot(name)

the Kakapo is still a parrot:

kakapo.isInstanceOf[Parrot] // returns true

But it can’t fly (kakapo.fly will not compile) although the reason might not be immediately clear. It’s true that we have an instance of Fly[Parrot] in scope and equally, a Kakapo is a parrot (as shown above) but their parent/child relationship doesn’t get propagated to the higher-kinded type Fly[A] as this is invariant in its type A.
For the same reason, the kakapo can’t talk like other parrots. I must admit that my knowledge on the whole kakapo subject is quite limited but from the information I gather, that seems to be the case. On the other hand, the ability to speak is widespread across the parrot family. It might well be that with a bit of patient training someone could teach few words to a kakapo. In that case, we can still adapt to this newly discovered behavior (still without modify our models) by adding an instance for it:

implicit val kakapoTalk: Talk[Kakapo] = new Talk[Kakapo] {
    def talk(kakapo: Kakapo): Unit = {
      println(s"I'm a kakapo and I can talk. My name is ${kakapo.name}")
    }
  }

Even if there is no perfect solution in those cases, a type class approach seems to provide a better way to deal with those scenarios where the we are trying to model concepts that relate to the family resemblance paradigm.

———————————

1. [All that remind me of my first experience as a developer, working for a big Italian utility company. It was long time ago and we were trying (among other things) to model a “meter reader” attached to user accounts. We were struggling with old legacy data and when we thought we capture the essence of a meter reader (and its types and hierarchy) it was just a matter of time and a new account pop up with a generally old type of reader, breaking our assumptions. It was then a matter to find someone with the know out in a position to throw some light on the weird new type. Usually someone close to retirement.  ]
2. [The examples will rely on implicits, although, strictly speaking, type classes can be implemented without using them. In the code there are also few example of the more verbose version which does not use implicits]

 

Spark ML Model Tuning

The purpose of this post is to provide some basic guidelines and supporting code for the tuning of an ML pipeline based on a random forest algorithm using cross-validation along with some final consideration about the computational cost involved and possibility to train/tuning the model in parallel.

Tuning

A random forest algorithm takes few hyperparameters. The most important ones are probably numTrees and maxDepth (but in code example there is also some reference to maxBins). The number of trees in the forest (numTrees) is inversely proportional to the variance in predictions: more trees generally improve the accuracy but also the computational load of the training phase increase proportionally. Analogously, a deep tree generally lead to better metrics but also increase the risk of overfitting and the training computational cost. Tuning the model is the process of working out the optimal values for those hyperparameters and striking the right balance between costs and benefits.

Cross Validation

The idea is to split a given labelled dataset in chunks (folds) then iterates through them and, at each iteration, one chunk will be used as a validation set and the remaining as a training set. For example, with fold = 3, the process splits the data in three folds: a, b, c and then iterates three times:

1st iteration -> a + b = training, c = validation
2nd iteration -> a + c = training, b = validation
3rd iteration -> b + c = training, a = validation

To note that the whole process is indeed training and evaluating three times (as in this example folds = 3).

An Estimator can then be plugged in the cross validation process with the intention of keeping track of the relationship between certain hyperparameters values and some metrics and pick up the best performing model.

Data and Code

Code can be found here. Inspired by the Cambridge Analytica scandal, and as a warning that great power comes with great responsibility, the example uses a completely fictional and auto generated dataset where made up citizens are listed with the party they voted for (the label) along with some features (the usual suspects: age, income, gender, etc.):

+----+------+---+----+------+---------+-------+
|id  |gender|age|area|income|education|vote   |
+----+------+---+----+------+---------+-------+
|650 |female|99 |A   |57000 |1        |tory   |
|1523|male  |24 |B   |5500  |3        |labour |
|434 |male  |82 |D   |5500  |3        |liberal|
|174 |male  |26 |C   |69000 |1        |liberal|
+----+------+---+----+------+---------+-------+

Road Map

As mentioned, the data in the example is not only fictional but some statistical correlation has been artificially introduced in the dataset and the features engineering part is simplified but on the other hand that is all what is needed in order to illustrate the advantages of tuning. The focus need to be on the different outcome between a tuned versus a not tuned model. Quite obviously, a tuned model is expected to perform better than a non tuned one and the following road map should prove the point:

  1. Given a training dataset, train the model without any tuning
  2. Using the same training dataset, train the model using cross validation and an evaluator to work out the optimal hyperparameters values
  3. Train a new model using the optimal hyperparameters values and same training dataset at point 1
  4. Evaluate the model at point 1 (not tuned) and model at point 3 (tuned) against a given validation set and compare the results

Evaluation will be based on f1 metric (which takes into consideration precision and recall). Besides being the default, f1 seems the most natural choice for the example.

Tuning the Model

First point in the road map is to train a random forest model without any specific tuning (ie. default value for numTrees and maxDepth will be used)1. In the example that is done in a training method that takes as a parameter a given training set.

val vectorAssembler = new VectorAssembler().setInputCols(Array("f_gender", "age", "f_area", "income", "f_education"))
    .setOutputCol("features")
val genderIndexer = new StringIndexer().setInputCol("gender").setOutputCol("f_gender")
val areaIndexer = new StringIndexer().setInputCol("area").setOutputCol("f_area")
val educationIndexer = new StringIndexer().setInputCol("education").setOutputCol("f_education")
val voteIntentionIndexer = new StringIndexer().setInputCol("vote").setOutputCol("label")

def training(trainingSet: DataFrame): PipelineModel = {
  val transformationPipeline = new Pipeline()
  val classifier = new RandomForestClassifier().setSeed(5043) // set the seed for reproducibility
  val trainingPipeline: transformationPipeline.type = transformationPipeline.setStages(
    Array(genderIndexer, areaIndexer, educationIndexer, voteIntentionIndexer, vectorAssembler, classifier))
  trainingPipeline.fit(trainingSet)
}

Then the model can be evaluated against a given validation set and its performance expressed in terms of f1:

def validation(model: PipelineModel, validation: DataFrame) = {
    // Make predictions.
    val predictions = model.transform(validation)

    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("label")
      .setPredictionCol("prediction")
      .setMetricName("f1") // f1 is the default anyway

    val f1: Double = evaluator.evaluate(predictions)
    println("@@ f1 ====> " + f1)
  }

The whole flow is implemented in a test in VoteIntentionTrainingTest called train and evaluate model. Once executed it should return a value for f1 of 0.6736260921544567 and this will the benchmark to compare the tuned version against.

Better Hyperparameters Values

Figuring out the optimal hyperparameters value should be more interesting. A ParamGridBuilder allows to pass different candidates for each hyperparameter. Considering the Spark default values (numTrees = 20, maxDepth = 5), a good first approximation would be two values surrounding the default. For example numTrees is 20 hence passing an Array(15, 20, 25) will give an idea about whether better metrics can be obtained with values greater or smaller than the default one.

def tuning(trainingSet: DataFrame): CrossValidatorModel = {

  val rfClassifier = new RandomForestClassifier()
  val pipeline= new Pipeline().setStages(
    Array(genderIndexer, areaIndexer, educationIndexer, voteIntentionIndexer, vectorAssembler, rfClassifier))

  val nFolds: Int = 8
  val metric: String = "f1"
  val paramGrid = new ParamGridBuilder()
    .addGrid(rfClassifier.numTrees, Array(15, 20, 25))
    .addGrid(rfClassifier.maxDepth, Array(4, 5, 6))
    .build()
  val evaluator = new MulticlassClassificationEvaluator() .setLabelCol("label") .setPredictionCol("prediction") .setMetricName(metric)

  val cv: CrossValidator = new CrossValidator()
    .setEstimator(pipeline)
    .setEstimatorParamMaps(paramGrid)
    .setEvaluator(evaluator)
    .setNumFolds(nFolds)
    .setParallelism(3)

  val fittedPipeline: CrossValidatorModel = cv.fit(trainingSet)
  val best: Model[_] = fittedPipeline.bestModel
  val stages: Seq[Transformer] = best.asInstanceOf[PipelineModel].stages.toList


  val paramMap = fittedPipeline.getEstimatorParamMaps
    .zip(fittedPipeline.avgMetrics)
    .maxBy(_._2)
    ._1

  println(s"@@@@ best num trees:     ${stages(5).asInstanceOf[RandomForestClassificationModel].getNumTrees}")
  println(s"@@@@ best max depth:     ${stages(5).asInstanceOf[RandomForestClassificationModel].getMaxDepth}")
  println(s"@@@@ best max bins:      ${stages(5).asInstanceOf[RandomForestClassificationModel].getMaxBins}")

  println(s"@@@@ best paramMap:      ${paramMap}")

  fittedPipeline
}

Once the pipeline is fitted we grab the best model, find out the optimal hyperparameters values (among the ones provided in ParamGridBuilder) and print them in the console. As a note, we are interested in stage 6 (so the 5th element in the array of stages) because the pipeline (as defined in this example) has 6 stages, the last one being the classifier (the one we are interested in) but paramMap returns the same results. The whole process is implemented in VoteIntentionTrainingTest as the test tuning a model and save it in $TunedModelPath and if executed should return the following optimal values:

num trees:     25
max depth:     6

We could refine those results further and for each hyperparameter pass new candidates in an interval around the optimal value. For example, in the numTrees case an Array(23, 24, 25, 26, 27) could be passed to the ParamGridBuilder with a new set candidates to fulfil the role of optimal value. I’ll omit that here but that should return the following values:

num trees:     24
max depth:     6

Tuned vs Not Tuned Model

The optimal values can now be used to override the default ones:

  def training(trainingSet: DataFrame): PipelineModel = {
    val transformationPipeline = new Pipeline()
    val classifier = new RandomForestClassifier()
      .setNumTrees(24) // <- Tuned value
      .setMaxDepth(6) // <- Tuned value
      .setSeed(5043) // set the seed for reproducibility
    val trainingPipeline: transformationPipeline.type = transformationPipeline.setStages(
      Array(genderIndexer, areaIndexer, educationIndexer, voteIntentionIndexer, vectorAssembler, classifier))
    trainingPipeline.fit(trainingSet)
  }

And an evaluation of the new model against the same validation dataset, should return an f1 of 0.6919821403245205 which is an improvement if compared to the previous value of f1 (0.6736260921544567) obtained with the default hyperparameters value.

numTrees = 20, maxDepth = 5 -> f1 = 0.6736260921544567 (default values)
numTrees = 24, maxDepth = 6 -> f1 = 0.6919821403245205

Training and Parallelism

It is worth noting that cross validation and the parameter grid increase substantially the computational cost of fitting a model. The first tuning example:

.addGrid(rfClassifier.numTrees, Array(15, 20, 25))
.addGrid(rfClassifier.maxDepth, Array(4, 5, 6))

will evaluate 9 different parameter combinations (3 * 3) and this on the top of the cross validation cost (8 folds). In this fictional example we are dealing with dataset intentionally small but in a real case scenario Spark and a distributed architecture are likely to make a big difference and we can take advantage of this via the setParallelism method.

val cv: CrossValidator = new CrossValidator()
      .setEstimator(pipeline)
      .setEstimatorParamMaps(paramGrid)
      .setEvaluator(evaluator)
      .setNumFolds(nFolds)
      .setParallelism(3)

That would set the max number of threads to use when running parallel algorithms (default is 1 for serial execution).

———————————

1. [In Spark 2.4.3 the default values are: numTrees = 20, maxDepth = 5, maxBins = 32]

Scala Stream: There’s no reason to use it if you don’t actually need one.

The first time I read about Scala Streams in Functional Programming in Scala I was impressed by the example’s captivating simplicity. I assume a certain familiarity with the idea of a Stream but just as a quick recap if that is not the case, the original example in the book was:

List(1,2,3,4).map(_ + 10).filter(_%2==0).map(_ * 3)

Which gets evaluated, step by step, as:

List(11,12,13,14).filter(_%2==0).map(_ * 3)
List(12,14).map(_ * 3)
List(36,42)

Whereas a Stream version

Stream(1,2,3,4).map(_ + 10).filter(_%2==0).map(_ * 3).toList1

because of its laziness, will evaluate in the following way:

Stream(1,2,3,4).map(_ + 10).filter(_ % 2 == 0).toList
cons(11, Stream(2,3,4).map(_ + 10)).filter(_ % 2 == 0).toList
Stream(2,3,4).map(_ + 10).filter(_ % 2 == 0).toList
cons(12, Stream(3,4).map(_ + 10)).filter(_ % 2 == 0).toList
12 :: Stream(3,4).map(_ + 10).filter(_ % 2 == 0).toList
12 :: cons(13, Stream(4).map(_ + 10)).filter(_ % 2 == 0).toList
12 :: Stream(4).map(_ + 10).filter(_ % 2 == 0).toList
12 :: cons(14, Stream().map(_ + 10)).filter(_ % 2 == 0).toList
12 :: 14 :: Stream().map(_ + 10).filter(_ % 2 == 0).toList
12 :: 14 :: List()2

Note that no intermediate results like List(11,12,13,14) or List(12,14) get instantiated in this case. It looks like there is simply less work to do but this impression of efficiency might be misleading. A simple micro benchmark reveals how the Stream perform worse than the List:

testOriginalExampleWithList    thrpt   20  4368200.644 ± 155109.000  ops/s
testOriginalExampleWithStream  thrpt   20  2881809.414 ± 122061.743  ops/s

The reason has to be tracked down in the Stream lazy nature along with its immutability as it belongs to the Scala immutable collection but at the same time its tail is not “real” yet (sort to speak) and it will be evaluated when required. On that respect the tail resemble a rule or a set of instructions to materialise its elements. The marriage between these two aspects of a Stream requires some synchronisation which is the toll to pay (and the reason behind the different performance above):

/** A lazy cons cell, from which streams are built. */
@SerialVersionUID(-602202424901551803L)
final class Cons[+A](hd: A, tl: => Stream[A]) extends Stream[A] {
  override def isEmpty = false
  override def head = hd
  @volatile private[this] var tlVal: Stream[A] = _
  @volatile private[this] var tlGen = tl _
  def tailDefined: Boolean = tlGen eq null
  override def tail: Stream[A] = {
    if (!tailDefined)
      synchronized {
        if (!tailDefined) {
          tlVal = tlGen()
          tlGen = null
        }
      }

    tlVal
  }
}3

So, why use a Stream after all? As the title (and the Scala doc) suggest, there are occasions where a lazy sequence comes handy. One well known example is the possibility to define “infinite” sequences like the Fibonacci one in the documentation. An other aspect, and potentially an advantage in some situation, is the generally low memory footprint of a Stream (remember, the intermediate sequences, like List(11,12,13,14) and List(12,14) in the example, don’t get instantiated).

The following terminate successfully on my machine with roughly 4GB of heap memory4:

Stream.fill(600)("Some random sample text.")
      .map(_ * 500000 + "test")
      .zipWithIndex.filter(_._2 % 2 == 0)
      .map(_._1.contains("test"))

Whereas its List equivalent throws a

java.lang.OutOfMemoryError: Java heap space

.The above case is build purely with the intent to trigger the exception but compare this:

final case class Wrapper(index: Int, text: String)

val listInput: List[Int] = (1 to 500).toList

listInput
      .map(Wrapper(_, "Some simple text"))
      .map(_.copy(text = "some simple text" * 500))
      .map(w => w.copy(text = w.text + "look for me!"))
      .filter(_.index % 2 == 0)
      .map(_.text.toLowerCase)
      .count(_.contains("look for me!"))

with this one:

val streamInput: Stream[Int] = (1 to 500).toStream

streamInput
      .map(Wrapper(_, "Some simple text"))
      .map(_.copy(text = "some simple text" * 500))
      .map(w => w.copy(text = w.text + "look for me!"))
      .filter(_.index % 2 == 0)
      .map(_.text.toLowerCase)
      .count(_.contains("look for me!"))

Both complete successfully but the Stream version in this case is performing better than the List one:

List    thrpt   15  76.588 ± 2.734  ops/s
Stream  thrpt   15  80.246 ± 2.260  ops/s

Flight Recorder shows what is happening under the hood: the first example with List generates more intermediate results that then increase the amount of work for the garbage collector:

gc-list

Whereas in the Stream case the garbage collector is definitely less busy and less CPU resources are needed in order to free the heap.

gc-stream

In this last case, the amount of extra work on the garbage collector was sufficient to tip the balance in favor of a lazy sequence. Those cases are probably the exceptions rather than the rule but I guess the lesson is that wherever there are long chain of transformations on non lazy collection and/or the suspect that those might involve extra work for the GC, then perhaps the idea to test a lazy alternative should be taken into consideration.

All code and tests available here.

———————————

1. [a Stream is lazy hence we need to turn it into a List to trigger the evaluation]
2. [Chiusano P., Bjarnason R. Functional Programming in Scala. Manning, 2014. p.72]
3. [The code comes from the definition of the Stream class.]
4. [All the numbers regarding performance and potential out of memory errors or general memory usage, depends obviously on the specific hardware and configuration in use (in my case an old laptop and not much heap assigned) but I think the general idea behind those tests, rather than the specific numbers, still stands.]

Microbenchmarking and Performance Tests in Scala: JMH and ScalaMeter

In many contexts, micro-benchmarking and performance are not the main issue and other considerations should drive a developer’s choices regarding implementation. That’s not always the case tough. This post is just to share and illustrate a sort of Scala project template that I put together and found quite useful in few situations where micro-benchmarking and performance might be a concern (or simply a legitimate curiosity). Broadly speaking, in Scala there are two viable approaches: JMH (Java Microbenchmarking Harness) and ScalaMeter and, if you are interested, a discussion about their pros and cons and their place in the Scala ecosystem is available here. To note that JMH is part of OpenJdk project but that doesn’t mean that benchmarks have to run on Open Jdk. As a matter of fact, all the examples presented in this post were executed on an Oracle Jdk. NOTE: all the code is available here.

Settings and dependencies

For JMH, we  can use sbt-jmh, an sbt plugin for JMH, so I added the following line to project/plugins.sbt:

addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.3.4")

And then we set the dependencies in the build.sbt

import sbt.Keys._

lazy val root = (project in file(".")).
  enablePlugins(JmhPlugin).
  settings(
    name := "scala-jmh-performance-testing",
    organization := "LansaloLtd",
    version := "1.0",
    scalaVersion := "2.12.3",
    libraryDependencies ++= Seq(
      "org.openjdk.jmh" % "jmh-generator-annprocess" % "1.21",
      "commons-codec" % "commons-codec" % "1.9",
      "com.storm-enroute" %% "scalameter" % "0.8.2",
      "org.scalatest" %% "scalatest" % "3.0.1" % Test
    ),
    testFrameworks += new TestFramework("org.scalameter.ScalaMeterFramework"),
    parallelExecution in Test := false
  )

Personally I like to use both ScalaMeter and JMH (that is why both are present as dependency above).

Far to be an exhaustive list of pros and cons between the two but I would like to mention at least that the first allows to create graphs really easily and follow the same approach as ScalaCheck with generators that can be composed via for-comprehension, on the other hand is not actively maintained. The second seems to be the preferred choice for Scala and for some measurement is really straight forward. At the present time, the only documentation available for JMH is via examples. Note also that we have introduced a dependency on OpenJDK (org.openjdk.jmh) but that is independent from the runtime jdk.

Targets

Purely for demonstration purposes we need two candidates or targets for our benchmark. For no particular reason I chose:

// candidate 1
def mapOnFunctionsAndList(input: String, slice: Int, funcs: List[String => Int]): List[Int] = {
    funcs.map(f => slide(input, slice).map(f).min)
}

and:

// candidate 2
def foldOnFunctionsList(input: String, slice: Int, funcs: List[String => Int]): List[Int] = {
    funcs.foldLeft(List.empty[Int])((acc, func) => {
      slide(input, slice).map(str => func(str)).min :: acc
    })
}

Basically, we start from a certain number n of functions (String => Int), then we slide a given String obtaining z substrings from it. For each function, 1 to n, we pass all the substrings obtaining z Int but we keep only the minimum.  Therefore, if we start with a list of n functions, we end up with a list on n Int. These two implementations are equivalent and return same results given the same inputs but are there differences when it comes to performances?

JMH

With JMH we can work out pretty easily the throughput for each of our two candidates. First we need to define the targets of our benchmark and a State for them:

package com.lansalo.jmh

import com.lansalo.Target._
import org.openjdk.jmh.annotations._

class TargetPerformance {

  import Scopes._

  def testMapOnFunctionsAndList(state: BenchmarkState): Unit = {
    mapOnFunctionsAndList(state.title, state.slice, state.funcs)
  }

  def testFoldOnFunctionsList(state: BenchmarkState): Unit = {
    foldOnFunctionsList(state.title, state.slice, state.funcs)
  }

}

object Scopes {

  import com.lansalo.HashingFunctions.hashingFunctions

  val novelTitle = "The Persecution and Assassination of Jean-Paul Marat as Performed by the Inmates of the Asylum of Charenton Under the Direction of the Marquis de Sade"

  @State(Scope.Benchmark)
  class BenchmarkState {
    val funcs: List[String => Int] = hashingFunctions(200)
    val title: String = novelTitle
    val slice: Int = 4
  }
}

And then the proper benchmark (that can also be executed within an IDE like IntelliJ):

package com.lansalo.jmh.benchmark

import java.util.concurrent.TimeUnit

import com.lansalo.jmh.{Scopes, TargetPerformance}
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.results.format.ResultFormatType
import org.openjdk.jmh.runner.Runner
import org.openjdk.jmh.runner.options.{Options, OptionsBuilder}

object BenchmarkRunner_ThroughputBenchmark {
  // run sbt clean jmh:compile from terminal first.
  def main(args: Array[String]): Unit = {
    val opt: Options = new OptionsBuilder().include(classOf[ThroughputBenchmark].getSimpleName)
      .resultFormat(ResultFormatType.TEXT).build
    new Runner(opt).run
  }
}

@OutputTimeUnit(TimeUnit.SECONDS)
@Warmup(iterations = 30)
@Measurement(iterations = 30)
@State(Scope.Benchmark)
private class ThroughputBenchmark extends TargetPerformance {

  @Benchmark
  @BenchmarkMode(Array(Mode.Throughput, Mode.AverageTime))
  @Fork(value = 1)
  override def testFoldOnFunctionsList(state: Scopes.BenchmarkState): Unit = super.testFoldOnFunctionsList(state)

  @Benchmark
  @BenchmarkMode(Array(Mode.Throughput, Mode.AverageTime))
  @Fork(value = 1)
  override def testMapOnFunctionsAndList(state: Scopes.BenchmarkState): Unit = super.testMapOnFunctionsAndList(state)

}

In this example, we measure the throughput and average time:

@BenchmarkMode(Array(Mode.Throughput, Mode.AverageTime))

in a separate JVM:

@Fork(value = 1)

We also run 30 warmup iterations (ignored for the final results) and 30 proper iterations where the actual measurement is taken:

@Warmup(iterations = 30)
@Measurement(iterations = 30)

Warmup is needed mainly because of JVM features as just-in-time compilation but that would take us to talk about all the background behind microbenchmarking which would be an entire chapter on its own. I can recommend few readings though: Nanotrusting the Nanotime, Avoiding Benchmarking Pitfalls on the JVM, Java JVM Warmup.1
We are almost ready, run sbt clean jmh:compile from the terminal and then we can run ThroughputBenchmark straight from IntelliJ and obtain something like:

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

Benchmark                                       Mode  Cnt   Score    Error  Units
ThroughputBenchmark.testFoldOnFunctionsList    thrpt   30  95.529 ±  0.940  ops/s
ThroughputBenchmark.testMapOnFunctionsAndList  thrpt   30  71.549 ±  0.734  ops/s
ThroughputBenchmark.testFoldOnFunctionsList     avgt   30   0.010 ±  0.001   s/op
ThroughputBenchmark.testMapOnFunctionsAndList   avgt   30   0.011 ±  0.001   s/op

Not really interested in the numbers themselves (which also depends upon the hardware) but more in the sort of intelligence we can gather in this way. ScalaMeter offer different possibilities and customizations as well. I quite like the graphs bits and with the following code:

package com.lansalo.scalameter.benchmark

import com.lansalo.HashingFunctions.hashingFunctions
import com.lansalo.Target._

import java.io.File

import org.scalameter.{Bench, Gen}
import org.scalameter.persistence.SerializationPersistor

import scala.util.Random

object PerformanceBenchmark extends Bench.OfflineReport {

  override lazy val persistor = SerializationPersistor(new File("target/scalameter/performance/results"))

  val functionsGen: Gen[Int] = Gen.range("functionsNumber")(100, 1000, 100)
  val inputGen: Gen[Int] = Gen.range("imputLength")(20, 200, 20)

  val inputs: Gen[(String, List[String => Int])] = for {
    functionsNumber  foldOnFunctionsList(param._1, 4, param._2)
      }
    }
  }

}

we can obtain the below graph:

map-fold

Where orange is for fold and blue for map.

We can also investigate the two implementations from a memory usage point of view. JMH provides a series of Profilers that can be quite handy in those cases (in the following example we chose a HotspotMemoryProfiler):

object BenchmarkRunner_MemoryFootprint {
  // run sbt clean jmh:compile from terminal first.
  def main(args: Array[String]): Unit = {
    val opt: Options = new OptionsBuilder().include(classOf[MemoryFootprint].getSimpleName).addProfiler(classOf[HotspotMemoryProfiler])
      .resultFormat(ResultFormatType.TEXT).build
    new Runner(opt).run
  }
}

Resulting in a long list of counters (below a partial output):

Benchmark                                                                                             Mode  Cnt            Score   Error  Units
MemoryFootprint.testFoldOnFunctionsList                                                              thrpt                71.335          ops/s
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.0.invocations                              thrpt               108.000              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.0.lastEntryTime                            thrpt       10179095151.000              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.0.lastExitTime                             thrpt       10178954989.000              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.0.time                                     thrpt          75456013.000              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.1.invocations                              thrpt                   ≈ 0              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.1.lastEntryTime                            thrpt                   ≈ 0              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.1.lastExitTime                             thrpt                   ≈ 0              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.1.time                                     thrpt                   ≈ 0              ?

you can take a look at here to see how to make sense of them. The following ScalaMeter example instead provide a more basic picture of the heap usage:

object MemoryBenchmark extends Bench.OfflineReport {

  override lazy val persistor = SerializationPersistor(new File("target/scalameter/memoryusage/results"))

  override def measurer = new Measurer.MemoryFootprint

// rest of the code is the same as above

}

For the two fold/map implementations the graph wouldn’t be so interesting (as they overlap) but you can take a look at another post I wrote about value classes to get an idea.

Finally, if running the benchmark on the Oracle JDK, we can use JMH to exploit the Flight Recorder:

object BenchmarkRunner_FlightRecorder {
  // run sbt clean jmh:compile from terminal first.
  def main(args: Array[String]): Unit = {
    val opt: Options = new OptionsBuilder().include(classOf[FlightRecorderDump].getSimpleName)
      .resultFormat(ResultFormatType.TEXT).build
    new Runner(opt).run
  }
}

@Warmup(iterations = 30)
@Measurement(iterations = 30)
@State(Scope.Benchmark)
private class FlightRecorderDump extends TargetPerformance {

  @Benchmark
  @Fork(value = 1, jvmArgsAppend = Array("-XX:+UnlockCommercialFeatures",
      "-XX:+FlightRecorder", "-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints",
      "-XX:FlightRecorderOptions=defaultrecording=true,dumponexit=true,dumponexitpath=/tmp/foldOnFunctionsList.jfr"))
  override def testFoldOnFunctionsList(state: Scopes.BenchmarkState): Unit = super.testFoldOnFunctionsList(state)

  @Benchmark
  @Fork(value = 1, jvmArgsAppend = Array("-XX:+UnlockCommercialFeatures",
    "-XX:+FlightRecorder", "-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints",
    "-XX:FlightRecorderOptions=defaultrecording=true,dumponexit=true,dumponexitpath=/tmp/mapOnFunctionsAndList.jfr"))
  override def testMapOnFunctionsAndList(state: Scopes.BenchmarkState): Unit = super.testMapOnFunctionsAndList(state)

}

And this will generate two separate reports, /tmp/foldOnFunctionsList.jfr for the fold version and /tmp/mapOnFunctionsAndList.jfr for the version with map. With JMC (Java Mission Control) we can compare a rich collection of parameters and and different aspects  between the two implementations. Below a general overview of the heap usage for the the map implementation, just as an example:

map-perf

—————————————-

1. [In this post a certain familiarity with basic concepts of benchmarking is somehow taken for granted as the main focus it to illustrate how JMH and ScalaMeter can be used to produce some results. Still, the interpretation of those results, require the understanding of the mentioned background]

Spark: How to Add Multiple Columns in Dataframes (and How Not to)

There are generally two ways to dynamically add columns to a dataframe in Spark. A foldLeft or a map (passing a RowEncoder). The foldLeft way is quite popular (and elegant) but recently I came across an issue regarding its performance when the number of columns to add is not trivial. I think it’s worth to share the lesson learned: a map solution offers substantial better performance when the number of columns to be added tends to increase.

Let’s start with an example dataframe with four columns: “id”, “author”, “title”, “incipit” (the opening line of the novel). A simple dataframe with just one row might look something like:

+---+-----------+-------------+-----------------------------+
| id|     author|        title|                      incipit|
+---+-----------+-------------+-----------------------------+
|  6|Leo Tolstoy|Anna Karenina|Happy families are all alike |
+---+-----------+-------------+-----------------------------+

You are given an arbitrary list of words and, for each of them, you would like to add a column (named after the word) to the original dataframe and flag with a boolean whether or not that word appear at least once in the opening line (incipit).
For a list of two words List("families", "giraffe"), the above dataframe will be transformed into the following:

+---+-----------+-------------+-----------------------------+--------+-------+
| id|     author|        title|                      incipit|families|giraffe|
+---+-----------+-------------+-----------------------------+--------+-------+
|  6|Leo Tolstoy|Anna Karenina|Happy families are all alike |    true|  false|
+---+-----------+-------------+-----------------------------+--------+-------+

As the list of columns is arbitrary, there are two possible approaches to this problem. I wrapper both in a method to make testing easier. First approach would be the foldLeft way:

def addColumnsViaFold(df: DataFrame, columns: List[String]): DataFrame = {
  import df.sparkSession.implicits._
  columns.foldLeft(df)((acc, col) => {
    acc.withColumn(col, acc("incipit").as[String].contains(col))
  })
}

And the second one (which involves a bit more coding) is the map way:

def addColumnsViaMap(df: DataFrame, words: List[String]): DataFrame = {
   val encoder = RowEncoder.apply(getSchema(df, words))
   df.map(mappingRows(df.schema)(words))(encoder)
}

private val mappingRows: StructType => List[String] => Row => Row =
  (schema) => (words) => (row) => {
    val addedCols: List[Boolean] = words.map {
      word => row.getString(schema.fieldIndex("incipit")).contains(word)
    }
    Row.merge(row, Row.fromSeq(addedCols))
  }

private def getSchema(df: DataFrame, words: List[String]): StructType = {
  var schema: StructType = df.schema
  words.foreach(word => schema = schema.add(word, DataTypes.BooleanType, false))
  schema
}

The code (including all tests) is available here
When we run the scala meter tests to get some idea of how the two approaches behave when dealing with 100 new columns, we get the following results1. For foldLeft (addColumnsViaFold method):
adding-column-via-fold-100
Whereas those one are the results for map (addColumnsViaMap method):
adding_columns_via_map-100
When the number of columns increases, foldLeft is taking considerably more time. If we take the number of columns to 500 the result is similar (and more dramatic)
Once again, for foldLeft :
adding-column-via-fold
And for map:
adding_columns_via_map-500
With 1000 columns, foldLeft job aborts:

[error] Error running separate JVM: java.lang.OutOfMemoryError: GC overhead limit exceeded

Probably sign that the heap is running low and the CG can’t free much memory. Still, the map based solution seems to cope much better even with 1000 row:
adding_columns_via_map
When it comes to the reason behind this different behavior, my guess would be that somehow Catalyst is not able to optimize the foldLeft operation. The below plans (explain(true)) show how the Parsed and Analyzed Logical Plans end up in a sort of nested structure.

== Parsed Logical Plan ==
Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 20 more fields]
+- AnalysisBarrier
      +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 19 more fields]
         +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 18 more fields]
            +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 17 more fields]
               +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 16 more fields]
                  +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 15 more fields]
                     +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 14 more fields]
                        +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 13 more fields]
                           +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 12 more fields]
                              +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 11 more fields]
                                 +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 10 more fields]
                                    +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 9 more fields]
                                       +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 8 more fields]
                                          +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 7 more fields]
                                             +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 6 more fields]
                                                +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 5 more fields]
                                                   +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 4 more fields]
                                                      +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 3 more fields]
                                                         +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 2 more fields]
                                                            +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, Contains(incipit#12, toing) AS toing#368]
                                                               +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, Contains(incipit#12, hence) AS hence#342]
                                                                  +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, Contains(incipit#12, four) AS four#317]
                                                                     +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, Contains(incipit#12, John) AS John#293]
                                                                        +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, Contains(incipit#12, pop) AS pop#270]
                                                                           +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, Contains(incipit#12, drunk) AS drunk#248]
                                                                              +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, Contains(incipit#12, two) AS two#227]
                                                                                 +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, Contains(incipit#12, when) AS when#207]
                                                                                    +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, Contains(incipit#12, city) AS city#188]
                                                                                       +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, Contains(incipit#12, morning) AS morning#170]
                                                                                          +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, Contains(incipit#12, way) AS way#153]
                                                                                             +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, Contains(incipit#12, pig) AS pig#137]
                                                                                                +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, Contains(incipit#12, happy) AS happy#122]
                                                                                                   +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, Contains(incipit#12, was) AS was#108]
                                                                                                      +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, Contains(incipit#12, anna) AS anna#95]
                                                                                                         +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, Contains(incipit#12, in) AS in#83]
                                                                                                            +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, Contains(incipit#12, man) AS man#72]
                                                                                                               +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, Contains(incipit#12, with) AS with#62]
                                                                                                                  +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, Contains(incipit#12, alike) AS alike#53]
                                                                                                                     +- Project [id#9, author#10, title#11, incipit#12, to#38, Contains(incipit#12, get) AS get#45]
                                                                                                                        +- Project [id#9, author#10, title#11, incipit#12, Contains(incipit#12, to) AS to#38]
                                                                                                                           +- Project [_1#4 AS id#9, _2#5 AS author#10, _3#6 AS title#11, _4#7 AS incipit#12]
                                                                                                                              +- LocalRelation [_1#4, _2#5, _3#6, _4#7]

== Analyzed Logical Plan ==
id: int, author: string, title: string, incipit: string, to: boolean, get: boolean, alike: boolean, with: boolean, man: boolean, in: boolean, anna: boolean, was: boolean, happy: boolean, pig: boolean, way: boolean, morning: boolean, city: boolean, when: boolean, two: boolean, drunk: boolean, pop: boolean, John: boolean, four: boolean, hence: boolean, ... 20 more fields
Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 20 more fields]
+- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 19 more fields]
   +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 18 more fields]
      +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 17 more fields]
         +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 16 more fields]
            +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 15 more fields]
               +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 14 more fields]
                  +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 13 more fields]
                     +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 12 more fields]
                        +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 11 more fields]
                           +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 10 more fields]
                              +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 9 more fields]
                                 +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 8 more fields]
                                    +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 7 more fields]
                                       +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 6 more fields]
                                          +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 5 more fields]
                                             +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 4 more fields]
                                                +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 3 more fields]
                                                   +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 2 more fields]
                                                      +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, Contains(incipit#12, toing) AS toing#368]
                                                         +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, Contains(incipit#12, hence) AS hence#342]
                                                            +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, Contains(incipit#12, four) AS four#317]
                                                               +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, Contains(incipit#12, John) AS John#293]
                                                                  +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, Contains(incipit#12, pop) AS pop#270]
                                                                     +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, Contains(incipit#12, drunk) AS drunk#248]
                                                                        +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, Contains(incipit#12, two) AS two#227]
                                                                           +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, Contains(incipit#12, when) AS when#207]
                                                                              +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, Contains(incipit#12, city) AS city#188]
                                                                                 +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, Contains(incipit#12, morning) AS morning#170]
                                                                                    +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, Contains(incipit#12, way) AS way#153]
                                                                                       +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, Contains(incipit#12, pig) AS pig#137]
                                                                                          +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, Contains(incipit#12, happy) AS happy#122]
                                                                                             +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, Contains(incipit#12, was) AS was#108]
                                                                                                +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, Contains(incipit#12, anna) AS anna#95]
                                                                                                   +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, Contains(incipit#12, in) AS in#83]
                                                                                                      +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, Contains(incipit#12, man) AS man#72]
                                                                                                         +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, Contains(incipit#12, with) AS with#62]
                                                                                                            +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, Contains(incipit#12, alike) AS alike#53]
                                                                                                               +- Project [id#9, author#10, title#11, incipit#12, to#38, Contains(incipit#12, get) AS get#45]
                                                                                                                  +- Project [id#9, author#10, title#11, incipit#12, Contains(incipit#12, to) AS to#38]
                                                                                                                     +- Project [_1#4 AS id#9, _2#5 AS author#10, _3#6 AS title#11, _4#7 AS incipit#12]
                                                                                                                        +- LocalRelation [_1#4, _2#5, _3#6, _4#7]

== Optimized Logical Plan ==
Project [id#9, author#10, title#11, incipit#12, Contains(incipit#12, to) AS to#38, Contains(incipit#12, get) AS get#45, Contains(incipit#12, alike) AS alike#53, Contains(incipit#12, with) AS with#62, Contains(incipit#12, man) AS man#72, Contains(incipit#12, in) AS in#83, Contains(incipit#12, anna) AS anna#95, Contains(incipit#12, was) AS was#108, Contains(incipit#12, happy) AS happy#122, Contains(incipit#12, pig) AS pig#137, Contains(incipit#12, way) AS way#153, Contains(incipit#12, morning) AS morning#170, Contains(incipit#12, city) AS city#188, Contains(incipit#12, when) AS when#207, Contains(incipit#12, two) AS two#227, Contains(incipit#12, drunk) AS drunk#248, Contains(incipit#12, pop) AS pop#270, Contains(incipit#12, John) AS John#293, Contains(incipit#12, four) AS four#317, Contains(incipit#12, hence) AS hence#342, ... 20 more fields]
+- InMemoryRelation [id#9, author#10, title#11, incipit#12], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
      +- LocalTableScan [id#9, author#10, title#11, incipit#12]

== Physical Plan ==
*(1) Project [id#9, author#10, title#11, incipit#12, Contains(incipit#12, to) AS to#38, Contains(incipit#12, get) AS get#45, Contains(incipit#12, alike) AS alike#53, Contains(incipit#12, with) AS with#62, Contains(incipit#12, man) AS man#72, Contains(incipit#12, in) AS in#83, Contains(incipit#12, anna) AS anna#95, Contains(incipit#12, was) AS was#108, Contains(incipit#12, happy) AS happy#122, Contains(incipit#12, pig) AS pig#137, Contains(incipit#12, way) AS way#153, Contains(incipit#12, morning) AS morning#170, Contains(incipit#12, city) AS city#188, Contains(incipit#12, when) AS when#207, Contains(incipit#12, two) AS two#227, Contains(incipit#12, drunk) AS drunk#248, Contains(incipit#12, pop) AS pop#270, Contains(incipit#12, John) AS John#293, Contains(incipit#12, four) AS four#317, Contains(incipit#12, hence) AS hence#342, ... 20 more fields]
+- InMemoryTableScan [author#10, id#9, incipit#12, title#11]
      +- InMemoryRelation [id#9, author#10, title#11, incipit#12], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- LocalTableScan [id#9, author#10, title#11, incipit#12]

Not sure whether that is somehow related to the Catalyst inability to optimize foldLeft but the explained map plan doesn’t show the same nested structure:

== Parsed Logical Plan ==
'SerializeFromObject [validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, id), IntegerType) AS id#83, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, author), StringType), true, false) AS author#84, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, title), StringType), true, false) AS title#85, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, incipit), StringType), true, false) AS incipit#86, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, to), BooleanType) AS to#87, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, get), BooleanType) AS get#88, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, alike), BooleanType) AS alike#89, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, with), BooleanType) AS with#90, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, man), BooleanType) AS man#91, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, in), BooleanType) AS in#92, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, anna), BooleanType) AS anna#93, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, was), BooleanType) AS was#94, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, happy), BooleanType) AS happy#95, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, pig), BooleanType) AS pig#96, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, way), BooleanType) AS way#97, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 15, morning), BooleanType) AS morning#98, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 16, city), BooleanType) AS city#99, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 17, when), BooleanType) AS when#100, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 18, two), BooleanType) AS two#101, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 19, drunk), BooleanType) AS drunk#102, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 20, pop), BooleanType) AS pop#103, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 21, John), BooleanType) AS John#104, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 22, four), BooleanType) AS four#105, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 23, hence), BooleanType) AS hence#106, ... 20 more fields]
+- 'MapElements , interface org.apache.spark.sql.Row, [StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true)], obj#82: org.apache.spark.sql.Row
   +- 'DeserializeToObject unresolveddeserializer(createexternalrow(getcolumnbyordinal(0, IntegerType), getcolumnbyordinal(1, StringType).toString, getcolumnbyordinal(2, StringType).toString, getcolumnbyordinal(3, StringType).toString, StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true))), obj#81: org.apache.spark.sql.Row
      +- AnalysisBarrier
            +- Project [_1#4 AS id#9, _2#5 AS author#10, _3#6 AS title#11, _4#7 AS incipit#12]
               +- LocalRelation [_1#4, _2#5, _3#6, _4#7]

== Analyzed Logical Plan ==
id: int, author: string, title: string, incipit: string, to: boolean, get: boolean, alike: boolean, with: boolean, man: boolean, in: boolean, anna: boolean, was: boolean, happy: boolean, pig: boolean, way: boolean, morning: boolean, city: boolean, when: boolean, two: boolean, drunk: boolean, pop: boolean, John: boolean, four: boolean, hence: boolean, ... 20 more fields
SerializeFromObject [validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, id), IntegerType) AS id#83, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, author), StringType), true, false) AS author#84, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, title), StringType), true, false) AS title#85, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, incipit), StringType), true, false) AS incipit#86, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, to), BooleanType) AS to#87, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, get), BooleanType) AS get#88, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, alike), BooleanType) AS alike#89, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, with), BooleanType) AS with#90, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, man), BooleanType) AS man#91, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, in), BooleanType) AS in#92, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, anna), BooleanType) AS anna#93, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, was), BooleanType) AS was#94, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, happy), BooleanType) AS happy#95, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, pig), BooleanType) AS pig#96, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, way), BooleanType) AS way#97, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 15, morning), BooleanType) AS morning#98, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 16, city), BooleanType) AS city#99, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 17, when), BooleanType) AS when#100, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 18, two), BooleanType) AS two#101, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 19, drunk), BooleanType) AS drunk#102, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 20, pop), BooleanType) AS pop#103, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 21, John), BooleanType) AS John#104, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 22, four), BooleanType) AS four#105, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 23, hence), BooleanType) AS hence#106, ... 20 more fields]
+- MapElements , interface org.apache.spark.sql.Row, [StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true)], obj#82: org.apache.spark.sql.Row
   +- DeserializeToObject createexternalrow(id#9, author#10.toString, title#11.toString, incipit#12.toString, StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true)), obj#81: org.apache.spark.sql.Row
      +- Project [_1#4 AS id#9, _2#5 AS author#10, _3#6 AS title#11, _4#7 AS incipit#12]
         +- LocalRelation [_1#4, _2#5, _3#6, _4#7]

== Optimized Logical Plan ==
SerializeFromObject [validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, id), IntegerType) AS id#83, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, author), StringType), true, false) AS author#84, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, title), StringType), true, false) AS title#85, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, incipit), StringType), true, false) AS incipit#86, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, to), BooleanType) AS to#87, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, get), BooleanType) AS get#88, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, alike), BooleanType) AS alike#89, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, with), BooleanType) AS with#90, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, man), BooleanType) AS man#91, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, in), BooleanType) AS in#92, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, anna), BooleanType) AS anna#93, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, was), BooleanType) AS was#94, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, happy), BooleanType) AS happy#95, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, pig), BooleanType) AS pig#96, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, way), BooleanType) AS way#97, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 15, morning), BooleanType) AS morning#98, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 16, city), BooleanType) AS city#99, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 17, when), BooleanType) AS when#100, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 18, two), BooleanType) AS two#101, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 19, drunk), BooleanType) AS drunk#102, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 20, pop), BooleanType) AS pop#103, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 21, John), BooleanType) AS John#104, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 22, four), BooleanType) AS four#105, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 23, hence), BooleanType) AS hence#106, ... 20 more fields]
+- MapElements , interface org.apache.spark.sql.Row, [StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true)], obj#82: org.apache.spark.sql.Row
   +- DeserializeToObject createexternalrow(id#9, author#10.toString, title#11.toString, incipit#12.toString, StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true)), obj#81: org.apache.spark.sql.Row
      +- InMemoryRelation [id#9, author#10, title#11, incipit#12], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- LocalTableScan [id#9, author#10, title#11, incipit#12]

== Physical Plan ==
*(1) SerializeFromObject [validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, id), IntegerType) AS id#83, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, author), StringType), true, false) AS author#84, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, title), StringType), true, false) AS title#85, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, incipit), StringType), true, false) AS incipit#86, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, to), BooleanType) AS to#87, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, get), BooleanType) AS get#88, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, alike), BooleanType) AS alike#89, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, with), BooleanType) AS with#90, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, man), BooleanType) AS man#91, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, in), BooleanType) AS in#92, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, anna), BooleanType) AS anna#93, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, was), BooleanType) AS was#94, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, happy), BooleanType) AS happy#95, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, pig), BooleanType) AS pig#96, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, way), BooleanType) AS way#97, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 15, morning), BooleanType) AS morning#98, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 16, city), BooleanType) AS city#99, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 17, when), BooleanType) AS when#100, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 18, two), BooleanType) AS two#101, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 19, drunk), BooleanType) AS drunk#102, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 20, pop), BooleanType) AS pop#103, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 21, John), BooleanType) AS John#104, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 22, four), BooleanType) AS four#105, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 23, hence), BooleanType) AS hence#106, ... 20 more fields]
+- *(1) MapElements , obj#82: org.apache.spark.sql.Row
   +- *(1) DeserializeToObject createexternalrow(id#9, author#10.toString, title#11.toString, incipit#12.toString, StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true)), obj#81: org.apache.spark.sql.Row
      +- InMemoryTableScan [id#9, author#10, title#11, incipit#12]
            +- InMemoryRelation [id#9, author#10, title#11, incipit#12], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- LocalTableScan [id#9, author#10, title#11, incipit#12]

Whatever the root cause is, the conclusion is clear. As per Spark 2.3.0 (and probably previous versions) adding (dynamically) a congruous number of columns to a dataframe should be done via a map operation and not foldLeft for the reasons we’ve seen.

—————————————-

1. [I run the tests on a virtual box with three cores running Spark 2.3.0. Time in milliseconds reflects the underline infrastructure and I would expect different performance on a proper cluster. Nevertheless, that a foldLeft solution performs significantly worse that a map based solution should be an outcome independent of the underline hardware.]

Custom JSON serializer

In Scala there are quite a few frameworks that make the implementation of RESTful JSON services quite straightforward (Play, Scalatra, Akka HTTP are the main ones). They all follow the same general idea: a JSON payload deserialize into a Scala class (usually a case class) and serialize back into JSON. For example a JSON payload like:

{"name": "Paul", "age": 45, "gender": "male"}

Can be easily deserialized into something like:

final case class User(name: String, age: Int, gender: String)

This is pretty standard stuff and works almost out of the box in all the mentioned framework but in some cases you might want to take advantage of the type system for example by defining a type for “age”:

// A value class representing a person's age
final class Age(val age: Int) extends AnyVal

And also constant values for “gender” rather than a generic String type:

sealed trait Gender {
  val sex: String
}

final case object Male extends Gender {
  val sex = "male"
}

final case object Female extends Gender {
  val sex = "female"
}

final case object Other extends Gender {
  val sex = "other"
}

object GenderHelper {
  private val genders: Set[Gender] = Set(Male, Female, Other)
  def toGender(sex: String): Option[Gender] = genders.find(_.sex == sex)
}

We can now define User in a more robust way as:

final case class User(name: String, age: Age, gender: Gender)

Which looks better but doesn’t work out of the box anymore as, regardless of the JSON scala library we use, there is no way to tell how to serialize/deserialize types like Age or Gender. We need to write our own JSON custom serializer/deserializer. The following example uses Akka HTTP and Json4s (code is available here).

We first need to write two serializer/deserializer. One for Age:

import com.lansalo.model.Age
import org.json4s.CustomSerializer
import org.json4s.JsonAST.JInt

case object AgeSerializer extends CustomSerializer[Age](format => ( {
  case JInt(age) => new Age(age.intValue)
}, {
  case age: Int => JInt(BigInt(age))
}))

And one for Gender:

import com.lansalo.model.{Gender, GenderHelper}
import org.json4s.CustomSerializer
import org.json4s.JsonAST.JString

case object GenderSerializer extends CustomSerializer[Gender](format => ( {
  case JString(gender) => GenderHelper.toGender(gender).get
}, {
  case gender: Gender => JString(gender.sex)
}))

Then we just need to add them to the default json4s formats (in a Trait here):

import com.lansalo.json.serializer.{AgeSerializer, GenderSerializer}
import org.json4s.DefaultFormats

trait JsonSupport {
  implicit val formats = DefaultFormats + GenderSerializer + AgeSerializer
}

And finally bring the json formats in scope where we need it:

import akka.actor.ActorSystem
import akka.event.Logging
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.directives.MethodDirectives.{get, post}
import akka.http.scaladsl.server.directives.PathDirectives.path
import akka.http.scaladsl.server.directives.RouteDirectives.complete
import akka.util.Timeout
import com.lansalo.json.JsonSupport
import com.lansalo.model.{Age, Female, User}
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.jackson

import scala.concurrent.duration._

trait UserRoutes extends JsonSupport {

  implicit def system: ActorSystem

  lazy val log = Logging(system, classOf[UserRoutes])

  implicit lazy val timeout = Timeout(5.seconds)
  import Json4sSupport._

  implicit val serialization = jackson.Serialization // or native.Serialization

  lazy val userRoutes: Route = pathPrefix("users") {
    pathEnd {
      post {
        entity(as[User]) { user =>
          log.info(s"Received user: $user")
          complete((StatusCodes.Created, "OK"))
        }
      }
    } ~
      path(Segment) { name =>
        get {
          complete(User(name, new Age(22), Female))
        }
      }
  }
}

 

Scala Value Classes: Type-Safety for Free

Value classes are available since Scala 2.10 and the initial proposal SP-15 included several use case for this new mechanism but on this post I would like to focus on one in particular: the possibility to create wrappers with no boxing overhead and how we can leverage on this to improve our type system at zero cost.
And in doing that, I’ll add also some number to show what “zero cost” exactly means. The class Wrapper below:

class Wrapper(val flag: Boolean) extends AnyVal

is a value class. Generally speaking a value class has a single, public val parameter and extends directly AnyVal. There is a specific list of criteria that a value class must satisfy in order to be considered as such (for all the details, including universal traits, see SP-15) but here I would like to focus on the general idea behind such mechanism and it’s benefits.

In the example above, Wrapper is the type at compile time but when we run the program, at runtime the representation is a boolean and the “free” in the title refers precisely to the fact that Wrapper never gets actually created at runtime hence no boxing overhead and no extra memory allocation (but I’ll get back on this after an example).

So far so good… but how could we exploit this feature at our advantage? An example (perhaps a bit clumsy) should illustrate the point. Let’s say we are working on an online auction web site and we have an entity for a generic auction item:

final case class Item (
   itemId: Int,
   asked: Double,
   soldPrice: Double
)

The model is pretty simple: an id to identify uniquely an item, the asking price expressed as double (although in a real case scenario BigDecimal1 would be the right choice) and the price the item has been sold for. And, for no particular reason beyond our curiosity, we also have a method to check whether, overall, the buyers payed more than the asking price (for example for all the items sold in the last month):

def totalPerformanceBase(list: List[Item]): Double = {
   list.foldLeft(0d)((acc, item) => {
      acc + (item.soldPrice - item.asked)
   })
}

The method is called totalPerformance because we are going to check the performance of this method later and use it as our base reference.

Representing a price as a Double2 is perfectly fine but on the other hand we also have bets, shipping costs and quite likely many other domain models which include a price. We could create our own type to represent specifically an Item‘s price in order to make our code more robust and prevent common mistakes (pass a price for a bet to an item or vice versa), after all that’s the advantage of having a type system. The compiler will spot immediately the error, less runtime bugs, improved IDE experience and so forth. So we write a wrapper for our price:

final case class Price(price: Double)

And re-factor the Item class:

final case class Item (
   itemId: Int,
   asked: Price,
   soldPrice: Price
)

Also totalPerformance needs to be refactored accordingly:

def totalPerformance(list: List[Item]): Double = {
   list.foldLeft(0d)((acc, item) => {
      acc + (item.soldPrice.price - item.asked.price)
   })
}

In the graph below, the blue line represents our base implementation with a Double whereas the orange line represents the performance of the version with the wrapper Price.
Selection_01

You can find the Scala meter tests and all the code here, but bear in mind that results might vary based on your hardware. However the important point to notice here is that introducing Price as a wrapper for extra type safety comes with a small price in terms of performance. When totalPerformance crunches a list of half a million items we can see a difference of a couple of milliseconds3. Besides, because the wrapper Price gets actually instantiated, also from a memory usage point of view we can note a similar behavior. Let’s consider for example the following methods:

// For the original Item
def instantiate(list: List[Int]): List[Item] = {
  list.map(n => Item(n, n, n + 1.0))
}

// For the Item using Price as wrapper
def instantiate(list: List[Int]): List[Item] = {
   list.map(n => Item(n, Price(n), Price(n + 1.0)))
}

From the graph below we can notice the different heap memory usage while increasing progressively the number of items (in orange the implementation with Price wrapper and blue the original implementation with Double)

Selection_02

Just one note: the y-axis’ measure is actually in Kb (that’s a known issue with Scala Meter) so when the size is 970000, for instance, the heap usage for the Item with Double is 54222.97 Kb against one of 93120.00 Kb for the version with the price wrapper.

Value classes comes into play because allow us to keep the type safety we gain with the wrapper without paying the extra price (both in terms of execution time and memory usage).
We can change slightly the Price definition to make it a value class:

final case class PriceVal(price: Double) extends AnyVal {
   def -(that: PriceVal) = PriceVal(this.price - that.price)
   def +(that: PriceVal) = PriceVal(this.price + that.price)
}

and change our Item definition to reflect this:

final case class Item (
   itemId: Int,
   asked: PriceVal,
   soldPrice: PriceVal
)

We also need to refactor a bit our methods to accommodate the new Item‘s definition:

def totalPerformance(list: List[Item]): PriceVal = {
   list.foldLeft(PriceVal(0d))((acc, item) => {
     acc + (item.soldPrice - item.asked)
   })
}

def instantiate(list: List[Int]): List[Item] = {
   list.map(n => Item(n, PriceVal(n), PriceVal(n + 1.0)))
}

Now we can compare totalPerformance time usage with all the solutions we came up with:

Selection_03

In orange is the implementation with the wrapper Price
In blue is the original implementation of Item with Double
in green the one with the value class PriceVal

Is quite evident how there isn’t any significant difference between the base implementation with Double and the one with value classes. On the other hand, the version with a Price wrapper doesn’t perform as efficiently as the others. Analogous outcome when the result for the memory usage is examined:

Selection_04-bis

Once again, no difference between the Double and value class implementation and that’s why you see only one line with the green and blue lines perfectly overlapped.
In case the two overlapping lines hide a bit too much what’s happening, the graph below instead compare just the memory usage of the Price wrapper solution (in orange) against the version using PriceVal value class (in green)

Selection_04

———————————-

1. [Here there is a good explanation of why double is not a wise choice when dealing with money but for the purpose of this post is perfectly fine and illustrate the point quite effectively]
2. [Note that in Scala, Double itself is not represented by an object in the underlying runtime system and is equivalent to a double Java primitive]
3. [Whether those few milliseconds actually make a difference it largely depends on the context. For the purpose of this post, the only relevant aspect is that there is a difference]