spark杂记2
#######################################################
Ctrl+Alt+T:生成try catch
Ctrl+N:查找類(enter class name);Ctrl+shift+N:查找文件(enter file name);Ctrl+shift+alt+N:查找文件(enter file name);
Ctrl + F:當前文件查找特定文字、代碼等內容
Ctrl + shift + F:當前項目中查找特定的文字、代碼等內容。(edit—find—find in path)
#######################################################
spark[源碼]-sparkContext詳解[一]
https://www.cnblogs.com/chushiyaoyue/p/7468952.html
Spark :Master、Worker、Driver、Executor工作流程詳解
https://blog.csdn.net/weixin_38750084/article/details/83025172
大量數據去重:Bitmap和布隆過濾器(Bloom Filter)
https://blog.csdn.net/zdxiq000/article/details/57626464
BloomFilter(大數據去重)+Redis(持久化)策略
https://blog.csdn.net/qq_18495465/article/details/78500472
Spark核心技術原理透視一(Spark運行原理)
https://www.jianshu.com/p/1b5f97d5a22a
Spark 核心篇-SparkContext
https://www.cnblogs.com/xia520pi/p/8609602.html
大數據技術,Spark核心技術之運行原理
https://blog.51cto.com/13854477/2347535?source=dra
Spark基本架構及運行原理
https://blog.csdn.net/zxc123e/article/details/79912343
Spark運行原理【史上最詳細】
https://blog.csdn.net/lovechendongxing/article/details/81746988
Spark Mllib
https://www.cnblogs.com/dadadechengzi/p/6993757.html
Spark MLlib機器學習算法、源碼及實戰詳解 書籍及代碼.zip
https://download.csdn.net/download/zjm362/10205368
基于spark和sparkstreaming的word2vec
https://www.cnblogs.com/ulysses-you/p/6863585.html
基于spark word2vec實踐
https://blog.csdn.net/hjj974834257/article/details/79089686
word2vec學習 spark版
https://www.cnblogs.com/aezero/p/4586605.html
spark Word2Vec+LSH相似文本推薦(scala)
https://blog.csdn.net/u013090676/article/details/82716911
Spark MLlib 機器學習
https://www.cnblogs.com/swordfall/p/9456222.html
http://www.r66r.net/(hadoop筆記本【真心贊】)
http://leezk.com/ (有推薦書籍和一些不錯的文章)
#######################################################
RDD(Resilient Distributed Datasets)彈性分布式數據集:
分區,只讀,不可變,并行
RDD[T]
RDD[(t,s)]
有向無環圖(DAG)
窄依賴:一個父分區只有一個子分區(可以多個父對應一個子)
寬依賴:一個父分區有多個子分區
Shuffle:含義就是洗牌,將數據打散,父RDD一個分區中的數據如果給了子RDD的多個分區(只要存在這種可能),就是shuffle。Shuffle會有網絡傳輸數據,但是有網絡傳輸,并不意味著就是shuffle。
join可能是寬依賴也可能是窄依賴
#######################################################
sparkContext構建的頂級三大核心:
①DAGScheduler:面向Job的Stage的高層調度器
②TaskScheduler:一個接口,是低層調度器,根據具體的ClusterManager的不同會有不同的實現。Standalone模式下具體實現的是TaskSchedulerlmpl
③SchedulerBackend:一個接口,根據具體的ClusterManger的不同會有不同的實現,Standalone模式下具體的實現是SparkDeloySchedulerBackend
#######################################################
transformation/轉換
action/行動
#######################################################
################面試題總結##########################
1.rdd有幾種操作類型?
①transformation 轉換操作 (rdd →rdd)
②action 行動操作 (rdd →結果集)
③controller 控制算子 (對性能效率和容錯方面的支持:persist,cache,checkpoint)
2.寬窄依賴的區別?
寬依賴:多個子RDD的Partition會依賴同一個父RDD的Partition
窄依賴:每一個父RDD的Partition最多被子RDD的一個Partition使用
3.cache和persist的區別?
cache:緩存數據,默認是緩存在內存中,本質是調用persist
persist:緩存數據,可以指定緩存策略(MEMORY_ONLY,MEMORY_AND_DISK,等等)
4.spark的有幾種部署模式?
①本地模式
②standalone模式
③spark on yarn模式:
分布式部署集群,資源和任務監控交給yarn管理,cluster和client
④mesos模式:
粗粒度模式(Coarse-grained Mode)
細粒度模式(Fine-grained Mode)
5.Spark為什么比mapreduce快?
①基于內存計算,減少低效的磁盤交互;
②高效的調度算法,基于DAG;
③容錯機制Linage;
6.spark有哪些組件?
①master:管理集群和節點,不參與計算
②worker:計算節點,進程本身不參與計算,和master匯報
③Driver:運行程序的main方法,創建sparkContext對象
④sparkContext: 控制整個Application的生命周期,包括dagscheduler和taskscheduler
⑤client:客戶端,用戶提交程序的入口
⑥Executor:執行器,在worker上執行任務的組件,用于啟動線程池運行任務
⑦RDD: spark基本計算單元
⑧DAG Scheduler: 根據job構建基于stage的DAG,并提交stage給TaskScheduler
⑨TaskScheduler: 將任務(task)分發給Executor
⑩SparkEnv: 線程級別的上下文,存儲運行時的重要組件的引用
(11)BlockManager:負責存儲管理,創建和查找塊
(12)SparkConf:負責存儲配置信息
################面試題總結##########################
#######################################################
#######################################################
################hbase##########################
hbase(main):005:0> scan 'label_event_index_table',{LIMIT=>1}
ROW COLUMN+CELL
10H20190701101233N0 column=6:1092078ccf0604e3b49abfdfb2b5b46d681, timestamp=1561947156974, value=0.027:1:1561614000000:1092078ccf
0604e3b49abfdfb2b5b46d681:6:{"sourceIDName":"109_|_|YesxE5xA8xB1xE4xB9x90xE7x8ExB0xE5x9CxBA","org
Categ":"xE5xA8xB1xE4xB9x90"}
10H20190701101233N0 column=6:1098aca82c9389f39b8afe70099b18107cd, timestamp=1561947156974, value=0.027:3:1561633525000:1098aca82c
9389f39b8afe70099b18107cd:6:{"sourceIDName":"109_|_|xE5xA2xA8xE5x85xAExE7x9Ax84xE7x88xB1xE6x84
x8F","orgCateg":"xE5xA8xB1xE4xB9x90"}
hbase(main):006:0> get 'label_event_index_table','10H20190701101233N0'
COLUMN CELL
6:1092078ccf0604e3b49abfdfb2b5b46d681 timestamp=1561947156974, value=0.027:1:1561614000000:1092078ccf0604e3b49abfdfb2b5b46d681:6:{"sourceIDName":"1
09_|_|YesxE5xA8xB1xE4xB9x90xE7x8ExB0xE5x9CxBA","orgCateg":"xE5xA8xB1xE4xB9x90"}
6:1098aca82c9389f39b8afe70099b18107cd timestamp=1561947156974, value=0.027:3:1561633525000:1098aca82c9389f39b8afe70099b18107cd:6:{"sourceIDName":"1
09_|_|xE5xA2xA8xE5x85xAExE7x9Ax84xE7x88xB1xE6x84x8F","orgCateg":"xE5xA8xB1xE4xB9x90"}
hbase存儲結構:
RowKey:是Byte array,是表中每條記錄的“主鍵”,方便快速查找,Rowkey的設計非常重要;
Column Family:列族,擁有一個名稱(string),包含一個或者多個相關列;
Column:屬于某一個columnfamily,familyName:columnName,每條記錄可動態添加;
Version Number:類型為Long,默認值是系統時間戳,可由用戶自定義;
Value(Cell):Byte array。
################hbase##########################
#######################################################
#######################################################
################redis##########################
NoSQL(Not Only SQL):泛指非關系型數據庫
CAP定理:
Consistency(一致性), 數據一致更新,所有數據變動都是同步的;
Availability(可用性), 好的響應性能;
Partition tolerance(分區容錯性) 可靠性;
定理:任何分布式系統只可同時滿足二點,沒法三者兼顧
CA:傳統Oracle數據庫
AP:大多數網站架構的選擇
CP:Redis、Mongodb
DCS,即一種分布式緩存數據庫服務,將現在很火的幾類內存數據庫Redis、Memcached和內存數據網格進行包裝,提供即開即用、安全可靠、彈性擴容、便捷管理的在線分布式緩存能力
1.redis:
redis是一個開源的、使用C語言編寫的、支持網絡交互的、可基于內存也可持久化的Key-Value數據庫(非關系性數據庫)
2.優點:
①速度快,因為數據存在內存中,類似于HashMap,HashMap的優勢就是查找和操作的時間復雜度都是O(1)
②支持豐富數據類型,支持string,list,set,sorted set,hash
③支持事務,操作都是原子性,所謂的原子性就是對數據的更改要么全部執行,要么全部不執行
④豐富的特性:可用于緩存,消息,按key設置過期時間,過期后將會自動刪除
3.redis數據類型:
①字符串(string)
set key value
get key
exists key //key是否存在
②哈希(hash)
hset hashKey key1 value1 key2 value2
hget hashkey key1
③集合(set)
sadd setKey value
scard setKey //返回集合中元素數量
sismember setKey value //查看value是否在集合setKey中
srem setKey value //從集合setKey中刪除value
④列表(list)
lpush list value
rpop list
llen list
⑤有序集合(sort set)
zadd zset1 key1 value1
zcard zset1 //統計zset1下key的個數
zrank zset1 value2 //查看value2在zset1中排名位置
zrange zset1 0 2 withscores //查看0到2的所有值和分數按照排名
################redis##########################
#######################################################
#####################################################################
Spark運行模式:
1. local: 本地線程方式,主要用于開發調試
Hadoop YARN: 集群運行在Yarn資源管理器上,資源管理交給Yarn,spark只負責進行任務調度和計算
2. 各Spark應用程序以相互獨立的進程集合運行于集群之上,由SparkContext對象進行協調,SparkContext對象可以視為Spark應用程序的入口,被稱為driver program,SparkContext可以與不同種類的集群資源管理器(Cluster Manager),例如Hadoop Yarn、Mesos等 進行通信,從而分配到程序運行所需的資源,獲取到集群運行所需的資源后,SparkContext將得到集群中其它工作節點(Worker Node) 上對應的Executors (不同的Spark應用程序有不同的Executor,它們之間也是獨立的進程,Executor為應用程序提供分布式計算及數據存儲功能),之后SparkContext將應用程序代碼分發到各Executors,最后將任務(Task)分配給executors執行。
3. RDD操作包含 Transformations和Action;所有的transformation都是lazy的
#######################################################
application
job
stage
task
master
worker
executor
driver
碰到第一個action算子的時候,相當于觸發了sc.runjob方法執行
DAGScheduler工作:從action算子開始,到把stage變成TaskSet提交到taskScheduler中去執行結束
###########################################################
distinct,groupByKey,reduceByKey,aggregateByKey,join,cogroup,repartition
數據傾斜的解決方案:
①使用Hive ETL預處理數據
從根源上提前處理好hive表中的數據;
(缺點:治標不治本,spark程序沒有解決數據傾斜的能力)
②過濾少數導致傾斜的key
③提高shuffle的并行度
實現簡單,增加shuffle read task數量
(缺點:緩解傾斜)
④兩階段聚合(局部聚合+全局聚合)
第一階段隨機打亂,比如key可以加上前綴,將其分開;第二步再將前綴去掉
(對于聚合類的shuffle操作,最好的解決方案)
⑤將reduce join轉為 map join
//reduce join 通用的join實現;但容易產生數據傾斜
rdd1.join(rdd2)
//map join 完美的避開了shuffle階段,所以沒有數據傾斜;但適用場景有限,只適合大小表做關聯
val bc=sc.broadCast(rdd1.toList)
rdd1.foreachPartition(data =>{
val data2=bc.value
val data1=data
data1.join(data2)
})
⑥采樣傾斜key并分拆join操作
將出現數據傾斜的key的所有數據,形成單獨的數據集
⑦使用隨機前綴和擴容的RDD進行join
⑧多種方案組合使用
#########################################################################
sc.read.format("json").load("/test/")
sc.read.json("test/")
df.write.json("test/")
df.write.format("json").save("/test/")
sc.read.load("test/xxx.parquet") //默認parquet格式,等價下一
sc.read.format("parquet").load("test/")
#######################################################################
①rdd.toDF
②sqlContext.createDataFrame(rdd)
③
#####################################################################
Block
輸入可能以多個文件的形式存儲在HDFS上,每個File都包含了很多塊,稱為Block。
InputSplit
當Spark讀取這些文件作為輸入時,會根據具體數據格式對應的InputFormat進行解析,一般是將若干個Block合并成一個輸入分片,稱為InputSplit,注意InputSplit不能跨越文件。
隨后將為這些輸入分片生成具體的Task。InputSplit與Task是一一對應的關系。
job
在spark rdd中,有action、transform操作,當真正觸發action時,才真正執行計算,此時產生一個job任務
stage
以shuffle為界,當在一個job任務中涉及shuffle操作時,會進行stage劃分,產生一個或多個stage。
Stage概念是spark中獨有的。一般而言一個Job會切換成一定數量的stage。各個stage之間按照順序執行。至于stage是怎么切分的,首選得知道spark論文中提到的narrow dependency(窄依賴)和wide dependency( 寬依賴)的概念。其實很好區分,看一下父RDD中的數據是否進入不同的子RDD,如果只進入到一個子RDD則是窄依賴,否則就是寬依賴。寬依賴和窄依賴的邊界就是stage的劃分點
task
一個stage可能包含一個或者多個task任務,task任務與partition、executor息息相關,即并行度。
Task是Spark中最新的執行單元。RDD一般是帶有partitions的,每個partition的在一個executor上的執行可以任務是一個Task。
partition
partition個數即rdd的分區數,不同的數據源讀進來的數據分區數默認不同,可以通過repartition進行重分區操作。
executor
executor運行在work上,一個work可以運行一個或多個executor,一個executor可以運行一個或者多個task(取決于executor的core個數,默認是一個task占用一個core,即有多少個core就可以啟動多少個task任務)
Application
application(應用)其實就是用spark-submit提交的程序。比方說spark examples中的計算pi的SparkPi。一個application通常包含三部分:從數據源(比方說HDFS)取數據形成RDD,通過RDD的transformation和action進行計算,將結果輸出到console或者外部存儲(比方說collect收集輸出到console)。
Driver
Spark中的driver感覺其實和yarn中Application Master的功能相類似。主要完成任務的調度以及和executor和cluster manager進行協調。有client和cluster聯眾模式。client模式driver在任務提交的機器上運行,而cluster模式會隨機選擇機器中的一臺機器啟動driver。從spark官網截圖的一張圖可以大致了解driver的功能。
######################################################################
schema: 表的字段的定義(名稱,類型);當前這個表的數據存儲目錄
data:真實數據
離線計算
批任務
實時處理
流式處理
################################
storm
嚴格的一條數據計算一次,流式處理,但不一定是實時處理
實時要求可以很高,吞吐量低
sparkStreaming
一批數據計算一次(每個批次的時間間隔用戶自由設置)
實時要求低一點,吞吐量高一點(折中一點)
流式計算是離線計算的一個特例
flink
################################
mapreduce: 每隔一段時間,執行一次任務,對兩次任務之間的數據進行累積
storm: 一條一條的執行計算,(流式處理的核心思路);延遲低,亞秒級;數據消費至少一次(trident API 有且僅一次)
sparkStreaming: 很小的一批批的取執行計算(常見在1s到5min);偽實時;基于sparkCore,基于離線處理;有且僅一次
flink: 把離線處理看做是流式處理的一個特例,離散的批處理,基于流式處理;有且僅一次
#################################################
節點 進程 線程
spark worker executor task
storm supervisor worker executor
###################################################
一站式通用解決方案
DStream:
離散的RDD組成
UpdateStateByKey
輸入數據流 → (接收器) →結果輸出
有狀態計算
無狀態計算
#####################################################
窗口長度
滑動周期
二者都要是時間片的整數倍
每隔多長時間(滑動周期)計算過去多長時間(窗口長度)的數據
###################################################
package com.huawei.rcm.newsfeed.test
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
object test38 {
case class Fruit(name: String,age:Int,address:String)
def main(args: Array[String]): Unit = {
val ss = SparkSession.builder()
.config("Spark SQL basic example","some-value")
.master("local[2]")
.appName("hello")
//.enableHiveSupport()
.getOrCreate()
//enableHiveSupport,必須要有配置文件hive-site.xml
val sc = ss.sparkContext
val sqlContext =new SQLContext(sc)
ss.sparkContext.setLogLevel("ERROR")
val rdd=sc.parallelize(List(("apple",1,"China"),("strawberry",2,"China"),("banana",3,"America"),("orange",4,"France")))
val fruitRDD: RDD[Fruit] = rdd.map(x=>Fruit(x._1,x._2,x._3))
import ss.implicits._
val fruitDF=fruitRDD.toDF("name","age","Address")
fruitDF.show()
//sql風格
fruitDF.registerTempTable("fruitTable")
val resutDF: DataFrame = sqlContext.sql("select * from fruitTable where age >2")
resutDF.show()
val resultDF2: DataFrame = ss.sql("select * from fruitTable where age >=2")
resultDF2.show()
sc.stop()
}
}
###################################################
mapreduce 典型的離線處理計算引擎
storm 流失處理計算引擎
spark 離線處理+流式處理;基于離線處理的執行引擎來設計的;把流式處理看作是離線處理的特例
flink 流式處理+批處理;基于流式處理的執行引擎來進行設計的; 把離線處理看作是流式處理的特例
#############################################
storm核心概念:
topology 一個storm的應用程序
spout 數據源
bolt 數據處理組件
tuple 一個消息一條記錄
StreamingGrouping 分組規則
##############################################
實時流式處理的大致架構:
tomcat → log4j → logFile(實時監聽) → flume(實時收集) → kafka(緩沖作用,上游收集和下游消費的速度調節) →計算引擎(實時計算/流式計算) → redis/mysql/hbase
離線:
flume → hdfs → mapreduce/hive → hbase/mysql/hdfs/redis
流式:
flume → kafka → storm/sparkStreaming → redis/mysql/hbase
flume既能做到監控文件的變化(tail -F exec),也能由事件進行驅動收集(spooldir)
##############################################
connection 不能序列化
所以connection在driver端,而寫入數據在executor端,
連接池:第一次訪問的時候,需要建立連接。 但是之后的訪問,均會復用之前創建的連接
##########################################
checkpoint
1.checkpoint的數據類型:
元數據
RDD數據
2.合適啟用checkpoint
有狀態計算:updateStateByKey,window
如果有需要對diver進行HA
3.如何配置checkpoint
streamingContext.checkpoint(hdfspath)
def functionToCreateStreamingContext():StreamingContext
StreamingContext.getOrCreate(chkDir,functionToCreateStreamingContext)
###############################################
對數據的消費有三種語義:
at most once 最多一次,有可能會漏消費
at least once 最少一次,有可能重復消費
exactly once 有且僅有一次,效率低
#################################################
每一個stage中的task數量,都是有這個stage中最后一個RDD的分區數巨鼎
#####################################################
Resilient Distributed Dataset
彈性(可在內存/磁盤,分區可變)分布式數據集
transformation/轉換
延遲計算,rdd → rdd
action/行動
觸發sparkcontext提交job作業,rdd → 輸出結果
寬依賴:指的是多個子RDD的Partition會依賴同一個父RDD的Partition
窄依賴:指的是每一個父RDD的Partition最多被子RDD的一個Partition使用
#############################################
在基于standalone的Spark集群,Cluster Manger就是Master。Master負責分配資源,在集群啟動時,Driver向Master申請資源,Worker負責監控自己節點的內存和CPU等狀況,并向Master匯報
每個worker可以起一個或多個Executor
每個Executor由若干core組成,每個Executor的每個core一次只能執行一個Task
一個application通過action劃分不同job,在job中最后一個算子往前推按寬依賴劃分不同stage
通過DAGScheduler劃分階段,形成一系列的TaskSet,然后傳給TaskScheduler,把具體的Task交給Worker節點上的Executor的線程池處理。線程池中的線程工作,通過BlockManager來讀寫數據。
######################################################
密集向量dense
稀疏向量sparse
############################################
需求:有兩份數據集,一份是邊,一份是點。求點的PageRank
格式:
邊:sourceID destID
點:name sourceID
總結
- 上一篇: 全连接层有何作用?
- 下一篇: 家庭网络-软路由搭建方案