《Pyflink》Flink集群安装,Python+Flink调研
Flink集群安裝,Python+Flink調(diào)研
Flink集群部署
下載對應(yīng)版本安裝包:https://flink.apache.org/downloads.html
實驗環(huán)境為hadoop2.7, scala2.11 所以下載flink-1.7.1-bin-hadoop27-scala_2.11.tgz
配置conf/flink-conf.yaml
jobmanager.rpc.address : master 節(jié)點 jobmanager.heap.mb : JobManager可用的內(nèi)存數(shù)量 taskmanager.heap.mb : 每個TaskManager可以用內(nèi)存數(shù)量 taskmanager.numberOfTaskSlots : 每個機器可用的CPU數(shù)量 parallelism.default : 集群中總的CPU數(shù)量 taskmanager.tmp.dirs : 臨時目錄配置conf/slaves
slave1 slave2點擊查看更多配置項
把在master上配置好的,文件夾發(fā)送到各個worker節(jié)點上
scp -r flink-1.7.1 hadoop@slavle1:~ scp -r flink-1.7.1 hadoop@slavle2:~啟動/終止 Flink
# 啟動一個JobManager,并通過SSH連接列在slaves文件中的所有節(jié)點以便在每個節(jié)點上啟動TaskManager flink-1.7.1/bin/start-cluster.sh # 停止flink集群,直接在master節(jié)點運行bin/stop-cluster.sh flink-1.7.1/bin/stop-cluster.sh啟動后在web界面輸入:master:8081 查看Web-UI
運行Python腳本
-
以官網(wǎng)的一個示例進行測試,可以復(fù)制粘貼這些代碼存儲為wordcount.py并在本地運行。
-
wordcount.py
from flink.plan.Environment import get_environment from flink.functions.GroupReduceFunction import GroupReduceFunctionclass Adder(GroupReduceFunction):def reduce(self, iterator, collector):count, word = iterator.next()count += sum([x[0] for x in iterator])collector.collect((count, word)) # 1. 獲取一個運行環(huán)境 env = get_environment() # 2. 加載/創(chuàng)建初始數(shù)據(jù) data = env.from_elements("Who's there?","I think I hear them. Stand, ho! Who's there?")# 3. 指定對這些數(shù)據(jù)的操作 data \.flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \.group_by(1) \.reduce_group(Adder(), combinable=True) \.output()# 4. 運行程序 env.execute(local=True) # 設(shè)置execute(local=True)強制程序在本機運行 -
執(zhí)行方法:
為了在Flink中運行計劃任務(wù),到Flink目錄下,運行/bin文件夾下的pyflink.sh腳本。對于python2.7版本,運行pyflink2.sh;對于python3.4版本,運行pyflink3.sh。包含計劃任務(wù)的腳本應(yīng)當作為第一個輸入?yún)?shù),其后可添加一些另外的python包,最后,在“-”之后,輸入其他附加參數(shù)。
./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]] -
針對上面的示例,在master-shell輸入:
flink-1.7.1/bin/pyflink.sh ./wordcount.py -
vi flink-1.7.1/log/flink-hadoop-taskexecutor-0-slave1.out查看輸出:
任務(wù)詳解
-
從示例程序可以看出,Flink程序看起來就像普通的python程序一樣。每個程序都包含相同的基本組成部分:
- 獲取一個運行環(huán)境
- 加載/創(chuàng)建初始數(shù)據(jù)
- 指定對這些數(shù)據(jù)的操作
- 指定計算結(jié)果的存放位置
- 運行程序
-
Environment(運行環(huán)境)是所有Flink程序的基礎(chǔ)。通過調(diào)用Environment類中的一些靜態(tài)方法來建立一個環(huán)境:
get_environment() -
運行環(huán)境可通過多種讀文件的方式來指定數(shù)據(jù)源。如果是簡單的按行讀取文本文件:
env = get_environment() text = env.read_text("file:///path/to/file")這樣,你就獲得了可以進行操作(apply transformations)的數(shù)據(jù)集。關(guān)于數(shù)據(jù)源和輸入格式的更多信息,請參考Data Sources
一旦你獲得了一個數(shù)據(jù)集DataSet,你就可以通過transformations來創(chuàng)建一個新的數(shù)據(jù)集,并把它寫入到文件,再次transform,或者與其他數(shù)據(jù)集相結(jié)合。你可以通過對數(shù)據(jù)集調(diào)用自己個性化定制的函數(shù)來進行數(shù)據(jù)操作。例如,一個類似這樣的數(shù)據(jù)映射操作:
data.map(lambda x: x*2)這將會創(chuàng)建一個新的數(shù)據(jù)集,其中的每個數(shù)據(jù)都是原來數(shù)據(jù)集中的2倍。若要獲取關(guān)于所有transformations的更多信息,及所有數(shù)據(jù)操作的列表,請參考Transformations。
-
當需要將所獲得的數(shù)據(jù)集寫入到磁盤時,調(diào)用下面三種函數(shù)的其中一個即可。
data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', write_mode=Constants.NO_OVERWRITE)output()其中,最后一種方法僅適用于在本機上進行開發(fā)/調(diào)試,它會將數(shù)據(jù)集的內(nèi)容輸出到標準輸出。(請注意,當函數(shù)在集群上運行時,結(jié)果將會輸出到整個集群節(jié)點的標準輸出流,即輸出到workers的.out文件。)前兩種方法,能夠?qū)?shù)據(jù)集寫入到對應(yīng)的文件中。關(guān)于寫入到文件的更多信息,請參考Data Sinks。
當設(shè)計好了程序之后,你需要在環(huán)境中執(zhí)行execute命令來運行程序。可以選擇在本機運行,也可以提交到集群運行,這取決于Flink的創(chuàng)建方式。你可以通過設(shè)置execute(local=True)強制程序在本機運行。
創(chuàng)建項目
-
除了搭建好Flink運行環(huán)境,就無需進行其他準備工作了。Python包可以從你的Flink版本對應(yīng)的/resource文件夾找到。在執(zhí)行工作任務(wù)時,Flink 包,plan包和optional包均可以通過HDFS自動分發(fā)。
Python API官方已經(jīng)在安裝了Python2.7或3.4的Linux/Windows系統(tǒng)上測試過。本次我是在安裝了Python3.6的Linux環(huán)境進行測試。
默認情況下,Flink通過調(diào)用”python”或”python3″來啟動python進程,這取決于使用了哪種啟動腳本。通過在 flink-conf.yaml 中設(shè)置 “python.binary.python[2/3]”對應(yīng)的值,來設(shè)定你所需要的啟動方式。
惰性評價
-
所有的Flink程序都是延遲執(zhí)行的。當程序的主函數(shù)執(zhí)行時,數(shù)據(jù)的載入和操作并沒有在當時發(fā)生。與此相反,每一個被創(chuàng)建出來的操作都被加入到程序的計劃中。當程序環(huán)境中的某個對象調(diào)用了execute()函數(shù)時,這些操作才會被真正的執(zhí)行。不論該程序是在本地運行還是集群上運行。
延遲求值能夠讓你建立復(fù)雜的程序,并在Flink上以一個整體的計劃單元來運行。
數(shù)據(jù)變換
- 數(shù)據(jù)變換(Data transformations)可以將一個或多個數(shù)據(jù)集映射為一個新的數(shù)據(jù)集。程序能夠?qū)⒍喾N變換結(jié)合到一起來進行復(fù)雜的整合變換。
該小節(jié)將概述各種可以實現(xiàn)的數(shù)據(jù)變換。transformations documentation數(shù)據(jù)變換文檔中,有關(guān)于所有數(shù)據(jù)變換和示例的全面介紹。
Map:輸入一個元素,輸出一個元素
data.map(lambda x: x * 2)FlatMap:輸入一個元素,輸出0,1,或多個元素
data.flat_map( lambda x,c: [(1,word) for word in line.lower().split() for line in x])MapPartition:通過一次函數(shù)調(diào)用實現(xiàn)并行的分割操作。該函數(shù)將分割變換作為一個”迭代器”,并且能夠產(chǎn)生任意數(shù)量的輸出值。每次分割變換的元素數(shù)量取決于變換的并行性和之前的操作結(jié)果。
data.map_partition(lambda x,c: [value * 2 for value in x])Filter:對每一個元素,計算一個布爾表達式的值,保留函數(shù)計算結(jié)果為true的元素。
data.filter(lambda x: x > 1000)Reduce:通過不斷的將兩個元素組合為一個,來將一組元素結(jié)合為一個單一的元素。這種縮減變換可以應(yīng)用于整個數(shù)據(jù)集,也可以應(yīng)用于已分組的數(shù)據(jù)集。
data.reduce(lambda x,y : x + y)ReduceGroup:將一組元素縮減為1個或多個元素。縮減分組變換可以被應(yīng)用于一個完整的數(shù)據(jù)集,或者一個分組數(shù)據(jù)集。
lass Adder(GroupReduceFunction): def reduce(self, iterator, collector):count, word = iterator.next()count += sum([x[0] for x in iterator) collector.collect((count, word))data.reduce_group(Adder())Aggregate:對一個數(shù)據(jù)集包含所有元組的一個域,或者數(shù)據(jù)集的每個數(shù)據(jù)組,執(zhí)行某項built-in操作(求和,求最小值,求最大值)。聚集變換可以被應(yīng)用于一個完整的數(shù)據(jù)集,或者一個分組數(shù)據(jù)集。
# This code finds the sum of all of the values in the first field and the maximum of all of the values in the second field data.aggregate(Aggregation.Sum, 0).and_agg(Aggregation.Max, 1)# min(), max(), and sum() syntactic sugar functions are also available data.sum(0).and_agg(Aggregation.Max, 1)Join:對兩個數(shù)據(jù)集進行聯(lián)合變換,將得到一個新的數(shù)據(jù)集,其中包含在兩個數(shù)據(jù)集中擁有相等關(guān)鍵字的所有元素對。也可通過JoinFunction來把成對的元素變?yōu)閱为毜脑亍jP(guān)于join keys的更多信息請查看 keys
# In this case tuple fields are used as keys. # "0" is the join field on the first tuple # "1" is the join field on the second tuple. result = input1.join(input2).where(0).equal_to(1)CoGroup:是Reduce變換在二維空間的一個變體。將來自一個或多個域的數(shù)據(jù)加入數(shù)據(jù)組。變換函數(shù)transformation function將被每一對數(shù)據(jù)組調(diào)用。關(guān)于定義coGroup keys的更多信息,請查看 keys 。
data1.co_group(data2).where(0).equal_to(1)Cross:計算兩個輸入數(shù)據(jù)集的笛卡爾乘積(向量叉乘),得到所有元素對。也可通過CrossFunction實現(xiàn)將一對元素轉(zhuǎn)變?yōu)橐粋€單獨的元素。
result = data1.cross(data2)Union:將兩個數(shù)據(jù)集進行合并。
data.union(data2)ZipWithIndex:為數(shù)據(jù)組中的元素逐個分配連續(xù)的索引。了解更多信息,請參考 【Zip Elements Guide】(zip_elements_guide.html#zip-with-a-dense-index).
data.zip_with_index()指定keys
-
一些變換(例如Join和CoGroup),需要在進行變換前,為作為輸入?yún)?shù)的數(shù)據(jù)集指定一個關(guān)鍵字,而另一些變換(例如Reduce和GroupReduce),則允許在變換操作之前,對數(shù)據(jù)集根據(jù)某個關(guān)鍵字進行分組。
數(shù)據(jù)集可通過如下方式分組
reduced = data \ .group_by(<define key here>) \ .reduce_group(<do something>)Flink中的數(shù)據(jù)模型并不是基于鍵-值對。你無需將數(shù)據(jù)集整理為keys和values的形式。鍵是”虛擬的”:它們被定義為在真實數(shù)據(jù)之上,引導(dǎo)分組操作的函數(shù)。
為元組定義keys
-
最簡單的情形是對一個數(shù)據(jù)集中的元組按照一個或多個域進行分組:
grouped = data \ .group_by(0) \ .reduce(/*do something*/)數(shù)據(jù)集中的元組被按照第一個域分組。對于接下來的group-reduce函數(shù),輸入的數(shù)據(jù)組中,每個元組的第一個域都有相同的值。
grouped = data \ .group_by(0,1) \ .reduce(/*do something*/)在上面的例子中,數(shù)據(jù)集的分組基于第一個和第二個域形成的復(fù)合關(guān)鍵字,因此,reduce函數(shù)輸入數(shù)據(jù)組中,每個元組兩個域的值均相同。
關(guān)于嵌套元組需要注意:如果你有一個使用了嵌套元組的數(shù)據(jù)集,指定group_by()操作,系統(tǒng)將把整個元組作為關(guān)鍵字使用。
向Flink傳遞函數(shù)
-
一些特定的操作需要采用用戶自定義的函數(shù),因此它們都接受lambda表達式和rich functions作為輸入?yún)?shù)。
data.filter(lambda x: x > 5)class Filter(FilterFunction):def filter(self, value):return value > 5data.filter(Filter())
Rich functions可以將函數(shù)作為輸入?yún)?shù),允許使用broadcast-variables(廣播變量),能夠由init()函數(shù)參數(shù)化,是復(fù)雜函數(shù)的一個可考慮的實現(xiàn)方式。它們也是在reduce操作中,定義一個可選的combine function的唯一方式。
Lambda表達式可以讓函數(shù)在一行代碼上實現(xiàn),非常便捷。需要注意的是,如果某個操作會返回多個數(shù)值,則其使用的lambda表達式應(yīng)當返回一個迭代器。(所有函數(shù)將接收一個collector輸入 參數(shù))。
數(shù)據(jù)類型
-
Flink的Python API目前僅支持python中的基本數(shù)據(jù)類型(int,float,bool,string)以及byte arrays。
class MyObj(object):def __init__(self, i):self.value = iclass MySerializer(object):def serialize(self, value):return struct.pack(">i", value.value)class MyDeserializer(object):def _deserialize(self, read):i = struct.unpack(">i", read(4))[0]return MyObj(i)env.register_custom_type(MyObj, MySerializer(), MyDeserializer())
運行環(huán)境對數(shù)據(jù)類型的支持,包括序列化器serializer,反序列化器deserializer,以及自定義類型的類。Tuples/Lists
可以使用元組(或列表)來表示復(fù)雜類型。Python中的元組可以轉(zhuǎn)換為Flink中的Tuple類型,它們包含數(shù)量固定的不同類型的域(最多25個)。每個域的元組可以是基本數(shù)據(jù)類型,也可以是其他的元組類型,從而形成嵌套元組類型。
word_counts = env.from_elements(("hello", 1), ("world",2))counts = word_counts.map(lambda x: x[1])當進行一些要求指定關(guān)鍵字的操作時,例如對數(shù)據(jù)記錄進行分組或配對。通過設(shè)定關(guān)鍵字,可以非常便捷地指定元組中各個域的位置。你可以指定多個位置,從而實現(xiàn)復(fù)合關(guān)鍵字(更多信息,查閱Section Data Transformations)。
wordCounts \ .group_by(0) \ .reduce(MyReduceFunction())
數(shù)據(jù)源
-
數(shù)據(jù)源創(chuàng)建了初始的數(shù)據(jù)集,包括來自文件,以及來自數(shù)據(jù)接口/集合兩種方式。
-
基于文件的:
read_text(path) – 按行讀取文件,并將每一行以String形式返回。
read_csv(path,type) – 解析以逗號(或其他字符)劃分數(shù)據(jù)域的文件。
返回一個包含若干元組的數(shù)據(jù)集。支持基本的java數(shù)據(jù)類型作為字段類型。 -
基于數(shù)據(jù)集合的:
from_elements(*args) – 基于一系列數(shù)據(jù)創(chuàng)建一個數(shù)據(jù)集,包含所有元素。
generate_sequence(from, to) – 按照指定的間隔,生成一系列數(shù)據(jù)。 -
Examples
env = get_environment\# read text file from local files system localLiens = env.read_text("file:#/path/to/my/textfile")\# read text file from a HDFS running at nnHost:nnPort hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")\# read a CSV file with three fields, schema defined using constants defined in flink.plan.Constants csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))\# create a set from some given elements values = env.from_elements("Foo", "bar", "foobar", "fubar")\# generate a number sequence numbers = env.generate_sequence(1, 10000000)
數(shù)據(jù)接收器
-
數(shù)據(jù)接收器可以接受DataSet,并用來存儲和返回它們:
-
write_text() –按行以String形式寫入數(shù)據(jù)。可通過對每個數(shù)據(jù)項調(diào)用str()函數(shù)獲取String。
-
write_csv(…) – 將元組寫入逗號分隔數(shù)值文件。行數(shù)和數(shù)據(jù)字段均可配置。每個字段的值可通過對數(shù)據(jù)項調(diào)用str()方法得到。
-
output() – 在標準輸出上打印每個數(shù)據(jù)項的str()字符串。
一個數(shù)據(jù)集可以同時作為多個操作的輸入數(shù)據(jù)。程序可以在寫入或打印一個數(shù)據(jù)集的同時,對其進行其他的變換操作。 -
標準數(shù)據(jù)池相關(guān)方法示例如下:
write DataSet to a file on the local file system textData.write_text("file:///my/result/on/localFS")write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS")write DataSet to a file and overwrite the file if it exists textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)tuples as lines with pipe as the separator "a|b|c" values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|")this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines values.write_text("file:///path/to/the/result/file")
廣播變量
-
使用廣播變量,能夠在使用普通輸入?yún)?shù)的基礎(chǔ)上,使得一個數(shù)據(jù)集同時被多個并行的操作所使用。這對于實現(xiàn)輔助數(shù)據(jù)集,或者是基于數(shù)據(jù)的參數(shù)化法非常有用。這樣,數(shù)據(jù)集就可以以集合的形式被訪問。
-
注冊廣播變量:廣播數(shù)據(jù)集可通過調(diào)用with_broadcast_set(DataSet,String)函數(shù),按照名字注冊廣播變量。
-
訪問廣播變量:通過對調(diào)用self.context.get_broadcast_variable(String)可獲取廣播變量。
class MapperBcv(MapFunction): def map(self, value):factor = self.context.get_broadcast_variable("bcv")[0][0]return value * factor# 1. The DataSet to be broadcasted toBroadcast = env.from_elements(1, 2, 3) data = env.from_elements("a", "b")# 2. Broadcast the DataSet data.map(MapperBcv()).with_broadcast_set("bcv", toBroadcast) -
確保在進行廣播變量的注冊和訪問時,應(yīng)當采用相同的名字(示例中的”bcv”)。
注意:由于廣播變量的內(nèi)容被保存在每個節(jié)點的內(nèi)部存儲中,不適合包含過多內(nèi)容。一些簡單的參數(shù),例如標量值,可簡單地通過參數(shù)化rich function來實現(xiàn)。
并行執(zhí)行
- 該章節(jié)將描述如何在Flink中配置程序的并行執(zhí)行。一個Flink程序可以包含多個任務(wù)(操作,數(shù)據(jù)源和數(shù)據(jù)池)。一個任務(wù)可以被劃分為多個可并行運行的部分,每個部分處理輸入數(shù)據(jù)的一個子集。并行運行的實例數(shù)量被稱作它的并行性或并行度degree of parallelism (DOP)。
在Flink中可以為任務(wù)指定不同等級的并行度。
運行環(huán)境級
-
Flink程序可在一個運行環(huán)境execution environment的上下文中運行。一個運行環(huán)境為其中運行的所有操作,數(shù)據(jù)源和數(shù)據(jù)池定義了一個默認的并行度。運行環(huán)境的并行度可通過對某個操作的并行度進行配置來修改。
一個運行環(huán)境的并行度可通過調(diào)用set_parallelism()方法來指定。例如,為了將WordCount示例程序中的所有操作,數(shù)據(jù)源和數(shù)據(jù)池的并行度設(shè)置為3,可以通過如下方式設(shè)置運行環(huán)境的默認并行度。
env = get_environment() env.set_parallelism(3)text.flat_map(lambda x,c: x.lower().split()) \.group_by(1) \.reduce_group(Adder(), combinable=True) \.output()env.execute()
系統(tǒng)級
- 通過設(shè)置位于./conf/flink-conf.yaml.文件的parallelism.default屬性,改變系統(tǒng)級的默認并行度,可設(shè)置所有運行環(huán)境的默認并行度。具體細節(jié)可查閱Configuration文檔。
執(zhí)行方法
-
為了在Flink中運行計劃任務(wù),到Flink目錄下,運行/bin文件夾下的pyflink.sh腳本。對于python2.7版本,運行pyflink2.sh;對于python3.4版本,運行pyflink3.sh。包含計劃任務(wù)的腳本應(yīng)當作為第一個輸入?yún)?shù),其后可添加一些另外的python包,最后,在“-”之后,輸入其他附加參數(shù)。
./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]
總結(jié)
以上是生活随笔為你收集整理的《Pyflink》Flink集群安装,Python+Flink调研的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Boosting集合算法详解(一)
- 下一篇: 6段Python代码刻画深度学习历史:从