Spark Shell
Spark Shell是一個互動介面,提供使用者一個簡單的方式學習Spark API,可以使用Scala或是Python。 要如何運作呢?
首先到官方網站下載Spark,作者這裡使用的Spark 2.1.2 Pre-build for Hadoop 2.7,表示支援Hadoop 2.7或以上到版本。
接著執行下列的語法:
#若環境變數沒有設定SPARK_HOME,請自行移動至Spark 目錄下。
cd $SPARK_HOME
./bin/spark-shell
可以看到下面的歡迎畫面:
Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/12/21 14:58:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform...
using builtin-java classes where applicable
Spark context Web UI available at http://10.1.1.123:4040
Spark context available as "sc" (master = local[*], app id = local-1513839499523).
Spark session available as "spark".
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ |/ __/ |_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.2
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
注意一下啟動時的資訊:
Spark context Web UI available at http://10.1.1.123:4040
Spark context available as 'sc' (master = local[*], app id = local-1513839499523).
Spark session available as 'spark'.
表示已經幫使用者啟動一個 local 的 Spark Web UI,也初始化了Spark context,表示隨時都可以寫一段程式讓 Spark 在 local 進行運作。接著讓我們來寫一段簡單的word count程式碼吧!
基礎用法
#讀取 $SPARK_HOME 根目錄下的 README.md 檔案。
scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24
#計算有多少文字
scala> textFile.count()
res0: Long = 104
Spark Application 的運作過程可以在Spark context Web UI可以看到剛剛wordcount運作的資訊。
進階使用
也可以在 Spark-Shell 內 import 某些 Scala 或 Java 的 Library。
scala> import java.lang.Math
import java.lang.Math
接著來計算README.md內以行(row)為比較單位,每行最多有幾個字:
scala> val maptextFileRDD = textFile.map(line => line.split(" ").size)
maptextFileRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:27
scala> val reduceMaptextFileRDD= maptextFileRDD.reduce((a, b) => Math.max(a, b))
reduceMaptextFileRDD: Int = 22
由此可了解單行最多的字有22個。
如果要列印出RDD內容,可以使用下列語法:
scala> maptextFileRDD.collect().foreach(println)
實作Hadoop MapReduce
Big Data 的運算框架 MapReduce 一樣可以使用Spark來實作:
scala> val testFlatmap = textFile.flatMap(line => line.split(" ")).map(word => (word, 1))
testFlatmap: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at map at <console>:27
scala> val reduceByKeyFlatmap = testFlatmap.reduceByKey((a, b) => a + b)
reduceByKeyFlatmap: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[11] at reduceByKey at <console>:29
//查看結果
scala> reduceByKeyFlatmap.collect()
res5: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (page](http://spark.apache.org/documentation.html).,1), (cluster.,1), (its,1), ([run,1), (general,3), (have,1), (pre-built,1), (YARN,,1), ([http://spark.apache.org/developer-tools.html](the,1), (changed,1), (locally,2), (sc.parallelize(1,1), (only,1), (locally.,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (first,1), (graph,1), (Hive,2), (info,1), (["Specifying,1), ("yarn",1), ([params]`.,1), ([project,1), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (["Parallel,1), (are...
離開Spark Shell
使用下面指令就可以離開Spark Shell的介面:
scala> :q
最後
Spark Shell 可以讓使用者快速了解如何撰寫RDD並得到結果,不適用於大量運算。如果需要計算大量資料,建議撰寫 Scala 或是 Java 程式,編譯後打包並使用spark-submit 指令送至資源管理系統進行分散式計算,效能會比較好。
簡單介紹完Spark Shell後,接下來要來介紹如何使用Spark api撰寫第一個Hello World application。