日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark(十二) -- Spark On Yarn Spark as a Service Spark On Tachyon

發布時間:2023/12/19 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark(十二) -- Spark On Yarn Spark as a Service Spark On Tachyon 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Spark On Yarn:

從0.6.0版本其,就可以在在Yarn上運行Spark
通過Yarn進行統一的資源管理和調度
進而可以實現不止Spark,多種處理框架并存工作的場景

部署Spark On Yarn的方式其實和Standalone是差不多的,區別就是需要在spark-env.sh中添加一些yarn的環境配置,在提交作業的時候會根據這些配置加載yarn的信息,然后將作業提交到yarn上進行管理

首先請確保已經部署了Yarn,相關操作請參考:

hadoop2.2.0集群安裝和配置

部署完成之后可以通過
yarn-master:8088
查看yarn的web管理界面
yarn-master為配置的yarn主機名或ip地址

Spark的一些配置如下:
修改spark-env.sh文件
必須添加的是
HADOOP_CONF_DIR 或者 YARN_CONF_DIR指向hadoop的conf配置文件目錄

其余的和Spark Standalone部署是一樣的,具體請參考:

Spark(一)– Standalone HA的部署

另外,可以通過
SPARK_YARN_USER_ENV
來配置要傳給Spark進程的環境變量,如JAVA_HOME等

通過export SPARK_JAR=hdfs://some/path
來將jar文件放在全局可讀的HDFS上,緩存在各個節點中,這樣一來,運行應用時就無需每次都分發jar文件到各個節點上

兩種方式作業提交方式:
1.yarn-cluster
在spark目錄下執行:

./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster lib/spark-examples*.jar 10

來運行SparkPi這個example

2.yarn-client

和之前的方式一模一樣,只是將yarn-cluster換成yarn-client,如下:

./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client lib/spark-examples*.jar 10

兩種方式的區別:
client方式下,Spark的Driver會在客戶端進程中,Application Master僅僅是向Yarn申請資源,同時會在客戶端(終端)上打印出具體的執行log

cluster方式下,Driver會在Application Master進程中運行,受到Yarn的管理。客戶端在應用初始化之后就可以脫離,這時候在客戶端不能收到執行的log信息,但是可以通過Yarn的WebUI來查看作業的運行情況

Spark On Yarn作業的提交方式和Standalone相比僅僅是將–master這個參數由具體的spark主節點,換成了yarn-cluster/client

Spark as a Service:

將部署好的Spark集群作為一種服務通過REST接口向外提供
這就很像云計算模型

我們將Spark集群部署好,將適用于各種場景作業的jar包分配上去,而外面的人通過REST接口來調用我們提供的各種服務,這就是Spark as a Service

其中典型的實現是JobServer

JobServer其實就是一套軟件,將其下載下來之后部署在Spark集群上

它會想外界提供REST接口,Spark上的各個資源都可以通過一個唯一的URL來訪問

構架圖如下:

特性
“Spark as a Service”: 簡單的面向job和context管理的REST接口
通過長期運行的job context支持亞秒級低延時作業(job)
可以通過結束context來停止運行的作業(job)
分割jar上傳步驟以提高job的啟動
異步和同步的job API,其中同步API對低延時作業非常有效
支持Standalone Spark和Mesos
Job和jar信息通過一個可插拔的DAO接口來持久化
命名RDD以緩存,并可以通過該名稱獲取RDD。這樣可以提高作業間RDD的共享和重用

部署JobServer需要sbt

JobServer下載地址

裝好sbt之后,將JobServer解壓,進入其根目錄
敲sbt
進入sbt命令之后(第一次啟動要下載很多jar包,可能會因為網絡的問題卡很久。。)
執行

re-start --- -Xmx4g

此時會下載spark-core,jetty和liftweb等相關模塊
完成之后可以通過訪問http://localhost:8090 可以看到Web UI

相關的API如下:

curl --data-binary @job-server-tests/target/job- server-tests-0.3.1.jar localhost:8090/jars/test //運行指定的jar包curl localhost:8090/jars/ //查看提交的jarcurl -d "input.string = hello job server" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample' //提交的appName為test,class為spark.jobserver.WordCountExamplecurl localhost:8090/jobs/34ce0666-0148-46f7-8bcf-a7a19b5608b2 curl localhost:8090/jobs/34ce0666-0148-46f7-8bcf-a7a19b5608b2/config //通過job-id查看結果和配置信息curl -d "input.string = hello job server" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample&sync=true' //sync=true會直接將執行接口返回,如果沒有設置,那么將會分配一個jobId,等作業完成后可以通過jobId在查看信息curl -d "" ‘localhost:8090/contexts/test-context? num-cpu-cores=4&mem-per-node=512m' //啟動一個contextcurl localhost:8090/contexts //查詢所有的contextcurl -d "input.string = a b c a b see" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample&context=test-context&sync=true' //在某個指定的context上執行作業

