Zeppelin-在Flink和Spark集群的安装
2019獨角獸企業重金招聘Python工程師標準>>>
該教程主要面向Zeppelin的入門者。不需要太多的關于 Linux, git, 或其它工具的基礎知識。如果你按照這里的方法逐項執行,就可以將 Zeppelin 正常運行起來。
安裝Zeppelin為Flink/Spark集群模式
本教程假定用戶有一個新的機器環境 (物理機或?virtual?均可, 最小安裝?Ubuntu 14.04.3 Server)。
注意:?虛擬機的大小至少16GB,以免出現磁盤空間不夠導致安裝失敗。
?
軟件要求
采用最小安裝, 下面幾個程序需要在安裝Zeppelin、Flink 和 Spark之前安裝:
- git
- openssh-server
- OpenJDK 7
- Maven 3.1+
安裝 git, openssh-server和 OpenJDK 7 可以使用apt 包管理器來完成。
git
命令行鍵入:
sudo apt-get install gitopenssh-server
sudo apt-get install openssh-serverOpenJDK 7
sudo apt-get install openjdk-7-jdk openjdk-7-jre-lib使用Ubuntu 16.04: 安裝?openjdk-7?必須加上 repository(?Source),如下:
sudo add-apt-repository ppa:openjdk-r/ppa sudo apt-get update sudo apt-get install openjdk-7-jdk openjdk-7-jre-libMaven 3.1+
Zeppelin 要求 maven 版本 3.x以上。該版本在系統庫中為 2.x, 因此 maven 需要手動安裝。
首先,清除現存的 maven各個版本:
sudo apt-get purge maven maven2下載 maven 3.3.9 二進制軟件:
wget "http://www.us.apache.org/dist/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz"解壓縮并放到?/usr/local?目錄:
tar -zxvf apache-maven-3.3.9-bin.tar.gz sudo mv ./apache-maven-3.3.9 /usr/local創建一個符號鏈接,在?/usr/bin?中:
sudo ln -s /usr/local/apache-maven-3.3.9/bin/mvn /usr/bin/mvn?
安裝 Zeppelin
這里提供一個Zeppelin的源碼安裝快速步驟,詳細步驟請閱讀?Zeppelin Installation Guide。
命令行,Clone Zeppelin 源代碼:
git clone https://github.com/apache/zeppelin.git進入 Zeppelin 根目錄:
cd zeppelin打包 Zeppelin:
mvn clean package -DskipTests -Pspark-1.6 -Dflink.version=1.1.3 -Pscala-2.10-DskipTests?跳過 build tests。
-Pspark-1.6?指定maven采用 Spark 1.6進行構建。因為 Zeppelin有自己的Spark interpreter,版本必須與連接的Spark服務保持一致。
-Dflink.version=1.1.3?指定 maven 采用Flink 版本 1.1.3進行構建。
--Pscala-2.10?指定 maven 使用 Scala v2.10進行構建。
注意:?你可以包含額外的build flags,如:?-Ppyspark?或?-Psparkr。詳細的參考:?the build section of github for more details.
注意:?你可以構建任何在Zeppelin Profile中可用的Spark版本,關鍵是要選擇一致的版本進行構建。
注意:?關于build失敗. 安裝過Zeppe超過30次,我可以告訴你,有時候構建失敗找不出原因。在沒有編輯任何代碼的情況下,可能因為某些原因build就失敗了。很多時候,maven試圖下載時失敗。
如果構建失敗,下面是一些解決方法的提示:
- 查看 logs.
- 重試 (再次運行?mvn clean package -DskipTests -Pspark-1.6?)
- 如果下載失敗,等待一些時間后再下載。有時如果 server 不可用,就只能等待。
- 確認你的步驟都是正確的。
- 向社區請求幫助。到?here?并且加入用戶郵件列表。確保將build output (everything that happened in the console) 的輸出包含在你的消息中。
啟動Zeppelin服務
bin/zeppelin-daemon.sh start使用?ifconfig?來確認 host machine's IP 地址。如果不熟悉, 可以參考?here。
打開瀏覽器,本機輸入地址 http://127.0.0.1:8080, 如果不在本機訪問(同一個網段)可以通過命令?ifconfig獲得服務器的IP地址。
查看?Zeppelin tutorial?獲取Zeppelin的基本用法。建議你花一些時間查看一下 Zeppelin 安裝時自帶的notebook例子,可以快速熟悉基本的notebook功能。
Flink 測試
創建一個新的 notebook ,名稱為 "Flink Test",復制下面的代碼到里面:
%flink // let Zeppelin know what interpreter to use.val text = benv.fromElements("In the time of chimpanzees, I was a monkey", // some lines of text to analyze "Butane in my veins and I'm out to cut the junkie", "With the plastic eyeballs, spray paint the vegetables", "Dog food stalls with the beefcake pantyhose", "Kill the headlights and put it in neutral", "Stock car flamin' with a loser in the cruise control", "Baby's in Reno with the Vitamin D", "Got a couple of couches, sleep on the love seat", "Someone came in sayin' I'm insane to complain", "About a shotgun wedding and a stain on my shirt", "Don't believe everything that you breathe", "You get a parking violation and a maggot on your sleeve", "So shave your face with some mace in the dark", "Savin' all your food stamps and burnin' down the trailer park", "Yo, cut it")/* The meat and potatoes:this tells Flink to iterate through the elements, in this case strings,transform the string to lower case and split the string at white space into individual wordsthen finally aggregate the occurrence of each word.This creates the count variable which is a list of tuples of the form (word, occurances)counts.collect().foreach(println(_)) // execute the script and print each element in the counts list*/ val counts = text.flatMap{ _.toLowerCase.split("\\W+") }.map { (_,1) }.groupBy(0).sum(1)counts.collect().foreach(println(_)) // execute the script and print each element in the counts list按Enter+Shift運行,確保 Zeppelin Flink interpreter 工作正確,如果有問題到菜單的interpreter進行設置。
Spark 測試
創建一個notebook,名稱為 "Spark Test" ,復制下面的代碼進去:
%spark // let Zeppelin know what interpreter to use.val text = sc.parallelize(List("In the time of chimpanzees, I was a monkey", // some lines of text to analyze "Butane in my veins and I'm out to cut the junkie", "With the plastic eyeballs, spray paint the vegetables", "Dog food stalls with the beefcake pantyhose", "Kill the headlights and put it in neutral", "Stock car flamin' with a loser in the cruise control", "Baby's in Reno with the Vitamin D", "Got a couple of couches, sleep on the love seat", "Someone came in sayin' I'm insane to complain", "About a shotgun wedding and a stain on my shirt", "Don't believe everything that you breathe", "You get a parking violation and a maggot on your sleeve", "So shave your face with some mace in the dark", "Savin' all your food stamps and burnin' down the trailer park", "Yo, cut it"))/* The meat and potatoes:this tells spark to iterate through the elements, in this case strings,transform the string to lower case and split the string at white space into individual wordsthen finally aggregate the occurrence of each word.This creates the count variable which is a list of tuples of the form (word, occurances) */ val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_,1) }.reduceByKey(_ + _)counts.collect().foreach(println(_)) // execute the script and print each element in the counts list按Enter+Shift運行,確保 Zeppelin Flink interpreter 工作正確,如果有問題到菜單的interpreter進行設置。
最后, 停止Zeppelin daemon服務。從系統的命令窗口輸入并回車執行:
bin/zeppelin-daemon.sh stop安裝集群
Flink 集群
現在預編譯代碼
如果可能,建議您從源碼進行構建,不僅可以獲得最新的功能,還能了解項目的最新進展和代碼的結構,定制自己特定環境的版本。為了便于演示,這里直接下載編譯好的版本。
下載使用?wget
wget "http://mirror.cogentco.com/pub/apache/flink/flink-1.1.3/flink-1.1.3-bin-hadoop24-scala_2.10.tgz" tar -xzvf flink-1.1.3-bin-hadoop24-scala_2.10.tgz將下載 Flink 1.1.3, 與 Hadoop 2.4兼容。這個版本不需要安裝 Hadoop ,但如果使用 Hadoop, 將上面的?24?改為對應的版本。
啟動 Flink 集群:
flink-1.1.3/bin/start-cluster.sh從源碼構建
如果希望從源碼編譯構建Flink, 下面是快捷指南。 改變構建工具和版本可能帶來不穩定性。例如, Java8 和 Maven 3.0.3 建議用于編譯 Flink, 但是目前不適合用于 Zeppelin 的構建(版本在快速更新中,以后可能就適合了). 查看?Flink Installation guide?獲得更多的細節指南。
返回到目錄, 這里假設是?$HOME. 復制 Flink 項目源碼, 檢出版本 release-1.1.3-rc2, 然后編譯。
cd $HOME git clone https://github.com/apache/flink.git cd flink git checkout release-1.1.3-rc2 mvn clean install -DskipTests啟動? Flink 集群,使用 stand-alone 模式:
build-target/bin/start-cluster.sh確保集群成功啟動。
在瀏覽器中, 輸入URL地址 http://127.0.0.1:8082 ,可以看到Flink 的Web-UI。在左側導航欄點擊 'Task Managers' 。確保至少有一個Task Manager打開。
如果task managers沒有出現, 重新啟動一下 Flink 集群,方法如下:
(if binaries)?flink-1.1.3/bin/stop-cluster.sh flink-1.1.3/bin/start-cluster.sh
(if built from source)?build-target/bin/stop-cluster.sh build-target/bin/start-cluster.sh
?
Spark 1.6 集群
下載預編譯軟件包
如果可能,建議從源碼編譯。這里為了便于演示,采用直接下載編譯好的軟件包。
下載使用?wget
wget "http://d3kbcqa49mib13.cloudfront.net/spark-1.6.3-bin-hadoop2.6.tgz" tar -xzvf spark-1.6.3-bin-hadoop2.6.tgz mv spark-1.6.3-bin-hadoop2.6 spark上面的命令會下載Spark 1.6.3, 與Hadoop 2.6兼容。 本安裝包工作時不需要安裝Hadoop,但如果使用 Hadoop, 需要將版本號?2.6?改變為你的對應版本。
從源碼編譯
Spark 是一個比較大的項目, 將耗費較長的時間下載和編譯,中間可能會遇到像Flink編譯時同樣的問題而失敗。參考?Spark Installation?獲得更多的細節的指南。
返回到下載目錄,這里假設是 $HOME. 復制 Spark源代碼, 檢出分支 branch-1.6, 然后進行build。
注意:?這里檢出 1.6 只是因為這是本文寫作時的 Zeppelin profile 支持的版本。你需要構建對應于Spark的相應版本。如果使用 Spark 2.0, 下面的例子 word count 需要修改為Spark 2.0 兼容。
cd $HOMEClone, check out, 以及 build Spark 1.6.x,腳本命令如下:
git clone https://github.com/apache/spark.git cd spark git checkout branch-1.6 mvn clean package -DskipTests啟動 Spark集群
返回到?$HOME?目錄.
cd $HOME啟動Spark 集群,使用stand-alone 模式。如果不使用默認端口8080,通過 webui-port 參數制定服務端口 (Zeppelin的webui-port服務端口)。
spark/sbin/start-master.sh --webui-port 8082注意:?為什么使用?--webui-port 8082? 這個是題外話,在后面再去解釋。
打開瀏覽器,導航到 http://yourip:8082 確保 Spark master 已經運行,顯示信息如下。
?
?
頁面上方顯示?URL地址: spark://yourhost:7077, 這是Spark Master訪問的URI, 在后續的操作中將會用到。
使用這個URI啟動一個Spark的slave節點:
spark/sbin/start-slave.sh spark://yourhostname:7077返回 Zeppelin daemon啟動的主目錄:
cd $HOMEzeppelin/bin/zeppelin-daemon.sh start?
配置 Interpreters
打開瀏覽器,導航到 Zeppelin 的web-ui,地址為:http://yourip:8080.
回到 Zeppelin web-ui ( http://yourip:8080),點擊右上方的?anonymous?將打開下拉菜單, 選擇?Interpreters?進入解釋器的配置頁面。
在 Spark 一節, 右上方點擊 edit 按鈕(鉛筆圖標)。 然后,編輯 Spark 的 master 域。 從?local[*]?改為上面的URI,上面的是?spark://ubuntu:7077。
點擊?Save?(保存)更新參數, 然后在詢問是否需要重啟interpreter時點擊?OK。
現在滾動頁面到 Flink 一節。點擊edit按鈕,將?host?的值從?local?改為?localhost. 點擊?Save?保存。
重新打開 examples ,然后重新運行。 (屏幕上方點擊 play 按鈕,或者在每一paragraph點擊play按鈕來運行,或者按Enter+Shift組合鍵)。
你可以去檢查 Flink 和 Spark 的webui界面 (譬如上面的 http://yourip:8081, http://yourip:8082, http://yourip:8083),可以看到任務在集群上運行。
題外話-關于服務的端口
為什么要用 'something like', 而不是精確的 web-ui 端口呢?因為這依賴于你啟動時的設置。Flink 和 Spark 將缺省啟動web-ui 在端口8080, 如果被占用就尋找下一個可用的端口。
因為 Zeppelin 第一個啟動,缺省將占用端口 8080。當 Flink 啟動時, 將試圖使用端口 8080, 如果不可用,則使用下一個,如 8081。Spark 的 webui界面分為 master 和 slave, 啟動時將試圖綁定端口 8080,但該端口已經被Zeppelin占用), 然后將使用8081 (但已被 Flink的 webui占用), 然后使用 8082。
如果一切完全如上述運行, webui的端口將會是 8081 和 8082。但是,如果運行了其他程序或者啟動過程由其它的集群管理程序控制,情況可能就與預期的不同,尤其是在啟動大量節點的情況下。
可以通過啟動參數來指定webui服務綁定的端口 (在啟動 Flink 和 Spark時,在命令行加上參數?--webui-port <port>?,這里?<port>?為webui使用的端口。 也可以在配置文件中指定端口,具體方法參考官方網站文檔,這里不再贅述。
Zeppelin 0.6.2使用Spark的yarn-client模式
Zeppelin版本0.6.2
1. Export SPARK_HOME
In?conf/zeppelin-env.sh, export?SPARK_HOME?environment variable with your Spark installation path.
You can optionally export HADOOP_CONF_DIR and SPARK_SUBMIT_OPTIONS
export SPARK_HOME=/usr/crh/4.9.2.5-1051/spark export HADOOP_CONF_DIR=/etc/hadoop/conf export JAVA_HOME=/opt/jdk1.7.0_79這兒雖然添加了SPARK_HOME但是后面使用的時候還是找不到包。
2. Set master in Interpreter menu
After start Zeppelin, go to?Interpreter?menu and edit?master?property in your Spark interpreter setting. The value may vary depending on your Spark cluster deployment type.
spark解釋器設置為yarn-client模式
?
FAQ
1.
ERROR [2016-07-26 16:46:15,999] ({pool-2-thread-2} Job.java[run]:189) - Job failed java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror;at org.apache.spark.repl.SparkILoop.<init>(SparkILoop.scala:936)at org.apache.spark.repl.SparkILoop.<init>(SparkILoop.scala:70)at org.apache.zeppelin.spark.SparkInterpreter.open(SparkInterpreter.java:765)at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:69)at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93)at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:341)at org.apache.zeppelin.scheduler.Job.run(Job.java:176)at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:139)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)at java.util.concurrent.FutureTask.run(FutureTask.java:262)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)at java.lang.Thread.run(Thread.java:745)Solution
把SPARK_HOME/lib目錄下的所有jar包都拷到zeppelin的lib下。
2.
%spark.sql
show tables
Solution
hadoop fs -chown root:hdfs /user/root3.
import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.ml.feature.RFormula import org.apache.spark.ml.regression.LinearRegression conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@6a79f5df sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@59b2aabc spark: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@129d0b9b org.apache.spark.sql.AnalysisException: Specifying database name or other qualifiers are not allowed for temporary tables. If the table name has dots (.) in it, please quote the table name with backticks (`).;at org.apache.spark.sql.catalyst.analysis.Catalog$class.checkTableIdentifier(Catalog.scala:97)at org.apache.spark.sql.catalyst.analysis.SimpleCatalog.checkTableIdentifier(Catalog.scala:104)at org.apache.spark.sql.catalyst.analysis.SimpleCatalog.lookupRelation(Catalog.scala:134)at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:257)at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:268)at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:264)at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249) val dataset = spark.sql("select knife_dish_power,penetration,knife_dish_torque,total_propulsion,knife_dish_speed_readings,propulsion_speed1 from `tbm.tbm_test` where knife_dish_power!=0 and penetration!=0")如上sql中給表名和庫名添加``。
然后又報如下錯:
import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.ml.feature.RFormula import org.apache.spark.ml.regression.LinearRegression conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@4dd69db0 sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@4072dd9 spark: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@238ac654 java.lang.RuntimeException: Table Not Found: tbm.tbm_testat scala.sys.package$.error(package.scala:27)at org.apache.spark.sql.catalyst.analysis.SimpleCatalog.lookupRelation(Catalog.scala:139)at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:257)at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:268)原因:我用的是org.apache.spark.sql.SQLContext對象spark查詢hive中的數據,查詢hive的數據需要org.apache.spark.sql.hive.HiveContext對象sqlContext或sqlc。
實例:
?
順便記錄一下spark-shell使用HiveContext:
集群環境是HDP2.3.4.0
spark版本是1.5.2
spark-shell scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) scala> hiveContext.sql("show tables").collect().foreach(println) [gps_p1,false] scala> hiveContext.sql("select * from g").collect().foreach(println) [1,li] [1,li] [1,li] [1,li] [1,li]4.
import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.ml.feature.RFormula import org.apache.spark.ml.regression.LinearRegression conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@4d66e4f8 org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at: org.apache.spark.SparkContext.<init>(SparkContext.scala:82) $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46) $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:51) $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:53) $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55) $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57) $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59) $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:61) $iwC$$iwC$$iwC$$iwC.<init>(<console>:63) $iwC$$iwC$$iwC.<init>(<console>:65) $iwC$$iwC.<init>(<console>:67) $iwC.<init>(<console>:69) <init>(<console>:71) .<init>(<console>:75) .<clinit>(<console>) .<init>(<console>:7) .<clinit>(<console>) $print(<console>)Solution:
val conf = new SparkConf().setAppName("test").set("spark.driver.allowMultipleContexts", "true")val sc = new SparkContext(conf)val spark = new SQLContext(sc)在上面添加set("spark.driver.allowMultipleContexts", "true")。
轉載于:https://my.oschina.net/hblt147/blog/2907137
總結
以上是生活随笔為你收集整理的Zeppelin-在Flink和Spark集群的安装的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 萌新必看——10种客户端存储哪家强,一文
- 下一篇: 如何在APP中集成Google账户登录