Tuesday, March 24, 2015

Deploy a Scala job to a Spark Cluster using SBT

Environment

  1. sbt 0.13.8
  2. Scala 2.11.6
  3. Spark 1.2.1
  4. Debian Linux (Ubuntu 14.10)



Scala Program


This simple program written in Scala will analyze a local file on my system, count the number of times that lines containing a and b occur, and print the total to the console.
/*** SimpleApp.scala ***/
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "/home/craig/spark/README.md" // Should be some file on your system
    val sc = new SparkContext("local", "Simple App", "$SPARK_HOME", List("target/scala-2.11/simple-project_2.11-1.0.jar"))
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}



Directory Structure


I've created the following directory structure in my home directory:
mkdir -p ~/apps/simple/src/main/scala

The scala program above will be saved under the "src/main/scala" directory.


Build Script


Create a build script called simple.sbt under the "~/apps/simple/" directory:
name := "Simple Project"

version := "1.0"

scalaVersion := "2.11.6"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.1"
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.6.0"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

The lines highlighted in red should be replaced with your specific version numbers.


Building the Program


Navigate to ~/apps/simple and type "sbt" on the terminal session:
craig@spark:~/apps/simple$ sbt
[info] Set current project to Simple Project (in build file:/home/craig/apps/simple/)
> 

Type "package" at the prompt:
> package
[info] Updating {file:/home/craig/apps/simple/}simple...
[info] Resolving jline#jline;2.12.1 ...
[info] Done updating.
[info] Compiling 1 Scala source to /home/craig/apps/simple/target/scala-2.11/classes...
[info] Packaging /home/craig/apps/simple/target/scala-2.11/simple-project_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 18 s, completed Mar 24, 2015 11:15:18 AM

My directory structure now looks like this:
craig@spark:~/apps/simple$ tree
.
├── simple.sbt
├── src
│   └── main
│       └── scala
│           └── SimpleApp.scala
└── target
    ├── resolution-cache
    │   ├── reports
    │   │   ├── ivy-report.css
    │   │   ├── ivy-report.xsl
    │   │   ├── simple-project-simple-project_2.11-compile-internal.xml
    │   │   ├── simple-project-simple-project_2.11-compile.xml
    │   │   ├── simple-project-simple-project_2.11-docs.xml
    │   │   ├── simple-project-simple-project_2.11-optional.xml
    │   │   ├── simple-project-simple-project_2.11-plugin.xml
    │   │   ├── simple-project-simple-project_2.11-pom.xml
    │   │   ├── simple-project-simple-project_2.11-provided.xml
    │   │   ├── simple-project-simple-project_2.11-runtime-internal.xml
    │   │   ├── simple-project-simple-project_2.11-runtime.xml
    │   │   ├── simple-project-simple-project_2.11-scala-tool.xml
    │   │   ├── simple-project-simple-project_2.11-sources.xml
    │   │   ├── simple-project-simple-project_2.11-test-internal.xml
    │   │   └── simple-project-simple-project_2.11-test.xml
    │   └── simple-project
    │       └── simple-project_2.11
    │           └── 1.0
    │               ├── resolved.xml.properties
    │               └── resolved.xml.xml
    ├── scala-2.11
    │   ├── classes
    │   │   ├── SimpleApp$$anonfun$1.class
    │   │   ├── SimpleApp$$anonfun$2.class
    │   │   ├── SimpleApp.class
    │   │   └── SimpleApp$.class
    │   └── simple-project_2.11-1.0.jar
    └── streams
        ├── compile
        │   ├── compile
        │   │   └── $global
        │   │       └── streams
        │   │           └── out
        │   ├── compileIncremental
        │   │   └── $global
        │   │       └── streams
        │   │           ├── export
        │   │           └── out
        │   ├── copyResources
        │   │   └── $global
        │   │       └── streams
        │   │           ├── copy-resources
        │   │           └── out
        │   ├── dependencyClasspath
        │   │   └── $global
        │   │       └── streams
        │   │           └── export
        │   ├── externalDependencyClasspath
        │   │   └── $global
        │   │       └── streams
        │   │           └── export
        │   ├── $global
        │   │   └── $global
        │   │       └── discoveredMainClasses
        │   │           └── data
        │   ├── incCompileSetup
        │   │   └── $global
        │   │       └── streams
        │   │           └── inc_compile_2.11
        │   ├── internalDependencyClasspath
        │   │   └── $global
        │   │       └── streams
        │   │           └── export
        │   ├── mainClass
        │   │   └── $global
        │   │       └── streams
        │   │           └── out
        │   ├── managedClasspath
        │   │   └── $global
        │   │       └── streams
        │   │           └── export
        │   ├── packageBin
        │   │   └── $global
        │   │       └── streams
        │   │           ├── inputs
        │   │           ├── out
        │   │           └── output
        │   ├── unmanagedClasspath
        │   │   └── $global
        │   │       └── streams
        │   │           └── export
        │   └── unmanagedJars
        │       └── $global
        │           └── streams
        │               └── export
        └── $global
            ├── dependencyPositions
            │   └── $global
            │       └── streams
            │           └── update_cache_2.11
            │               ├── input_dsp
            │               └── output_dsp
            ├── $global
            │   └── $global
            │       └── streams
            │           └── out
            ├── ivyConfiguration
            │   └── $global
            │       └── streams
            │           └── out
            ├── ivySbt
            │   └── $global
            │       └── streams
            │           └── out
            ├── projectDescriptors
            │   └── $global
            │       └── streams
            │           └── out
            └── update
                └── $global
                    └── streams
                        ├── out
                        └── update_cache_2.11
                            ├── inputs
                            └── output

