Showing posts with label sbt package. Show all posts
Showing posts with label sbt package. Show all posts

Tuesday, March 24, 2015

Deploy a Scala job to a Spark Cluster using SBT

Environment

  1. sbt 0.13.8
  2. Scala 2.11.6
  3. Spark 1.2.1
  4. Debian Linux (Ubuntu 14.10)



Scala Program


This simple program written in Scala will analyze a local file on my system, count the number of times that lines containing a and b occur, and print the total to the console.
/*** SimpleApp.scala ***/
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "/home/craig/spark/README.md" // Should be some file on your system
    val sc = new SparkContext("local", "Simple App", "$SPARK_HOME", List("target/scala-2.11/simple-project_2.11-1.0.jar"))
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}



Directory Structure


I've created the following directory structure in my home directory:
mkdir -p ~/apps/simple/src/main/scala

The scala program above will be saved under the "src/main/scala" directory.


Build Script


Create a build script called simple.sbt under the "~/apps/simple/" directory:
name := "Simple Project"

version := "1.0"

scalaVersion := "2.11.6"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.1"
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.6.0"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

The lines highlighted in red should be replaced with your specific version numbers.


Building the Program


Navigate to ~/apps/simple and type "sbt" on the terminal session:
craig@spark:~/apps/simple$ sbt
[info] Set current project to Simple Project (in build file:/home/craig/apps/simple/)
> 

Type "package" at the prompt:
> package
[info] Updating {file:/home/craig/apps/simple/}simple...
[info] Resolving jline#jline;2.12.1 ...
[info] Done updating.
[info] Compiling 1 Scala source to /home/craig/apps/simple/target/scala-2.11/classes...
[info] Packaging /home/craig/apps/simple/target/scala-2.11/simple-project_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 18 s, completed Mar 24, 2015 11:15:18 AM

My directory structure now looks like this:
craig@spark:~/apps/simple$ tree
.
├── simple.sbt
├── src
│   └── main
│       └── scala
│           └── SimpleApp.scala
└── target
    ├── resolution-cache
    │   ├── reports
    │   │   ├── ivy-report.css
    │   │   ├── ivy-report.xsl
    │   │   ├── simple-project-simple-project_2.11-compile-internal.xml
    │   │   ├── simple-project-simple-project_2.11-compile.xml
    │   │   ├── simple-project-simple-project_2.11-docs.xml
    │   │   ├── simple-project-simple-project_2.11-optional.xml
    │   │   ├── simple-project-simple-project_2.11-plugin.xml
    │   │   ├── simple-project-simple-project_2.11-pom.xml
    │   │   ├── simple-project-simple-project_2.11-provided.xml
    │   │   ├── simple-project-simple-project_2.11-runtime-internal.xml
    │   │   ├── simple-project-simple-project_2.11-runtime.xml
    │   │   ├── simple-project-simple-project_2.11-scala-tool.xml
    │   │   ├── simple-project-simple-project_2.11-sources.xml
    │   │   ├── simple-project-simple-project_2.11-test-internal.xml
    │   │   └── simple-project-simple-project_2.11-test.xml
    │   └── simple-project
    │       └── simple-project_2.11
    │           └── 1.0
    │               ├── resolved.xml.properties
    │               └── resolved.xml.xml
    ├── scala-2.11
    │   ├── classes
    │   │   ├── SimpleApp$$anonfun$1.class
    │   │   ├── SimpleApp$$anonfun$2.class
    │   │   ├── SimpleApp.class
    │   │   └── SimpleApp$.class
    │   └── simple-project_2.11-1.0.jar
    └── streams
        ├── compile
        │   ├── compile
        │   │   └── $global
        │   │       └── streams
        │   │           └── out
        │   ├── compileIncremental
        │   │   └── $global
        │   │       └── streams
        │   │           ├── export
        │   │           └── out
        │   ├── copyResources
        │   │   └── $global
        │   │       └── streams
        │   │           ├── copy-resources
        │   │           └── out
        │   ├── dependencyClasspath
        │   │   └── $global
        │   │       └── streams
        │   │           └── export
        │   ├── externalDependencyClasspath
        │   │   └── $global
        │   │       └── streams
        │   │           └── export
        │   ├── $global
        │   │   └── $global
        │   │       └── discoveredMainClasses
        │   │           └── data
        │   ├── incCompileSetup
        │   │   └── $global
        │   │       └── streams
        │   │           └── inc_compile_2.11
        │   ├── internalDependencyClasspath
        │   │   └── $global
        │   │       └── streams
        │   │           └── export
        │   ├── mainClass
        │   │   └── $global
        │   │       └── streams
        │   │           └── out
        │   ├── managedClasspath
        │   │   └── $global
        │   │       └── streams
        │   │           └── export
        │   ├── packageBin
        │   │   └── $global
        │   │       └── streams
        │   │           ├── inputs
        │   │           ├── out
        │   │           └── output
        │   ├── unmanagedClasspath
        │   │   └── $global
        │   │       └── streams
        │   │           └── export
        │   └── unmanagedJars
        │       └── $global
        │           └── streams
        │               └── export
        └── $global
            ├── dependencyPositions
            │   └── $global
            │       └── streams
            │           └── update_cache_2.11
            │               ├── input_dsp
            │               └── output_dsp
            ├── $global
            │   └── $global
            │       └── streams
            │           └── out
            ├── ivyConfiguration
            │   └── $global
            │       └── streams
            │           └── out
            ├── ivySbt
            │   └── $global
            │       └── streams
            │           └── out
            ├── projectDescriptors
            │   └── $global
            │       └── streams
            │           └── out
            └── update
                └── $global
                    └── streams
                        ├── out
                        └── update_cache_2.11
                            ├── inputs
                            └── output

