Scala Stream: There’s no reason to use it if you don’t actually need one.

The first time I read about Scala Streams in Functional Programming in Scala I was impressed by the example’s captivating simplicity. I assume a certain familiarity with the idea of a Stream but just as a quick recap if that is not the case, the original example in the book was:

List(1,2,3,4).map(_ + 10).filter(_%2==0).map(_ * 3)

Which gets evaluated, step by step, as:

List(11,12,13,14).filter(_%2==0).map(_ * 3)
List(12,14).map(_ * 3)
List(36,42)

Whereas a Stream version

Stream(1,2,3,4).map(_ + 10).filter(_%2==0).map(_ * 3).toList1

because of its laziness, will evaluate in the following way:

Stream(1,2,3,4).map(_ + 10).filter(_ % 2 == 0).toList
cons(11, Stream(2,3,4).map(_ + 10)).filter(_ % 2 == 0).toList
Stream(2,3,4).map(_ + 10).filter(_ % 2 == 0).toList
cons(12, Stream(3,4).map(_ + 10)).filter(_ % 2 == 0).toList
12 :: Stream(3,4).map(_ + 10).filter(_ % 2 == 0).toList
12 :: cons(13, Stream(4).map(_ + 10)).filter(_ % 2 == 0).toList
12 :: Stream(4).map(_ + 10).filter(_ % 2 == 0).toList
12 :: cons(14, Stream().map(_ + 10)).filter(_ % 2 == 0).toList
12 :: 14 :: Stream().map(_ + 10).filter(_ % 2 == 0).toList
12 :: 14 :: List()2

Note that no intermediate results like List(11,12,13,14) or List(12,14) get instantiated in this case. It looks like there is simply less work to do but this impression of efficiency might be misleading. A simple micro benchmark reveals how the Stream perform worse than the List:

testOriginalExampleWithList    thrpt   20  4368200.644 ± 155109.000  ops/s
testOriginalExampleWithStream  thrpt   20  2881809.414 ± 122061.743  ops/s

The reason has to be tracked down in the Stream lazy nature along with its immutability as it belongs to the Scala immutable collection but at the same time its tail is not “real” yet (sort to speak) and it will be evaluated when required. On that respect the tail resemble a rule or a set of instructions to materialise its elements. The marriage between these two aspects of a Stream requires some synchronisation which is the toll to pay (and the reason behind the different performance above):

/** A lazy cons cell, from which streams are built. */
@SerialVersionUID(-602202424901551803L)
final class Cons[+A](hd: A, tl: => Stream[A]) extends Stream[A] {
  override def isEmpty = false
  override def head = hd
  @volatile private[this] var tlVal: Stream[A] = _
  @volatile private[this] var tlGen = tl _
  def tailDefined: Boolean = tlGen eq null
  override def tail: Stream[A] = {
    if (!tailDefined)
      synchronized {
        if (!tailDefined) {
          tlVal = tlGen()
          tlGen = null
        }
      }

    tlVal
  }
}3

So, why use a Stream after all? As the title (and the Scala doc) suggest, there are occasions where a lazy sequence comes handy. One well known example is the possibility to define “infinite” sequences like the Fibonacci one in the documentation. An other aspect, and potentially an advantage in some situation, is the generally low memory footprint of a Stream (remember, the intermediate sequences, like List(11,12,13,14) and List(12,14) in the example, don’t get instantiated).

The following terminate successfully on my machine with roughly 4GB of heap memory4:

Stream.fill(600)("Some random sample text.")
      .map(_ * 500000 + "test")
      .zipWithIndex.filter(_._2 % 2 == 0)
      .map(_._1.contains("test"))

Whereas its List equivalent throws a

java.lang.OutOfMemoryError: Java heap space

.The above case is build purely with the intent to trigger the exception but compare this:

final case class Wrapper(index: Int, text: String)

val listInput: List[Int] = (1 to 500).toList

listInput
      .map(Wrapper(_, "Some simple text"))
      .map(_.copy(text = "some simple text" * 500))
      .map(w => w.copy(text = w.text + "look for me!"))
      .filter(_.index % 2 == 0)
      .map(_.text.toLowerCase)
      .count(_.contains("look for me!"))

with this one:

val streamInput: Stream[Int] = (1 to 500).toStream

streamInput
      .map(Wrapper(_, "Some simple text"))
      .map(_.copy(text = "some simple text" * 500))
      .map(w => w.copy(text = w.text + "look for me!"))
      .filter(_.index % 2 == 0)
      .map(_.text.toLowerCase)
      .count(_.contains("look for me!"))

Both complete successfully but the Stream version in this case is performing better than the List one:

List    thrpt   15  76.588 ± 2.734  ops/s
Stream  thrpt   15  80.246 ± 2.260  ops/s

Flight Recorder shows what is happening under the hood: the first example with List generates more intermediate results that then increase the amount of work for the garbage collector:

gc-list

Whereas in the Stream case the garbage collector is definitely less busy and less CPU resources are needed in order to free the heap.

gc-stream

In this last case, the amount of extra work on the garbage collector was sufficient to tip the balance in favor of a lazy sequence. Those cases are probably the exceptions rather than the rule but I guess the lesson is that wherever there are long chain of transformations on non lazy collection and/or the suspect that those might involve extra work for the GC, then perhaps the idea to test a lazy alternative should be taken into consideration.

All code and tests available here.

———————————

1. [a Stream is lazy hence we need to turn it into a List to trigger the evaluation]
2. [Chiusano P., Bjarnason R. Functional Programming in Scala. Manning, 2014. p.72]
3. [The code comes from the definition of the Stream class.]
4. [All the numbers regarding performance and potential out of memory errors or general memory usage, depends obviously on the specific hardware and configuration in use (in my case an old laptop and not much heap assigned) but I think the general idea behind those tests, rather than the specific numbers, still stands.]

Microbenchmarking and Performance Tests in Scala: JMH and ScalaMeter

In many contexts, micro-benchmarking and performance are not the main issue and other considerations should drive a developer’s choices regarding implementation. That’s not always the case tough. This post is just to share and illustrate a sort of Scala project template that I put together and found quite useful in few situations where micro-benchmarking and performance might be a concern (or simply a legitimate curiosity). Broadly speaking, in Scala there are two viable approaches: JMH (Java Microbenchmarking Harness) and ScalaMeter and, if you are interested, a discussion about their pros and cons and their place in the Scala ecosystem is available here. To note that JMH is part of OpenJdk project but that doesn’t mean that benchmarks have to run on Open Jdk. As a matter of fact, all the examples presented in this post were executed on an Oracle Jdk. NOTE: all the code is available here.

Settings and dependencies

For JMH, we  can use sbt-jmh, an sbt plugin for JMH, so I added the following line to project/plugins.sbt:

addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.3.4")

And then we set the dependencies in the build.sbt

import sbt.Keys._

lazy val root = (project in file(".")).
  enablePlugins(JmhPlugin).
  settings(
    name := "scala-jmh-performance-testing",
    organization := "LansaloLtd",
    version := "1.0",
    scalaVersion := "2.12.3",
    libraryDependencies ++= Seq(
      "org.openjdk.jmh" % "jmh-generator-annprocess" % "1.21",
      "commons-codec" % "commons-codec" % "1.9",
      "com.storm-enroute" %% "scalameter" % "0.8.2",
      "org.scalatest" %% "scalatest" % "3.0.1" % Test
    ),
    testFrameworks += new TestFramework("org.scalameter.ScalaMeterFramework"),
    parallelExecution in Test := false
  )

