日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Flink 异步IO访问外部数据(mysql篇)

發(fā)布時(shí)間:2025/5/22 数据库 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink 异步IO访问外部数据(mysql篇) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

  接上篇:【翻譯】Flink 異步I / O訪問外部數(shù)據(jù)

  最近看了大佬的博客,突然想起Async I/O方式是Blink 推給社區(qū)的一大重要功能,可以使用異步的方式獲取外部數(shù)據(jù),想著自己實(shí)現(xiàn)以下,項(xiàng)目上用的時(shí)候,可以不用現(xiàn)去找了。

  最開始想用scala 實(shí)現(xiàn)一個(gè)讀取 hbase數(shù)據(jù)的demo,參照官網(wǎng)demo:  

/*** An implementation of the 'AsyncFunction' that sends requests and sets the callback.*/ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {/** The database specific client that can issue concurrent requests with callbacks */lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)/** The context used for the future callbacks */implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {// issue the asynchronous request, receive a future for the resultval resultFutureRequested: Future[String] = client.query(str)// set the callback to be executed once the request by the client is complete// the callback simply forwards the result to the result future resultFutureRequested.onSuccess {case result: String => resultFuture.complete(Iterable((str, result)))}} }// create the original stream val stream: DataStream[String] = ...// apply the async I/O transformation val resultStream: DataStream[(String, String)] =AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

失敗了,上圖標(biāo)紅的部分實(shí)現(xiàn)不了

1、Future 找不到可以用的實(shí)現(xiàn)類

2、unorderedWait 一直報(bào)錯(cuò)

源碼example 里面也有Scala 的案例

def main(args: Array[String]) {val timeout = 10000Lval env = StreamExecutionEnvironment.getExecutionEnvironmentval input = env.addSource(new SimpleSource())val asyncMapped = AsyncDataStream.orderedWait(input, timeout, TimeUnit.MILLISECONDS, 10) {(input, collector: ResultFuture[Int]) =>Future {collector.complete(Seq(input))} (ExecutionContext.global)}asyncMapped.print()env.execute("Async I/O job")}

主要部分是這樣的,菜雞表示無力,想繼承RichAsyncFunction,可以使用open 方法初始化鏈接。

網(wǎng)上博客翻了不少,大部分是翻譯官網(wǎng)的原理,案例也沒有可以執(zhí)行的,苦惱。

失敗了。

轉(zhuǎn)為java版本的,昨天在群里問,有個(gè)大佬給我個(gè)Java版本的:?https://github.com/perkinls/flink-local-train/blob/c8b4efe33620352aea0100adef4fae2a068a3b65/src/main/scala/com/lp/test/asyncio/AsyncIoSideTableJoinMysqlJava.java 還沒看過,因?yàn)镴ava版的官網(wǎng)的案例能看懂。

下面開始上mysql 版本 的 源碼(hbase 的還沒測試過,本機(jī)的hbase 掛了):

業(yè)務(wù)如下:

接收kafka數(shù)據(jù),轉(zhuǎn)為user對象,調(diào)用async,使用user.id 查詢對應(yīng)的phone,放回user對象,輸出

?主類:

import com.alibaba.fastjson.JSON; import com.venn.common.Common; import org.apache.flink.formats.json.JsonNodeDeserializationSchema; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.concurrent.TimeUnit;public class AsyncMysqlRequest {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();FlinkKafkaConsumer<ObjectNode> source = new FlinkKafkaConsumer<>("async", new JsonNodeDeserializationSchema(), Common.getProp());// 接收kafka數(shù)據(jù),轉(zhuǎn)為User 對象DataStream<User> input = env.addSource(source).map(value -> {String id = value.get("id").asText();String username = value.get("username").asText();String password = value.get("password").asText();return new User(id, username, password);});// 異步IO 獲取mysql數(shù)據(jù), timeout 時(shí)間 1s,容量 10(超過10個(gè)請求,會反壓上游節(jié)點(diǎn))DataStream async = AsyncDataStream.unorderedWait(input, new AsyncFunctionForMysqlJava(), 1000, TimeUnit.MICROSECONDS, 10);async.map(user -> {return JSON.toJSON(user).toString();}).print();env.execute("asyncForMysql");} }

函數(shù)類:

import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.*;public class AsyncFunctionForMysqlJava extends RichAsyncFunction<AsyncUser, AsyncUser> {Logger logger = LoggerFactory.getLogger(AsyncFunctionForMysqlJava.class);private transient MysqlClient client;private transient ExecutorService executorService;/*** open 方法中初始化鏈接** @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {logger.info("async function for mysql java open ...");super.open(parameters);client = new MysqlClient();executorService = Executors.newFixedThreadPool(30);}/*** use asyncUser.getId async get asyncUser phone** @param asyncUser* @param resultFuture* @throws Exception*/@Overridepublic void asyncInvoke(AsyncUser asyncUser, ResultFuture<AsyncUser> resultFuture) throws Exception {executorService.submit(() -> {// submit querySystem.out.println("submit query : " + asyncUser.getId() + "-1-" + System.currentTimeMillis());AsyncUser tmp = client.query1(asyncUser);// 一定要記得放回 resultFuture,不然數(shù)據(jù)全部是timeout 的 resultFuture.complete(Collections.singletonList(tmp));});}@Overridepublic void timeout(AsyncUser input, ResultFuture<AsyncUser> resultFuture) throws Exception {logger.warn("Async function for hbase timeout");List<AsyncUser> list = new ArrayList();input.setPhone("timeout");list.add(input);resultFuture.complete(list);}/*** close function** @throws Exception*/@Overridepublic void close() throws Exception {logger.info("async function for mysql java close ...");super.close();} }

MysqlClient:

import com.venn.flink.util.MathUtil; import org.apache.flink.shaded.netty4.io.netty.channel.DefaultEventLoop; import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future; import org.apache.flink.shaded.netty4.io.netty.util.concurrent.SucceededFuture;import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException;public class MysqlClient {private static String jdbcUrl = "jdbc:mysql://192.168.229.128:3306?useSSL=false&allowPublicKeyRetrieval=true";private static String username = "root";private static String password = "123456";private static String driverName = "com.mysql.jdbc.Driver";private static java.sql.Connection conn;private static PreparedStatement ps;static {try {Class.forName(driverName);conn = DriverManager.getConnection(jdbcUrl, username, password);ps = conn.prepareStatement("select phone from async.async_test where id = ?");} catch (ClassNotFoundException | SQLException e) {e.printStackTrace();}}/*** execute query** @param user* @return*/public AsyncUser query1(AsyncUser user) {try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}String phone = "0000";try {ps.setString(1, user.getId());ResultSet rs = ps.executeQuery();if (!rs.isClosed() && rs.next()) {phone = rs.getString(1);}System.out.println("execute query : " + user.getId() + "-2-" + "phone : " + phone + "-" + System.currentTimeMillis());} catch (SQLException e) {e.printStackTrace();}user.setPhone(phone);return user;}// 測試代碼public static void main(String[] args) {MysqlClient mysqlClient = new MysqlClient();AsyncUser asyncUser = new AsyncUser();asyncUser.setId("526");long start = System.currentTimeMillis();asyncUser = mysqlClient.query1(asyncUser);System.out.println("end : " + (System.currentTimeMillis() - start));System.out.println(asyncUser.toString());} }

?

函數(shù)類(錯(cuò)誤示范:asyncInvoke 方法中阻塞查詢數(shù)據(jù)庫,是同步的):

import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.ArrayList; import java.util.List;public class AsyncFunctionForMysqlJava extends RichAsyncFunction<User, User> {// 鏈接private static String jdbcUrl = "jdbc:mysql://192.168.229.128:3306?useSSL=false";private static String username = "root";private static String password = "123456";private static String driverName = "com.mysql.jdbc.Driver";java.sql.Connection conn;PreparedStatement ps;Logger logger = LoggerFactory.getLogger(AsyncFunctionForMysqlJava.class);/*** open 方法中初始化鏈接* @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {logger.info("async function for hbase java open ...");super.open(parameters);Class.forName(driverName);conn = DriverManager.getConnection(jdbcUrl, username, password);ps = conn.prepareStatement("select phone from async.async_test where id = ?");}/*** use user.getId async get user phone** @param user* @param resultFuture* @throws Exception*/@Overridepublic void asyncInvoke(User user, ResultFuture<User> resultFuture) throws Exception {// 使用 user id 查詢ps.setString(1, user.getId());ResultSet rs = ps.executeQuery();String phone = null;if (rs.next()) {phone = rs.getString(1);}user.setPhone(phone);List<User> list = new ArrayList();list.add(user);// 放回 result 隊(duì)列 resultFuture.complete(list);}@Overridepublic void timeout(User input, ResultFuture<User> resultFuture) throws Exception {logger.info("Async function for hbase timeout");List<User> list = new ArrayList();list.add(input);resultFuture.complete(list);}/*** close function** @throws Exception*/@Overridepublic void close() throws Exception {logger.info("async function for hbase java close ...");super.close();conn.close();} }

測試數(shù)據(jù)如下:

{"id" : 1, "username" : "venn", "password" : 1561709530935} {"id" : 2, "username" : "venn", "password" : 1561709536029} {"id" : 3, "username" : "venn", "password" : 1561709541033} {"id" : 4, "username" : "venn", "password" : 1561709546037} {"id" : 5, "username" : "venn", "password" : 1561709551040} {"id" : 6, "username" : "venn", "password" : 1561709556044} {"id" : 7, "username" : "venn", "password" : 1561709561048}

執(zhí)行結(jié)果如下:

submit query : 1-1-1562763486845 submit query : 2-1-1562763486846 submit query : 3-1-1562763486846 submit query : 4-1-1562763486849 submit query : 5-1-1562763486849 submit query : 6-1-1562763486859 submit query : 7-1-1562763486913 submit query : 8-1-1562763486967 submit query : 9-1-1562763487021 execute query : 1-2-phone : 12345678910-1562763487316 1> {"password":"1562763486506","phone":"12345678910","id":"1","username":"venn"} submit query : 10-1-1562763487408 submit query : 11-1-1562763487408 execute query : 9-2-phone : 1562661110630-1562763487633 1> {"password":"1562763487017","phone":"1562661110630","id":"9","username":"venn"} # 這里可以看到異步,提交查詢的到 11 了,執(zhí)行查詢 的只有 1/9,返回了 1/9(unorderedWait 調(diào)用) submit query : 12-1-1562763487634 execute query : 8-2-phone : 1562661110627-1562763487932 1> {"password":"1562763486963","phone":"1562661110627","id":"8","username":"venn"} submit query : 13-1-1562763487933 execute query : 7-2-phone : 1562661110624-1562763488228 1> {"password":"1562763486909","phone":"1562661110624","id":"7","username":"venn"} submit query : 14-1-1562763488230 execute query : 6-2-phone : 1562661110622-1562763488526 1> {"password":"1562763486855","phone":"1562661110622","id":"6","username":"venn"} submit query : 15-1-1562763488527 execute query : 4-2-phone : 12345678913-1562763488832 1> {"password":"1562763486748","phone":"12345678913","id":"4","username":"venn"}

?

hbase、redis或其他實(shí)現(xiàn)類似

?

轉(zhuǎn)載于:https://www.cnblogs.com/Springmoon-venn/p/11103558.html

總結(jié)

以上是生活随笔為你收集整理的Flink 异步IO访问外部数据(mysql篇)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: youjizz欧美 | 国产精品一区二区三区不卡 | 公侵犯人妻一区二区 | 午夜精品免费 | 日韩伦理av| 涩涩涩涩涩涩涩涩涩涩 | 欧美特级特黄aaaaaa在线看 | 欧美女同视频 | 亚洲国产精品免费 | 一级特黄色 | 成人免费无码大片a毛片抽搐色欲 | 国产激情亚洲 | 国产精品久久久久久久久晋中 | 国产午夜在线一区二区三区 | 黄色av网址在线 | 青青草97国产精品免费观看 | 成人三级黄色片 | 日皮视频免费看 | 深夜福利一区二区三区 | 国产精品一区二区三区免费视频 | 国产精品久久久久久99 | 美女爽爽爽 | av网站国产 | 激情涩涩 | 四虎色网 | 亚洲成人免费在线观看 | 麻豆做爰免费观看 | 成人爽爽视频 | 两个人做羞羞的视频 | 天天爽夜夜爽一区二区三区 | 日本韩国在线播放 | av日韩国产 | 免费黄av | 午夜偷拍福利视频 | 欧美一区二区黄色 | 一区二区在线精品 | 狠狠操女人 | 国产三级久久 | 国产精品theporn88 | 西西午夜| 一区精品在线观看 | 日韩成人无码 | 日韩骚片 | 麻豆一区二区三区精品视频 | 国产免费无遮挡吸奶头视频 | 欧美一区二区在线播放 | 婷婷四房综合激情五月 | 欧美黑人粗大 | 天天色综 | 好吊妞无缓冲视频观看 | 三级三级久久三级久久18 | 国产精品乱子伦 | 激情婷婷小说 | 国产高清在线观看 | 97国产精品| 国产乱码精品一区二三区蜜臂 | 国产中文字幕乱人伦在线观看 | 免费观看黄网站 | 国产不卡视频在线 | 亚洲再线| 我想看一级黄色片 | 日韩乱码一区二区 | 国产对白自拍 | 久久综合亚洲色hezyo国产 | 欧美国产一区二区在线观看 | 999视频| 黄色aaaaa| 国产在线区 | 6080黄色| 最新av网站在线观看 | 午夜精品一区二区三区三上悠亚 | 黄色在线免费网站 | 欧美寡妇性猛交ⅹxxx | 色噜噜日韩精品欧美一区二区 | 国产成人+综合亚洲+天堂 | 人人草超碰| 日韩性生交大片免费看 | 久久久久久久久久国产精品 | 欧美91在线 | 亚洲欧洲在线视频 | 99精品久久99久久久久 | 国产在线精品自拍 | 在线麻豆视频 | 毛片免费一区二区三区 | 97免费观看视频 | 四虎精品永久在线 | 三级特黄| 欧美aaa级片 | 日韩精品――色哟哟 | 人日人视频| 91丨porny丨对白 | 亚洲区一区二 | 丁香花激情网 | 亚洲福利网站 | 国产欧美精品一区二区色综合朱莉 | 三级国产网站 | 波多野结衣视频免费在线观看 | 国产一级二级毛片 | 中文字幕在线播放第一页 |