vertx web连接超时 阻塞_Flink之基于Vertx的Mysql异步IO
導讀
在流計算中,如果以事件流為主,關聯一些維度信息,就需要根據每個事件中的關鍵信息去數據庫執行一次查詢。正常的思路可能是通過mapFunction以阻塞的方式查詢數據庫,等待數據結果返回,然后執行下一個步驟。如果數據庫查詢時間很長,那有可能會阻塞流計算的整體流程。因此可以考慮異步的方式請求數據庫,當數據返回時,該事件再繼續執行下面的操作。這樣提升了流計算的并發度,但是也增加了數據庫的訪問以及網絡帶寬的壓力。
1 Flink中的異步IO
在Flink中提供了一種異步IO的模式,不需要使用map函數阻塞式的加載數據,而是使用異步方法同時處理大量請求。不過這就需要數據庫支持異步請求,如果不支持異步請求也可以手動維護線程池調用,只不過效率上沒有原生的異步client更高效。比如Mysql可以通過Vertx支持異步查詢,HBase2.x也支持異步查詢。
一般要實現Flink的異步查詢需要自定義幾個方法:
class MyAsyncReq extends RichAsyncFunction<IN,OUT>{@Overridepublic void open(..) throws Exception {}@Overridepublic void close() throws Exception {}@Overridepublic void asyncInvoke(..) throws Exception {} }其中open中需要定義連接或者連接池,close中進行釋放,asyncInvoke執行異步查詢。
AsyncDataStream.unorderedWait( stream, new MyAsyncReq(), 1000, TimeUnit.MILLISECONDS, 100);使用的使用執行下面的方法即可,
stream為主要的事件流,
myasyncreq是異步IO類,
1000為異步請求的超時時間,
100是同時進行異步請求的最大數量
另外,由于是異步請求,所以可能請求結束后順序與原來的順序就不一致了。使用unordered時會以異步請求結束的時間為準,ordered會以事件時間為準。
2 基于Vertx實現的Mysql異步IO
如果外部數據源是Mysql,一般的jdbc連接都是同步機制的,看浪尖大大的文章,推薦了一個異步JDBC組件——Vertx,下面就以Vertx為例作為異步IO的Client。
maven引入除flink之外其他的jar:
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.13</version> </dependency> <dependency><groupId>io.vertx</groupId><artifactId>vertx-jdbc-client</artifactId><version>3.8.3</version> </dependency> <dependency><groupId>io.vertx</groupId><artifactId>vertx-core</artifactId><version>3.8.3</version> </dependency>先在open中創建SQLClient,它內部維護了自己的異步請求服務;然后再close中關閉client;在asyncInvoke中調用獲取connection,執行查詢,并釋放連接。
public class JDBCAsyncFunction extends RichAsyncFunction<Click, Store> {private SQLClient client;@Overridepublic void open(Configuration parameters) throws Exception {Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(10).setEventLoopPoolSize(10));JsonObject config = new JsonObject().put("url", "jdbc:mysql://xx:3306/base").put("driver_class", "com.mysql.cj.jdbc.Driver").put("max_pool_size", 10).put("user", "x").put("password", "x");client = JDBCClient.createShared(vertx, config);}@Overridepublic void close() throws Exception {client.close();}@Overridepublic void asyncInvoke(Click input, ResultFuture<Store> resultFuture) throws Exception {client.getConnection(conn -> {if (conn.failed()) {return;}final SQLConnection connection = conn.result();connection.query("select id, name from t where id = " + input.getId(), res2 -> {ResultSet rs = new ResultSet();if (res2.succeeded()) {rs = res2.result();}List<Store> stores = new ArrayList<>();for (JsonObject json : rs.getRows()) {Store s = new Store();s.setId(json.getInteger("id"));s.setName(json.getString("name"));stores.add(s);}connection.close();resultFuture.complete(stores);});});} }注意,一定要在query的返回調用方法中手動釋放connection,不然馬上就會報連接池耗盡的異常。使用時就沒什么區別了:
AsyncDataStream .unorderedWait(clicks,new JDBCPoolFunction(), 100,TimeUnit.SECONDS,10) .print();3 參考
1 vertx:
https://vertx.io/docs/vertx-jdbc-client/java/
2 設計思想參考:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673
總結
以上是生活随笔為你收集整理的vertx web连接超时 阻塞_Flink之基于Vertx的Mysql异步IO的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Atititi tesseract使用总
- 下一篇: SQL Server-数据类型(七)