Flink异步IO
本文講解 Flink 用于訪問外部數(shù)據(jù)存儲的異步 I/O API。 對于不熟悉異步或者事件驅(qū)動編程的用戶,建議先儲備一些關(guān)于 Future 和事件驅(qū)動編程的知識。
對于異步 I/O 操作的需求
在與外部系統(tǒng)交互(用數(shù)據(jù)庫中的數(shù)據(jù)擴充流數(shù)據(jù))的時候,需要考慮與外部系統(tǒng)的通信延遲對整個流處理應用的影響。
簡單地訪問外部數(shù)據(jù)庫的數(shù)據(jù),比如使用 MapFunction,通常意味著同步交互: MapFunction 向數(shù)據(jù)庫發(fā)送一個請求然后一直等待,直到收到響應。在許多情況下,等待占據(jù)了函數(shù)運行的大部分時間。
與數(shù)據(jù)庫異步交互是指一個并行函數(shù)實例可以并發(fā)地處理多個請求和接收多個響應。這樣,函數(shù)在等待的時間可以發(fā)送其他請求和接收其他響應。至少等待的時間可以被多個請求攤分。大多數(shù)情況下,異步交互可以大幅度提高流處理的吞吐量。
注意: 僅僅提高 MapFunction 的并行度(parallelism)在有些情況下也可以提升吞吐量,但是這樣做通常會導致非常高的資源消耗:更多的并行 MapFunction 實例意味著更多的 Task、更多的線程、更多的 Flink 內(nèi)部網(wǎng)絡(luò)連接、 更多的與數(shù)據(jù)庫的網(wǎng)絡(luò)連接、更多的緩沖和更多程序內(nèi)部協(xié)調(diào)的開銷。
先決條件
如上節(jié)所述,正確地實現(xiàn)數(shù)據(jù)庫(或鍵/值存儲)的異步 I/O 交互需要支持異步請求的數(shù)據(jù)庫客戶端。許多主流數(shù)據(jù)庫都提供了這樣的客戶端。
如果沒有這樣的客戶端,可以通過創(chuàng)建多個客戶端并使用線程池處理同步調(diào)用的方法,將同步客戶端轉(zhuǎn)換為有限并發(fā)的客戶端。然而,這種方法通常比正規(guī)的異步客戶端效率低。
異步 I/O API
Flink 的異步 I/O API 允許用戶在流處理中使用異步請求客戶端。API 處理與數(shù)據(jù)流的集成,同時還能處理好順序、事件時間和容錯等。
在具備異步數(shù)據(jù)庫客戶端的基礎(chǔ)上,實現(xiàn)數(shù)據(jù)流轉(zhuǎn)換操作與數(shù)據(jù)庫的異步 I/O 交互需要以下三部分:
實現(xiàn)分發(fā)請求的 AsyncFunction
獲取數(shù)據(jù)庫交互的結(jié)果并發(fā)送給 ResultFuture 的 回調(diào) 函數(shù)
將異步 I/O 操作應用于 DataStream 作為 DataStream 的一次轉(zhuǎn)換操作, 啟用或者不啟用重試。
下面是基本的代碼模板:
// 這個例子使用 Java 8 的 Future 接口(與 Flink 的 Future 相同)實現(xiàn)了異步請求和回調(diào)。
/**
* 實現(xiàn) 'AsyncFunction' 用于發(fā)送請求和設(shè)置回調(diào)。
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
/** 能夠利用回調(diào)函數(shù)并發(fā)發(fā)送請求的數(shù)據(jù)庫客戶端 */
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
// 發(fā)送異步請求,接收 future 結(jié)果
final Future<String> result = client.query(key);
// 設(shè)置客戶端完成請求后要執(zhí)行的回調(diào)函數(shù)
// 回調(diào)函數(shù)只是簡單地把結(jié)果發(fā)給 future
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
// 顯示地處理異常。
return null;
}
}
}).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
});
}
}
// 創(chuàng)建初始 DataStream
DataStream<String> stream = ...;
// 應用異步 I/O 轉(zhuǎn)換操作,不啟用重試
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
這是官網(wǎng)的代碼模板,這里給出Flink異步IO訪問mysql數(shù)據(jù)的例子。
數(shù)據(jù)庫有一張people表,字段姓名和國家,F(xiàn)link從nc讀取數(shù)據(jù),根據(jù)空格切分人名,從mysql查出每個人對應的國家,然后打印出來。真實大數(shù)據(jù)場景可能會遇到其它的外部存儲,需要在Flink程序里面訪問這些數(shù)據(jù)庫,擴充數(shù)據(jù)維度,組成大寬表。比如redis、hbase等數(shù)據(jù)庫。
mysql里面提前建好people表,建表語句:
CREATE TABLE `people` (
`id` bigint(13) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(50) NOT NULL,
`country` varchar(50) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of people
-- ----------------------------
INSERT INTO `people` VALUES ('1', 'tom', 'US');
INSERT INTO `people` VALUES ('2', 'zhangsan', 'china');
我們的程序:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.8</version>
</dependency>
package operator;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.sql.*;
import java.util.Collections;
import java.util.concurrent.*;
import java.util.function.Supplier;
public class AsyncDataBaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
// 線程池
private ExecutorService executorService;
// 連接池
private DruidDataSource druidDataSource;
@Override
public void asyncInvoke(String key, ResultFuture<Tuple2<String, String>> resultFuture) {
Future<String> result = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
// 從連接池中獲取連接
DruidPooledConnection connection = druidDataSource.getConnection();
// 預編譯SQL
String sql = "select country from people where name = ?";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
// 設(shè)置參數(shù)
preparedStatement.setString(1, key);
// 執(zhí)行SQL并獲取結(jié)果
ResultSet resultSet = preparedStatement.executeQuery();
String country = "";
try {
// 封裝結(jié)果
while (resultSet.next()) {
country = resultSet.getString("country");
}
} finally {
resultSet.close();
preparedStatement.close();
connection.close();
}
return country;
}
});
// 獲取異步結(jié)果并輸出
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
return null;
}
}
}).thenAccept((String dbResult) -> {
resultFuture.complete(Collections.singleton(Tuple2.of(key, dbResult)));
});
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
druidDataSource = new DruidDataSource();
druidDataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
druidDataSource.setUsername("bigdata");
druidDataSource.setPassword("bigdata");
druidDataSource.setUrl("jdbc:mysql://192.168.1.1:3306/test");
// 創(chuàng)建線程池,用于執(zhí)行異步操作
executorService = new ThreadPoolExecutor(5, 15, 1,
TimeUnit.MINUTES,
new LinkedBlockingDeque<>(100));
}
@Override
public void close() throws Exception {
super.close();
// 關(guān)閉連接池
if (druidDataSource != null){
druidDataSource.close();
}
// 關(guān)閉線程池
if (executorService != null){
executorService.shutdown();
}
}
}
主程序:
package operator;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.concurrent.TimeUnit;
public class AsyncIODemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = senv.socketTextStream("192.168.20.130", 9999)
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] values = value.split(" ");
for(String v : values) {
out.collect(v);
}
}
});
// 應用異步 I/O 轉(zhuǎn)換操作,不啟用重試
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDataBaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
resultStream.print();
senv.execute("AsyncIODemo");
}
}
啟動程序,nc輸入數(shù)據(jù):
[root@hm-001 logs]# nc -lk 9999
tom zhangsan tom
程序輸出:
6> (zhangsan,china)
6> (tom,US)
6> (tom,US)
代碼gitee地址:
https://gitee.com/ddxygq/BigDataTechnical/blob/main/Flink/src/main/java/operator/AsyncIODemo.java
重要提示: 第一次調(diào)用 ResultFuture.complete 后 ResultFuture 就完成了。 后續(xù)的 complete 調(diào)用都將被忽略。
下面兩個參數(shù)控制異步操作:
-
Timeout: 超時參數(shù)定義了異步操作執(zhí)行多久未完成、最終認定為失敗的時長,如果啟用重試,則可能包括多個重試請求。 它可以防止一直等待得不到響應的請求。
-
Capacity: 容量參數(shù)定義了可以同時進行的異步請求數(shù)。 即使異步 I/O 通常帶來更高的吞吐量,執(zhí)行異步 I/O 操作的算子仍然可能成為流處理的瓶頸。 限制并發(fā)請求的數(shù)量可以確保算子不會持續(xù)累積待處理的請求進而造成積壓,而是在容量耗盡時觸發(fā)反壓。
-
AsyncRetryStrategy: 重試策略參數(shù)定義了什么條件會觸發(fā)延遲重試以及延遲的策略,例如,固定延遲、指數(shù)后退延遲、自定義實現(xiàn)等。
超時處理
當異步 I/O 請求超時的時候,默認會拋出異常并重啟作業(yè)。 如果你想處理超時,可以重寫 AsyncFunction#timeout 方法。 重寫 AsyncFunction#timeout 時別忘了調(diào)用 ResultFuture.complete() 或者 ResultFuture.completeExceptionally() 以便告訴Flink這條記錄的處理已經(jīng)完成。如果超時發(fā)生時你不想發(fā)出任何記錄,你可以調(diào)用 ResultFuture.complete(Collections.emptyList()) 。
結(jié)果的順序
AsyncFunction 發(fā)出的并發(fā)請求經(jīng)常以不確定的順序完成,這取決于請求得到響應的順序。 Flink 提供兩種模式控制結(jié)果記錄以何種順序發(fā)出。
-
無序模式: 異步請求一結(jié)束就立刻發(fā)出結(jié)果記錄。 流中記錄的順序在經(jīng)過異步 I/O 算子之后發(fā)生了改變。 當使用 處理時間 作為基本時間特征時,這個模式具有最低的延遲和最少的開銷。 此模式使用 AsyncDataStream.unorderedWait(...) 方法。
-
有序模式: 這種模式保持了流的順序。發(fā)出結(jié)果記錄的順序與觸發(fā)異步請求的順序(記錄輸入算子的順序)相同。為了實現(xiàn)這一點,算子將緩沖一個結(jié)果記錄直到這條記錄前面的所有記錄都發(fā)出(或超時)。由于記錄或者結(jié)果要在 checkpoint 的狀態(tài)中保存更長的時間,所以與無序模式相比,有序模式通常會帶來一些額外的延遲和 checkpoint 開銷。此模式使用 AsyncDataStream.orderedWait(...) 方法。
事件時間
當流處理應用使用事件時間時,異步 I/O 算子會正確處理 watermark。對于兩種順序模式,這意味著以下內(nèi)容:
-
無序模式: Watermark 既不超前于記錄也不落后于記錄,即 watermark 建立了順序的邊界。 只有連續(xù)兩個 watermark 之間的記錄是無序發(fā)出的。 在一個 watermark 后面生成的記錄只會在這個 watermark 發(fā)出以后才發(fā)出。 在一個 watermark 之前的所有輸入的結(jié)果記錄全部發(fā)出以后,才會發(fā)出這個 watermark。這意味著存在 watermark 的情況下,無序模式 會引入一些與有序模式 相同的延遲和管理開銷。開銷大小取決于 watermark 的頻率。
-
有序模式: 連續(xù)兩個 watermark 之間的記錄順序也被保留了。開銷與使用處理時間 相比,沒有顯著的差別。
請記住,攝入時間 是一種特殊的事件時間,它基于數(shù)據(jù)源的處理時間自動生成 watermark。
容錯保證
異步 I/O 算子提供了完全的精確一次容錯保證。它將在途的異步請求的記錄保存在 checkpoint 中,在故障恢復時重新觸發(fā)請求。
重試支持
重試支持為異步 I/O 操作引入了一個內(nèi)置重試機制,它對用戶的異步函數(shù)實現(xiàn)邏輯是透明的。
-
AsyncRetryStrategy: 異步重試策略包含了觸發(fā)重試條件 AsyncRetryPredicate 定義,以及根據(jù)當前已嘗試次數(shù)判斷是否繼續(xù)重試、下次重試間隔時長的接口方法。 需要注意,在滿足觸發(fā)重試條件后,有可能因為當前重試次數(shù)超過預設(shè)的上限放棄重試,或是在任務(wù)結(jié)束時被強制終止重試(這種情況下,系統(tǒng)以最后一次執(zhí)行的結(jié)果或異常作為最終狀態(tài))。
-
AsyncRetryPredicate: 觸發(fā)重試條件可以選擇基于返回結(jié)果、 執(zhí)行異常來定義條件,兩種條件是或的關(guān)系,滿足其一即會觸發(fā)。
實現(xiàn)提示
在實現(xiàn)使用 Executor(或者 Scala 中的 ExecutionContext)和回調(diào)的 Futures 時,建議使用 DirectExecutor,因為通常回調(diào)的工作量很小,DirectExecutor 避免了額外的線程切換開銷。回調(diào)通常只是把結(jié)果發(fā)送給 ResultFuture,也就是把它添加進輸出緩沖。從這里開始,包括發(fā)送記錄和與 chenkpoint 交互在內(nèi)的繁重邏輯都將在專有的線程池中進行處理。
DirectExecutor 可以通過 org.apache.flink.util.concurrent.Executors.directExecutor() 或 com.google.common.util.concurrent.MoreExecutors.directExecutor() 獲得。
總結(jié)
- 上一篇: Sunshine + Moonlight
- 下一篇: Apollo系列之架构设计(一)