日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Apache Flink 零基础入门(十五)Flink DataStream编程(如何自定义DataSource)

發布時間:2024/9/16 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Apache Flink 零基础入门(十五)Flink DataStream编程(如何自定义DataSource) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

數據源可以通過StreamExecutionEnvironment.addSource(sourceFunction)方式來創建,Flink也提供了一些內置的數據源方便使用,例如readTextFile(path) readFile(),當然,也可以寫一個自定義的數據源(可以通過實現SourceFunction方法,但是無法并行執行。或者實現可以并行實現的接口ParallelSourceFunction或者繼承RichParallelSourceFunction)

入門

首先做一個簡單入門,建立一個DataStreamSourceApp

Scala

object DataStreamSourceApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentsocketFunction(env)env.execute("DataStreamSourceApp")}def socketFunction(env: StreamExecutionEnvironment): Unit = {val data=env.socketTextStream("192.168.152.45", 9999)data.print()} }

這個方法將會從socket中讀取數據,因此我們需要在192.168.152.45中開啟服務:

nc -lk 9999

然后運行DataStreamSourceApp,在服務器上輸入:

iie4bu@swarm-manager:~$ nc -lk 9999 apache flink spark

在控制臺中也會輸出:

3> apache 4> flink 1> spark

前面的 341表示的是并行度??梢酝ㄟ^設置setParallelism來操作:

data.print().setParallelism(1)

Java

public class JavaDataStreamSourceApp {public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();socketFunction(environment);environment.execute("JavaDataStreamSourceApp");}public static void socketFunction(StreamExecutionEnvironment executionEnvironment){DataStreamSource<String> data = executionEnvironment.socketTextStream("192.168.152.45", 9999);data.print().setParallelism(1);} }

自定義添加數據源方式

Scala

實現SourceFunction接口

這種方式不能并行處理。

新建一個自定義數據源

class CustomNonParallelSourceFunction extends SourceFunction[Long]{var count=1Lvar isRunning = trueoverride def run(ctx: SourceFunction.SourceContext[Long]): Unit = {while (isRunning){ctx.collect(count)count+=1Thread.sleep(1000)}}override def cancel(): Unit = {isRunning = false} }

這個方法首先定義一個初始值count=1L,然后執行的run方法,方法主要是輸出count,并且執行加一操作,當執行cancel方法時結束。調用方法如下:

def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// socketFunction(env)nonParallelSourceFunction(env)env.execute("DataStreamSourceApp")}def nonParallelSourceFunction(env: StreamExecutionEnvironment): Unit = {val data=env.addSource(new CustomNonParallelSourceFunction())data.print()}

輸出結果就是控制臺一直輸出count值。

無法設置并行度,除非設置并行度是1.

val data=env.addSource(new CustomNonParallelSourceFunction()).setParallelism(3)

那么控制臺報錯:

Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel sourceat org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:55)at com.vincent.course05.DataStreamSourceApp$.nonParallelSourceFunction(DataStreamSourceApp.scala:16)at com.vincent.course05.DataStreamSourceApp$.main(DataStreamSourceApp.scala:11)at com.vincent.course05.DataStreamSourceApp.main(DataStreamSourceApp.scala)

繼承ParallelSourceFunction方法

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}class CustomParallelSourceFunction extends ParallelSourceFunction[Long]{var isRunning = truevar count = 1Loverride def run(ctx: SourceFunction.SourceContext[Long]): Unit = {while(isRunning){ctx.collect(count)count+=1Thread.sleep(1000)}}override def cancel(): Unit = {isRunning=false} }

方法的功能跟上面是一樣的。main方法如下:

def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// socketFunction(env) // nonParallelSourceFunction(env)parallelSourceFunction(env)env.execute("DataStreamSourceApp")}def parallelSourceFunction(env: StreamExecutionEnvironment): Unit = {val data=env.addSource(new CustomParallelSourceFunction()).setParallelism(3)data.print()}