配置文件:

vim spark-jobserver/config/local.conf.template master = "local[4]"//將這里改為集群的地址jobdao = spark.jobserver.io.JobFileDAOfiledao {rootdir = /tmp/spark-job-server/filedao/data} //數據對象的存儲方法和存儲路徑context-settings {num-cpu-cores = 2 memory-per-node = 512m} //context的默認設置,如果在REST接口中顯示的指明了context的配置,那么這里將會被覆蓋POST /contexts/my-new-context?num-cpu-cores=10 //在REST中設置一個新的context,以參數的形式放在url中

JobServer部署:

復制con?g/local.sh.template到env.sh ,并且設置相關參數如:指定安裝路徑,Spark Home, Spark Conf等。

DEPLOY_HOSTS="spark1spark2spark3" APP_USER=spark APP_GROUP=spark INSTALL_DIR=/home/spark/jobserver LOG_DIR=/home/spark/jobserver/log PIDFILE=spark-jobserver.pid SPARK_HOME=/home/spark/spark SPARK_CONF_HOME=/home/spark/spark/conf

修改project/Dependencies.scala。重新指定spark版本為當前的版本

lazy val sparkDeps = Seq(
“org.apache.spark” %% “spark-core” % “1.3.1” ……

運?行bin/server_deploy.sh env(或者直接將env.sh的絕對路徑寫進server_deploy.sh這樣就不用再傳參數了)
打好包后與相關配置?一起放到指定服務器的指定目錄

啟動:

需要把config下的local.conf復制到INSTALL_DIR下面,改名為local.conf,并修改其中的master以及兩個路徑。

jar-store-rootdir = /var/lib/spark/jars
rootdir = /var/lib/spark/filedao

進?入服務器指定指定目錄,運?行server_start.sh

如果啟動有問題可以試試把cfg.sh 拷貝到 spark-job-server目錄下 改名為 settings.sh

創建JobServer工程:

在idea中新建SBT工程
在.sbt文件中添加以下內容

name := "job server demo" version := "1.0" scalaVersion := "2.10.4" resolvers += "Ooyala Bintray" at "http://dl.bintray.com/ooyala/maven" libraryDependencies += "ooyala.cnd" % "job-server" % "0.3.1" % "provided" libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.0.0"

繼承SparkJob,重寫validate與runJob
validate就是一個執行一系列驗證的方法,執行的時候先看一下validate的驗證對不對
runJob執行作業的邏輯

import com.typesafe.config.{Config, ConfigFactory} import org.apache.spark._ import org.apache.spark.SparkContext._ import scala.util.Try import spark.jobserver.SparkJob import spark.jobserver.SparkJobValidation import spark.jobserver.SparkJobValid import spark.jobserver.SparkJobInvalidobject WordCount extends SparkJob{ def main(args: Array[String]) {val sc = new SparkContext("local[4]", "WordCountExample")val config = ConfigFactory.parseString("")val results = runJob(sc, config)println("Result is " + results)}override def validate(sc: SparkContext, config: Config): SparkJobValidation = {Try(config.getString("input.string")).map(x => SparkJobValid).getOrElse(SparkJobInvalid("No input.string config param"))}override def runJob(sc: SparkContext, config: Config): Any = {val dd = sc.parallelize(config.getString("input.string").split(" ").toSeq)val rsList = dd.map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(false).collectrsList(0)._2} }

生成jar包并提交

curl --data-binary @/root/install-pkg/job-server-demo_2.10-1.0.jar localhost:8090/jars/example

測試

curl -i -d "input.string=a a a b b c" 'localhost:8090/jobs?appName=example&classPath=com.persia.spark.WordCount'HTTP/1.1 202 Accepted Server: spray-can/1.2.0 Date: Sat, 12 Jul 2014 09:22:26 GMT Content-Type: application/json; charset=UTF-8 Content-Length: 150{"status": "STARTED","result": {"jobId": "b5b2e80f-1992-471f-8a5d-44c08c3a9731","context": "6bd9aa29-com.persia.spark.WordCount"} }

使用命名RDD:

object MyNamedRDD extends SparkJob with NamedRDDSuport //繼承SparkJob并混入NamedRDDSuport特質之后寫自己的NamedRDDthis.namedRDDs.update("myrdd",myrdd) //以鍵值對的形式將自定義的命名RDD緩存起來val myrdd = this.namedRDDs.get[(String,String)]("myrdd").get //將緩存的RDD拿出來//命名RDD可以用于有點類似于Session的作用

Spark On Tachyon:

什么是Tachyon?

來看看傳統的Spark不同job之間,不同的框架是如何共享數據的

通過不斷的讀取HDFS來實現數據的共享,HDFS是什么?是一種分布式的文件系統啊,說到底就是硬盤。那么問題就很明顯了,頻繁的磁盤IO操作,還有cache丟失,內存使用等問題

解決方案是什么?

就是Tachyon,一種分布式的內存?文件系統,注意內存兩個字,不同任務(框架)享受可靠快速的數據共享

于是BDAS變成了下面這種構架:

之前的問題解決方案:

Tachyon部署:

在命令行中下載Tachyon

wget https://github.com/amplab/tachyon/releases/download/v0.6.4/tachyon-0.6.4-bin.tar.gztar xvfz tachyon-0.6.4-bin.tar.gzcd tachyon-0.6.4

國內的網絡訪問不了的時候可以在這里下載:

Tachyon下載地址

1.Local模式:

cp conf/tachyon-env.sh.template conf/tachyon-env.sh./bin/tachyon format./bin/tachyon-start.sh local

可以通過 http://localhost:19999 WebUI查看

測試

./bin/tachyon runTest Basic CACHE_THROUGH./bin/tachyon runTests./bin/tachyon-stop.sh

2.Cluster模式:

在配置文件目錄下修改slaves
加入各個節點的主機名

tachyon-env.sh修改如下配置:

export TACHYON_MASTER_ADDRESS=spark1
export TACHYON_WORKER_MEMORY_SIZE=4GB
export TACHYON_UNDERFS_HDFS_IMPL=org.apache.hadoop.hdfs.DistributedFileSystem
export TACHYON_UNDERFS_ADDRESS=hdfs://spark1:9000

./bin/tachyon format./bin/tachyon-start.sh

可以通過 http://tachyon.master.spark1:19999 WebUI查看

測試

./bin/tachyon runTests

3.基于zookeeper的Master HA

確保在tachyon-env.sh中設置過
export TACHYON_UNDERFS_ADDRESS=hdfs://hostname:port

在TACHYON_JAVA_OPTS中加?
-Dtachyon.master.journal.folder=hdfs://hostname:port/
tachyon/journal/
-Dtachyon.usezookeeper=true
-Dtachyon.zookeeper.address=zkserver1:2181,zkserver2:2181,zkserver3:2181

4.Spark On Tachyon:

在Spark的conf目錄下新建core-site.xml,并加入以下內容(zk模式下,如果不是zk模式,將name替換為fs.tachyon.impl即可)

<configuration> <property> <name>fs.tachyon-ft.impl</name> <value>tachyon.hadoop.TFS</value> </property> </configuration>

如果運?行的是低于Spark1.0.版本,在spark.env.sh中加?入:
export SPARK_CLASSPATH=/pathToTachyon/client/target/tachyon-client-0.5.0-jar-with-dependencies.jar:$SPARK_CLASSPATH

在zk模式下,還需要在spark-env.sh中加入
以下內容:

export SPARK_JAVA_OPTS=" -Dtachyon.usezookeeper=true -Dtachyon.zookeeper.address=zkserver1:2181,zkserver2:2181,zkserver3:2181 $SPARK_JAVA_OPTS "

要在spark程序中使用 Tachyon需指定:
1、spark.tachyonStore.url
2、spark.tachyonStore.baseDir

如果不想每次手動輸入以上配置時,可以在spark的conf目錄下編輯spark-defaults.conf文件
將上面兩個配置加入去即可

spark.master spark://spark1:7077 spark.eventLog.enabled true spark.eventLog.dir hdfs://ns1/spark_event_log spark.tachyonStore.url tachyon://spark1:19998 spark.tachyonStore.baseDir /data/tachyon_tmp

spark程序存儲數據要指定OffHeap方式,說明數據不讓spark自己管理了,而是讓Tachyon接手

轉載于:https://www.cnblogs.com/jchubby/p/5449389.html

總結

以上是生活随笔為你收集整理的Spark(十二) -- Spark On Yarn Spark as a Service Spark On Tachyon的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。