An example of using Spark in Scala

Let us end this round by going over a small example which should give you an idea of how to use Spark RDDs with Scala.

Writing a Spark program in Scala requires first setting up a SparkContext, which represents a connection to a Spark cluster. In our example we will not use a real cluster but let Spark will run on your local computer, emulating a cluster. This is useful for testing and practice.

The SparkContext object has methods for parallelizing and loading data, creating RDDs on the cluster.

Warning

Spark does not currently have native Scala 3 libraries, and needs to be built using cross compilation, mixing in the Scala 2.13 library.

Example context

Assume that we have a (large) set of data representing multiple test scores for participants during a test. These test scores represent reaction times, captured with sensors. There are relatively few participants in the test, but many values are logged for each participant in arbitrary order.

For simplicity and practice, let us assume that the data is logged to a text file. There is only one entry per line, and each line is on the format <participant id> : <reaction time>. Participant IDs are alphanumeric strings and reaction times numeric strings representing integers.

Here is an example of how the data file can look like:

7ANX6 : 26
7ANX6 : 55
KY76X : 23
DZJQR : 31
SSRU2 : 90
KY76X : 12
SSRU2 : 87

Our task is to calculate the total sum of response times for each participant, while ignoring outlier values. Outliers are any value not in the acceptable range \(\left[5,95\right]\).

The program

Here is a Spark program written in Scala that performs the described task and prints the result:

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

@main def main() = 
    val sc = new SparkContext("local[4]", "example")
    sc.setLogLevel("ERROR")

    // Assume we have a file of data pairs, e.g. id and points
    val rawData : RDD[String] = sc.textFile("data.txt")
    // Split at ':' and convert second column to int
    // Note ASSUMES well formatted file
    val data : RDD[(String, Int)] = rawData.map(x =>{
        val cols = x.split(":")
        (cols(0).trim(), cols(1).trim.toInt)
    })

    // Filter on values in a certain range, e.g. to get rid of outliers
    val filtedData : RDD[(String,Int)] = 
        data.filter((_,v) => (v > 5 && v < 95))

    // Calculate sum of values for each key value, and get results
    val keySums : Array[(String, Int)] = 
        filtedData
        .reduceByKey(_+_)
        .collect()

    keySums.foreach(println(_))

Let’s go over the code to understand what happens.

After importing the modules we need, our first task is to set up the SparkContext

    val sc = new SparkContext("local[4]", "example")

The first argument is the connection to the Spark cluster, in our case the string "local[4]" denotes a local (on the same machine as the program runs) cluster of four nodes (threads). The second string, "example" is the name we give our Spark application. This is provided to the cluster for logging and display purposes.

The line sc.setLogLevel("ERROR") tells the spark context to only print messages if they are are of log level error. This is not necessary for the program to function, but as Spark produces a lot of output, it is handy.

Next, we need to load our data from the text file on the cluster. We can do this with the SparkContext method textFile, as so:

    val rawData : RDD[String] = sc.textFile("data.txt")

This causes the contents of the file data.txt (here, assumed to exist in the same directory as the program is run from) to be loaded into a Redundant Distributed Dataset on the cluster. Note that the value rawData is of type RDD[String]. This object represents the data set on the cluster in our program. You could think of it as a collection, where each item is a string representing an individual line in the file.

Now, it is time to transform the data in the RDD and calculate our answer. Let’s do this in three steps: first, we parse the lines we have just read to construct id,value pairs. Then, we’ll filter the data to discard those outliers. Finally, we group the data by id and calculate the sum in each group. There may be several ways to achieve these steps, below is a suggestion.

To parse the data, we can map a function over the strings in the RDD that splits them at the : character, trims the strings (to get rid of any left-over spaces), parses the value to an int (toInt), and returns the result as a pair, which is what happens on these lines:

    val data : RDD[(String, Int)] = rawData.map(x =>{
        val cols = x.split(":")
        (cols(0).trim(), cols(1).trim.toInt)
    })

Warning

For demonstration purposes, our parsing function assumes that the data is well formatted. That is, that each line contains exactly one : and that this nicely breaks down into an id on one side and a string that can be read as an integer on the other. In real world data this is not always the case. Think about: how should we deal with malformed lines, or errors when calling toInt, for example?

Note that from a programming perspective this map just the same approach we could implement if the data was represented in a collection, such as Seq - we map an anonymous scala function which transform each String to a (String, Int) pair. It is however good to keep in mind that the function we map will actually be executed on the cluster.

This is a narrow RDD transform, because the parsing of each line (each item in the RDD) is independent from the others.

So is the next step, filtering the outliers:

    val filtedData : RDD[(String,Int)] = 
        data.filter((_,v) => (v > 5 && v < 95))

Here we use the RDD filter method, which takes a predicate function stating how to determine if a specific piece of data should be kept or not. In this case, the function uses pattern matching to bind the value to the variable v and then checking if it is in the range of 5 to 95. The result is a new RDD where only the valid lines are kept.

Finally, we calculate the sums using reduceByKey and collect the result from the cluster:

    val keySums : Array[(String, Int)] = 
        filtedData
        .reduceByKey(_+_)
        .collect()

reduceByKey is a method that applies to RDDs of pairs, where the first element of the pair is considered a key (this is our ID) and the second a value. It will first group data based on the key, and thereafter use the provided function to reduce these values to a single one. In our case we wish to calculate the sum (_+_). This is not necessarily a narrow transform, as the set of values for the same key may be split over different worker nodes in the cluster.

The result of the reduction is a new RDD(String, Int) where each id is unique and the associated value is the sum, as this is how we performed the reduction. The final step is to apply an action and collect the result from the cluster back to our main program. Note that the result keySums is of type Array, not RDD. Any computation we do on this value will be local, not performed on the cluster.