Get started with Apache Spark

Reap the performance and developer productivity advantages of Spark for batch processing, streaming analysis, machine learning, and structured queries

Apache Spark is an open source clustering framework for batch and stream processing. The framework originated at the AMPLab in UC Berkeley in 2009, became an Apache project in 2013, and emerged as one of the organization’s top priorities in 2014. It is currently supported by Databricks, which was founded by many of the original creators of Spark.

At the heart of Spark is the concept of the Resilient Distributed Dataset (RDD), a programming abstraction that is an immutable collection of objects, able to be split across a computing cluster. Operations on the RDDs can therefore also be split, leading to highly parallelizable processing. RDDs can be created from simple text files, SQL databases, NoSQL stores (such as Cassandra and Riak), Hadoop InputFiles, or even programmatically. Much of the Spark Core API is built on the RDD concept, enabling traditional map and reduce functionality, but also providing built-in support for joining data sets, filtering, sampling, and aggregation.

In addition, Spark comes with support for many different big data applications, all built around the RDD model:

Spark Streaming: Although Spark is at heart a batch-mode processing framework, it offers a Spark Streaming mode that continuously collects data in “microbatches,” effectively providing streaming support for applications that do not require low-latency responses. The Spark distribution ships with support for streaming data from Kafka, Flume, and Kinesis.

MLLib: This library brings several machine learning algorithm implementations to Spark for off-the-shelf use by data scientists, including Naive and Multi- nominal Bayesian models, clustering, collaborative filtering, and dimension reduction.

GraphX: This provides graph algorithm support for Spark, including a parallelized version of PageRank, triangle counts, and the ability to discover connected components.

Spark SQL (formerly known as Shark): Spark SQL provides common and uniform access to various different structured data sources (Hive, Avro, Parquet, ORC, JSON, JDBC/ODBC), allowing the user to write SQL queries that run across the Spark cluster, and to mix these data sources without the need for complicated ETL pipelines.

Why Spark?

Spark is often considered to be the next step for big data processing beyond Hadoop. While Hadoop’s MapReduce functionality is spread across storage, Spark uses in-memory techniques to provide huge improvements in processing times, in some cases up to 100 times the equivalent task in Hadoop. In a test performed by Reynold Xin, Parviz Deyhim, Xiangrui Meng, Ali Ghodsi, and Matei Zaharia with Databricks in October 2014, Spark completed the Daytona GraySort Benchmark, sorting a 100TB set in 23 minutes with 206 nodes. In comparison, the equivalent MapReduce record of sorting a similar-sized set is 72 minutes with 2,100 nodes. Being able to work faster with an order of magnitude fewer resources definitely adds to Spark’s appeal.

While the benchmarks are undoubtedly notable, that is not the entire reason why Spark is currently riding high in the big data world. As well as being fast, it provides developer-friendly features such as the Spark shell for interactive development, plus native bindings for Java, Scala, Python, and R. In particular, support for Python and R opens up the developer pool to data scientists who previously would have had to navigate the more unwieldy landscape of MapReduce in Java.

Also because Spark supports batch and streaming modes within the same architecture, it allows code to be reused between applications running in different modes, and both modes can take advantage of additional libraries such as MLlib, GraphX, and Spark SQL. For example, doing a join with streaming data against data from Cassandra in Spark is as simple as joining data from two text files.

Developing with Spark

One of the goals of Spark is to make developing big data applications a more approachable enterprise for both traditional developers and data scientists. To this end, there are language bindings for Scala, Java, Python, and as of Spark 1.4, R. Also, Scala, Python, and R have an interactive shell available for prototyping and data exploration without having to resort to the compilation cycle, and in the end are still able to harness the full power of the cluster. No matter which language you choose, you will use a SparkContext object to create RDDs and perform operations on them.

Batch analysis of logs with Spark

In this example we will examine a log file in the context of the Spark shell. To open a new Spark shell, simply issue this at your command line:

$> SPARK_HOME/bin/spark-shell

SPARK_HOME is where you have installed Spark. You should then see a set of logging messages as the shell initializes, along with this:

Welcome to
SPARK version 1.4.1
...

15/08/04 14:29:10 INFO SparkILoop: Created spark context...
Spark context available as sc.
...
scala> 

The Spark shell is now active, and Scala code can be entered directly into the shell. First, we'll want to access our data: in this case a log file stored in S3. Spark has a textFile() method that allows us to grab text files from the local file system, HDFS, or S3: 