Personally I like to use both ScalaMeter and JMH (that is why both are present as dependency above).

Far to be an exhaustive list of pros and cons between the two but I would like to mention at least that the first allows to create graphs really easily and follow the same approach as ScalaCheck with generators that can be composed via for-comprehension, on the other hand is not actively maintained. The second seems to be the preferred choice for Scala and for some measurement is really straight forward. At the present time, the only documentation available for JMH is via examples. Note also that we have introduced a dependency on OpenJDK (org.openjdk.jmh) but that is independent from the runtime jdk.

Targets

Purely for demonstration purposes we need two candidates or targets for our benchmark. For no particular reason I chose:

// candidate 1
def mapOnFunctionsAndList(input: String, slice: Int, funcs: List[String => Int]): List[Int] = {
    funcs.map(f => slide(input, slice).map(f).min)
}

and:

// candidate 2
def foldOnFunctionsList(input: String, slice: Int, funcs: List[String => Int]): List[Int] = {
    funcs.foldLeft(List.empty[Int])((acc, func) => {
      slide(input, slice).map(str => func(str)).min :: acc
    })
}

Basically, we start from a certain number n of functions (String => Int), then we slide a given String obtaining z substrings from it. For each function, 1 to n, we pass all the substrings obtaining z Int but we keep only the minimum.  Therefore, if we start with a list of n functions, we end up with a list on n Int. These two implementations are equivalent and return same results given the same inputs but are there differences when it comes to performances?

JMH

With JMH we can work out pretty easily the throughput for each of our two candidates. First we need to define the targets of our benchmark and a State for them:

package com.lansalo.jmh

import com.lansalo.Target._
import org.openjdk.jmh.annotations._

class TargetPerformance {

  import Scopes._

  def testMapOnFunctionsAndList(state: BenchmarkState): Unit = {
    mapOnFunctionsAndList(state.title, state.slice, state.funcs)
  }

  def testFoldOnFunctionsList(state: BenchmarkState): Unit = {
    foldOnFunctionsList(state.title, state.slice, state.funcs)
  }

}

object Scopes {

  import com.lansalo.HashingFunctions.hashingFunctions

  val novelTitle = "The Persecution and Assassination of Jean-Paul Marat as Performed by the Inmates of the Asylum of Charenton Under the Direction of the Marquis de Sade"

  @State(Scope.Benchmark)
  class BenchmarkState {
    val funcs: List[String => Int] = hashingFunctions(200)
    val title: String = novelTitle
    val slice: Int = 4
  }
}

And then the proper benchmark (that can also be executed within an IDE like IntelliJ):

package com.lansalo.jmh.benchmark

import java.util.concurrent.TimeUnit

import com.lansalo.jmh.{Scopes, TargetPerformance}
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.results.format.ResultFormatType
import org.openjdk.jmh.runner.Runner
import org.openjdk.jmh.runner.options.{Options, OptionsBuilder}

object BenchmarkRunner_ThroughputBenchmark {
  // run sbt clean jmh:compile from terminal first.
  def main(args: Array[String]): Unit = {
    val opt: Options = new OptionsBuilder().include(classOf[ThroughputBenchmark].getSimpleName)
      .resultFormat(ResultFormatType.TEXT).build
    new Runner(opt).run
  }
}

@OutputTimeUnit(TimeUnit.SECONDS)
@Warmup(iterations = 30)
@Measurement(iterations = 30)
@State(Scope.Benchmark)
private class ThroughputBenchmark extends TargetPerformance {

  @Benchmark
  @BenchmarkMode(Array(Mode.Throughput, Mode.AverageTime))
  @Fork(value = 1)
  override def testFoldOnFunctionsList(state: Scopes.BenchmarkState): Unit = super.testFoldOnFunctionsList(state)

  @Benchmark
  @BenchmarkMode(Array(Mode.Throughput, Mode.AverageTime))
  @Fork(value = 1)
  override def testMapOnFunctionsAndList(state: Scopes.BenchmarkState): Unit = super.testMapOnFunctionsAndList(state)

}

In this example, we measure the throughput and average time:

@BenchmarkMode(Array(Mode.Throughput, Mode.AverageTime))

in a separate JVM:

@Fork(value = 1)

We also run 30 warmup iterations (ignored for the final results) and 30 proper iterations where the actual measurement is taken:

@Warmup(iterations = 30)
@Measurement(iterations = 30)

Warmup is needed mainly because of JVM features as just-in-time compilation but that would take us to talk about all the background behind microbenchmarking which would be an entire chapter on its own. I can recommend few readings though: Nanotrusting the Nanotime, Avoiding Benchmarking Pitfalls on the JVM, Java JVM Warmup.1
We are almost ready, run sbt clean jmh:compile from the terminal and then we can run ThroughputBenchmark straight from IntelliJ and obtain something like:

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

Benchmark                                       Mode  Cnt   Score    Error  Units
ThroughputBenchmark.testFoldOnFunctionsList    thrpt   30  95.529 ±  0.940  ops/s
ThroughputBenchmark.testMapOnFunctionsAndList  thrpt   30  71.549 ±  0.734  ops/s
ThroughputBenchmark.testFoldOnFunctionsList     avgt   30   0.010 ±  0.001   s/op
ThroughputBenchmark.testMapOnFunctionsAndList   avgt   30   0.011 ±  0.001   s/op

Not really interested in the numbers themselves (which also depends upon the hardware) but more in the sort of intelligence we can gather in this way. ScalaMeter offer different possibilities and customizations as well. I quite like the graphs bits and with the following code:

package com.lansalo.scalameter.benchmark

import com.lansalo.HashingFunctions.hashingFunctions
import com.lansalo.Target._

import java.io.File

import org.scalameter.{Bench, Gen}
import org.scalameter.persistence.SerializationPersistor

import scala.util.Random

object PerformanceBenchmark extends Bench.OfflineReport {

  override lazy val persistor = SerializationPersistor(new File("target/scalameter/performance/results"))

  val functionsGen: Gen[Int] = Gen.range("functionsNumber")(100, 1000, 100)
  val inputGen: Gen[Int] = Gen.range("imputLength")(20, 200, 20)

  val inputs: Gen[(String, List[String => Int])] = for {
    functionsNumber  foldOnFunctionsList(param._1, 4, param._2)
      }
    }
  }

}

we can obtain the below graph:

map-fold

Where orange is for fold and blue for map.

We can also investigate the two implementations from a memory usage point of view. JMH provides a series of Profilers that can be quite handy in those cases (in the following example we chose a HotspotMemoryProfiler):

object BenchmarkRunner_MemoryFootprint {
  // run sbt clean jmh:compile from terminal first.
  def main(args: Array[String]): Unit = {
    val opt: Options = new OptionsBuilder().include(classOf[MemoryFootprint].getSimpleName).addProfiler(classOf[HotspotMemoryProfiler])
      .resultFormat(ResultFormatType.TEXT).build
    new Runner(opt).run
  }
}

Resulting in a long list of counters (below a partial output):

Benchmark                                                                                             Mode  Cnt            Score   Error  Units
MemoryFootprint.testFoldOnFunctionsList                                                              thrpt                71.335          ops/s
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.0.invocations                              thrpt               108.000              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.0.lastEntryTime                            thrpt       10179095151.000              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.0.lastExitTime                             thrpt       10178954989.000              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.0.time                                     thrpt          75456013.000              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.1.invocations                              thrpt                   ≈ 0              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.1.lastEntryTime                            thrpt                   ≈ 0              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.1.lastExitTime                             thrpt                   ≈ 0              ?
MemoryFootprint.testFoldOnFunctionsList:·sun.gc.collector.1.time                                     thrpt                   ≈ 0              ?

