Published 2016-11-15.
Time to read: 3 minutes.
Overview
The Apache Spark committers just accepted my pull request that updated the official Twitter Classifier Reference Application from Spark 1.4 / Scala 2.10 to Spark 2 / Scala 2.11. This article discusses some things I did in the pull request from the point of view of a Scala programmer. A primary goal was to rewrite the reference application using idiomatic and functional-style Scala. This article briefly discusses two unique aspects that I addressed: command-line parsing and DRYing up the code by importing scopes. I did several other things to improve the reference application, such as modularizing the code and providing run scripts, but this article does not address them because those techniques are generally well understood.
I did not upgrade the reference application to Scala 2.12, which was released a couple of weeks ago because Spark does not yet support Scala 2.12. Josh Rosen of Databricks wrote me and said:
issues.apache.org/jira/browse/SPARK-14220
.
One of the hardest pieces will be
issues.apache.org/jira/browse/SPARK-14643
(see the linked design document on that issue).
Lack of 2.12 support for Breeze and its dependencies is likely to be another serious blocker,
but that might be avoidable by only publishing a subset of the projects with 2.12 to begin with
(e.g. only Spark core / SQL at first).”
Command Line Parsing
I modified the reference applications’ command line parsing to use a Scala library that supported idiomatic Scala
(Commander Scala),
instead of Apache Commons CLI,
which is the Java library that was previously used.
The result was simple, clean and very terse code that is intuitive to understand and easy to maintain.
Commander Scala automatically generates the help message.
Take a look at the
collect
command’s parsing.
You’ll notice that it uses some common code for parsing Twitter authentication parameters.
This code is much shorter than the previous code, easier to understand and modify, and is more flexible.
import com.github.acrisci.commander.Program import java.io.File abstract sealed case class CollectOptions( twitterOptions: TwitterOptions, overWrite: Boolean = false, tweetDirectory: File = new File(System.getProperty("user.home"), "/sparkTwitter/tweets/"), numTweetsToCollect: Int = 100, intervalInSecs: Int = 1, partitionsEachInterval: Int = 1 ) object CollectOptions extends TwitterOptionParser { override val _program = super._program .option(flags="-w, --overWrite", description="Overwrite all data files from a previous run") .usage("Collect [options] <tweetDirectory> <numTweetsToCollect> <intervalInSeconds> <partitionsEachInterval>") def parse(args: Array[String]): CollectOptions = { val program: Program = _program.parse(args) if (program.args.length!=program.usage.split(" ").length-2) program.help new CollectOptions( twitterOptions = super.apply(args), overWrite = program.overWrite, tweetDirectory = new File(program.args.head.replaceAll("^~", System.getProperty("user.home"))), numTweetsToCollect = program.args(1).toInt, intervalInSecs = program.args(2).toInt, partitionsEachInterval = program.args(3).toInt ){} } }
Here is how to sidestep the Spark help message and display the help message for the collect entry point:
$ spark-shell \ -class com.databricks.apps.twitterClassifier.Collect \ -jars target/scala-2.11/spark-twitter-lang-classifier-assembly-2.0.0.jar \ -- -help Usage: Collect [options] <tweetDirectory> <numTweetsToCollect> <intervalInSeconds> <partitionsEachInterval> Options: -h, — help output usage information -V, — version output the version number -w, — overWrite Overwrite all data files from a previous run -v, — accessTokenSecret [type] Twitter OAuth Access Token Secret -t, — accessToken [type] Twitter OAuth Access Token -s, — consumerSecret [type] Twitter OAuth Consumer Secret -c, — consumerKey [type] Twitter OAuth Consumer Key
Importing Inner Scope Into Another Object
Apache Spark is unusual in that you cannot encapsulate a Spark streaming context in a type instance. A memory overflow occurs when you try to instantiate a Scala trait or class that creates a Spark context. The solution is to use a unique Scala feature: the ability to import inner scope from an object into another scope. This meant that the code was made DRY (common code was not repeated), without using classes or traits.
Here is how I took advantage of this little-known Scala technique: first I defined the
SparkObject
object
within a package object so it was easily found:
object SparkSetup { val spark = SparkSession .builder .appName(getClass.getSimpleName.replace("$", "")) .getOrCreate() val sqlContext = spark.sqlContext val sc: SparkContext = spark.sparkContext sc.setLogLevel("ERROR") }
Next I imported all the variables defined in SparkSetup
into the Collect
object’s scope, including sc
,
which was used twice,
like this:
object Collect extends App { val options = CollectOptions.parse(args) import SparkSetup._ val ssc = new StreamingContext(sc, Seconds(options.intervalInSecs)) Collector.doIt(options, sc, ssc) }
Want to learn more practical Scala techniques?
Head over to ScalaCourses.com
and enroll!
The combination of the Introduction to Scala and Intermediate Scala courses will teach you everything you need to know to start your journey with Apache Spark.
Mike Slinn is the lead Scala instructor at ScalaCourses.com.