scala> val log = sc.textFile(“hdfs:///demo/testdata.txt”)
log: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21
 

We now have a string-based RDD, with each logical unit in the RDD a line from the log. As we are only interested in errors and the IP addresses associated with them, we need to filter the RDD so that we are only processing lines containing the ERROR string: 

scala> val errors = log.filter( _.split(“\\s+”)(1) == “ERROR” )

Here we are supplying an inline function to the RDD’s filter() method, which splits the line on whitespace and does a comparison of the second element of the returned array with the string ERROR. This could also be written as the following:

scala> def errorFilter(line: String): Boolean = { line.split(“\\ s+”)(1) == “ERROR” }
scala> val errors = log.filter(errorFilter(_))

In this example, it is more readable to have the function inline, but as processing becomes more complex, and to facilitate code reuse between batch and streaming operations, your applications will likely use defined functions and pass them into transformations and actions.

Also note that because filter() is a transformation, rather than an action, no processing is performed at this point. Spark is building up the graph of operations that need to run on the cluster, but it will not execute them until an action is added to the graph.

scala> val errorsByIP = errors.map( x => (x.split(“ “)(0), 1L))

This is another transformation, one that is a familiar concept from MapReduce. On the filtered RDD we map over the contents, creating a new RDD that has a key (the IP address is the first element) and a value. In this case we are going to count the number of times an IP address appears, so the value is 1; then we will reduce these mapped values to count up all the times the IP addresses appear:

scala> errorsByIP.reduceByKey( (x,y) => x + y ).cache

At last we come to an action. Once this line has been entered into the shell, computation will occur, executing the previous transformations on the data set and carrying out the actions reduceByKey() and cache(). The reduceByKey() method, as you expect, performs a reducing aggregation on a per-key basis. In this example, it is simply adding all of the values found on a key together, so all of the 1s in the previous step will be added up (you may also see it written in examples as reduceByKey(_ + _)). Then, the cache() method is called to persist the result to memory (so it doesn’t need to be recomputed).

scala> errorsByIP.saveAsTextFile(“errors”) 

Finally, we save the RDD to a file, now containing a list of IP addresses and counts of errors.

Streaming log analysis

Now we’ll take the concepts in the previous example and extend it from batch operation to streaming. Unfortunately, streaming applications cannot be run in the shell. The Scala code for the streaming version is below and can also be found on GitHub. This needs to be compiled and submitted to the Spark cluster.

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._

object StreamingDemo extends java.io.Serializable {
       def main() {
             val ssc = new StreamingContext(new SparkConf(
setAppName(“StreamingLoggingDemo”), Seconds(1))
             val streamingLog = ssc.socketTextStream(“localhost”,
8888)
             val errors = streamingLog.filter( x => x.split(“\\
s+”)(1) == “ERROR” )
             val errorsByIPMap = errors.map( x => ((x.split(“\\
s+”)(0)), 1L))
             val errorsByIPAndWindow = errorsByIPMap.
reduceByKeyAndWindow( {(x:Long, y:Long) => x + y}, Seconds(30),
Seconds(5))
             ssc.start()
             ssc.awaitTermination()
       }
}

Most of this should be familiar from the previous example, but instead of a SparkContext, we require a StreamingContext. This will include an argument to detail the granularity of our batches and to note we will get new batches of data every second.

Next, we will require a streaming source for our data. The socketTextStream listed is a text-based socket stream that we can use to send data down via telnet or netcat (Spark comes with built-in support for Kafka, Flume, and Kinesis for enterprise-grade streaming). The reduceByKey method has been replaced by reduceByKeyAndWindow, a powerful operation available in streaming that allows you to perform windowing operations with ease. Here it constructs a window of 30 seconds, updating the values on the keys every five seconds within the window. Finally, we start() the pipeline on the cluster and awaitTermination() to clean up after the pipeline is terminated.

To run this on your Spark cluster you’ll need to submit the jar using spark-submit:

$> spark-submit --master yarn-cluster --class com.mammothdata.demos.StreamingLoggingDemo MammothSparkWhitePaper.jar

(Above is specifically for YARN. For a stand-alone cluster, you’ll need to pass in the URI of the Spark Master.)

1 2 Page 1
From CIO: 8 Free Online Courses to Grow Your Tech Skills
Notice to our Readers
We're now using social media to take your comments and feedback. Learn more about this here.