you can take a look at here to see how to make sense of them. The following ScalaMeter example instead provide a more basic picture of the heap usage:

object MemoryBenchmark extends Bench.OfflineReport {

  override lazy val persistor = SerializationPersistor(new File("target/scalameter/memoryusage/results"))

  override def measurer = new Measurer.MemoryFootprint

// rest of the code is the same as above

}

For the two fold/map implementations the graph wouldn’t be so interesting (as they overlap) but you can take a look at another post I wrote about value classes to get an idea.

Finally, if running the benchmark on the Oracle JDK, we can use JMH to exploit the Flight Recorder:

object BenchmarkRunner_FlightRecorder {
  // run sbt clean jmh:compile from terminal first.
  def main(args: Array[String]): Unit = {
    val opt: Options = new OptionsBuilder().include(classOf[FlightRecorderDump].getSimpleName)
      .resultFormat(ResultFormatType.TEXT).build
    new Runner(opt).run
  }
}

@Warmup(iterations = 30)
@Measurement(iterations = 30)
@State(Scope.Benchmark)
private class FlightRecorderDump extends TargetPerformance {

  @Benchmark
  @Fork(value = 1, jvmArgsAppend = Array("-XX:+UnlockCommercialFeatures",
      "-XX:+FlightRecorder", "-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints",
      "-XX:FlightRecorderOptions=defaultrecording=true,dumponexit=true,dumponexitpath=/tmp/foldOnFunctionsList.jfr"))
  override def testFoldOnFunctionsList(state: Scopes.BenchmarkState): Unit = super.testFoldOnFunctionsList(state)

  @Benchmark
  @Fork(value = 1, jvmArgsAppend = Array("-XX:+UnlockCommercialFeatures",
    "-XX:+FlightRecorder", "-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints",
    "-XX:FlightRecorderOptions=defaultrecording=true,dumponexit=true,dumponexitpath=/tmp/mapOnFunctionsAndList.jfr"))
  override def testMapOnFunctionsAndList(state: Scopes.BenchmarkState): Unit = super.testMapOnFunctionsAndList(state)

}

And this will generate two separate reports, /tmp/foldOnFunctionsList.jfr for the fold version and /tmp/mapOnFunctionsAndList.jfr for the version with map. With JMC (Java Mission Control) we can compare a rich collection of parameters and and different aspects  between the two implementations. Below a general overview of the heap usage for the the map implementation, just as an example:

map-perf

—————————————-

1. [In this post a certain familiarity with basic concepts of benchmarking is somehow taken for granted as the main focus it to illustrate how JMH and ScalaMeter can be used to produce some results. Still, the interpretation of those results, require the understanding of the mentioned background]

Spark: How to Add Multiple Columns in Dataframes (and How Not to)

There are generally two ways to dynamically add columns to a dataframe in Spark. A foldLeft or a map (passing a RowEncoder). The foldLeft way is quite popular (and elegant) but recently I came across an issue regarding its performance when the number of columns to add is not trivial. I think it’s worth to share the lesson learned: a map solution offers substantial better performance when the number of columns to be added tends to increase.

Let’s start with an example dataframe with four columns: “id”, “author”, “title”, “incipit” (the opening line of the novel). A simple dataframe with just one row might look something like:

+---+-----------+-------------+-----------------------------+
| id|     author|        title|                      incipit|
+---+-----------+-------------+-----------------------------+
|  6|Leo Tolstoy|Anna Karenina|Happy families are all alike |
+---+-----------+-------------+-----------------------------+

You are given an arbitrary list of words and, for each of them, you would like to add a column (named after the word) to the original dataframe and flag with a boolean whether or not that word appear at least once in the opening line (incipit).
For a list of two words List("families", "giraffe"), the above dataframe will be transformed into the following:

+---+-----------+-------------+-----------------------------+--------+-------+
| id|     author|        title|                      incipit|families|giraffe|
+---+-----------+-------------+-----------------------------+--------+-------+
|  6|Leo Tolstoy|Anna Karenina|Happy families are all alike |    true|  false|
+---+-----------+-------------+-----------------------------+--------+-------+

As the list of columns is arbitrary, there are two possible approaches to this problem. I wrapper both in a method to make testing easier. First approach would be the foldLeft way:

def addColumnsViaFold(df: DataFrame, columns: List[String]): DataFrame = {
  import df.sparkSession.implicits._
  columns.foldLeft(df)((acc, col) => {
    acc.withColumn(col, acc("incipit").as[String].contains(col))
  })
}

And the second one (which involves a bit more coding) is the map way:

def addColumnsViaMap(df: DataFrame, words: List[String]): DataFrame = {
   val encoder = RowEncoder.apply(getSchema(df, words))
   df.map(mappingRows(df.schema)(words))(encoder)
}

private val mappingRows: StructType => List[String] => Row => Row =
  (schema) => (words) => (row) => {
    val addedCols: List[Boolean] = words.map {
      word => row.getString(schema.fieldIndex("incipit")).contains(word)
    }
    Row.merge(row, Row.fromSeq(addedCols))
  }

private def getSchema(df: DataFrame, words: List[String]): StructType = {
  var schema: StructType = df.schema
  words.foreach(word => schema = schema.add(word, DataTypes.BooleanType, false))
  schema
}

The code (including all tests) is available here
When we run the scala meter tests to get some idea of how the two approaches behave when dealing with 100 new columns, we get the following results1. For foldLeft (addColumnsViaFold method):
adding-column-via-fold-100
Whereas those one are the results for map (addColumnsViaMap method):
adding_columns_via_map-100
When the number of columns increases, foldLeft is taking considerably more time. If we take the number of columns to 500 the result is similar (and more dramatic)
Once again, for foldLeft :
adding-column-via-fold
And for map:
adding_columns_via_map-500
With 1000 columns, foldLeft job aborts:

[error] Error running separate JVM: java.lang.OutOfMemoryError: GC overhead limit exceeded

Probably sign that the heap is running low and the CG can’t free much memory. Still, the map based solution seems to cope much better even with 1000 row:
adding_columns_via_map
When it comes to the reason behind this different behavior, my guess would be that somehow Catalyst is not able to optimize the foldLeft operation. The below plans (explain(true)) show how the Parsed and Analyzed Logical Plans end up in a sort of nested structure.

