日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

A Complete Example

發布時間:2024/1/17 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 A Complete Example 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

A Complete Example

這個例子將關于人員的記錄流作為輸入,并將其過濾為只包含成人。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.api.common.functions.FilterFunction;public class Example {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Person> flintstones = env.fromElements(new Person("Fred", 35),new Person("Wilma", 35),new Person("Pebbles", 2));DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {@Overridepublic boolean filter(Person person) throws Exception {return person.age >= 18;}});adults.print();env.execute();}public static class Person {public String name;public Integer age;public Person() {};public Person(String name, Integer age) {this.name = name;this.age = age;};public String toString() {return this.name.toString() + ": age" + this.age.toSrting();};} }

流執行環境

每個Flink應用程序都需要一個執行環境,在這個例子中是?env。流式應用需要使用?StreamExecutionEnvironment。

在你的應用程序中DataStream API的調用會建立一個關聯到StreamExecutionEnvironment的作業圖。當env.execute()被調用這個作業圖就會被打包并發送給 Job Manager(作業管理器),作業管理器將作業并行化并將其片段分發給Task Manager(任務管理器)用于執行。每個作業的并行切片將會在task slot(任務槽)中執行。

需要注意的是,如果你不調用 execute()你的應用就不會跑。

此分布式運行時取決于您的應用程序是否可序列化。它還要求集群中的每個節點都可以使用所有依賴項。

基本流源

在上面的例子中我們通過env.fromElements(...)構建了一個DataStream<Person>。這是將簡單流集合在一起以便在原型或測試中使用的便捷方式。在StreamExecutionEnvironment上同樣有一個方法fromCollection(Collection)。我們可以這樣實現:

List<Person> people = new ArrayList<Person>();people.add(new Person("Fred", 35)); people.add(new Person("Wilma", 35)); people.add(new Person("Pebbles", 2));DataStream<Person> flintstones = env.fromCollection(people);

在原型設計時將一些數據放入流中的另一種簡單方式是使用套接字

DataStream<String> lines = env.socketTextStream("localhost", 9999)

或文件

DataStream<String> lines = env.readTextFile("file:///path")

在實際應用中,最常用的數據源是那些支持低延遲,高吞吐量并行度去以及倒帶和重放的數據源 - 高性能和容錯的先決條件 - 例如Apache Kafka, Kinesis 以及各種文件系統。REST APIs和數據庫也經常用于豐富流。

基本流下沉

上例使用adults.print()來顯示結果到任務管理器的日志中(如果運行在IDE上則會出現在IDE的控制臺中)。這個方法會為流中的每個元素調用toString()。

輸出看上去是這樣的:

1> Fred: age 35 2> Wilma: age 35

1> 和 2> 指出了產生輸出的子任務

你也可以寫到文本文件

stream.writeAsText("/path/to/file")

或者CSV文件

stream.writeAsCsv("/path/to/file")

或者套接字

stream.writeToSocket(host, port, SerializationSchema)

在生產中,常用的接收器包括Kafka以及各種數據庫和文件系統。

調試

在生產中,你將向應用程序運行的遠程集群提交應用程序JAR文件。如果失敗,遠程也會失敗。作業管理器和任務管理器日志在調試此類故障時非常游泳,但在IDE內部進行本地調試要容易的多,這是Flink支持的。你也可以設置斷點,檢查局部變量,并逐步執行代碼。你也可以進入Flink代碼,如果你想了解Flink的工作原理,這可能是了解更多內部信息的好方法。

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的A Complete Example的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。