《Pyflink》Flink集群安装,Python+Flink调研
Flink集群安裝,Python+Flink調(diào)研
Flink集群部署
下載對(duì)應(yīng)版本安裝包:https://flink.apache.org/downloads.html
實(shí)驗(yàn)環(huán)境為hadoop2.7, scala2.11 所以下載flink-1.7.1-bin-hadoop27-scala_2.11.tgz
配置conf/flink-conf.yaml
jobmanager.rpc.address : master 節(jié)點(diǎn) jobmanager.heap.mb : JobManager可用的內(nèi)存數(shù)量 taskmanager.heap.mb : 每個(gè)TaskManager可以用內(nèi)存數(shù)量 taskmanager.numberOfTaskSlots : 每個(gè)機(jī)器可用的CPU數(shù)量 parallelism.default : 集群中總的CPU數(shù)量 taskmanager.tmp.dirs : 臨時(shí)目錄配置conf/slaves
slave1 slave2點(diǎn)擊查看更多配置項(xiàng)
把在master上配置好的,文件夾發(fā)送到各個(gè)worker節(jié)點(diǎn)上
scp -r flink-1.7.1 hadoop@slavle1:~ scp -r flink-1.7.1 hadoop@slavle2:~啟動(dòng)/終止 Flink
# 啟動(dòng)一個(gè)JobManager,并通過SSH連接列在slaves文件中的所有節(jié)點(diǎn)以便在每個(gè)節(jié)點(diǎn)上啟動(dòng)TaskManager flink-1.7.1/bin/start-cluster.sh # 停止flink集群,直接在master節(jié)點(diǎn)運(yùn)行bin/stop-cluster.sh flink-1.7.1/bin/stop-cluster.sh啟動(dòng)后在web界面輸入:master:8081 查看Web-UI
運(yùn)行Python腳本
-
以官網(wǎng)的一個(gè)示例進(jìn)行測(cè)試,可以復(fù)制粘貼這些代碼存儲(chǔ)為wordcount.py并在本地運(yùn)行。
-
wordcount.py
from flink.plan.Environment import get_environment from flink.functions.GroupReduceFunction import GroupReduceFunctionclass Adder(GroupReduceFunction):def reduce(self, iterator, collector):count, word = iterator.next()count += sum([x[0] for x in iterator])collector.collect((count, word)) # 1. 獲取一個(gè)運(yùn)行環(huán)境 env = get_environment() # 2. 加載/創(chuàng)建初始數(shù)據(jù) data = env.from_elements("Who's there?","I think I hear them. Stand, ho! Who's there?")# 3. 指定對(duì)這些數(shù)據(jù)的操作 data \.flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \.group_by(1) \.reduce_group(Adder(), combinable=True) \.output()# 4. 運(yùn)行程序 env.execute(local=True) # 設(shè)置execute(local=True)強(qiáng)制程序在本機(jī)運(yùn)行 -
執(zhí)行方法:
為了在Flink中運(yùn)行計(jì)劃任務(wù),到Flink目錄下,運(yùn)行/bin文件夾下的pyflink.sh腳本。對(duì)于python2.7版本,運(yùn)行pyflink2.sh;對(duì)于python3.4版本,運(yùn)行pyflink3.sh。包含計(jì)劃任務(wù)的腳本應(yīng)當(dāng)作為第一個(gè)輸入?yún)?shù),其后可添加一些另外的python包,最后,在“-”之后,輸入其他附加參數(shù)。
./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]] -
針對(duì)上面的示例,在master-shell輸入:
flink-1.7.1/bin/pyflink.sh ./wordcount.py -
vi flink-1.7.1/log/flink-hadoop-taskexecutor-0-slave1.out查看輸出:
任務(wù)詳解
-
從示例程序可以看出,Flink程序看起來就像普通的python程序一樣。每個(gè)程序都包含相同的基本組成部分:
- 獲取一個(gè)運(yùn)行環(huán)境
- 加載/創(chuàng)建初始數(shù)據(jù)
- 指定對(duì)這些數(shù)據(jù)的操作
- 指定計(jì)算結(jié)果的存放位置
- 運(yùn)行程序
-
Environment(運(yùn)行環(huán)境)是所有Flink程序的基礎(chǔ)。通過調(diào)用Environment類中的一些靜態(tài)方法來建立一個(gè)環(huán)境:
get_environment() -
運(yùn)行環(huán)境可通過多種讀文件的方式來指定數(shù)據(jù)源。如果是簡(jiǎn)單的按行讀取文本文件:
env = get_environment() text = env.read_text("file:///path/to/file")這樣,你就獲得了可以進(jìn)行操作(apply transformations)的數(shù)據(jù)集。關(guān)于數(shù)據(jù)源和輸入格式的更多信息,請(qǐng)參考Data Sources
一旦你獲得了一個(gè)數(shù)據(jù)集DataSet,你就可以通過transformations來創(chuàng)建一個(gè)新的數(shù)據(jù)集,并把它寫入到文件,再次transform,或者與其他數(shù)據(jù)集相結(jié)合。你可以通過對(duì)數(shù)據(jù)集調(diào)用自己個(gè)性化定制的函數(shù)來進(jìn)行數(shù)據(jù)操作。例如,一個(gè)類似這樣的數(shù)據(jù)映射操作:
data.map(lambda x: x*2)這將會(huì)創(chuàng)建一個(gè)新的數(shù)據(jù)集,其中的每個(gè)數(shù)據(jù)都是原來數(shù)據(jù)集中的2倍。若要獲取關(guān)于所有transformations的更多信息,及所有數(shù)據(jù)操作的列表,請(qǐng)參考Transformations。
-
當(dāng)需要將所獲得的數(shù)據(jù)集寫入到磁盤時(shí),調(diào)用下面三種函數(shù)的其中一個(gè)即可。
data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', write_mode=Constants.NO_OVERWRITE)output()其中,最后一種方法僅適用于在本機(jī)上進(jìn)行開發(fā)/調(diào)試,它會(huì)將數(shù)據(jù)集的內(nèi)容輸出到標(biāo)準(zhǔn)輸出。(請(qǐng)注意,當(dāng)函數(shù)在集群上運(yùn)行時(shí),結(jié)果將會(huì)輸出到整個(gè)集群節(jié)點(diǎn)的標(biāo)準(zhǔn)輸出流,即輸出到workers的.out文件。)前兩種方法,能夠?qū)?shù)據(jù)集寫入到對(duì)應(yīng)的文件中。關(guān)于寫入到文件的更多信息,請(qǐng)參考Data Sinks。
當(dāng)設(shè)計(jì)好了程序之后,你需要在環(huán)境中執(zhí)行execute命令來運(yùn)行程序。可以選擇在本機(jī)運(yùn)行,也可以提交到集群運(yùn)行,這取決于Flink的創(chuàng)建方式。你可以通過設(shè)置execute(local=True)強(qiáng)制程序在本機(jī)運(yùn)行。
創(chuàng)建項(xiàng)目
-
除了搭建好Flink運(yùn)行環(huán)境,就無需進(jìn)行其他準(zhǔn)備工作了。Python包可以從你的Flink版本對(duì)應(yīng)的/resource文件夾找到。在執(zhí)行工作任務(wù)時(shí),Flink 包,plan包和optional包均可以通過HDFS自動(dòng)分發(fā)。
Python API官方已經(jīng)在安裝了Python2.7或3.4的Linux/Windows系統(tǒng)上測(cè)試過。本次我是在安裝了Python3.6的Linux環(huán)境進(jìn)行測(cè)試。
默認(rèn)情況下,Flink通過調(diào)用”python”或”python3″來啟動(dòng)python進(jìn)程,這取決于使用了哪種啟動(dòng)腳本。通過在 flink-conf.yaml 中設(shè)置 “python.binary.python[2/3]”對(duì)應(yīng)的值,來設(shè)定你所需要的啟動(dòng)方式。
惰性評(píng)價(jià)
-
所有的Flink程序都是延遲執(zhí)行的。當(dāng)程序的主函數(shù)執(zhí)行時(shí),數(shù)據(jù)的載入和操作并沒有在當(dāng)時(shí)發(fā)生。與此相反,每一個(gè)被創(chuàng)建出來的操作都被加入到程序的計(jì)劃中。當(dāng)程序環(huán)境中的某個(gè)對(duì)象調(diào)用了execute()函數(shù)時(shí),這些操作才會(huì)被真正的執(zhí)行。不論該程序是在本地運(yùn)行還是集群上運(yùn)行。
延遲求值能夠讓你建立復(fù)雜的程序,并在Flink上以一個(gè)整體的計(jì)劃單元來運(yùn)行。
數(shù)據(jù)變換
- 數(shù)據(jù)變換(Data transformations)可以將一個(gè)或多個(gè)數(shù)據(jù)集映射為一個(gè)新的數(shù)據(jù)集。程序能夠?qū)⒍喾N變換結(jié)合到一起來進(jìn)行復(fù)雜的整合變換。
該小節(jié)將概述各種可以實(shí)現(xiàn)的數(shù)據(jù)變換。transformations documentation數(shù)據(jù)變換文檔中,有關(guān)于所有數(shù)據(jù)變換和示例的全面介紹。
Map:輸入一個(gè)元素,輸出一個(gè)元素
data.map(lambda x: x * 2)FlatMap:輸入一個(gè)元素,輸出0,1,或多個(gè)元素
data.flat_map( lambda x,c: [(1,word) for word in line.lower().split() for line in x])MapPartition:通過一次函數(shù)調(diào)用實(shí)現(xiàn)并行的分割操作。該函數(shù)將分割變換作為一個(gè)”迭代器”,并且能夠產(chǎn)生任意數(shù)量的輸出值。每次分割變換的元素?cái)?shù)量取決于變換的并行性和之前的操作結(jié)果。
data.map_partition(lambda x,c: [value * 2 for value in x])Filter:對(duì)每一個(gè)元素,計(jì)算一個(gè)布爾表達(dá)式的值,保留函數(shù)計(jì)算結(jié)果為true的元素。
data.filter(lambda x: x > 1000)Reduce:通過不斷的將兩個(gè)元素組合為一個(gè),來將一組元素結(jié)合為一個(gè)單一的元素。這種縮減變換可以應(yīng)用于整個(gè)數(shù)據(jù)集,也可以應(yīng)用于已分組的數(shù)據(jù)集。
data.reduce(lambda x,y : x + y)ReduceGroup:將一組元素縮減為1個(gè)或多個(gè)元素。縮減分組變換可以被應(yīng)用于一個(gè)完整的數(shù)據(jù)集,或者一個(gè)分組數(shù)據(jù)集。
lass Adder(GroupReduceFunction): def reduce(self, iterator, collector):count, word = iterator.next()count += sum([x[0] for x in iterator) collector.collect((count, word))data.reduce_group(Adder())Aggregate:對(duì)一個(gè)數(shù)據(jù)集包含所有元組的一個(gè)域,或者數(shù)據(jù)集的每個(gè)數(shù)據(jù)組,執(zhí)行某項(xiàng)built-in操作(求和,求最小值,求最大值)。聚集變換可以被應(yīng)用于一個(gè)完整的數(shù)據(jù)集,或者一個(gè)分組數(shù)據(jù)集。
# This code finds the sum of all of the values in the first field and the maximum of all of the values in the second field data.aggregate(Aggregation.Sum, 0).and_agg(Aggregation.Max, 1)# min(), max(), and sum() syntactic sugar functions are also available data.sum(0).and_agg(Aggregation.Max, 1)Join:對(duì)兩個(gè)數(shù)據(jù)集進(jìn)行聯(lián)合變換,將得到一個(gè)新的數(shù)據(jù)集,其中包含在兩個(gè)數(shù)據(jù)集中擁有相等關(guān)鍵字的所有元素對(duì)。也可通過JoinFunction來把成對(duì)的元素變?yōu)閱为?dú)的元素。關(guān)于join keys的更多信息請(qǐng)查看 keys
# In this case tuple fields are used as keys. # "0" is the join field on the first tuple # "1" is the join field on the second tuple. result = input1.join(input2).where(0).equal_to(1)CoGroup:是Reduce變換在二維空間的一個(gè)變體。將來自一個(gè)或多個(gè)域的數(shù)據(jù)加入數(shù)據(jù)組。變換函數(shù)transformation function將被每一對(duì)數(shù)據(jù)組調(diào)用。關(guān)于定義coGroup keys的更多信息,請(qǐng)查看 keys 。
data1.co_group(data2).where(0).equal_to(1)Cross:計(jì)算兩個(gè)輸入數(shù)據(jù)集的笛卡爾乘積(向量叉乘),得到所有元素對(duì)。也可通過CrossFunction實(shí)現(xiàn)將一對(duì)元素轉(zhuǎn)變?yōu)橐粋€(gè)單獨(dú)的元素。
result = data1.cross(data2)Union:將兩個(gè)數(shù)據(jù)集進(jìn)行合并。
data.union(data2)ZipWithIndex:為數(shù)據(jù)組中的元素逐個(gè)分配連續(xù)的索引。了解更多信息,請(qǐng)參考 【Zip Elements Guide】(zip_elements_guide.html#zip-with-a-dense-index).
data.zip_with_index()指定keys
-
一些變換(例如Join和CoGroup),需要在進(jìn)行變換前,為作為輸入?yún)?shù)的數(shù)據(jù)集指定一個(gè)關(guān)鍵字,而另一些變換(例如Reduce和GroupReduce),則允許在變換操作之前,對(duì)數(shù)據(jù)集根據(jù)某個(gè)關(guān)鍵字進(jìn)行分組。
數(shù)據(jù)集可通過如下方式分組
reduced = data \ .group_by(<define key here>) \ .reduce_group(<do something>)Flink中的數(shù)據(jù)模型并不是基于鍵-值對(duì)。你無需將數(shù)據(jù)集整理為keys和values的形式。鍵是”虛擬的”:它們被定義為在真實(shí)數(shù)據(jù)之上,引導(dǎo)分組操作的函數(shù)。
為元組定義keys
-
最簡(jiǎn)單的情形是對(duì)一個(gè)數(shù)據(jù)集中的元組按照一個(gè)或多個(gè)域進(jìn)行分組:
grouped = data \ .group_by(0) \ .reduce(/*do something*/)數(shù)據(jù)集中的元組被按照第一個(gè)域分組。對(duì)于接下來的group-reduce函數(shù),輸入的數(shù)據(jù)組中,每個(gè)元組的第一個(gè)域都有相同的值。
grouped = data \ .group_by(0,1) \ .reduce(/*do something*/)在上面的例子中,數(shù)據(jù)集的分組基于第一個(gè)和第二個(gè)域形成的復(fù)合關(guān)鍵字,因此,reduce函數(shù)輸入數(shù)據(jù)組中,每個(gè)元組兩個(gè)域的值均相同。
關(guān)于嵌套元組需要注意:如果你有一個(gè)使用了嵌套元組的數(shù)據(jù)集,指定group_by()操作,系統(tǒng)將把整個(gè)元組作為關(guān)鍵字使用。
向Flink傳遞函數(shù)
-
一些特定的操作需要采用用戶自定義的函數(shù),因此它們都接受lambda表達(dá)式和rich functions作為輸入?yún)?shù)。
data.filter(lambda x: x > 5)class Filter(FilterFunction):def filter(self, value):return value > 5data.filter(Filter())
Rich functions可以將函數(shù)作為輸入?yún)?shù),允許使用broadcast-variables(廣播變量),能夠由init()函數(shù)參數(shù)化,是復(fù)雜函數(shù)的一個(gè)可考慮的實(shí)現(xiàn)方式。它們也是在reduce操作中,定義一個(gè)可選的combine function的唯一方式。
Lambda表達(dá)式可以讓函數(shù)在一行代碼上實(shí)現(xiàn),非常便捷。需要注意的是,如果某個(gè)操作會(huì)返回多個(gè)數(shù)值,則其使用的lambda表達(dá)式應(yīng)當(dāng)返回一個(gè)迭代器。(所有函數(shù)將接收一個(gè)collector輸入 參數(shù))。
數(shù)據(jù)類型
-
Flink的Python API目前僅支持python中的基本數(shù)據(jù)類型(int,float,bool,string)以及byte arrays。
class MyObj(object):def __init__(self, i):self.value = iclass MySerializer(object):def serialize(self, value):return struct.pack(">i", value.value)class MyDeserializer(object):def _deserialize(self, read):i = struct.unpack(">i", read(4))[0]return MyObj(i)env.register_custom_type(MyObj, MySerializer(), MyDeserializer())
運(yùn)行環(huán)境對(duì)數(shù)據(jù)類型的支持,包括序列化器serializer,反序列化器deserializer,以及自定義類型的類。Tuples/Lists
可以使用元組(或列表)來表示復(fù)雜類型。Python中的元組可以轉(zhuǎn)換為Flink中的Tuple類型,它們包含數(shù)量固定的不同類型的域(最多25個(gè))。每個(gè)域的元組可以是基本數(shù)據(jù)類型,也可以是其他的元組類型,從而形成嵌套元組類型。
word_counts = env.from_elements(("hello", 1), ("world",2))counts = word_counts.map(lambda x: x[1])當(dāng)進(jìn)行一些要求指定關(guān)鍵字的操作時(shí),例如對(duì)數(shù)據(jù)記錄進(jìn)行分組或配對(duì)。通過設(shè)定關(guān)鍵字,可以非常便捷地指定元組中各個(gè)域的位置。你可以指定多個(gè)位置,從而實(shí)現(xiàn)復(fù)合關(guān)鍵字(更多信息,查閱Section Data Transformations)。
wordCounts \ .group_by(0) \ .reduce(MyReduceFunction())
數(shù)據(jù)源
-
數(shù)據(jù)源創(chuàng)建了初始的數(shù)據(jù)集,包括來自文件,以及來自數(shù)據(jù)接口/集合兩種方式。
-
基于文件的:
read_text(path) – 按行讀取文件,并將每一行以String形式返回。
read_csv(path,type) – 解析以逗號(hào)(或其他字符)劃分?jǐn)?shù)據(jù)域的文件。
返回一個(gè)包含若干元組的數(shù)據(jù)集。支持基本的java數(shù)據(jù)類型作為字段類型。 -
基于數(shù)據(jù)集合的:
from_elements(*args) – 基于一系列數(shù)據(jù)創(chuàng)建一個(gè)數(shù)據(jù)集,包含所有元素。
generate_sequence(from, to) – 按照指定的間隔,生成一系列數(shù)據(jù)。 -
Examples
env = get_environment\# read text file from local files system localLiens = env.read_text("file:#/path/to/my/textfile")\# read text file from a HDFS running at nnHost:nnPort hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")\# read a CSV file with three fields, schema defined using constants defined in flink.plan.Constants csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))\# create a set from some given elements values = env.from_elements("Foo", "bar", "foobar", "fubar")\# generate a number sequence numbers = env.generate_sequence(1, 10000000)
數(shù)據(jù)接收器
-
數(shù)據(jù)接收器可以接受DataSet,并用來存儲(chǔ)和返回它們:
-
write_text() –按行以String形式寫入數(shù)據(jù)。可通過對(duì)每個(gè)數(shù)據(jù)項(xiàng)調(diào)用str()函數(shù)獲取String。
-
write_csv(…) – 將元組寫入逗號(hào)分隔數(shù)值文件。行數(shù)和數(shù)據(jù)字段均可配置。每個(gè)字段的值可通過對(duì)數(shù)據(jù)項(xiàng)調(diào)用str()方法得到。
-
output() – 在標(biāo)準(zhǔn)輸出上打印每個(gè)數(shù)據(jù)項(xiàng)的str()字符串。
一個(gè)數(shù)據(jù)集可以同時(shí)作為多個(gè)操作的輸入數(shù)據(jù)。程序可以在寫入或打印一個(gè)數(shù)據(jù)集的同時(shí),對(duì)其進(jìn)行其他的變換操作。 -
標(biāo)準(zhǔn)數(shù)據(jù)池相關(guān)方法示例如下:
write DataSet to a file on the local file system textData.write_text("file:///my/result/on/localFS")write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS")write DataSet to a file and overwrite the file if it exists textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)tuples as lines with pipe as the separator "a|b|c" values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|")this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines values.write_text("file:///path/to/the/result/file")
廣播變量
-
使用廣播變量,能夠在使用普通輸入?yún)?shù)的基礎(chǔ)上,使得一個(gè)數(shù)據(jù)集同時(shí)被多個(gè)并行的操作所使用。這對(duì)于實(shí)現(xiàn)輔助數(shù)據(jù)集,或者是基于數(shù)據(jù)的參數(shù)化法非常有用。這樣,數(shù)據(jù)集就可以以集合的形式被訪問。
-
注冊(cè)廣播變量:廣播數(shù)據(jù)集可通過調(diào)用with_broadcast_set(DataSet,String)函數(shù),按照名字注冊(cè)廣播變量。
-
訪問廣播變量:通過對(duì)調(diào)用self.context.get_broadcast_variable(String)可獲取廣播變量。
class MapperBcv(MapFunction): def map(self, value):factor = self.context.get_broadcast_variable("bcv")[0][0]return value * factor# 1. The DataSet to be broadcasted toBroadcast = env.from_elements(1, 2, 3) data = env.from_elements("a", "b")# 2. Broadcast the DataSet data.map(MapperBcv()).with_broadcast_set("bcv", toBroadcast) -
確保在進(jìn)行廣播變量的注冊(cè)和訪問時(shí),應(yīng)當(dāng)采用相同的名字(示例中的”bcv”)。
注意:由于廣播變量的內(nèi)容被保存在每個(gè)節(jié)點(diǎn)的內(nèi)部存儲(chǔ)中,不適合包含過多內(nèi)容。一些簡(jiǎn)單的參數(shù),例如標(biāo)量值,可簡(jiǎn)單地通過參數(shù)化rich function來實(shí)現(xiàn)。
并行執(zhí)行
- 該章節(jié)將描述如何在Flink中配置程序的并行執(zhí)行。一個(gè)Flink程序可以包含多個(gè)任務(wù)(操作,數(shù)據(jù)源和數(shù)據(jù)池)。一個(gè)任務(wù)可以被劃分為多個(gè)可并行運(yùn)行的部分,每個(gè)部分處理輸入數(shù)據(jù)的一個(gè)子集。并行運(yùn)行的實(shí)例數(shù)量被稱作它的并行性或并行度degree of parallelism (DOP)。
在Flink中可以為任務(wù)指定不同等級(jí)的并行度。
運(yùn)行環(huán)境級(jí)
-
Flink程序可在一個(gè)運(yùn)行環(huán)境execution environment的上下文中運(yùn)行。一個(gè)運(yùn)行環(huán)境為其中運(yùn)行的所有操作,數(shù)據(jù)源和數(shù)據(jù)池定義了一個(gè)默認(rèn)的并行度。運(yùn)行環(huán)境的并行度可通過對(duì)某個(gè)操作的并行度進(jìn)行配置來修改。
一個(gè)運(yùn)行環(huán)境的并行度可通過調(diào)用set_parallelism()方法來指定。例如,為了將WordCount示例程序中的所有操作,數(shù)據(jù)源和數(shù)據(jù)池的并行度設(shè)置為3,可以通過如下方式設(shè)置運(yùn)行環(huán)境的默認(rèn)并行度。
env = get_environment() env.set_parallelism(3)text.flat_map(lambda x,c: x.lower().split()) \.group_by(1) \.reduce_group(Adder(), combinable=True) \.output()env.execute()
系統(tǒng)級(jí)
- 通過設(shè)置位于./conf/flink-conf.yaml.文件的parallelism.default屬性,改變系統(tǒng)級(jí)的默認(rèn)并行度,可設(shè)置所有運(yùn)行環(huán)境的默認(rèn)并行度。具體細(xì)節(jié)可查閱Configuration文檔。
執(zhí)行方法
-
為了在Flink中運(yùn)行計(jì)劃任務(wù),到Flink目錄下,運(yùn)行/bin文件夾下的pyflink.sh腳本。對(duì)于python2.7版本,運(yùn)行pyflink2.sh;對(duì)于python3.4版本,運(yùn)行pyflink3.sh。包含計(jì)劃任務(wù)的腳本應(yīng)當(dāng)作為第一個(gè)輸入?yún)?shù),其后可添加一些另外的python包,最后,在“-”之后,輸入其他附加參數(shù)。
./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]
總結(jié)
以上是生活随笔為你收集整理的《Pyflink》Flink集群安装,Python+Flink调研的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Boosting集合算法详解(一)
- 下一篇: 6段Python代码刻画深度学习历史:从