== Parsed Logical Plan ==
Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 20 more fields]
+- AnalysisBarrier
      +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 19 more fields]
         +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 18 more fields]
            +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 17 more fields]
               +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 16 more fields]
                  +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 15 more fields]
                     +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 14 more fields]
                        +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 13 more fields]
                           +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 12 more fields]
                              +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 11 more fields]
                                 +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 10 more fields]
                                    +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 9 more fields]
                                       +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 8 more fields]
                                          +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 7 more fields]
                                             +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 6 more fields]
                                                +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 5 more fields]
                                                   +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 4 more fields]
                                                      +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 3 more fields]
                                                         +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 2 more fields]
                                                            +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, Contains(incipit#12, toing) AS toing#368]
                                                               +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, Contains(incipit#12, hence) AS hence#342]
                                                                  +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, Contains(incipit#12, four) AS four#317]
                                                                     +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, Contains(incipit#12, John) AS John#293]
                                                                        +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, Contains(incipit#12, pop) AS pop#270]
                                                                           +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, Contains(incipit#12, drunk) AS drunk#248]
                                                                              +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, Contains(incipit#12, two) AS two#227]
                                                                                 +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, Contains(incipit#12, when) AS when#207]
                                                                                    +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, Contains(incipit#12, city) AS city#188]
                                                                                       +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, Contains(incipit#12, morning) AS morning#170]
                                                                                          +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, Contains(incipit#12, way) AS way#153]
                                                                                             +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, Contains(incipit#12, pig) AS pig#137]
                                                                                                +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, Contains(incipit#12, happy) AS happy#122]
                                                                                                   +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, Contains(incipit#12, was) AS was#108]
                                                                                                      +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, Contains(incipit#12, anna) AS anna#95]
                                                                                                         +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, Contains(incipit#12, in) AS in#83]
                                                                                                            +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, Contains(incipit#12, man) AS man#72]
                                                                                                               +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, Contains(incipit#12, with) AS with#62]
                                                                                                                  +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, Contains(incipit#12, alike) AS alike#53]
                                                                                                                     +- Project [id#9, author#10, title#11, incipit#12, to#38, Contains(incipit#12, get) AS get#45]
                                                                                                                        +- Project [id#9, author#10, title#11, incipit#12, Contains(incipit#12, to) AS to#38]
                                                                                                                           +- Project [_1#4 AS id#9, _2#5 AS author#10, _3#6 AS title#11, _4#7 AS incipit#12]
                                                                                                                              +- LocalRelation [_1#4, _2#5, _3#6, _4#7]

== Analyzed Logical Plan ==
id: int, author: string, title: string, incipit: string, to: boolean, get: boolean, alike: boolean, with: boolean, man: boolean, in: boolean, anna: boolean, was: boolean, happy: boolean, pig: boolean, way: boolean, morning: boolean, city: boolean, when: boolean, two: boolean, drunk: boolean, pop: boolean, John: boolean, four: boolean, hence: boolean, ... 20 more fields
Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 20 more fields]
+- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 19 more fields]
   +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 18 more fields]
      +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 17 more fields]
         +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 16 more fields]
            +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 15 more fields]
               +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 14 more fields]
                  +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 13 more fields]
                     +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 12 more fields]
                        +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 11 more fields]
                           +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 10 more fields]
                              +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 9 more fields]
                                 +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 8 more fields]
                                    +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 7 more fields]
                                       +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 6 more fields]
                                          +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 5 more fields]
                                             +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 4 more fields]
                                                +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 3 more fields]
                                                   +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, ... 2 more fields]
                                                      +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, hence#342, Contains(incipit#12, toing) AS toing#368]
                                                         +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, four#317, Contains(incipit#12, hence) AS hence#342]
                                                            +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, John#293, Contains(incipit#12, four) AS four#317]
                                                               +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, pop#270, Contains(incipit#12, John) AS John#293]
                                                                  +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, drunk#248, Contains(incipit#12, pop) AS pop#270]
                                                                     +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, two#227, Contains(incipit#12, drunk) AS drunk#248]
                                                                        +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, when#207, Contains(incipit#12, two) AS two#227]
                                                                           +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, city#188, Contains(incipit#12, when) AS when#207]
                                                                              +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, morning#170, Contains(incipit#12, city) AS city#188]
                                                                                 +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, way#153, Contains(incipit#12, morning) AS morning#170]
                                                                                    +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, pig#137, Contains(incipit#12, way) AS way#153]
                                                                                       +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, happy#122, Contains(incipit#12, pig) AS pig#137]
                                                                                          +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, was#108, Contains(incipit#12, happy) AS happy#122]
                                                                                             +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, anna#95, Contains(incipit#12, was) AS was#108]
                                                                                                +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, in#83, Contains(incipit#12, anna) AS anna#95]
                                                                                                   +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, man#72, Contains(incipit#12, in) AS in#83]
                                                                                                      +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, with#62, Contains(incipit#12, man) AS man#72]
                                                                                                         +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, alike#53, Contains(incipit#12, with) AS with#62]
                                                                                                            +- Project [id#9, author#10, title#11, incipit#12, to#38, get#45, Contains(incipit#12, alike) AS alike#53]
                                                                                                               +- Project [id#9, author#10, title#11, incipit#12, to#38, Contains(incipit#12, get) AS get#45]
                                                                                                                  +- Project [id#9, author#10, title#11, incipit#12, Contains(incipit#12, to) AS to#38]
                                                                                                                     +- Project [_1#4 AS id#9, _2#5 AS author#10, _3#6 AS title#11, _4#7 AS incipit#12]
                                                                                                                        +- LocalRelation [_1#4, _2#5, _3#6, _4#7]

== Optimized Logical Plan ==
Project [id#9, author#10, title#11, incipit#12, Contains(incipit#12, to) AS to#38, Contains(incipit#12, get) AS get#45, Contains(incipit#12, alike) AS alike#53, Contains(incipit#12, with) AS with#62, Contains(incipit#12, man) AS man#72, Contains(incipit#12, in) AS in#83, Contains(incipit#12, anna) AS anna#95, Contains(incipit#12, was) AS was#108, Contains(incipit#12, happy) AS happy#122, Contains(incipit#12, pig) AS pig#137, Contains(incipit#12, way) AS way#153, Contains(incipit#12, morning) AS morning#170, Contains(incipit#12, city) AS city#188, Contains(incipit#12, when) AS when#207, Contains(incipit#12, two) AS two#227, Contains(incipit#12, drunk) AS drunk#248, Contains(incipit#12, pop) AS pop#270, Contains(incipit#12, John) AS John#293, Contains(incipit#12, four) AS four#317, Contains(incipit#12, hence) AS hence#342, ... 20 more fields]
+- InMemoryRelation [id#9, author#10, title#11, incipit#12], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
      +- LocalTableScan [id#9, author#10, title#11, incipit#12]

== Physical Plan ==
*(1) Project [id#9, author#10, title#11, incipit#12, Contains(incipit#12, to) AS to#38, Contains(incipit#12, get) AS get#45, Contains(incipit#12, alike) AS alike#53, Contains(incipit#12, with) AS with#62, Contains(incipit#12, man) AS man#72, Contains(incipit#12, in) AS in#83, Contains(incipit#12, anna) AS anna#95, Contains(incipit#12, was) AS was#108, Contains(incipit#12, happy) AS happy#122, Contains(incipit#12, pig) AS pig#137, Contains(incipit#12, way) AS way#153, Contains(incipit#12, morning) AS morning#170, Contains(incipit#12, city) AS city#188, Contains(incipit#12, when) AS when#207, Contains(incipit#12, two) AS two#227, Contains(incipit#12, drunk) AS drunk#248, Contains(incipit#12, pop) AS pop#270, Contains(incipit#12, John) AS John#293, Contains(incipit#12, four) AS four#317, Contains(incipit#12, hence) AS hence#342, ... 20 more fields]
+- InMemoryTableScan [author#10, id#9, incipit#12, title#11]
      +- InMemoryRelation [id#9, author#10, title#11, incipit#12], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- LocalTableScan [id#9, author#10, title#11, incipit#12]

Not sure whether that is somehow related to the Catalyst inability to optimize foldLeft but the explained map plan doesn’t show the same nested structure:

== Parsed Logical Plan ==
'SerializeFromObject [validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, id), IntegerType) AS id#83, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, author), StringType), true, false) AS author#84, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, title), StringType), true, false) AS title#85, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, incipit), StringType), true, false) AS incipit#86, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, to), BooleanType) AS to#87, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, get), BooleanType) AS get#88, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, alike), BooleanType) AS alike#89, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, with), BooleanType) AS with#90, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, man), BooleanType) AS man#91, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, in), BooleanType) AS in#92, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, anna), BooleanType) AS anna#93, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, was), BooleanType) AS was#94, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, happy), BooleanType) AS happy#95, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, pig), BooleanType) AS pig#96, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, way), BooleanType) AS way#97, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 15, morning), BooleanType) AS morning#98, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 16, city), BooleanType) AS city#99, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 17, when), BooleanType) AS when#100, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 18, two), BooleanType) AS two#101, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 19, drunk), BooleanType) AS drunk#102, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 20, pop), BooleanType) AS pop#103, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 21, John), BooleanType) AS John#104, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 22, four), BooleanType) AS four#105, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 23, hence), BooleanType) AS hence#106, ... 20 more fields]
+- 'MapElements , interface org.apache.spark.sql.Row, [StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true)], obj#82: org.apache.spark.sql.Row
   +- 'DeserializeToObject unresolveddeserializer(createexternalrow(getcolumnbyordinal(0, IntegerType), getcolumnbyordinal(1, StringType).toString, getcolumnbyordinal(2, StringType).toString, getcolumnbyordinal(3, StringType).toString, StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true))), obj#81: org.apache.spark.sql.Row
      +- AnalysisBarrier
            +- Project [_1#4 AS id#9, _2#5 AS author#10, _3#6 AS title#11, _4#7 AS incipit#12]
               +- LocalRelation [_1#4, _2#5, _3#6, _4#7]

== Analyzed Logical Plan ==
id: int, author: string, title: string, incipit: string, to: boolean, get: boolean, alike: boolean, with: boolean, man: boolean, in: boolean, anna: boolean, was: boolean, happy: boolean, pig: boolean, way: boolean, morning: boolean, city: boolean, when: boolean, two: boolean, drunk: boolean, pop: boolean, John: boolean, four: boolean, hence: boolean, ... 20 more fields
SerializeFromObject [validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, id), IntegerType) AS id#83, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, author), StringType), true, false) AS author#84, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, title), StringType), true, false) AS title#85, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, incipit), StringType), true, false) AS incipit#86, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, to), BooleanType) AS to#87, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, get), BooleanType) AS get#88, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, alike), BooleanType) AS alike#89, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, with), BooleanType) AS with#90, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, man), BooleanType) AS man#91, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, in), BooleanType) AS in#92, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, anna), BooleanType) AS anna#93, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, was), BooleanType) AS was#94, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, happy), BooleanType) AS happy#95, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, pig), BooleanType) AS pig#96, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, way), BooleanType) AS way#97, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 15, morning), BooleanType) AS morning#98, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 16, city), BooleanType) AS city#99, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 17, when), BooleanType) AS when#100, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 18, two), BooleanType) AS two#101, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 19, drunk), BooleanType) AS drunk#102, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 20, pop), BooleanType) AS pop#103, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 21, John), BooleanType) AS John#104, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 22, four), BooleanType) AS four#105, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 23, hence), BooleanType) AS hence#106, ... 20 more fields]
+- MapElements , interface org.apache.spark.sql.Row, [StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true)], obj#82: org.apache.spark.sql.Row
   +- DeserializeToObject createexternalrow(id#9, author#10.toString, title#11.toString, incipit#12.toString, StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true)), obj#81: org.apache.spark.sql.Row
      +- Project [_1#4 AS id#9, _2#5 AS author#10, _3#6 AS title#11, _4#7 AS incipit#12]
         +- LocalRelation [_1#4, _2#5, _3#6, _4#7]

== Optimized Logical Plan ==
SerializeFromObject [validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, id), IntegerType) AS id#83, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, author), StringType), true, false) AS author#84, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, title), StringType), true, false) AS title#85, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, incipit), StringType), true, false) AS incipit#86, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, to), BooleanType) AS to#87, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, get), BooleanType) AS get#88, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, alike), BooleanType) AS alike#89, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, with), BooleanType) AS with#90, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, man), BooleanType) AS man#91, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, in), BooleanType) AS in#92, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, anna), BooleanType) AS anna#93, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, was), BooleanType) AS was#94, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, happy), BooleanType) AS happy#95, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, pig), BooleanType) AS pig#96, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, way), BooleanType) AS way#97, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 15, morning), BooleanType) AS morning#98, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 16, city), BooleanType) AS city#99, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 17, when), BooleanType) AS when#100, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 18, two), BooleanType) AS two#101, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 19, drunk), BooleanType) AS drunk#102, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 20, pop), BooleanType) AS pop#103, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 21, John), BooleanType) AS John#104, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 22, four), BooleanType) AS four#105, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 23, hence), BooleanType) AS hence#106, ... 20 more fields]
+- MapElements , interface org.apache.spark.sql.Row, [StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true)], obj#82: org.apache.spark.sql.Row
   +- DeserializeToObject createexternalrow(id#9, author#10.toString, title#11.toString, incipit#12.toString, StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true)), obj#81: org.apache.spark.sql.Row
      +- InMemoryRelation [id#9, author#10, title#11, incipit#12], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- LocalTableScan [id#9, author#10, title#11, incipit#12]

== Physical Plan ==
*(1) SerializeFromObject [validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, id), IntegerType) AS id#83, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, author), StringType), true, false) AS author#84, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, title), StringType), true, false) AS title#85, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, incipit), StringType), true, false) AS incipit#86, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, to), BooleanType) AS to#87, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, get), BooleanType) AS get#88, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, alike), BooleanType) AS alike#89, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, with), BooleanType) AS with#90, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, man), BooleanType) AS man#91, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, in), BooleanType) AS in#92, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, anna), BooleanType) AS anna#93, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, was), BooleanType) AS was#94, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, happy), BooleanType) AS happy#95, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, pig), BooleanType) AS pig#96, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, way), BooleanType) AS way#97, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 15, morning), BooleanType) AS morning#98, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 16, city), BooleanType) AS city#99, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 17, when), BooleanType) AS when#100, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 18, two), BooleanType) AS two#101, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 19, drunk), BooleanType) AS drunk#102, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 20, pop), BooleanType) AS pop#103, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 21, John), BooleanType) AS John#104, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 22, four), BooleanType) AS four#105, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 23, hence), BooleanType) AS hence#106, ... 20 more fields]
+- *(1) MapElements , obj#82: org.apache.spark.sql.Row
   +- *(1) DeserializeToObject createexternalrow(id#9, author#10.toString, title#11.toString, incipit#12.toString, StructField(id,IntegerType,false), StructField(author,StringType,true), StructField(title,StringType,true), StructField(incipit,StringType,true)), obj#81: org.apache.spark.sql.Row
      +- InMemoryTableScan [id#9, author#10, title#11, incipit#12]
            +- InMemoryRelation [id#9, author#10, title#11, incipit#12], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- LocalTableScan [id#9, author#10, title#11, incipit#12]

Whatever the root cause is, the conclusion is clear. As per Spark 2.3.0 (and probably previous versions) adding (dynamically) a congruous number of columns to a dataframe should be done via a map operation and not foldLeft for the reasons we’ve seen.

—————————————-

1. [I run the tests on a virtual box with three cores running Spark 2.3.0. Time in milliseconds reflects the underline infrastructure and I would expect different performance on a proper cluster. Nevertheless, that a foldLeft solution performs significantly worse that a map based solution should be an outcome independent of the underline hardware.]

Custom JSON serializer

In Scala there are quite a few frameworks that make the implementation of RESTful JSON services quite straightforward (Play, Scalatra, Akka HTTP are the main ones). They all follow the same general idea: a JSON payload deserialize into a Scala class (usually a case class) and serialize back into JSON. For example a JSON payload like:

{"name": "Paul", "age": 45, "gender": "male"}

Can be easily deserialized into something like:

final case class User(name: String, age: Int, gender: String)

This is pretty standard stuff and works almost out of the box in all the mentioned framework but in some cases you might want to take advantage of the type system for example by defining a type for “age”:

// A value class representing a person's age
final class Age(val age: Int) extends AnyVal

And also constant values for “gender” rather than a generic String type:

sealed trait Gender {
  val sex: String
}

final case object Male extends Gender {
  val sex = "male"
}

final case object Female extends Gender {
  val sex = "female"
}

final case object Other extends Gender {
  val sex = "other"
}

object GenderHelper {
  private val genders: Set[Gender] = Set(Male, Female, Other)
  def toGender(sex: String): Option[Gender] = genders.find(_.sex == sex)
}

We can now define User in a more robust way as:

final case class User(name: String, age: Age, gender: Gender)

Which looks better but doesn’t work out of the box anymore as, regardless of the JSON scala library we use, there is no way to tell how to serialize/deserialize types like Age or Gender. We need to write our own JSON custom serializer/deserializer. The following example uses Akka HTTP and Json4s (code is available here).

We first need to write two serializer/deserializer. One for Age:

import com.lansalo.model.Age
import org.json4s.CustomSerializer
import org.json4s.JsonAST.JInt

case object AgeSerializer extends CustomSerializer[Age](format => ( {
  case JInt(age) => new Age(age.intValue)
}, {
  case age: Int => JInt(BigInt(age))
}))

And one for Gender:

import com.lansalo.model.{Gender, GenderHelper}
import org.json4s.CustomSerializer
import org.json4s.JsonAST.JString

case object GenderSerializer extends CustomSerializer[Gender](format => ( {
  case JString(gender) => GenderHelper.toGender(gender).get
}, {
  case gender: Gender => JString(gender.sex)
}))

Then we just need to add them to the default json4s formats (in a Trait here):

import com.lansalo.json.serializer.{AgeSerializer, GenderSerializer}
import org.json4s.DefaultFormats

trait JsonSupport {
  implicit val formats = DefaultFormats + GenderSerializer + AgeSerializer
}

And finally bring the json formats in scope where we need it:

import akka.actor.ActorSystem
import akka.event.Logging
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.directives.MethodDirectives.{get, post}
import akka.http.scaladsl.server.directives.PathDirectives.path
import akka.http.scaladsl.server.directives.RouteDirectives.complete
import akka.util.Timeout
import com.lansalo.json.JsonSupport
import com.lansalo.model.{Age, Female, User}
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.jackson

import scala.concurrent.duration._

trait UserRoutes extends JsonSupport {

  implicit def system: ActorSystem

  lazy val log = Logging(system, classOf[UserRoutes])

  implicit lazy val timeout = Timeout(5.seconds)
  import Json4sSupport._

  implicit val serialization = jackson.Serialization // or native.Serialization

  lazy val userRoutes: Route = pathPrefix("users") {
    pathEnd {
      post {
        entity(as[User]) { user =>
          log.info(s"Received user: $user")
          complete((StatusCodes.Created, "OK"))
        }
      }
    } ~
      path(Segment) { name =>
        get {
          complete(User(name, new Age(22), Female))
        }
      }
  }
}

 

Scala Value Classes: Type-Safety for Free

Value classes are available since Scala 2.10 and the initial proposal SP-15 included several use case for this new mechanism but on this post I would like to focus on one in particular: the possibility to create wrappers with no boxing overhead and how we can leverage on this to improve our type system at zero cost.
And in doing that, I’ll add also some number to show what “zero cost” exactly means. The class Wrapper below:

class Wrapper(val flag: Boolean) extends AnyVal

is a value class. Generally speaking a value class has a single, public val parameter and extends directly AnyVal. There is a specific list of criteria that a value class must satisfy in order to be considered as such (for all the details, including universal traits, see SP-15) but here I would like to focus on the general idea behind such mechanism and it’s benefits.

In the example above, Wrapper is the type at compile time but when we run the program, at runtime the representation is a boolean and the “free” in the title refers precisely to the fact that Wrapper never gets actually created at runtime hence no boxing overhead and no extra memory allocation (but I’ll get back on this after an example).

So far so good… but how could we exploit this feature at our advantage? An example (perhaps a bit clumsy) should illustrate the point. Let’s say we are working on an online auction web site and we have an entity for a generic auction item:

final case class Item (
   itemId: Int,
   asked: Double,
   soldPrice: Double
)

The model is pretty simple: an id to identify uniquely an item, the asking price expressed as double (although in a real case scenario BigDecimal1 would be the right choice) and the price the item has been sold for. And, for no particular reason beyond our curiosity, we also have a method to check whether, overall, the buyers payed more than the asking price (for example for all the items sold in the last month):

def totalPerformanceBase(list: List[Item]): Double = {
   list.foldLeft(0d)((acc, item) => {
      acc + (item.soldPrice - item.asked)
   })
}

The method is called totalPerformance because we are going to check the performance of this method later and use it as our base reference.

Representing a price as a Double2 is perfectly fine but on the other hand we also have bets, shipping costs and quite likely many other domain models which include a price. We could create our own type to represent specifically an Item‘s price in order to make our code more robust and prevent common mistakes (pass a price for a bet to an item or vice versa), after all that’s the advantage of having a type system. The compiler will spot immediately the error, less runtime bugs, improved IDE experience and so forth. So we write a wrapper for our price:

final case class Price(price: Double)

And re-factor the Item class:

final case class Item (
   itemId: Int,
   asked: Price,
   soldPrice: Price
)

Also totalPerformance needs to be refactored accordingly:

def totalPerformance(list: List[Item]): Double = {
   list.foldLeft(0d)((acc, item) => {
      acc + (item.soldPrice.price - item.asked.price)
   })
}

In the graph below, the blue line represents our base implementation with a Double whereas the orange line represents the performance of the version with the wrapper Price.
Selection_01

You can find the Scala meter tests and all the code here, but bear in mind that results might vary based on your hardware. However the important point to notice here is that introducing Price as a wrapper for extra type safety comes with a small price in terms of performance. When totalPerformance crunches a list of half a million items we can see a difference of a couple of milliseconds3. Besides, because the wrapper Price gets actually instantiated, also from a memory usage point of view we can note a similar behavior. Let’s consider for example the following methods:

// For the original Item
def instantiate(list: List[Int]): List[Item] = {
  list.map(n => Item(n, n, n + 1.0))
}

// For the Item using Price as wrapper
def instantiate(list: List[Int]): List[Item] = {
   list.map(n => Item(n, Price(n), Price(n + 1.0)))
}

From the graph below we can notice the different heap memory usage while increasing progressively the number of items (in orange the implementation with Price wrapper and blue the original implementation with Double)

Selection_02

Just one note: the y-axis’ measure is actually in Kb (that’s a known issue with Scala Meter) so when the size is 970000, for instance, the heap usage for the Item with Double is 54222.97 Kb against one of 93120.00 Kb for the version with the price wrapper.

Value classes comes into play because allow us to keep the type safety we gain with the wrapper without paying the extra price (both in terms of execution time and memory usage).
We can change slightly the Price definition to make it a value class:

final case class PriceVal(price: Double) extends AnyVal {
   def -(that: PriceVal) = PriceVal(this.price - that.price)
   def +(that: PriceVal) = PriceVal(this.price + that.price)
}

and change our Item definition to reflect this:

final case class Item (
   itemId: Int,
   asked: PriceVal,
   soldPrice: PriceVal
)

We also need to refactor a bit our methods to accommodate the new Item‘s definition:

def totalPerformance(list: List[Item]): PriceVal = {
   list.foldLeft(PriceVal(0d))((acc, item) => {
     acc + (item.soldPrice - item.asked)
   })
}

def instantiate(list: List[Int]): List[Item] = {
   list.map(n => Item(n, PriceVal(n), PriceVal(n + 1.0)))
}

Now we can compare totalPerformance time usage with all the solutions we came up with:

Selection_03

In orange is the implementation with the wrapper Price
In blue is the original implementation of Item with Double
in green the one with the value class PriceVal

Is quite evident how there isn’t any significant difference between the base implementation with Double and the one with value classes. On the other hand, the version with a Price wrapper doesn’t perform as efficiently as the others. Analogous outcome when the result for the memory usage is examined:

Selection_04-bis

Once again, no difference between the Double and value class implementation and that’s why you see only one line with the green and blue lines perfectly overlapped.
In case the two overlapping lines hide a bit too much what’s happening, the graph below instead compare just the memory usage of the Price wrapper solution (in orange) against the version using PriceVal value class (in green)

Selection_04

———————————-

1. [Here there is a good explanation of why double is not a wise choice when dealing with money but for the purpose of this post is perfectly fine and illustrate the point quite effectively]
2. [Note that in Scala, Double itself is not represented by an object in the underlying runtime system and is equivalent to a double Java primitive]
3. [Whether those few milliseconds actually make a difference it largely depends on the context. For the purpose of this post, the only relevant aspect is that there is a difference]

Scala Variance Explained

There are quite a few articles and resources about variance in Scala available on line. Still, I decided to add my own contribute with this post, trying to keep the whole topic as simple as I could, using familiar concept, few diagrams and some code you can play with. That hopefully will make the whole thing a bit easier to grasp. I also decided not to include reference to category theory: not because of not use (on the contrary) but because I hope this rough explanation could work as a sort of encouragement and introduction to it (if you want to dig it further I linked some resources at the end of the page).

Kingdom Animalia

As we need an example, actually an analogy, I chose something that lack originality but not familiarity (hopefully). The code is available here but you won’t find any surprise in it. It’s just a hierarchy of classes modeling the relationship between different groups of animals:
Kingdom Animalia

Variance

From the hierarchy above Vertebrate is the parent class of Fish (or Fish a subtype of Vertebrate) and we can clearly write something like:

val vertebrate: Vertebrate = new Fish()

And that’s because a fish is a vertebrate (or at least we modeled it wisely as such). But what about a higher-kinded types like T[A]? Let’s say for example that we have a generic class Cage[A] that takes a type parameter, what can be said about the relationship between Cage[Vertebrate] and Cage[Fish]? Is Cage[Vertebrate] the parent of Cage[Fish]? Does the same relationship between a vertebrate and a fish apply to the respective higher-kinded type?

functor
Variance is mainly about how we answer this question. We have precisely three options: invariance, covariance, and contravariance

Invariance

In the invariant case the answer to the previous question is pretty simple. There is no relationship between the types and their corresponding higher-kinded types.

invariance
An invariant class Cage would be defined in the following way in Scala:

class Cage[A]

Then we can unsurprisingly write val mammalCage: Cage[Mammal] = new Cage[Mammal] but, despite Primate being a subtype of Mammal, the following will not compile:

val primateCage: Cage[Mammal] = new Cage[Primate]
// Expect an error like:
// [error] Note: kingdom.animalia.vertebrate.mammal.primate.Primate <: kingdom.animalia.vertebrate.mammal.Mammal, but class Cage is invariant in type A.
// [error] You may wish to define A as +A instead.

And for analogous reasons val vertebrateCage: Cage[Mammal] = new Cage[Vertebrate] won’t compile either. Conclusion: an invariant class Cage does not preserve the inheritance relationship between its type arguments.

Covariance

In the covariant case the answer to our initial question is more articulate and interesting. If B is a subtype of A, then even F[B] is a subtype of F[A].

covariance
Back to our example, a covariant Cage class in Scala would look like:

class Cage[+A]

And:

val mammalCage: Cage[Mammal] = new Cage[Mammal]
val primateCage: Cage[Mammal] = new Cage[Primate]
val homoCage: Cage[Mammal] = new Cage[Homo]

Will all compile without errors because of the parent/child relationship between mammal/primate (or mammal/homo) is extended to the higher-kinded type Cage so that Cage[Mammal] is the parent of Cage[Primate]. Of course the same won’t be true of:

// expect a type mismatch error if you try to compile any of these
val reptileCage: Cage[Mammal] = new Cage[Reptile]
val vertebrateCage: Cage[Mammal] = new Cage[Vertebrate]

As there is no parent/child relationship between a mammal and a reptile, nor between a mammal and a vertebrate. In this last case though, the opposite is true actually. All mammals are vertebrate and we modeled Vertebrate to be the parent of Mammal and that introduce us to the next topic.

Contravariance

As you might have guessed at this point, contravariance is the opposite of covariance:

contravariance
We still extend the types’ inheritance to the corresponding higher-kinded type but in this case, so to speak, the arrow is inverted. More precisely, if B is a subtype of A, then F[A] is a subtype of F[B] (note the difference with the covariant case). A contravariant Cage class in Scala would look like:

class Cage[-A]

Then in such a cage for a mammal, we can keep:

val mammalCage: Cage[Mammal] = new Cage[Mammal]
val vertebrateCage: Cage[Mammal] = new Cage[Vertebrate]
val animalCage: Cage[Mammal] = new Cage[Animal]

But the following lines will not compile:

// type mismatch error
val primateCage: Cage[Mammal] = new Cage[Primate]
// type mismatch error
val molluscCage: Cage[Mammal] = new Cage[Mollusc]

Recap

  • In an invariant Cage[Mammal] I can keep only a type Mammal (not its subtypes or supertypes)
  • In a covariant Cage[Mammal] I can keep a type Mammal or one of its subtypes (but not its supertypes)
  • In a contravariant Cage[Mammal] I can keep a type Mammal or one of its supertypes (but not its subtypes)

And note that this restriction is imposed at compile time. In other words, the compiler will check for you what can and cannot be passed to an higher-kinded type accordingly to its type arguments variance definition.

Covariant type A occurs in contravariant position

That was a sort of really general overview but It might leave you with some question about the use of variance from a practical perspective, including advantages and disadvantages. In this section (and the following), I’ll try to present some example along with some common Scala class that makes use of variance and I’ll start first with some code that does not compile: a broken cage.

class BrokenCage[+A](var guest: A)

The BrokenCage class is defined as covariant in is type but then we simply pass a guest of type A. The compiler will not digest that and will return an error like covariant type A occurs in contravariant position in type A of value guest.
To explain what is happening here and why the compiler is not happy, an absurd reasoning might help. So let’s assume for a moment that the code above compile and see what happens. I first create a cage with a primate in it:

val primateCage = new BrokenCage[Primate](new Primate)

Because BrokenCage is covariant in A, BrokenCage[Primate] is a subtype of BrokenCage[Animal] and therefore we can assign a BrokenCage[Primate] to a BrokenCage[Animal]

val animalCage: BrokenCage[Animal] = primateCage

So now we have an animalCage of type BrokenCage[Animal]. Then there shouldn’t be any problem to assign an invertebrate as a guest. After all an invertebrate is an animal and BrokenCage is covariant in its type.

animalCage.guest = new Invertebrate

But our animalCage is the primateCage we define at the beginning so basically we managed to put an invertebrate in a cage for a primate! This issue is known as heap pollution and the Scala compiler will prevent these sort of problems from happening, but it can occur for example in a normal Java array:

Primate[] primates = new Primate[5];
Animal[] animals =  primates;
animals[0] = new Invertebrate();

This code will compile and will result in an ArrayStoreException at runtime. One of the advantages of variance in Scala is precisely that these kind of error are captured at compile time rather than at runtime.
And just as a note (as it’s out of scope for the present post), the BrokenCage class can be fixed either using a val (rather than a var) or an upper type bound like in:

class FixedCage[+A, B <: A](var guest: B)

Where we specify that the guest B has to be a subtype of A (there are a couple of simple test around it in the code)

Option

To see how variance (covariance in the specific case) is used in Scala, the familiar Option trait is a good example. A simple implementation would be (and the actual Scala implementation is not much different):

sealed trait Option[+A]

case class Some[+A]( value: A ) extends Option[A]
case object None extends Option[Nothing]

Option is covariant in it’s type A (and so it’s Some). But why is covariant might not be immediately evident so to understand a bit more about it, let’s try to make it invariant and see what sort of behavior we should expect as a result of it:

sealed trait Option[A]

case class Some[A]( value: A ) extends Option[A]
case object None extends Option[Nothing]

The first thing to notice is that even Some has to be invariant in its type argument A as it extends Option and now Option is invariant in A. Besides (not surprisingly) we cannot assign an Option[Primate] to an Option[Mammal].

var optionalMammal: Option[Mammal] = Some(new Mammal)
val optionalPrimate: Option[Primate] = Some(new Primate)
// This won't compile
// optionalMammal = optionalPrimate

Despite Primate being a subtype of Mammal, that relationship doesn’t get passed to our invariant version of Option. Same applies to this invariant version of Some (for the same reason). Perhaps not so obvious, we cannot assign a None to Option[Mammal]:

// This won't compile:
// val mammal: Option[Mammal] = None

Nothing is a subtype of any type in Scala but this time Option is invariant so we can’t assign to an Option[Mammal] anything that is not an Option[Mammal], hence not an Option[Nothing], nor None which extends Option[Nothing]. We could get around this inconvenient with an upper type bound defining Option trait like sealed trait Option[A <: Nothing] but I let you imagine the usefulness of such Option type. More generally, an invariant version of Option fails to meet those expectation an Option type is supposed to deliver (in the code, you can find also a version of a covariant Option with an invariant definition for Some).

Function

Functions are another interesting example of variance usage in the Scala language. The trait Function1 is defined as contravariant in its argument and covariant in its returned type. Simplifying it looks like:

trait Function1[-X,+Y] {
  def apply(arg: X): Y
}

What we are saying with such type definition is that if A is a supertype of X and B is a subtype of Y, then Function1[A,B] is a subtype of Function1[X,Y]. In terms of the kingdom animalia analogy used so far, Function1[Vertebrate,Homo] is a subtype of Function1[Mammal,Primate] as Vertebrate is a supertype of Mammal and Homo is a subtype of Primate:
functions

Although the reason for such a choice might not be immediately evident. When we think about a class in our animal kingdom it’s quite intuitive what a parent/child relationship involves. Primate is a subtype of Mammal because all Primates are Mammals. Hence, if you ask me for a mammal, I can give you a primate as a primate is a mammal. This concept has a name: Liskov substitution principle. It states (from Wikipedia):

If S is a subtype of T, then objects of type T may be replaced with objects of type S (i.e. an object of type T may be substituted with any object of a subtype S) without altering any of the desirable properties of T

Back to our example, if we want to understand why Function1[Vertebrate,Homo] is a subtype of Function1[Mammal,Primate], or in other words, why Function1 is defined as contravariant in its argument and covariant in its returned type, then we need to start from those desirable properties that the present definition is not supposed to alter (and that, likely, a different definition would have altered instead). For a function (and for a category in category theory) we expect the composition operation as a primitive. Function composition can be defined more rigorously but intuitively it says that if we have a function from A to B and a function from B to C, then we can always compose the two functions and obtain a function from A to C.

function1_right
Translated in Scala

val f: Vertebrate => Mammal = ???
val g: Mammal => Primate = ???

// We can compose those to obtain a new function Vertebrate => Primate
val fromVertebrateToPrimate: Vertebrate => Primate = f andThen g // or g compose f

Given the definition of Function we saw, can we safely say that replacing a function with one of its subtypes doesn’t alter the compose operation? Let’s start checking whether making the return type covariant is actually a problem.
Assume g stays the same but we replace f with a subtype of f. Accordingly to our definition of function the return type is covariant so Vertebrate => Homo is a subtype of f (as Homo is a subtype of Mammal)

function-compose2

It should be clear from the picture that replacing f with its subtype doesn’t prevent us from compose the two functions. g takes a Mammal as argument and Homo is a Mammal.
Now let’s keep f as it is and replace g with a subtype of g. As a function is contravariant in its argument, a function Vertebrate => Primate is a subtype of g (as the argument Vertebrate is a supertype of Mammal)

function-compose1

Even in this case the composition is preserved. g takes a vertebrate now and all mammals are vertebrate. To complete the picture, remain to be seen how a different definition of function would have impact on the composition operation. What for example if the return type is contravariant? Then we could replace f with a function Vertebrate => Vertebrate (with the return type being a a supertype of Mammal and therefore Vertebrate => Vertebrate a subtype of Vertebrate => Mammal). Would that impact on our ability to compose the two functions?

function-boken1

Yes, as the picture above show. For example we pass a vertebrate to f and f returns a fish (which is a vertebrate but not a mammal). g is not defined for fishes though, only for mammals.
Similar outcome if we define function as being covariant in its argument. In the following example we keep f unchanged but we replace the original g with a subtype: Primate => Primate. If we define Function as covariant in its argument then I can replace Mammal with its subtype Primate and therefore Primate => Primate would be a legitimate subtype of Mammal => Primate accordingly to this definition:

function-broken2

Even in this case our ability to compose the two function is lost as g is not defined for all the possible mammals but just for a specific subset of them: primates. If f returns a dog, g wouldn’t know what to do with it.
The lesson in case of function can be expressed in more formal way introducing the concept of domain and codomain of a function. A domain is the set of values for which the function is defined (its possible arguments) and the codomain is the set of all possible values that the function can return. So, for instance, in a function Mammal => Homo the set of all mammals is the domain of the function and the set of all humans is the codomain. We can now say that given two functions A => B and C => D, those can be composed into a function A => D if and only if the codomain of the first function (B) is a subset of the domain of the second function (C). And that is what is captured in Scala definition of function with its contravariant argument type and covariant returned type.

Code is available here: https://github.com/lansaloltd/type-variance/
And here: https://github.com/lansaloltd/kingdom-animalia