2021年大数据Flink(四十六):扩展阅读 异步IO
目錄
擴(kuò)展閱讀? 異步IO
介紹
異步IO操作的需求
使用Aysnc I/O的前提條件
Async I/O API
案例演示
擴(kuò)展閱讀 原理深入
AsyncDataStream
消息的順序性
擴(kuò)展閱讀? 異步IO
介紹
異步IO操作的需求
Apache Flink 1.12 Documentation: Asynchronous I/O for External Data Access
Async I/O 是阿里巴巴貢獻(xiàn)給社區(qū)的一個(gè)呼聲非常高的特性,于1.2版本引入。主要目的是為了解決與外部系統(tǒng)交互時(shí)網(wǎng)絡(luò)延遲成為了系統(tǒng)瓶頸的問題。
流計(jì)算系統(tǒng)中經(jīng)常需要與外部系統(tǒng)進(jìn)行交互,我們通常的做法如向數(shù)據(jù)庫發(fā)送用戶a的查詢請(qǐng)求,然后等待結(jié)果返回,在這之前,我們的程序無法發(fā)送用戶b的查詢請(qǐng)求。這是一種同步訪問方式,如下圖所示
- 左圖所示:通常實(shí)現(xiàn)方式是向數(shù)據(jù)庫發(fā)送用戶a的查詢請(qǐng)求(例如在MapFunction中),然后等待結(jié)果返回,在這之前,我們無法發(fā)送用戶b的查詢請(qǐng)求,這是一種同步訪問的模式,圖中棕色的長(zhǎng)條標(biāo)識(shí)等待時(shí)間,可以發(fā)現(xiàn)網(wǎng)絡(luò)等待時(shí)間極大的阻礙了吞吐和延遲
- 右圖所示:為了解決同步訪問的問題,異步模式可以并發(fā)的處理多個(gè)請(qǐng)求和回復(fù),可以連續(xù)的向數(shù)據(jù)庫發(fā)送用戶a、b、c、d等的請(qǐng)求,與此同時(shí),哪個(gè)請(qǐng)求的回復(fù)先返回了就處理哪個(gè)回復(fù),從而連續(xù)的請(qǐng)求之間不需要阻塞等待,這也正是Async I/O的實(shí)現(xiàn)原理。
使用Aysnc I/O的前提條件
- 數(shù)據(jù)庫(或key/value存儲(chǔ)系統(tǒng))提供支持異步請(qǐng)求的client。(如java的vertx)
- 沒有異步請(qǐng)求客戶端的話也可以將同步客戶端丟到線程池中執(zhí)行作為異步客戶端
Async I/O API
Async I/O API允許用戶在數(shù)據(jù)流中使用異步客戶端訪問外部存儲(chǔ),該API處理與數(shù)據(jù)流的集成,以及消息順序性(Order),事件時(shí)間(EventTime),一致性(容錯(cuò))等臟活累活,用戶只專注于業(yè)務(wù)
如果目標(biāo)數(shù)據(jù)庫中有異步客戶端,則三步即可實(shí)現(xiàn)異步流式轉(zhuǎn)換操作(針對(duì)該數(shù)據(jù)庫的異步):
- 實(shí)現(xiàn)用來分發(fā)請(qǐng)求的AsyncFunction,用來向數(shù)據(jù)庫發(fā)送異步請(qǐng)求并設(shè)置回調(diào)
- 獲取操作結(jié)果的callback,并將它提交給ResultFuture
- 將異步I/O操作應(yīng)用于DataStream
案例演示
兩種方式實(shí)現(xiàn)Flink異步IO查詢Mysql_優(yōu)優(yōu)我心的博客-CSDN博客
需求:
使用異步IO實(shí)現(xiàn)從MySQL中讀取數(shù)據(jù)
數(shù)據(jù)準(zhǔn)備:
DROP TABLE IF EXISTS `t_category`;CREATE TABLE `t_category` (`id` int(11) NOT NULL,`name` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- ------------------------------ Records of t_category-- ----------------------------INSERT INTO `t_category` VALUES ('1', '手機(jī)');INSERT INTO `t_category` VALUES ('2', '電腦');INSERT INTO `t_category` VALUES ('3', '服裝');INSERT INTO `t_category` VALUES ('4', '化妝品');INSERT INTO `t_category` VALUES ('5', '食品');
代碼演示-異步IO讀取MySQL
package cn.i.extend;import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.ext.sql.SQLClient;
import io.vertx.ext.sql.SQLConnection;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import java.sql.*;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** 使用異步io的先決條件* 1.數(shù)據(jù)庫(或key/value存儲(chǔ))提供支持異步請(qǐng)求的client。* 2.沒有異步請(qǐng)求客戶端的話也可以將同步客戶端丟到線程池中執(zhí)行作為異步客戶端。*/
public class ASyncIODemo {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.Source//DataStreamSource[1,2,3,4,5]DataStreamSource<CategoryInfo> categoryDS = env.addSource(new RichSourceFunction<CategoryInfo>() {private Boolean flag = true;@Overridepublic void run(SourceContext<CategoryInfo> ctx) throws Exception {Integer[] ids = {1, 2, 3, 4, 5};for (Integer id : ids) {ctx.collect(new CategoryInfo(id, null));}}@Overridepublic void cancel() {this.flag = false;}});//3.Transformation//方式一:Java-vertx中提供的異步client實(shí)現(xiàn)異步IO//unorderedWait無序等待SingleOutputStreamOperator<CategoryInfo> result1 = AsyncDataStream.unorderedWait(categoryDS, new ASyncIOFunction1(), 1000, TimeUnit.SECONDS, 10);//方式二:MySQL中同步client+線程池模擬異步IO//unorderedWait無序等待SingleOutputStreamOperator<CategoryInfo> result2 = AsyncDataStream.unorderedWait(categoryDS, new ASyncIOFunction2(), 1000, TimeUnit.SECONDS, 10);//4.Sinkresult1.print("方式一:Java-vertx中提供的異步client實(shí)現(xiàn)異步IO \n");result2.print("方式二:MySQL中同步client+線程池模擬異步IO \n");//5.executeenv.execute();}
}@Data
@NoArgsConstructor
@AllArgsConstructor
class CategoryInfo {private Integer id;private String name;
}class MysqlSyncClient {private static transient Connection connection;private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";private static final String URL = "jdbc:mysql://localhost:3306/bigdata";private static final String USER = "root";private static final String PASSWORD = "root";static {init();}private static void init() {try {Class.forName(JDBC_DRIVER);} catch (ClassNotFoundException e) {System.out.println("Driver not found!" + e.getMessage());}try {connection = DriverManager.getConnection(URL, USER, PASSWORD);} catch (SQLException e) {System.out.println("init connection failed!" + e.getMessage());}}public void close() {try {if (connection != null) {connection.close();}} catch (SQLException e) {System.out.println("close connection failed!" + e.getMessage());}}public CategoryInfo query(CategoryInfo category) {try {String sql = "select id,name from t_category where id = "+ category.getId();Statement statement = connection.createStatement();ResultSet rs = statement.executeQuery(sql);if (rs != null && rs.next()) {category.setName(rs.getString("name"));}} catch (SQLException e) {System.out.println("query failed!" + e.getMessage());}return category;}
}/*** 方式一:Java-vertx中提供的異步client實(shí)現(xiàn)異步IO*/
class ASyncIOFunction1 extends RichAsyncFunction<CategoryInfo, CategoryInfo> {private transient SQLClient mySQLClient;@Overridepublic void open(Configuration parameters) throws Exception {JsonObject mySQLClientConfig = new JsonObject();mySQLClientConfig.put("driver_class", "com.mysql.jdbc.Driver").put("url", "jdbc:mysql://localhost:3306/bigdata").put("user", "root").put("password", "root").put("max_pool_size", 20);VertxOptions options = new VertxOptions();options.setEventLoopPoolSize(10);options.setWorkerPoolSize(20);Vertx vertx = Vertx.vertx(options);//根據(jù)上面的配置參數(shù)獲取異步請(qǐng)求客戶端mySQLClient = JDBCClient.createNonShared(vertx, mySQLClientConfig);}//使用異步客戶端發(fā)送異步請(qǐng)求@Overridepublic void asyncInvoke(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {mySQLClient.getConnection(new Handler<AsyncResult<SQLConnection>>() {@Overridepublic void handle(AsyncResult<SQLConnection> sqlConnectionAsyncResult) {if (sqlConnectionAsyncResult.failed()) {return;}SQLConnection connection = sqlConnectionAsyncResult.result();connection.query("select id,name from t_category where id = " +input.getId(), new Handler<AsyncResult<io.vertx.ext.sql.ResultSet>>() {@Overridepublic void handle(AsyncResult<io.vertx.ext.sql.ResultSet> resultSetAsyncResult) {if (resultSetAsyncResult.succeeded()) {List<JsonObject> rows = resultSetAsyncResult.result().getRows();for (JsonObject jsonObject : rows) {CategoryInfo categoryInfo = new CategoryInfo(jsonObject.getInteger("id"), jsonObject.getString("name"));resultFuture.complete(Collections.singletonList(categoryInfo));}}}});}});}@Overridepublic void close() throws Exception {mySQLClient.close();}@Overridepublic void timeout(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {System.out.println("async call time out!");input.setName("未知");resultFuture.complete(Collections.singleton(input));}
}/*** 方式二:同步調(diào)用+線程池模擬異步IO*/
class ASyncIOFunction2 extends RichAsyncFunction<CategoryInfo, CategoryInfo> {private transient MysqlSyncClient client;private ExecutorService executorService;//線程池@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);client = new MysqlSyncClient();executorService = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}//異步發(fā)送請(qǐng)求@Overridepublic void asyncInvoke(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {executorService.execute(new Runnable() {@Overridepublic void run() {resultFuture.complete(Collections.singletonList((CategoryInfo) client.query(input)));}});}@Overridepublic void close() throws Exception {}@Overridepublic void timeout(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {System.out.println("async call time out!");input.setName("未知");resultFuture.complete(Collections.singleton(input));}
}
異步IO讀取Redis數(shù)據(jù)-沒必要!!!
package cn.it.extend;import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.redis.RedisClient;
import io.vertx.redis.RedisOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/**
使用異步IO訪問redis
hset?AsyncReadRedis?beijing?1
hset?AsyncReadRedis?shanghai?2
hset?AsyncReadRedis?guangzhou?3
hset?AsyncReadRedis?shenzhen?4
hset?AsyncReadRedis?hangzhou?5
hset?AsyncReadRedis?wuhan?6
hset?AsyncReadRedis?chengdu?7
hset?AsyncReadRedis?tianjin?8
hset?AsyncReadRedis?chongqing?9city.txt
1,beijing
2,shanghai
3,guangzhou
4,shenzhen
5,hangzhou
6,wuhan
7,chengdu
8,tianjin
9,chongqing*/
public class AsyncIODemo_Redis {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStreamSource<String> lines = env.readTextFile("data/input/city.txt");SingleOutputStreamOperator<String> result1 = AsyncDataStream.orderedWait(lines, new AsyncRedis(), 10, TimeUnit.SECONDS, 1);SingleOutputStreamOperator<String> result2 = AsyncDataStream.orderedWait(lines, new AsyncRedisByVertx(), 10, TimeUnit.SECONDS, 1);result1.print().setParallelism(1);result2.print().setParallelism(1);env.execute();}
}
/*** 使用異步的方式讀取redis的數(shù)據(jù)*/
class AsyncRedis extends RichAsyncFunction<String, String> {//定義redis的連接池對(duì)象private JedisPoolConfig config = null;private static String ADDR = "localhost";private static int PORT = 6379;//等待可用連接的最大時(shí)間,單位是毫秒,默認(rèn)是-1,表示永不超時(shí),如果超過等待時(shí)間,則會(huì)拋出異常private static int TIMEOUT = 10000;//定義redis的連接池實(shí)例private JedisPool jedisPool = null;//定義連接池的核心對(duì)象private Jedis jedis = null;//初始化redis的連接@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);//定義連接池對(duì)象屬性配置config = new JedisPoolConfig();//初始化連接池對(duì)象jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);//實(shí)例化連接對(duì)象(獲取一個(gè)可用的連接)jedis = jedisPool.getResource();}@Overridepublic void close() throws Exception {super.close();if(jedis.isConnected()){jedis.close();}}//異步調(diào)用redis@Overridepublic void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {System.out.println("input:"+input);//發(fā)起一個(gè)異步請(qǐng)求,返回結(jié)果CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {String[] arrayData = input.split(",");String name = arrayData[1];String value = jedis.hget("AsyncReadRedis", name);System.out.println("output:"+value);return ?value;}}).thenAccept((String dbResult)->{//設(shè)置請(qǐng)求完成時(shí)的回調(diào),將結(jié)果返回resultFuture.complete(Collections.singleton(dbResult));});}//連接超時(shí)的時(shí)候調(diào)用的方法,一般在該方法中輸出連接超時(shí)的錯(cuò)誤日志,如果不重新該方法,連接超時(shí)后會(huì)拋出異常@Overridepublic void timeout(String input, ResultFuture<String> resultFuture) throws Exception {System.out.println("redis connect timeout!");}
}
/*** 使用高性能異步組件vertx實(shí)現(xiàn)類似于連接池的功能,效率比連接池要高* 1)在java版本中可以直接使用* 2)如果在scala版本中使用的話,需要scala的版本是2.12+*/
class AsyncRedisByVertx extends RichAsyncFunction<String,String> {//用transient關(guān)鍵字標(biāo)記的成員變量不參與序列化過程private transient RedisClient redisClient;//獲取連接池的配置對(duì)象private JedisPoolConfig config = null;//獲取連接池JedisPool jedisPool = null;//獲取核心對(duì)象Jedis jedis = null;//Redis服務(wù)器IPprivate static String ADDR = "localhost";//Redis的端口號(hào)private static int PORT = 6379;//訪問密碼private static String AUTH = "XXXXXX";//等待可用連接的最大時(shí)間,單位毫秒,默認(rèn)值為-1,表示永不超時(shí)。如果超過等待時(shí)間,則直接拋出JedisConnectionException;private static int TIMEOUT = 10000;private static final Logger logger = LoggerFactory.getLogger(AsyncRedis.class);//初始化連接@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);config = new JedisPoolConfig();jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);jedis = jedisPool.getResource();RedisOptions config = new RedisOptions();config.setHost(ADDR);config.setPort(PORT);VertxOptions vo = new VertxOptions();vo.setEventLoopPoolSize(10);vo.setWorkerPoolSize(20);Vertx vertx = Vertx.vertx(vo);redisClient = RedisClient.create(vertx, config);}//數(shù)據(jù)異步調(diào)用@Overridepublic void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {System.out.println("input:"+input);String[] split = input.split(",");String name = split[1];// 發(fā)起一個(gè)異步請(qǐng)求redisClient.hget("AsyncReadRedis", name, res->{if(res.succeeded()){String result = res.result();if(result== null){resultFuture.complete(null);return;}else {// 設(shè)置請(qǐng)求完成時(shí)的回調(diào): 將結(jié)果傳遞給?collectorresultFuture.complete(Collections.singleton(result));}}else if(res.failed()) {resultFuture.complete(null);return;}});}@Overridepublic void timeout(String input, ResultFuture resultFuture) throws Exception {}@Overridepublic void close() throws Exception {super.close();if (redisClient != null) {redisClient.close(null);}}
}
擴(kuò)展閱讀 原理深入
AsyncDataStream
AsyncDataStream是一個(gè)工具類,用于將AsyncFunction應(yīng)用于DataStream,AsyncFunction發(fā)出的并發(fā)請(qǐng)求都是無序的,該順序基于哪個(gè)請(qǐng)求先完成,為了控制結(jié)果記錄的發(fā)出順序,flink提供了兩種模式,分別對(duì)應(yīng)AsyncDataStream的兩個(gè)靜態(tài)方法,OrderedWait和unorderedWait
- orderedWait(有序):消息的發(fā)送順序與接收到的順序相同(包括 watermark ),也就是先進(jìn)先出。
- unorderWait(無序):
- 在ProcessingTime中,完全無序,即哪個(gè)請(qǐng)求先返回結(jié)果就先發(fā)送(最低延遲和最低消耗)。
- 在EventTime中,以watermark為邊界,介于兩個(gè)watermark之間的消息可以亂序,但是watermark和消息之間不能亂序,這樣既認(rèn)為在無序中又引入了有序,這樣就有了與有序一樣的開銷。
AsyncDataStream.(un)orderedWait?的主要工作就是創(chuàng)建了一個(gè)?AsyncWaitOperator。AsyncWaitOperator?是支持異步 IO 訪問的算子實(shí)現(xiàn),該算子會(huì)運(yùn)行?AsyncFunction?并處理異步返回的結(jié)果,其內(nèi)部原理如下圖所示。
如圖所示,AsyncWaitOperator?主要由兩部分組成:
- StreamElementQueue?
- Emitter
StreamElementQueue 是一個(gè) Promise 隊(duì)列,所謂 Promise 是一種異步抽象表示將來會(huì)有一個(gè)值(海底撈排隊(duì)給你的小票),這個(gè)隊(duì)列是未完成的 Promise 隊(duì)列,也就是進(jìn)行中的請(qǐng)求隊(duì)列。Emitter 是一個(gè)單獨(dú)的線程,負(fù)責(zé)發(fā)送消息(收到的異步回復(fù))給下游。
圖中E5表示進(jìn)入該算子的第五個(gè)元素(”Element-5”)
- 在執(zhí)行過程中首先會(huì)將其包裝成一個(gè) “Promise”?P5,然后將P5放入隊(duì)列
- 最后調(diào)用?AsyncFunction?的?ayncInvoke?方法,該方法會(huì)向外部服務(wù)發(fā)起一個(gè)異步的請(qǐng)求,并注冊(cè)回調(diào)
- 該回調(diào)會(huì)在異步請(qǐng)求成功返回時(shí)調(diào)用?AsyncCollector.collect?方法將返回的結(jié)果交給框架處理。
- 實(shí)際上?AsyncCollector?是一個(gè) Promise ,也就是?P5,在調(diào)用?collect?的時(shí)候會(huì)標(biāo)記 Promise 為完成狀態(tài),并通知 Emitter 線程有完成的消息可以發(fā)送了。
- Emitter 就會(huì)從隊(duì)列中拉取完成的 Promise ,并從 Promise 中取出消息發(fā)送給下游。
???????消息的順序性
上文提到 Async I/O 提供了兩種輸出模式。其實(shí)細(xì)分有三種模式:
- 有序
- ProcessingTime 無序
- EventTime 無序
Flink 使用隊(duì)列來實(shí)現(xiàn)不同的輸出模式,并抽象出一個(gè)隊(duì)列的接口(StreamElementQueue),這種分層設(shè)計(jì)使得AsyncWaitOperator和Emitter不用關(guān)心消息的順序問題。StreamElementQueue有兩種具體實(shí)現(xiàn),分別是?OrderedStreamElementQueue?和UnorderedStreamElementQueue。UnorderedStreamElementQueue?比較有意思,它使用了一套邏輯巧妙地實(shí)現(xiàn)完全無序和 EventTime 無序。
- 有序
有序比較簡(jiǎn)單,使用一個(gè)隊(duì)列就能實(shí)現(xiàn)。所有新進(jìn)入該算子的元素(包括 watermark),都會(huì)包裝成 Promise 并按到達(dá)順序放入該隊(duì)列。如下圖所示,盡管P4的結(jié)果先返回,但并不會(huì)發(fā)送,只有?P1?(隊(duì)首)的結(jié)果返回了才會(huì)觸發(fā) Emitter 拉取隊(duì)首元素進(jìn)行發(fā)送。
- ProcessingTime 無序
ProcessingTime 無序也比較簡(jiǎn)單,因?yàn)闆]有 watermark,不需要協(xié)調(diào) watermark 與消息的順序性,所以使用兩個(gè)隊(duì)列就能實(shí)現(xiàn),一個(gè)?uncompletedQueue?一個(gè)?completedQueue。所有新進(jìn)入該算子的元素,同樣的包裝成 Promise 并放入?uncompletedQueue?隊(duì)列,當(dāng)uncompletedQueue隊(duì)列中任意的Promise返回了數(shù)據(jù),則將該 Promise 移到?completedQueue?隊(duì)列中,并通知 Emitter 消費(fèi)。如下圖所示:
- EventTime 無序
EventTime 無序類似于有序與 ProcessingTime 無序的結(jié)合體。因?yàn)橛?watermark,需要協(xié)調(diào) watermark與消息之間的順序性,所以uncompletedQueue中存放的元素從原先的 Promise 變成了 Promise 集合。
- 如果進(jìn)入算子的是消息元素,則會(huì)包裝成 Promise 放入隊(duì)尾的集合中
- 如果進(jìn)入算子的是 watermark,也會(huì)包裝成 Promise 并放到一個(gè)獨(dú)立的集合中,再將該集合加入到?uncompletedQueue?隊(duì)尾,最后再創(chuàng)建一個(gè)空集合加到?uncompletedQueue?隊(duì)尾
- 這樣,watermark 就成了消息順序的邊界。
- 只有處在隊(duì)首的集合中的 Promise 返回了數(shù)據(jù),才能將該 Promise 移到completedQueue?
- 隊(duì)列中,由 Emitter 消費(fèi)發(fā)往下游。
- 只有隊(duì)首集合空了,才能處理第二個(gè)集合。
這樣就保證了當(dāng)且僅當(dāng)某個(gè) watermark 之前所有的消息都已經(jīng)被發(fā)送了,該 watermark 才能被發(fā)送。過程如下圖所示:
總結(jié)
以上是生活随笔為你收集整理的2021年大数据Flink(四十六):扩展阅读 异步IO的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据常用语言Scala(三十
- 下一篇: 2021年大数据Flink(四十八):扩