可以設置并行度3,輸出結果如下:

2> 1 1> 1 2> 1 2> 2 3> 2 3> 2 3> 3 4> 3 4> 3

繼承RichParallelSourceFunction方法

class CustomRichParallelSourceFunction extends RichParallelSourceFunction[Long] {var isRunning = truevar count = 1Loverride def run(ctx: SourceFunction.SourceContext[Long]): Unit = {while (isRunning) {ctx.collect(count)count += 1Thread.sleep(1000)}}override def cancel(): Unit = {isRunning = false} } def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// socketFunction(env)// nonParallelSourceFunction(env) // parallelSourceFunction(env)richParallelSourceFunction(env)env.execute("DataStreamSourceApp")}def richParallelSourceFunction(env: StreamExecutionEnvironment): Unit = {val data = env.addSource(new CustomRichParallelSourceFunction()).setParallelism(3)data.print()}

Java

實現SourceFunction接口

import org.apache.flink.streaming.api.functions.source.SourceFunction;public class JavaCustomNonParallelSourceFunction implements SourceFunction<Long> {boolean isRunning = true;long count = 1;@Overridepublic void run(SourceFunction.SourceContext ctx) throws Exception {while (isRunning) {ctx.collect(count);count+=1;Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning=false;} } public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // socketFunction(environment);nonParallelSourceFunction(environment);environment.execute("JavaDataStreamSourceApp");}public static void nonParallelSourceFunction(StreamExecutionEnvironment executionEnvironment){DataStreamSource data = executionEnvironment.addSource(new JavaCustomNonParallelSourceFunction());data.print().setParallelism(1);}

當設置并行度時:

DataStreamSource data = executionEnvironment.addSource(new JavaCustomNonParallelSourceFunction()).setParallelism(2);

那么報錯異常:

Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel sourceat org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:55)at com.vincent.course05.JavaDataStreamSourceApp.nonParallelSourceFunction(JavaDataStreamSourceApp.java:16)at com.vincent.course05.JavaDataStreamSourceApp.main(JavaDataStreamSourceApp.java:10)

實現ParallelSourceFunction接口

import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;public class JavaCustomParallelSourceFunction implements ParallelSourceFunction<Long> {boolean isRunning = true;long count = 1;@Overridepublic void run(SourceContext ctx) throws Exception {while (isRunning) {ctx.collect(count);count+=1;Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning=false;} } public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // socketFunction(environment); // nonParallelSourceFunction(environment);parallelSourceFunction(environment);environment.execute("JavaDataStreamSourceApp");}public static void parallelSourceFunction(StreamExecutionEnvironment executionEnvironment){DataStreamSource data = executionEnvironment.addSource(new JavaCustomParallelSourceFunction()).setParallelism(2);data.print().setParallelism(1);}

可以設置并行度,輸出結果:

1 1 2 2 3 3 4 4 5 5

繼承抽象類RichParallelSourceFunction

public class JavaCustomRichParallelSourceFunction extends RichParallelSourceFunction<Long> {boolean isRunning = true;long count = 1;@Overridepublic void run(SourceContext ctx) throws Exception {while (isRunning) {ctx.collect(count);count+=1;Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning=false;} } public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // socketFunction(environment); // nonParallelSourceFunction(environment); // parallelSourceFunction(environment);richpParallelSourceFunction(environment);environment.execute("JavaDataStreamSourceApp");}public static void richpParallelSourceFunction(StreamExecutionEnvironment executionEnvironment){DataStreamSource data = executionEnvironment.addSource(new JavaCustomRichParallelSourceFunction()).setParallelism(2);data.print().setParallelism(1);}

輸出結果:

1 1 2 2 3 3 4 4 5 5 6 6

SourceFunction??ParallelSourceFunction??RichParallelSourceFunction類之間的關系

?

總結

以上是生活随笔為你收集整理的Apache Flink 零基础入门(十五)Flink DataStream编程(如何自定义DataSource)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。