日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

vertx web连接超时 阻塞_Flink之基于Vertx的Mysql异步IO

發布時間:2023/12/19 数据库 45 豆豆
生活随笔 收集整理的這篇文章主要介紹了 vertx web连接超时 阻塞_Flink之基于Vertx的Mysql异步IO 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

導讀

在流計算中,如果以事件流為主,關聯一些維度信息,就需要根據每個事件中的關鍵信息去數據庫執行一次查詢。正常的思路可能是通過mapFunction以阻塞的方式查詢數據庫,等待數據結果返回,然后執行下一個步驟。如果數據庫查詢時間很長,那有可能會阻塞流計算的整體流程。因此可以考慮異步的方式請求數據庫,當數據返回時,該事件再繼續執行下面的操作。這樣提升了流計算的并發度,但是也增加了數據庫的訪問以及網絡帶寬的壓力。

1 Flink中的異步IO

在Flink中提供了一種異步IO的模式,不需要使用map函數阻塞式的加載數據,而是使用異步方法同時處理大量請求。不過這就需要數據庫支持異步請求,如果不支持異步請求也可以手動維護線程池調用,只不過效率上沒有原生的異步client更高效。比如Mysql可以通過Vertx支持異步查詢,HBase2.x也支持異步查詢。

一般要實現Flink的異步查詢需要自定義幾個方法:

class MyAsyncReq extends RichAsyncFunction<IN,OUT>{@Overridepublic void open(..) throws Exception {}@Overridepublic void close() throws Exception {}@Overridepublic void asyncInvoke(..) throws Exception {} }

其中open中需要定義連接或者連接池,close中進行釋放,asyncInvoke執行異步查詢。

AsyncDataStream.unorderedWait( stream, new MyAsyncReq(), 1000, TimeUnit.MILLISECONDS, 100);

使用的使用執行下面的方法即可,

stream為主要的事件流,

myasyncreq是異步IO類,

1000為異步請求的超時時間,

100是同時進行異步請求的最大數量

另外,由于是異步請求,所以可能請求結束后順序與原來的順序就不一致了。使用unordered時會以異步請求結束的時間為準,ordered會以事件時間為準。

2 基于Vertx實現的Mysql異步IO

如果外部數據源是Mysql,一般的jdbc連接都是同步機制的,看浪尖大大的文章,推薦了一個異步JDBC組件——Vertx,下面就以Vertx為例作為異步IO的Client。

maven引入除flink之外其他的jar:

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.13</version> </dependency> <dependency><groupId>io.vertx</groupId><artifactId>vertx-jdbc-client</artifactId><version>3.8.3</version> </dependency> <dependency><groupId>io.vertx</groupId><artifactId>vertx-core</artifactId><version>3.8.3</version> </dependency>

先在open中創建SQLClient,它內部維護了自己的異步請求服務;然后再close中關閉client;在asyncInvoke中調用獲取connection,執行查詢,并釋放連接。

public class JDBCAsyncFunction extends RichAsyncFunction<Click, Store> {private SQLClient client;@Overridepublic void open(Configuration parameters) throws Exception {Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(10).setEventLoopPoolSize(10));JsonObject config = new JsonObject().put("url", "jdbc:mysql://xx:3306/base").put("driver_class", "com.mysql.cj.jdbc.Driver").put("max_pool_size", 10).put("user", "x").put("password", "x");client = JDBCClient.createShared(vertx, config);}@Overridepublic void close() throws Exception {client.close();}@Overridepublic void asyncInvoke(Click input, ResultFuture<Store> resultFuture) throws Exception {client.getConnection(conn -> {if (conn.failed()) {return;}final SQLConnection connection = conn.result();connection.query("select id, name from t where id = " + input.getId(), res2 -> {ResultSet rs = new ResultSet();if (res2.succeeded()) {rs = res2.result();}List<Store> stores = new ArrayList<>();for (JsonObject json : rs.getRows()) {Store s = new Store();s.setId(json.getInteger("id"));s.setName(json.getString("name"));stores.add(s);}connection.close();resultFuture.complete(stores);});});} }

注意,一定要在query的返回調用方法中手動釋放connection,不然馬上就會報連接池耗盡的異常。使用時就沒什么區別了:

AsyncDataStream .unorderedWait(clicks,new JDBCPoolFunction(), 100,TimeUnit.SECONDS,10) .print();

3 參考

1 vertx:

https://vertx.io/docs/vertx-jdbc-client/java/

2 設計思想參考:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673

總結

以上是生活随笔為你收集整理的vertx web连接超时 阻塞_Flink之基于Vertx的Mysql异步IO的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。