A Complete Example
A Complete Example
這個例子將關于人員的記錄流作為輸入,并將其過濾為只包含成人。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.api.common.functions.FilterFunction;public class Example {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Person> flintstones = env.fromElements(new Person("Fred", 35),new Person("Wilma", 35),new Person("Pebbles", 2));DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {@Overridepublic boolean filter(Person person) throws Exception {return person.age >= 18;}});adults.print();env.execute();}public static class Person {public String name;public Integer age;public Person() {};public Person(String name, Integer age) {this.name = name;this.age = age;};public String toString() {return this.name.toString() + ": age" + this.age.toSrting();};} }流執行環境
每個Flink應用程序都需要一個執行環境,在這個例子中是?env。流式應用需要使用?StreamExecutionEnvironment。
在你的應用程序中DataStream API的調用會建立一個關聯到StreamExecutionEnvironment的作業圖。當env.execute()被調用這個作業圖就會被打包并發送給 Job Manager(作業管理器),作業管理器將作業并行化并將其片段分發給Task Manager(任務管理器)用于執行。每個作業的并行切片將會在task slot(任務槽)中執行。
需要注意的是,如果你不調用 execute()你的應用就不會跑。
此分布式運行時取決于您的應用程序是否可序列化。它還要求集群中的每個節點都可以使用所有依賴項。
基本流源
在上面的例子中我們通過env.fromElements(...)構建了一個DataStream<Person>。這是將簡單流集合在一起以便在原型或測試中使用的便捷方式。在StreamExecutionEnvironment上同樣有一個方法fromCollection(Collection)。我們可以這樣實現:
List<Person> people = new ArrayList<Person>();people.add(new Person("Fred", 35)); people.add(new Person("Wilma", 35)); people.add(new Person("Pebbles", 2));DataStream<Person> flintstones = env.fromCollection(people);在原型設計時將一些數據放入流中的另一種簡單方式是使用套接字
DataStream<String> lines = env.socketTextStream("localhost", 9999)或文件
DataStream<String> lines = env.readTextFile("file:///path")在實際應用中,最常用的數據源是那些支持低延遲,高吞吐量并行度去以及倒帶和重放的數據源 - 高性能和容錯的先決條件 - 例如Apache Kafka, Kinesis 以及各種文件系統。REST APIs和數據庫也經常用于豐富流。
基本流下沉
上例使用adults.print()來顯示結果到任務管理器的日志中(如果運行在IDE上則會出現在IDE的控制臺中)。這個方法會為流中的每個元素調用toString()。
輸出看上去是這樣的:
1> Fred: age 35 2> Wilma: age 351> 和 2> 指出了產生輸出的子任務
你也可以寫到文本文件
stream.writeAsText("/path/to/file")或者CSV文件
stream.writeAsCsv("/path/to/file")或者套接字
stream.writeToSocket(host, port, SerializationSchema)在生產中,常用的接收器包括Kafka以及各種數據庫和文件系統。
調試
在生產中,你將向應用程序運行的遠程集群提交應用程序JAR文件。如果失敗,遠程也會失敗。作業管理器和任務管理器日志在調試此類故障時非常游泳,但在IDE內部進行本地調試要容易的多,這是Flink支持的。你也可以設置斷點,檢查局部變量,并逐步執行代碼。你也可以進入Flink代碼,如果你想了解Flink的工作原理,這可能是了解更多內部信息的好方法。
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的A Complete Example的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 变频电源的日常检查目录
- 下一篇: 你知道css单位fr吗?