Mike Slinn

I Updated the Apache Spark Reference Applications

Published 2016-11-15.
Time to read: 3 minutes.

This page is part of the posts collection, categorized under Open Source, Scala, Spark.

Overview

The Apache Spark committers just accepted my pull request that updated the official Twitter Classifier Reference Application from Spark 1.4 / Scala 2.10 to Spark 2 / Scala 2.11. This article discusses some things I did in the pull request from the point of view of a Scala programmer. A primary goal was to rewrite the reference application using idiomatic and functional-style Scala. This article briefly discusses two unique aspects that I addressed: command-line parsing and DRYing up the code by importing scopes. I did several other things to improve the reference application, such as modularizing the code and providing run scripts, but this article does not address them because those techniques are generally well understood.

I did not upgrade the reference application to Scala 2.12, which was released a couple of weeks ago because Spark does not yet support Scala 2.12. Josh Rosen of Databricks wrote me and said:

“Some of Spark’s dependencies and by Scala-version-specific code changes necessary to work around method overloads became ambiguous in 2.12. The umbrella ticket tracking 2.12 support can be found at issues.apache.org/jira/browse/SPARK-14220. One of the hardest pieces will be issues.apache.org/jira/browse/SPARK-14643 (see the linked design document on that issue). Lack of 2.12 support for Breeze and its dependencies is likely to be another serious blocker, but that might be avoidable by only publishing a subset of the projects with 2.12 to begin with (e.g. only Spark core / SQL at first).”

Command Line Parsing

I modified the reference applications’ command line parsing to use a Scala library that supported idiomatic Scala (Commander Scala), instead of Apache Commons CLI, which is the Java library that was previously used. The result was simple, clean and very terse code that is intuitive to understand and easy to maintain. Commander Scala automatically generates the help message. Take a look at the collect command’s parsing. You’ll notice that it uses some common code for parsing Twitter authentication parameters. This code is much shorter than the previous code, easier to understand and modify, and is more flexible.

Shell
import com.github.acrisci.commander.Program
import java.io.File

abstract sealed case class CollectOptions(
  twitterOptions: TwitterOptions,
  overWrite: Boolean = false,
  tweetDirectory: File = new File(System.getProperty("user.home"), "/sparkTwitter/tweets/"),
  numTweetsToCollect: Int = 100,
  intervalInSecs: Int = 1,
  partitionsEachInterval: Int = 1
)

object CollectOptions extends TwitterOptionParser {
  override val _program = super._program
    .option(flags="-w, --overWrite", description="Overwrite all data files from a previous run")
    .usage("Collect [options] <tweetDirectory> <numTweetsToCollect> <intervalInSeconds> <partitionsEachInterval>")

  def parse(args: Array[String]): CollectOptions = {
    val program: Program = _program.parse(args)
    if (program.args.length!=program.usage.split(" ").length-2) program.help

    new CollectOptions(
      twitterOptions = super.apply(args),
      overWrite = program.overWrite,
      tweetDirectory = new File(program.args.head.replaceAll("^~", System.getProperty("user.home"))),
      numTweetsToCollect = program.args(1).toInt,
      intervalInSecs = program.args(2).toInt,
      partitionsEachInterval = program.args(3).toInt
    ){}
  }
}

Here is how to sidestep the Spark help message and display the help message for the collect entry point:

Shell
$ spark-shell \
-class com.databricks.apps.twitterClassifier.Collect \
-jars target/scala-2.11/spark-twitter-lang-classifier-assembly-2.0.0.jar \
-- -help
Usage: Collect [options] <tweetDirectory> <numTweetsToCollect> <intervalInSeconds> <partitionsEachInterval>
Options:
-h, — help output usage information
 -V, — version output the version number
 -w, — overWrite Overwrite all data files from a previous run
 -v, — accessTokenSecret [type] Twitter OAuth Access Token Secret
 -t, — accessToken [type] Twitter OAuth Access Token
 -s, — consumerSecret [type] Twitter OAuth Consumer Secret
 -c, — consumerKey [type] Twitter OAuth Consumer Key 

Importing Inner Scope Into Another Object

Apache Spark is unusual in that you cannot encapsulate a Spark streaming context in a type instance. A memory overflow occurs when you try to instantiate a Scala trait or class that creates a Spark context. The solution is to use a unique Scala feature: the ability to import inner scope from an object into another scope. This meant that the code was made DRY (common code was not repeated), without using classes or traits.

Here is how I took advantage of this little-known Scala technique: first I defined the SparkObject object within a package object so it was easily found:

Shell
object SparkSetup {
  val spark = SparkSession
    .builder
    .appName(getClass.getSimpleName.replace("$", ""))
    .getOrCreate()
  val sqlContext = spark.sqlContext
  val sc: SparkContext = spark.sparkContext
  sc.setLogLevel("ERROR")
}

Next I imported all the variables defined in SparkSetup into the Collect object’s scope, including sc, which was used twice, like this:

Shell
object Collect extends App {
  val options = CollectOptions.parse(args)
  import SparkSetup._
  val ssc = new StreamingContext(sc, Seconds(options.intervalInSecs))
  Collector.doIt(options, sc, ssc)
}

Want to learn more practical Scala techniques? Head over to ScalaCourses.com and enroll! The combination of the Introduction to Scala and Intermediate Scala courses will teach you everything you need to know to start your journey with Apache Spark.

Mike Slinn is the lead Scala instructor at ScalaCourses.com.

* indicates a required field.

Please select the following to receive Mike Slinn’s newsletter:

You can unsubscribe at any time by clicking the link in the footer of emails.

Mike Slinn uses Mailchimp as his marketing platform. By clicking below to subscribe, you acknowledge that your information will be transferred to Mailchimp for processing. Learn more about Mailchimp’s privacy practices.