73 directories, 50 files



Running the Program


Inside your working directory (~/apps/simple), execute this command:
$SPARK_HOME/bin/./spark-submit --class "SimpleApp" --master local[8] target/scala-2.11/simple-project_2.11-1.0.jar

Successful output on my workstation looks like this:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/03/24 11:20:37 INFO SecurityManager: Changing view acls to: craig
15/03/24 11:20:37 INFO SecurityManager: Changing modify acls to: craig
15/03/24 11:20:37 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(craig); users with modify permissions: Set(craig)
15/03/24 11:20:38 INFO Slf4jLogger: Slf4jLogger started
15/03/24 11:20:38 INFO Remoting: Starting remoting
15/03/24 11:20:38 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.0.4.15:49222]
15/03/24 11:20:38 INFO Utils: Successfully started service 'sparkDriver' on port 49222.
15/03/24 11:20:38 INFO SparkEnv: Registering MapOutputTracker
15/03/24 11:20:38 INFO SparkEnv: Registering BlockManagerMaster
15/03/24 11:20:38 INFO DiskBlockManager: Created local directory at /tmp/spark-91e5f424-b1e6-4d51-a010-a4e0ac788725/spark-2d07771d-f3ad-4c70-bb5b-1329be484b4f
15/03/24 11:20:38 INFO MemoryStore: MemoryStore started with capacity 265.1 MB
15/03/24 11:20:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/03/24 11:20:38 INFO HttpFileServer: HTTP File server directory is /tmp/spark-cf38f85d-efd0-4941-b4f6-d245b9d8380c/spark-f903d7b0-c67c-48f0-9eb0-4a3279903d2a
15/03/24 11:20:38 INFO HttpServer: Starting HTTP Server
15/03/24 11:20:39 INFO Utils: Successfully started service 'HTTP file server' on port 50465.
15/03/24 11:20:39 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/03/24 11:20:39 INFO SparkUI: Started SparkUI at http://10.0.4.15:4040
15/03/24 11:20:39 INFO SparkContext: Added JAR target/scala-2.11/simple-project_2.11-1.0.jar at http://10.0.4.15:50465/jars/simple-project_2.11-1.0.jar with timestamp 1427221239211
15/03/24 11:20:39 INFO Executor: Starting executor ID <driver> on host localhost
15/03/24 11:20:39 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@10.0.4.15:49222/user/HeartbeatReceiver
15/03/24 11:20:39 INFO NettyBlockTransferService: Server created on 54739
15/03/24 11:20:39 INFO BlockManagerMaster: Trying to register BlockManager
15/03/24 11:20:39 INFO BlockManagerMasterActor: Registering block manager localhost:54739 with 265.1 MB RAM, BlockManagerId(<driver>, localhost, 54739)
15/03/24 11:20:39 INFO BlockManagerMaster: Registered BlockManager
15/03/24 11:20:39 INFO MemoryStore: ensureFreeSpace(180608) called with curMem=0, maxMem=278019440
15/03/24 11:20:39 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 176.4 KB, free 265.0 MB)
15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(25432) called with curMem=180608, maxMem=278019440
15/03/24 11:20:40 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 24.8 KB, free 264.9 MB)
15/03/24 11:20:40 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:54739 (size: 24.8 KB, free: 265.1 MB)
15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/03/24 11:20:40 INFO SparkContext: Created broadcast 0 from textFile at SimpleApp.scala:9
15/03/24 11:20:40 INFO FileInputFormat: Total input paths to process : 1
15/03/24 11:20:40 INFO SparkContext: Starting job: count at SimpleApp.scala:10
15/03/24 11:20:40 INFO DAGScheduler: Got job 0 (count at SimpleApp.scala:10) with 2 output partitions (allowLocal=false)
15/03/24 11:20:40 INFO DAGScheduler: Final stage: Stage 0(count at SimpleApp.scala:10)
15/03/24 11:20:40 INFO DAGScheduler: Parents of final stage: List()
15/03/24 11:20:40 INFO DAGScheduler: Missing parents: List()
15/03/24 11:20:40 INFO DAGScheduler: Submitting Stage 0 (FilteredRDD[2] at filter at SimpleApp.scala:10), which has no missing parents
15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(2720) called with curMem=206040, maxMem=278019440
15/03/24 11:20:40 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.7 KB, free 264.9 MB)
15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(1950) called with curMem=208760, maxMem=278019440
15/03/24 11:20:40 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1950.0 B, free 264.9 MB)
15/03/24 11:20:40 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:54739 (size: 1950.0 B, free: 265.1 MB)
15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/03/24 11:20:40 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838
15/03/24 11:20:40 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (FilteredRDD[2] at filter at SimpleApp.scala:10)
15/03/24 11:20:40 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/03/24 11:20:40 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1361 bytes)
15/03/24 11:20:40 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/03/24 11:20:40 INFO Executor: Fetching http://10.0.4.15:50465/jars/simple-project_2.11-1.0.jar with timestamp 1427221239211
15/03/24 11:20:40 INFO Utils: Fetching http://10.0.4.15:50465/jars/simple-project_2.11-1.0.jar to /tmp/spark-4055924d-8bde-4216-9932-526612364a63/spark-ac26b090-87ab-4f55-b0b9-3c8571d43307/fetchFileTemp507742889710724975.tmp
15/03/24 11:20:40 INFO Executor: Adding file:/tmp/spark-4055924d-8bde-4216-9932-526612364a63/spark-ac26b090-87ab-4f55-b0b9-3c8571d43307/simple-project_2.11-1.0.jar to class loader
15/03/24 11:20:40 INFO CacheManager: Partition rdd_1_0 not found, computing it
15/03/24 11:20:40 INFO HadoopRDD: Input split: file:/home/craig/spark/README.md:0+1814
15/03/24 11:20:40 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/03/24 11:20:40 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/03/24 11:20:40 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/03/24 11:20:40 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/03/24 11:20:40 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(6208) called with curMem=210710, maxMem=278019440
15/03/24 11:20:40 INFO MemoryStore: Block rdd_1_0 stored as values in memory (estimated size 6.1 KB, free 264.9 MB)
15/03/24 11:20:40 INFO BlockManagerInfo: Added rdd_1_0 in memory on localhost:54739 (size: 6.1 KB, free: 265.1 MB)
15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block rdd_1_0
15/03/24 11:20:40 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2326 bytes result sent to driver
15/03/24 11:20:40 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 1361 bytes)
15/03/24 11:20:40 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
15/03/24 11:20:40 INFO CacheManager: Partition rdd_1_1 not found, computing it
15/03/24 11:20:40 INFO HadoopRDD: Input split: file:/home/craig/spark/README.md:1814+1815
15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(5400) called with curMem=216918, maxMem=278019440
15/03/24 11:20:40 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 260 ms on localhost (1/2)
15/03/24 11:20:40 INFO MemoryStore: Block rdd_1_1 stored as values in memory (estimated size 5.3 KB, free 264.9 MB)
15/03/24 11:20:40 INFO BlockManagerInfo: Added rdd_1_1 in memory on localhost:54739 (size: 5.3 KB, free: 265.1 MB)
15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block rdd_1_1
15/03/24 11:20:40 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2326 bytes result sent to driver
15/03/24 11:20:40 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 37 ms on localhost (2/2)
15/03/24 11:20:40 INFO DAGScheduler: Stage 0 (count at SimpleApp.scala:10) finished in 0.299 s
15/03/24 11:20:40 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/03/24 11:20:40 INFO DAGScheduler: Job 0 finished: count at SimpleApp.scala:10, took 0.448259 s
15/03/24 11:20:40 INFO SparkContext: Starting job: count at SimpleApp.scala:11
15/03/24 11:20:40 INFO DAGScheduler: Got job 1 (count at SimpleApp.scala:11) with 2 output partitions (allowLocal=false)
15/03/24 11:20:40 INFO DAGScheduler: Final stage: Stage 1(count at SimpleApp.scala:11)
15/03/24 11:20:40 INFO DAGScheduler: Parents of final stage: List()
15/03/24 11:20:40 INFO DAGScheduler: Missing parents: List()
15/03/24 11:20:40 INFO DAGScheduler: Submitting Stage 1 (FilteredRDD[3] at filter at SimpleApp.scala:11), which has no missing parents
15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(2720) called with curMem=222318, maxMem=278019440
15/03/24 11:20:40 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.7 KB, free 264.9 MB)
15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(1950) called with curMem=225038, maxMem=278019440
15/03/24 11:20:40 INFO BlockManager: Removing broadcast 1
15/03/24 11:20:40 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1950.0 B, free 264.9 MB)
15/03/24 11:20:40 INFO BlockManager: Removing block broadcast_1
15/03/24 11:20:40 INFO MemoryStore: Block broadcast_1 of size 2720 dropped from memory (free 277795172)
15/03/24 11:20:40 INFO BlockManager: Removing block broadcast_1_piece0
15/03/24 11:20:40 INFO MemoryStore: Block broadcast_1_piece0 of size 1950 dropped from memory (free 277797122)
15/03/24 11:20:40 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:54739 (size: 1950.0 B, free: 265.1 MB)
15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
15/03/24 11:20:40 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:54739 in memory (size: 1950.0 B, free: 265.1 MB)
15/03/24 11:20:40 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:838
15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/03/24 11:20:40 INFO DAGScheduler: Submitting 2 missing tasks from Stage 1 (FilteredRDD[3] at filter at SimpleApp.scala:11)
15/03/24 11:20:40 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
15/03/24 11:20:40 INFO ContextCleaner: Cleaned broadcast 1
15/03/24 11:20:40 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, PROCESS_LOCAL, 1361 bytes)
15/03/24 11:20:40 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
15/03/24 11:20:40 INFO BlockManager: Found block rdd_1_0 locally
15/03/24 11:20:40 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1757 bytes result sent to driver
15/03/24 11:20:40 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, PROCESS_LOCAL, 1361 bytes)
15/03/24 11:20:40 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
15/03/24 11:20:40 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 25 ms on localhost (1/2)
15/03/24 11:20:40 INFO BlockManager: Found block rdd_1_1 locally
15/03/24 11:20:40 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 1757 bytes result sent to driver
15/03/24 11:20:40 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 15 ms on localhost (2/2)
15/03/24 11:20:40 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
15/03/24 11:20:40 INFO DAGScheduler: Stage 1 (count at SimpleApp.scala:11) finished in 0.040 s
15/03/24 11:20:40 INFO DAGScheduler: Job 1 finished: count at SimpleApp.scala:11, took 0.067147 s
Lines with a: 60, Lines with b: 29



References

  1. [Spark] Quick Start Guide
    1. Contains the SimpleApp.scala program that was modified for this article

No comments:

Post a Comment