Now, let's get hands-on with Spark so that we can go deeper into the core APIs and libraries. In all of the chapters of this book, I will be referring to the 2.2.1 release of Spark, however, several examples that are presented here should work with the 2.0 release or later. I will put a note when an example is specifically for 2.2+ releases only.
You need to have JDK 1.8+ and Python 2.7+ or 3.4+ (only if you need to develop using this language). Spark 2.2.1 supports Scala 2.11. The JDK needs to be present on your user path system variable, though, alternatively, you could have your user JAVA_HOME environment variable pointing to a JDK installation.
Extract the content of the downloaded archive to any local directory. Move to the $SPARK_HOME/bin directory. There, among the other executables, you will find the interactive Spark shells for Scala and Python. They are the best way to get familiar with this framework. In this chapter, I am going to present examples that you can run through these shells.
You can run a Scala shell using the following command:
$SPARK_HOME/bin/spark-shell.sh
If you don't specify an argument, Spark assumes that you're running locally in standalone mode. Here's the expected output to the console:
Spark context Web UI available at http://10.72.0.2:4040 Spark context available as 'sc' (master = local[*], app id = local-1518131682342). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.1 /_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91) Type in expressions to have them evaluated. Type :help for more information.
scala>
The web UI is available at the following URL: http://<host>:4040.
It will give you the following output:
Figure 1.5
From there, you can check the status of your jobs and executors.
Here's an example of how to read and manipulate a text file and put it into a Dataset using the Spark shell (the file used in this example is part of the resources for the examples that are bundled with the Spark distribution):
The result is a Dataset instance that contains the file lines. You can then make several operations on this Dataset, such as counting the number of lines:
scala> res5.count() res6: Long = 3
You can also get the first line of the Dataset:
scala> res5.first() res7: String = Michael, 29
In this example, we used a path on the local filesystem. In these cases, the file should be accessible from the same path by all of the workers, so you will need to copy the file across all workers or use a network-mounted shared filesystem.
To close a shell, you can type the following:
:quit
To see the list of all of the available shell commands, type the following:
scala> :help
All commands can be abbreviated, for example, :he instead of :help.
The following is the list of commands:
Like Scala, an interactive shell is available for Python. You can run it using the following command:
$SPARK_HOME/bin/pyspark.sh
A built-in variable named spark representing the SparkSession is available. You can do the same things as for the Scala shell:
Unlike Java and Scala, Python is more dynamic and is not strongly typed. Therefore, a DataSet in Python is a DataSet[Row], but you can call it a DataFrame so that it's consistent with the DataFrame concept of the Pandas framework (https://pandas.pydata.org/).
To close a Python shell, you can type the following:
quit()
Interactive shells aren't the only choice for running code in Spark. It is also possible to implement self-contained applications. Here's an example of reading and manipulating a file in Scala:
import org.apache.spark.sql.SparkSession
object SimpleApp { def main(args: Array[String]) { val logFile = "/usr/spark-2.2.1/examples/src/main/resources/people.txt" val spark = SparkSession.builder.master("local").appName("Simple Application").getOrCreate() val logData = spark.read.textFile(logFile).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println(s"Lines with a: $numAs, Lines with b: $numBs") spark.stop() } }
Applications should define a main() method instead of extending scala.App. Note the code to create SparkSession:
val spark = SparkSession.builder.master("local").appName("Simple Application").getOrCreate()
It follows the builder factory design pattern.
Always explicitly close the session before ending the program execution:
spark.stop()
To build the application, you can use a build tool of your choice (Maven, sbt, or Gradle), adding the dependencies from Spark 2.2.1 and Scala 2.11. Once a JAR file has been generated, you can use the $SPARK_HOME/bin/spark-submit command to execute it, specifying the JAR filename, the Spark master URL, and a list of optional parameters, including the job name, the main class, the maximum memory to be used by each executor, and many others.
The same self-contained application could have been implemented in Python as well:
from pyspark.sql import SparkSession
logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system spark = SparkSession.builder().appName(appName).master(master).getOrCreate() logData = spark.read.text(logFile).cache()