73 directories, 50 files



Running the Program


Inside your working directory (~/apps/simple), execute this command:
$SPARK_HOME/bin/./spark-submit --class "SimpleApp" --master local[8] target/scala-2.11/simple-project_2.11-1.0.jar

Successful output on my workstation looks like this:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/03/24 11:20:37 INFO SecurityManager: Changing view acls to: craig
15/03/24 11:20:37 INFO SecurityManager: Changing modify acls to: craig
15/03/24 11:20:37 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(craig); users with modify permissions: Set(craig)
15/03/24 11:20:38 INFO Slf4jLogger: Slf4jLogger started
15/03/24 11:20:38 INFO Remoting: Starting remoting
15/03/24 11:20:38 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.0.4.15:49222]
15/03/24 11:20:38 INFO Utils: Successfully started service 'sparkDriver' on port 49222.
15/03/24 11:20:38 INFO SparkEnv: Registering MapOutputTracker
15/03/24 11:20:38 INFO SparkEnv: Registering BlockManagerMaster
15/03/24 11:20:38 INFO DiskBlockManager: Created local directory at /tmp/spark-91e5f424-b1e6-4d51-a010-a4e0ac788725/spark-2d07771d-f3ad-4c70-bb5b-1329be484b4f
15/03/24 11:20:38 INFO MemoryStore: MemoryStore started with capacity 265.1 MB
15/03/24 11:20:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/03/24 11:20:38 INFO HttpFileServer: HTTP File server directory is /tmp/spark-cf38f85d-efd0-4941-b4f6-d245b9d8380c/spark-f903d7b0-c67c-48f0-9eb0-4a3279903d2a
15/03/24 11:20:38 INFO HttpServer: Starting HTTP Server
15/03/24 11:20:39 INFO Utils: Successfully started service 'HTTP file server' on port 50465.
15/03/24 11:20:39 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/03/24 11:20:39 INFO SparkUI: Started SparkUI at http://10.0.4.15:4040
15/03/24 11:20:39 INFO SparkContext: Added JAR target/scala-2.11/simple-project_2.11-1.0.jar at http://10.0.4.15:50465/jars/simple-project_2.11-1.0.jar with timestamp 1427221239211
15/03/24 11:20:39 INFO Executor: Starting executor ID <driver> on host localhost
15/03/24 11:20:39 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@10.0.4.15:49222/user/HeartbeatReceiver
15/03/24 11:20:39 INFO NettyBlockTransferService: Server created on 54739
15/03/24 11:20:39 INFO BlockManagerMaster: Trying to register BlockManager
15/03/24 11:20:39 INFO BlockManagerMasterActor: Registering block manager localhost:54739 with 265.1 MB RAM, BlockManagerId(<driver>, localhost, 54739)
15/03/24 11:20:39 INFO BlockManagerMaster: Registered BlockManager
15/03/24 11:20:39 INFO MemoryStore: ensureFreeSpace(180608) called with curMem=0, maxMem=278019440
15/03/24 11:20:39 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 176.4 KB, free 265.0 MB)
15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(25432) called with curMem=180608, maxMem=278019440
15/03/24 11:20:40 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 24.8 KB, free 264.9 MB)
15/03/24 11:20:40 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:54739 (size: 24.8 KB, free: 265.1 MB)
15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/03/24 11:20:40 INFO SparkContext: Created broadcast 0 from textFile at SimpleApp.scala:9
15/03/24 11:20:40 INFO FileInputFormat: Total input paths to process : 1
15/03/24 11:20:40 INFO SparkContext: Starting job: count at SimpleApp.scala:10
15/03/24 11:20:40 INFO DAGScheduler: Got job 0 (count at SimpleApp.scala:10) with 2 output partitions (allowLocal=false)
15/03/24 11:20:40 INFO DAGScheduler: Final stage: Stage 0(count at SimpleApp.scala:10)
15/03/24 11:20:40 INFO DAGScheduler: Parents of final stage: List()
15/03/24 11:20:40 INFO DAGScheduler: Missing parents: List()
15/03/24 11:20:40 INFO DAGScheduler: Submitting Stage 0 (FilteredRDD[2] at filter at SimpleApp.scala:10), which has no missing parents
15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(2720) called with curMem=206040, maxMem=278019440
15/03/24 11:20:40 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.7 KB, free 264.9 MB)
15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(1950) called with curMem=208760, maxMem=278019440
15/03/24 11:20:40 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1950.0 B, free 264.9 MB)
15/03/24 11:20:40 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:54739 (size: 1950.0 B, free: 265.1 MB)
15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/03/24 11:20:40 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838
15/03/24 11:20:40 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (FilteredRDD[2] at filter at SimpleApp.scala:10)
15/03/24 11:20:40 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/03/24 11:20:40 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1361 bytes)
15/03/24 11:20:40 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/03/24 11:20:40 INFO Executor: Fetching http://10.0.4.15:50465/jars/simple-project_2.11-1.0.jar with timestamp 1427221239211
15/03/24 11:20:40 INFO Utils: Fetching http://10.0.4.15:50465/jars/simple-project_2.11-1.0.jar to /tmp/spark-4055924d-8bde-4216-9932-526612364a63/spark-ac26b090-87ab-4f55-b0b9-3c8571d43307/fetchFileTemp507742889710724975.tmp
15/03/24 11:20:40 INFO Executor: Adding file:/tmp/spark-4055924d-8bde-4216-9932-526612364a63/spark-ac26b090-87ab-4f55-b0b9-3c8571d43307/simple-project_2.11-1.0.jar to class loader
15/03/24 11:20:40 INFO CacheManager: Partition rdd_1_0 not found, computing it
15/03/24 11:20:40 INFO HadoopRDD: Input split: file:/home/craig/spark/README.md:0+1814
15/03/24 11:20:40 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/03/24 11:20:40 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/03/24 11:20:40 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/03/24 11:20:40 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/03/24 11:20:40 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(6208) called with curMem=210710, maxMem=278019440
15/03/24 11:20:40 INFO MemoryStore: Block rdd_1_0 stored as values in memory (estimated size 6.1 KB, free 264.9 MB)
15/03/24 11:20:40 INFO BlockManagerInfo: Added rdd_1_0 in memory on localhost:54739 (size: 6.1 KB, free: 265.1 MB)
15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block rdd_1_0
15/03/24 11:20:40 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2326 bytes result sent to driver
15/03/24 11:20:40 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 1361 bytes)
15/03/24 11:20:40 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
15/03/24 11:20:40 INFO CacheManager: Partition rdd_1_1 not found, computing it
15/03/24 11:20:40 INFO HadoopRDD: Input split: file:/home/craig/spark/README.md:1814+1815
15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(5400) called with curMem=216918, maxMem=278019440
15/03/24 11:20:40 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 260 ms on localhost (1/2)
15/03/24 11:20:40 INFO MemoryStore: Block rdd_1_1 stored as values in memory (estimated size 5.3 KB, free 264.9 MB)
15/03/24 11:20:40 INFO BlockManagerInfo: Added rdd_1_1 in memory on localhost:54739 (size: 5.3 KB, free: 265.1 MB)
15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block rdd_1_1
15/03/24 11:20:40 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2326 bytes result sent to driver
15/03/24 11:20:40 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 37 ms on localhost (2/2)
15/03/24 11:20:40 INFO DAGScheduler: Stage 0 (count at SimpleApp.scala:10) finished in 0.299 s
15/03/24 11:20:40 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/03/24 11:20:40 INFO DAGScheduler: Job 0 finished: count at SimpleApp.scala:10, took 0.448259 s
15/03/24 11:20:40 INFO SparkContext: Starting job: count at SimpleApp.scala:11
15/03/24 11:20:40 INFO DAGScheduler: Got job 1 (count at SimpleApp.scala:11) with 2 output partitions (allowLocal=false)
15/03/24 11:20:40 INFO DAGScheduler: Final stage: Stage 1(count at SimpleApp.scala:11)
15/03/24 11:20:40 INFO DAGScheduler: Parents of final stage: List()
15/03/24 11:20:40 INFO DAGScheduler: Missing parents: List()
15/03/24 11:20:40 INFO DAGScheduler: Submitting Stage 1 (FilteredRDD[3] at filter at SimpleApp.scala:11), which has no missing parents
15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(2720) called with curMem=222318, maxMem=278019440
15/03/24 11:20:40 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.7 KB, free 264.9 MB)
15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(1950) called with curMem=225038, maxMem=278019440
15/03/24 11:20:40 INFO BlockManager: Removing broadcast 1
15/03/24 11:20:40 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1950.0 B, free 264.9 MB)
15/03/24 11:20:40 INFO BlockManager: Removing block broadcast_1
15/03/24 11:20:40 INFO MemoryStore: Block broadcast_1 of size 2720 dropped from memory (free 277795172)
15/03/24 11:20:40 INFO BlockManager: Removing block broadcast_1_piece0
15/03/24 11:20:40 INFO MemoryStore: Block broadcast_1_piece0 of size 1950 dropped from memory (free 277797122)
15/03/24 11:20:40 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:54739 (size: 1950.0 B, free: 265.1 MB)
15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
15/03/24 11:20:40 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:54739 in memory (size: 1950.0 B, free: 265.1 MB)
15/03/24 11:20:40 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:838
15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/03/24 11:20:40 INFO DAGScheduler: Submitting 2 missing tasks from Stage 1 (FilteredRDD[3] at filter at SimpleApp.scala:11)
15/03/24 11:20:40 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
15/03/24 11:20:40 INFO ContextCleaner: Cleaned broadcast 1
15/03/24 11:20:40 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, PROCESS_LOCAL, 1361 bytes)
15/03/24 11:20:40 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
15/03/24 11:20:40 INFO BlockManager: Found block rdd_1_0 locally
15/03/24 11:20:40 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1757 bytes result sent to driver
15/03/24 11:20:40 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, PROCESS_LOCAL, 1361 bytes)
15/03/24 11:20:40 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
15/03/24 11:20:40 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 25 ms on localhost (1/2)
15/03/24 11:20:40 INFO BlockManager: Found block rdd_1_1 locally
15/03/24 11:20:40 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 1757 bytes result sent to driver
15/03/24 11:20:40 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 15 ms on localhost (2/2)
15/03/24 11:20:40 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
15/03/24 11:20:40 INFO DAGScheduler: Stage 1 (count at SimpleApp.scala:11) finished in 0.040 s
15/03/24 11:20:40 INFO DAGScheduler: Job 1 finished: count at SimpleApp.scala:11, took 0.067147 s
Lines with a: 60, Lines with b: 29



