[Kafka与Spark集成系列二] Spark的安装及简单应用
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/kafka-and-spark-integration-2-spark-quick-start/
下載Spark安裝包是安裝的第一步,下載地址為http://spark.apache.org/downloads.html。截止撰稿之時(shí),Spark最新版本為2.3.1,如下圖所示,我們可以從官網(wǎng)中選擇spark-2.3.1-bin-hadoop2.7.tgz進(jìn)行下載。
在下載過后,筆者是先將安裝包拷貝至/opt目錄下,然后執(zhí)行相應(yīng)的解壓縮動(dòng)作,示例如下:
[root@node1 opt]# tar zxvf spark-2.3.1-bin-hadoop2.7.tgz [root@node1 opt]# mv spark-2.3.1-bin-hadoop2.7 spark [root@node1 opt]# cd spark [root@node1 spark]#在解壓縮之后可以直接運(yùn)行Spark,當(dāng)然前提是要安裝好JDK,并設(shè)置好環(huán)境變量JAVA_HOME。進(jìn)入$SPARK_HOME/sbin目錄下執(zhí)行start-all.sh腳本啟動(dòng)Spark。腳本執(zhí)行后,可以通過jps -l命令查看當(dāng)前運(yùn)行的進(jìn)程信息,示例如下:
[root@node1 spark]# jps -l 23353 org.apache.spark.deploy.master.Master 23452 org.apache.spark.deploy.worker.Worker可以看到Spark啟動(dòng)后多了Master和Worker進(jìn)程,分別代表主節(jié)點(diǎn)和工作節(jié)點(diǎn)。我們還可以通過Spark提供的Web界面來查看Spark的運(yùn)行情況,比如可以通過http://localhost:8080來查看Master的運(yùn)行情況。
Spark中帶有交互式的shell,可以用作即時(shí)數(shù)據(jù)分析。現(xiàn)在我們通過spark-shell來運(yùn)行一個(gè)簡(jiǎn)單但又非常經(jīng)典的單詞統(tǒng)計(jì)的程序,以便可以簡(jiǎn)單的了解一下Spark的使用。首先是進(jìn)入$SPARK_HOME/bin目錄下(SPARK_HOME表示Spark安裝的根目錄,即本例中的/opt/spark)執(zhí)行spark-shell命令來進(jìn)行啟動(dòng),可以通過–master參數(shù)來指定所需要連接的集群。spark-shell啟動(dòng)時(shí),你會(huì)看到一些啟動(dòng)日志,示例如下:
[root@node1 spark]# bin/spark-shell --master spark://localhost:7077 2018-08-07 11:02:04 WARN Utils:66 - Your hostname, hidden.zzh.com resolves to a loopback address: 127.0.0.1; using 10.xxx.xxx.xxx instead (on interface eth0) 2018-08-07 11:02:04 WARN Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address 2018-08-07 11:02:04 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http:// 10.xxx.xxx.xxx:4040 Spark context available as 'sc' (master = spark://localhost:7077, app id = app-20180807110212-0000). Spark session available as 'spark'. Welcome to____ __/ __/__ ___ _____/ /___\ \/ _ \/ _ `/ __/ '_//___/ .__/\_,_/_/ /_/\_\ version 2.3.1/_/Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102) Type in expressions to have them evaluated. Type :help for more information.scala>如此,我們便可以在“scala>”處鍵入我們想要輸入的程序。
在將要演示的示例程序中,我們就近取材的以bin/spark-shell文件中的內(nèi)容來進(jìn)行單詞統(tǒng)計(jì)。程序首先讀取這個(gè)文件的內(nèi)容,然后進(jìn)行分詞,在這里的分詞方法是使用空格進(jìn)行分割,最后統(tǒng)計(jì)單詞出現(xiàn)的次數(shù),下面就將這些步驟進(jìn)行拆分,一步一步來講解其中的細(xì)節(jié)。如無特殊說明,本章節(jié)的示例均以Scala語言進(jìn)行編寫。
首先是通過SparkContext(Spark在啟動(dòng)是已經(jīng)自動(dòng)創(chuàng)建了一個(gè)SparkContext對(duì)象,是一個(gè)叫做sc的變量)的textFile()方法讀取bin/spark-shell文件,參考如下:
scala> val rdd = sc.textFile("/opt/spark/bin/spark-shell") rdd: org.apache.spark.rdd.RDD[String] = /opt/spark/bin/spark-shell MapPartitionsRDD[3] at textFile at <console>:24然后使用split()方法按照空格進(jìn)行分詞,之后又通過flatMap()方法對(duì)處理后的單詞進(jìn)行展平,展平完畢之后使用map(x=>(x,1))對(duì)每個(gè)單詞計(jì)數(shù)1,參考如下:
scala> val wordmap = rdd.flatMap(_.split(" ")).map(x=>(x,1)) wordmap: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at <console>:25最后使用reduceByKey(+)根據(jù)key也就是單詞進(jìn)行計(jì)數(shù),這個(gè)過程是一個(gè)混洗(Shuffle)的過程,參考如下:
scala> val wordreduce = wordmap.reduceByKey(_+_) wordreduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at <console>:25到這里我們便完成了單詞統(tǒng)計(jì),進(jìn)一步的我們使用take(10)方法來獲取前面10個(gè)單詞統(tǒng)計(jì)的結(jié)果,參考如下:
scala> wordreduce.take(10) res3: Array[(String, Int)] = Array((scala,2), (!=,1), (Unless,1), (this,4), (starting,1), (under,4), (its,1), (reenable,2), (-Djline.terminal=unix",1), (CYGWIN*),1))我們發(fā)現(xiàn)結(jié)果并沒有按照某種順序進(jìn)行排序,如果要看到諸如單詞出現(xiàn)次數(shù)前10內(nèi)容的話,還需要對(duì)統(tǒng)計(jì)后的結(jié)果進(jìn)行排序。
scala> val wordsort = wordreduce.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)) wordsort: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at map at <console>:25scala> wordsort.take(10) res2: Array[(String, Int)] = Array(("",91), (#,37), (the,19), (in,7), (to,7), (for,6), (if,5), (then,5), (this,4), (under,4))上面的代碼中首先使用map(x=>(x._2,x._1)對(duì)單詞統(tǒng)計(jì)結(jié)果的鍵和值進(jìn)行互換,然后通過sortByKey(false)方法對(duì)值進(jìn)行降序排序,然后再次通過map(x=>(x._2,x._1)將鍵和值進(jìn)行互換,最終的結(jié)果按照降序排序。
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/kafka-and-spark-integration-2-spark-quick-start/
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
總結(jié)
以上是生活随笔為你收集整理的[Kafka与Spark集成系列二] Spark的安装及简单应用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [Kafka与Spark集成系列一]
- 下一篇: [Kafka与Spark集成系列三] S