Flink实现异步IO实战
Flink實(shí)現(xiàn)異步IO實(shí)戰(zhàn)
基本概念
首先通過(guò)官網(wǎng)的一個(gè)圖片了解一下Asynchronous I/O
Flink source收到一條數(shù)據(jù)就會(huì)進(jìn)行處理,如果需要通過(guò)這條數(shù)據(jù)關(guān)聯(lián)外部數(shù)據(jù)源,例如mysql,在發(fā)出查詢請(qǐng)求后,同步IO的方式是會(huì)等待查詢結(jié)果再處理下一條數(shù)據(jù)的查詢,也就是每一條數(shù)據(jù)都要等待上一個(gè)查詢結(jié)束。而異步IO是指數(shù)據(jù)來(lái)了以后發(fā)出查詢請(qǐng)求,先不等查詢結(jié)果,直接繼續(xù)發(fā)送下一條的查詢請(qǐng)求,對(duì)于查詢結(jié)果是異步返回的,返回結(jié)果之后再進(jìn)入下一個(gè)算子的計(jì)算。這兩種方式性能差距請(qǐng)看下的樣例。
案例
生成6條數(shù)據(jù),從0開(kāi)始遞增的6個(gè)數(shù)字。模擬異步查詢之后,加上時(shí)間戳輸出。
public class AsyncIODemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);final int maxCount = 6;final int taskNum = 1;final long timeout = 40000;DataStream<Integer> inputStream = env.addSource(new SimpleSource(maxCount));AsyncFunction<Integer, String> function = new SampleAsyncFunction();DataStream<String> result = AsyncDataStream.unorderedWait(inputStream,function,timeout,TimeUnit.MILLISECONDS,10).setParallelism(taskNum);result.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value + "," + System.currentTimeMillis();}}).print();env.execute("Async IO Demo");}private static class SimpleSource implements SourceFunction<Integer> {private volatile boolean isRunning = true;private int counter = 0;private int start = 0;public SimpleSource(int maxNum) {this.counter = maxNum;}@Overridepublic void run(SourceContext<Integer> ctx) throws Exception {while ((start < counter || counter == -1) && isRunning) {synchronized (ctx.getCheckpointLock()) {System.out.println("send data:" + start);ctx.collect(start);++start;}Thread.sleep(10L);}}@Overridepublic void cancel() {isRunning = false;}} }異步方法
代碼如下(示例):
public class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {private long[] sleep = {100L, 1000L, 5000L, 2000L, 6000L, 100L};@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);}@Overridepublic void close() throws Exception {super.close();}@Overridepublic void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) {System.out.println(System.currentTimeMillis() + "-input:" + input + " will sleep " + sleep[input] + " ms");query(input, resultFuture);}private void query(final Integer input, final ResultFuture<String> resultFuture) {try {Thread.sleep(sleep[input]);resultFuture.complete(Collections.singletonList(String.valueOf(input)));} catch (InterruptedException e) {resultFuture.complete(new ArrayList<>(0));}}private void asyncQuery(final Integer input, final ResultFuture<String> resultFuture) {CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {try {Thread.sleep(sleep[input]);return input;} catch (Exception e) {return null;}}}).thenAccept((Integer dbResult) -> {resultFuture.complete(Collections.singleton(String.valueOf(dbResult)));});} }上面的代碼中有兩個(gè)方法query()和asyncQuery(),其中Thread.sleep(sleep[input]);用來(lái)模擬查詢需要等待的時(shí)間,每條數(shù)據(jù)等待的時(shí)間分別為100L, 1000L, 5000L, 2000L, 6000L, 100L毫秒。
結(jié)果分析
運(yùn)行query()的結(jié)果為
可以看到第一條數(shù)據(jù)進(jìn)入到map算子的時(shí)間與最后一條相差了13115毫秒,執(zhí)行的順序與source中數(shù)據(jù)的順序一致,并且是串行的。
運(yùn)行asyncQuery()的結(jié)果為:
同樣第一條數(shù)據(jù)進(jìn)入map算子的時(shí)間與最后一條僅相差了6903毫秒,而且輸出結(jié)果的順序并不是source中的順序,而是按照查詢時(shí)間遞增的順序輸出,并且查詢請(qǐng)求幾乎是同一時(shí)間發(fā)出的。
通過(guò)上面的例子可以看出,flink所謂的異步IO,并不是只要實(shí)現(xiàn)了asyncInvoke方法就是異步了,這個(gè)方法并不是異步的,而是要依靠這個(gè)方法里面所寫(xiě)的查詢是異步的才可以。否則像是上面query()方法那樣,同樣會(huì)阻塞查詢相當(dāng)于同步IO。在實(shí)現(xiàn)flink異步IO的時(shí)候一定要注意。官方文檔也給出了相關(guān)的說(shuō)明。
總結(jié)
以上是生活随笔為你收集整理的Flink实现异步IO实战的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 数据库优化:SqlServer的with
- 下一篇: 没想到,Git居然有3种“后悔药”!