References

  1. [Spark] Quick Start Guide
    1. Contains the SimpleApp.scala program that was modified for this article

Installing SBT (Simple Build Tool) on Ubuntu

Environment

  1. SBT 0.13.8
  2. Ubuntu 14.10
sbt is an open source build tool for Scala and Java projects, similar to Java's Maven or Ant. Its main features are: native support for compiling Scala code and integrating with many Scala test frameworks.



Installing SBT


The following commands have been tested, and are operational, under Ubuntu 14.10
mkdir ~/sbt
cd ~/sbt
wget https://dl.bintray.com/sbt/native-packages/sbt/0.13.8/sbt-0.13.8.tgz
sudo tar -zxvf sbt-0.13.8.tgz

This script will create an sbt folder in the home directory, get the latest (at the time of this article) sbt tgz file and unpackage it.


Modifying the Path


Modify both the PATH and CLASSPATH to point to the new Scala installation.

I like to use nano to edit my environment file:
sudo nano /etc/environment

The text in bold was added:
PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/home/craig/sbt/bin"  
...  
export SBT_HOME=/home/craig/sbt

Once the environment file is saved, reload it:
source /etc/environment



Verify the Installation


If the installation and environment editing were both successful, you should be able to find the version of scala on the terminal session:
craig@spark:~$ sbt --version
sbt launcher version 0.13.8



References

  1. [SBT] Installing SBT on Debian