Environment
- sbt 0.13.8
- Scala 2.11.6
- Spark 1.2.1
- Debian Linux (Ubuntu 14.10)
Scala Program
This simple program written in Scala will analyze a local file on my system, count the number of times that lines containing a and b occur, and print the total to the console.
/*** SimpleApp.scala ***/ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object SimpleApp { def main(args: Array[String]) { val logFile = "/home/craig/spark/README.md" // Should be some file on your system val sc = new SparkContext("local", "Simple App", "$SPARK_HOME", List("target/scala-2.11/simple-project_2.11-1.0.jar")) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) } }
Directory Structure
I've created the following directory structure in my home directory:
mkdir -p ~/apps/simple/src/main/scala
The scala program above will be saved under the "src/main/scala" directory.
Build Script
Create a build script called simple.sbt under the "~/apps/simple/" directory:
name := "Simple Project" version := "1.0" scalaVersion := "2.11.6" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.1" libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.6.0" resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
The lines highlighted in red should be replaced with your specific version numbers.
Building the Program
Navigate to ~/apps/simple and type "sbt" on the terminal session:
craig@spark:~/apps/simple$ sbt [info] Set current project to Simple Project (in build file:/home/craig/apps/simple/) >
Type "package" at the prompt:
> package [info] Updating {file:/home/craig/apps/simple/}simple... [info] Resolving jline#jline;2.12.1 ... [info] Done updating. [info] Compiling 1 Scala source to /home/craig/apps/simple/target/scala-2.11/classes... [info] Packaging /home/craig/apps/simple/target/scala-2.11/simple-project_2.11-1.0.jar ... [info] Done packaging. [success] Total time: 18 s, completed Mar 24, 2015 11:15:18 AM
My directory structure now looks like this:
craig@spark:~/apps/simple$ tree . ├── simple.sbt ├── src │ └── main │ └── scala │ └── SimpleApp.scala └── target ├── resolution-cache │ ├── reports │ │ ├── ivy-report.css │ │ ├── ivy-report.xsl │ │ ├── simple-project-simple-project_2.11-compile-internal.xml │ │ ├── simple-project-simple-project_2.11-compile.xml │ │ ├── simple-project-simple-project_2.11-docs.xml │ │ ├── simple-project-simple-project_2.11-optional.xml │ │ ├── simple-project-simple-project_2.11-plugin.xml │ │ ├── simple-project-simple-project_2.11-pom.xml │ │ ├── simple-project-simple-project_2.11-provided.xml │ │ ├── simple-project-simple-project_2.11-runtime-internal.xml │ │ ├── simple-project-simple-project_2.11-runtime.xml │ │ ├── simple-project-simple-project_2.11-scala-tool.xml │ │ ├── simple-project-simple-project_2.11-sources.xml │ │ ├── simple-project-simple-project_2.11-test-internal.xml │ │ └── simple-project-simple-project_2.11-test.xml │ └── simple-project │ └── simple-project_2.11 │ └── 1.0 │ ├── resolved.xml.properties │ └── resolved.xml.xml ├── scala-2.11 │ ├── classes │ │ ├── SimpleApp$$anonfun$1.class │ │ ├── SimpleApp$$anonfun$2.class │ │ ├── SimpleApp.class │ │ └── SimpleApp$.class │ └── simple-project_2.11-1.0.jar └── streams ├── compile │ ├── compile │ │ └── $global │ │ └── streams │ │ └── out │ ├── compileIncremental │ │ └── $global │ │ └── streams │ │ ├── export │ │ └── out │ ├── copyResources │ │ └── $global │ │ └── streams │ │ ├── copy-resources │ │ └── out │ ├── dependencyClasspath │ │ └── $global │ │ └── streams │ │ └── export │ ├── externalDependencyClasspath │ │ └── $global │ │ └── streams │ │ └── export │ ├── $global │ │ └── $global │ │ └── discoveredMainClasses │ │ └── data │ ├── incCompileSetup │ │ └── $global │ │ └── streams │ │ └── inc_compile_2.11 │ ├── internalDependencyClasspath │ │ └── $global │ │ └── streams │ │ └── export │ ├── mainClass │ │ └── $global │ │ └── streams │ │ └── out │ ├── managedClasspath │ │ └── $global │ │ └── streams │ │ └── export │ ├── packageBin │ │ └── $global │ │ └── streams │ │ ├── inputs │ │ ├── out │ │ └── output │ ├── unmanagedClasspath │ │ └── $global │ │ └── streams │ │ └── export │ └── unmanagedJars │ └── $global │ └── streams │ └── export └── $global ├── dependencyPositions │ └── $global │ └── streams │ └── update_cache_2.11 │ ├── input_dsp │ └── output_dsp ├── $global │ └── $global │ └── streams │ └── out ├── ivyConfiguration │ └── $global │ └── streams │ └── out ├── ivySbt │ └── $global │ └── streams │ └── out ├── projectDescriptors │ └── $global │ └── streams │ └── out └── update └── $global └── streams ├── out └── update_cache_2.11 ├── inputs └── output 73 directories, 50 files
Running the Program
Inside your working directory (~/apps/simple), execute this command:
$SPARK_HOME/bin/./spark-submit --class "SimpleApp" --master local[8] target/scala-2.11/simple-project_2.11-1.0.jar
Successful output on my workstation looks like this:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/24 11:20:37 INFO SecurityManager: Changing view acls to: craig 15/03/24 11:20:37 INFO SecurityManager: Changing modify acls to: craig 15/03/24 11:20:37 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(craig); users with modify permissions: Set(craig) 15/03/24 11:20:38 INFO Slf4jLogger: Slf4jLogger started 15/03/24 11:20:38 INFO Remoting: Starting remoting 15/03/24 11:20:38 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.0.4.15:49222] 15/03/24 11:20:38 INFO Utils: Successfully started service 'sparkDriver' on port 49222. 15/03/24 11:20:38 INFO SparkEnv: Registering MapOutputTracker 15/03/24 11:20:38 INFO SparkEnv: Registering BlockManagerMaster 15/03/24 11:20:38 INFO DiskBlockManager: Created local directory at /tmp/spark-91e5f424-b1e6-4d51-a010-a4e0ac788725/spark-2d07771d-f3ad-4c70-bb5b-1329be484b4f 15/03/24 11:20:38 INFO MemoryStore: MemoryStore started with capacity 265.1 MB 15/03/24 11:20:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/24 11:20:38 INFO HttpFileServer: HTTP File server directory is /tmp/spark-cf38f85d-efd0-4941-b4f6-d245b9d8380c/spark-f903d7b0-c67c-48f0-9eb0-4a3279903d2a 15/03/24 11:20:38 INFO HttpServer: Starting HTTP Server 15/03/24 11:20:39 INFO Utils: Successfully started service 'HTTP file server' on port 50465. 15/03/24 11:20:39 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/03/24 11:20:39 INFO SparkUI: Started SparkUI at http://10.0.4.15:4040 15/03/24 11:20:39 INFO SparkContext: Added JAR target/scala-2.11/simple-project_2.11-1.0.jar at http://10.0.4.15:50465/jars/simple-project_2.11-1.0.jar with timestamp 1427221239211 15/03/24 11:20:39 INFO Executor: Starting executor ID <driver> on host localhost 15/03/24 11:20:39 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@10.0.4.15:49222/user/HeartbeatReceiver 15/03/24 11:20:39 INFO NettyBlockTransferService: Server created on 54739 15/03/24 11:20:39 INFO BlockManagerMaster: Trying to register BlockManager 15/03/24 11:20:39 INFO BlockManagerMasterActor: Registering block manager localhost:54739 with 265.1 MB RAM, BlockManagerId(<driver>, localhost, 54739) 15/03/24 11:20:39 INFO BlockManagerMaster: Registered BlockManager 15/03/24 11:20:39 INFO MemoryStore: ensureFreeSpace(180608) called with curMem=0, maxMem=278019440 15/03/24 11:20:39 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 176.4 KB, free 265.0 MB) 15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(25432) called with curMem=180608, maxMem=278019440 15/03/24 11:20:40 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 24.8 KB, free 264.9 MB) 15/03/24 11:20:40 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:54739 (size: 24.8 KB, free: 265.1 MB) 15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/03/24 11:20:40 INFO SparkContext: Created broadcast 0 from textFile at SimpleApp.scala:9 15/03/24 11:20:40 INFO FileInputFormat: Total input paths to process : 1 15/03/24 11:20:40 INFO SparkContext: Starting job: count at SimpleApp.scala:10 15/03/24 11:20:40 INFO DAGScheduler: Got job 0 (count at SimpleApp.scala:10) with 2 output partitions (allowLocal=false) 15/03/24 11:20:40 INFO DAGScheduler: Final stage: Stage 0(count at SimpleApp.scala:10) 15/03/24 11:20:40 INFO DAGScheduler: Parents of final stage: List() 15/03/24 11:20:40 INFO DAGScheduler: Missing parents: List() 15/03/24 11:20:40 INFO DAGScheduler: Submitting Stage 0 (FilteredRDD[2] at filter at SimpleApp.scala:10), which has no missing parents 15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(2720) called with curMem=206040, maxMem=278019440 15/03/24 11:20:40 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.7 KB, free 264.9 MB) 15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(1950) called with curMem=208760, maxMem=278019440 15/03/24 11:20:40 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1950.0 B, free 264.9 MB) 15/03/24 11:20:40 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:54739 (size: 1950.0 B, free: 265.1 MB) 15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/03/24 11:20:40 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838 15/03/24 11:20:40 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (FilteredRDD[2] at filter at SimpleApp.scala:10) 15/03/24 11:20:40 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 15/03/24 11:20:40 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1361 bytes) 15/03/24 11:20:40 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 15/03/24 11:20:40 INFO Executor: Fetching http://10.0.4.15:50465/jars/simple-project_2.11-1.0.jar with timestamp 1427221239211 15/03/24 11:20:40 INFO Utils: Fetching http://10.0.4.15:50465/jars/simple-project_2.11-1.0.jar to /tmp/spark-4055924d-8bde-4216-9932-526612364a63/spark-ac26b090-87ab-4f55-b0b9-3c8571d43307/fetchFileTemp507742889710724975.tmp 15/03/24 11:20:40 INFO Executor: Adding file:/tmp/spark-4055924d-8bde-4216-9932-526612364a63/spark-ac26b090-87ab-4f55-b0b9-3c8571d43307/simple-project_2.11-1.0.jar to class loader 15/03/24 11:20:40 INFO CacheManager: Partition rdd_1_0 not found, computing it 15/03/24 11:20:40 INFO HadoopRDD: Input split: file:/home/craig/spark/README.md:0+1814 15/03/24 11:20:40 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 15/03/24 11:20:40 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 15/03/24 11:20:40 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 15/03/24 11:20:40 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 15/03/24 11:20:40 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(6208) called with curMem=210710, maxMem=278019440 15/03/24 11:20:40 INFO MemoryStore: Block rdd_1_0 stored as values in memory (estimated size 6.1 KB, free 264.9 MB) 15/03/24 11:20:40 INFO BlockManagerInfo: Added rdd_1_0 in memory on localhost:54739 (size: 6.1 KB, free: 265.1 MB) 15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block rdd_1_0 15/03/24 11:20:40 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2326 bytes result sent to driver 15/03/24 11:20:40 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 1361 bytes) 15/03/24 11:20:40 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 15/03/24 11:20:40 INFO CacheManager: Partition rdd_1_1 not found, computing it 15/03/24 11:20:40 INFO HadoopRDD: Input split: file:/home/craig/spark/README.md:1814+1815 15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(5400) called with curMem=216918, maxMem=278019440 15/03/24 11:20:40 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 260 ms on localhost (1/2) 15/03/24 11:20:40 INFO MemoryStore: Block rdd_1_1 stored as values in memory (estimated size 5.3 KB, free 264.9 MB) 15/03/24 11:20:40 INFO BlockManagerInfo: Added rdd_1_1 in memory on localhost:54739 (size: 5.3 KB, free: 265.1 MB) 15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block rdd_1_1 15/03/24 11:20:40 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2326 bytes result sent to driver 15/03/24 11:20:40 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 37 ms on localhost (2/2) 15/03/24 11:20:40 INFO DAGScheduler: Stage 0 (count at SimpleApp.scala:10) finished in 0.299 s 15/03/24 11:20:40 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 15/03/24 11:20:40 INFO DAGScheduler: Job 0 finished: count at SimpleApp.scala:10, took 0.448259 s 15/03/24 11:20:40 INFO SparkContext: Starting job: count at SimpleApp.scala:11 15/03/24 11:20:40 INFO DAGScheduler: Got job 1 (count at SimpleApp.scala:11) with 2 output partitions (allowLocal=false) 15/03/24 11:20:40 INFO DAGScheduler: Final stage: Stage 1(count at SimpleApp.scala:11) 15/03/24 11:20:40 INFO DAGScheduler: Parents of final stage: List() 15/03/24 11:20:40 INFO DAGScheduler: Missing parents: List() 15/03/24 11:20:40 INFO DAGScheduler: Submitting Stage 1 (FilteredRDD[3] at filter at SimpleApp.scala:11), which has no missing parents 15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(2720) called with curMem=222318, maxMem=278019440 15/03/24 11:20:40 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.7 KB, free 264.9 MB) 15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(1950) called with curMem=225038, maxMem=278019440 15/03/24 11:20:40 INFO BlockManager: Removing broadcast 1 15/03/24 11:20:40 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1950.0 B, free 264.9 MB) 15/03/24 11:20:40 INFO BlockManager: Removing block broadcast_1 15/03/24 11:20:40 INFO MemoryStore: Block broadcast_1 of size 2720 dropped from memory (free 277795172) 15/03/24 11:20:40 INFO BlockManager: Removing block broadcast_1_piece0 15/03/24 11:20:40 INFO MemoryStore: Block broadcast_1_piece0 of size 1950 dropped from memory (free 277797122) 15/03/24 11:20:40 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:54739 (size: 1950.0 B, free: 265.1 MB) 15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 15/03/24 11:20:40 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:54739 in memory (size: 1950.0 B, free: 265.1 MB) 15/03/24 11:20:40 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:838 15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/03/24 11:20:40 INFO DAGScheduler: Submitting 2 missing tasks from Stage 1 (FilteredRDD[3] at filter at SimpleApp.scala:11) 15/03/24 11:20:40 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks 15/03/24 11:20:40 INFO ContextCleaner: Cleaned broadcast 1 15/03/24 11:20:40 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, PROCESS_LOCAL, 1361 bytes) 15/03/24 11:20:40 INFO Executor: Running task 0.0 in stage 1.0 (TID 2) 15/03/24 11:20:40 INFO BlockManager: Found block rdd_1_0 locally 15/03/24 11:20:40 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1757 bytes result sent to driver 15/03/24 11:20:40 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, PROCESS_LOCAL, 1361 bytes) 15/03/24 11:20:40 INFO Executor: Running task 1.0 in stage 1.0 (TID 3) 15/03/24 11:20:40 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 25 ms on localhost (1/2) 15/03/24 11:20:40 INFO BlockManager: Found block rdd_1_1 locally 15/03/24 11:20:40 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 1757 bytes result sent to driver 15/03/24 11:20:40 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 15 ms on localhost (2/2) 15/03/24 11:20:40 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 15/03/24 11:20:40 INFO DAGScheduler: Stage 1 (count at SimpleApp.scala:11) finished in 0.040 s 15/03/24 11:20:40 INFO DAGScheduler: Job 1 finished: count at SimpleApp.scala:11, took 0.067147 s Lines with a: 60, Lines with b: 29
References
- [Spark] Quick Start Guide
- Contains the SimpleApp.scala program that was modified for this article
No comments:
Post a Comment