聊聊flink的Async I/O
生活随笔
收集整理的這篇文章主要介紹了
聊聊flink的Async I/O
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
序
本文主要研究一下flink的Async I/O
實(shí)例
// This example implements the asynchronous request and callback with Futures that have the // interface of Java 8's futures (which is the same one followed by Flink's Future)/*** An implementation of the 'AsyncFunction' that sends requests and sets the callback.*/ class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {/** The database specific client that can issue concurrent requests with callbacks */private transient DatabaseClient client;@Overridepublic void open(Configuration parameters) throws Exception {client = new DatabaseClient(host, post, credentials);}@Overridepublic void close() throws Exception {client.close();}@Overridepublic void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {// issue the asynchronous request, receive a future for resultfinal Future<String> result = client.query(key);// set the callback to be executed once the request by the client is complete// the callback simply forwards the result to the result futureCompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {return result.get();} catch (InterruptedException | ExecutionException e) {// Normally handled explicitly.return null;}}}).thenAccept( (String dbResult) -> {resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));});} }// create the original stream DataStream<String> stream = ...;// apply the async I/O transformation DataStream<Tuple2<String, String>> resultStream =AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);- 本實(shí)例展示了flink Async I/O的基本用法,首先是實(shí)現(xiàn)AsyncFunction接口,用于編寫異步請求邏輯及將結(jié)果或異常設(shè)置到resultFuture,然后就是使用AsyncDataStream的unorderedWait或orderedWait方法將AsyncFunction作用到DataStream作為transformation;AsyncDataStream的unorderedWait或orderedWait有兩個(gè)關(guān)于async operation的參數(shù),一個(gè)是timeout參數(shù)用于設(shè)置async的超時(shí)時(shí)間,一個(gè)是capacity參數(shù)用于指定同一時(shí)刻最大允許多少個(gè)(并發(fā))async request在執(zhí)行
AsyncFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
/*** A function to trigger Async I/O operation.** <p>For each #asyncInvoke, an async io operation can be triggered, and once it has been done,* the result can be collected by calling {@link ResultFuture#complete}. For each async* operation, its context is stored in the operator immediately after invoking* #asyncInvoke, avoiding blocking for each stream input as long as the internal buffer is not full.** <p>{@link ResultFuture} can be passed into callbacks or futures to collect the result data.* An error can also be propagate to the async IO operator by* {@link ResultFuture#completeExceptionally(Throwable)}.** <p>Callback example usage:** <pre>{@code* public class HBaseAsyncFunc implements AsyncFunction<String, String> {** public void asyncInvoke(String row, ResultFuture<String> result) throws Exception {* HBaseCallback cb = new HBaseCallback(result);* Get get = new Get(Bytes.toBytes(row));* hbase.asyncGet(get, cb);* }* }* }</pre>** <p>Future example usage:** <pre>{@code* public class HBaseAsyncFunc implements AsyncFunction<String, String> {** public void asyncInvoke(String row, final ResultFuture<String> result) throws Exception {* Get get = new Get(Bytes.toBytes(row));* ListenableFuture<Result> future = hbase.asyncGet(get);* Futures.addCallback(future, new FutureCallback<Result>() {* public void onSuccess(Result result) {* List<String> ret = process(result);* result.complete(ret);* }* public void onFailure(Throwable thrown) {* result.completeExceptionally(thrown);* }* });* }* }* }</pre>** @param <IN> The type of the input elements.* @param <OUT> The type of the returned elements.*/ @PublicEvolving public interface AsyncFunction<IN, OUT> extends Function, Serializable {/*** Trigger async operation for each stream input.** @param input element coming from an upstream task* @param resultFuture to be completed with the result data* @exception Exception in case of a user code error. An exception will make the task fail and* trigger fail-over process.*/void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;/*** {@link AsyncFunction#asyncInvoke} timeout occurred.* By default, the result future is exceptionally completed with a timeout exception.** @param input element coming from an upstream task* @param resultFuture to be completed with the result data*/default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out."));}}- AsyncFunction接口繼承了Function,它定義了asyncInvoke方法以及一個(gè)default的timeout方法;asyncInvoke方法執(zhí)行異步邏輯,然后通過ResultFuture.complete將結(jié)果設(shè)置到ResultFuture,如果異常則通過ResultFuture.completeExceptionally(Throwable)來傳遞到ResultFuture
RichAsyncFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
@PublicEvolving public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction implements AsyncFunction<IN, OUT> {private static final long serialVersionUID = 3858030061138121840L;@Overridepublic void setRuntimeContext(RuntimeContext runtimeContext) {Preconditions.checkNotNull(runtimeContext);if (runtimeContext instanceof IterationRuntimeContext) {super.setRuntimeContext(new RichAsyncFunctionIterationRuntimeContext((IterationRuntimeContext) runtimeContext));} else {super.setRuntimeContext(new RichAsyncFunctionRuntimeContext(runtimeContext));}}@Overridepublic abstract void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;//...... }- RichAsyncFunction繼承了AbstractRichFunction,同時(shí)聲明實(shí)現(xiàn)AsyncFunction接口,它不沒有實(shí)現(xiàn)asyncInvoke,交由子類實(shí)現(xiàn);它覆蓋了setRuntimeContext方法,這里使用RichAsyncFunctionRuntimeContext或者RichAsyncFunctionIterationRuntimeContext進(jìn)行包裝
RichAsyncFunctionRuntimeContext
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
/*** A wrapper class for async function's {@link RuntimeContext}. The async function runtime* context only supports basic operations which are thread safe. Consequently, state access,* accumulators, broadcast variables and the distributed cache are disabled.*/private static class RichAsyncFunctionRuntimeContext implements RuntimeContext {private final RuntimeContext runtimeContext;RichAsyncFunctionRuntimeContext(RuntimeContext context) {runtimeContext = Preconditions.checkNotNull(context);}@Overridepublic String getTaskName() {return runtimeContext.getTaskName();}@Overridepublic MetricGroup getMetricGroup() {return runtimeContext.getMetricGroup();}@Overridepublic int getNumberOfParallelSubtasks() {return runtimeContext.getNumberOfParallelSubtasks();}@Overridepublic int getMaxNumberOfParallelSubtasks() {return runtimeContext.getMaxNumberOfParallelSubtasks();}@Overridepublic int getIndexOfThisSubtask() {return runtimeContext.getIndexOfThisSubtask();}@Overridepublic int getAttemptNumber() {return runtimeContext.getAttemptNumber();}@Overridepublic String getTaskNameWithSubtasks() {return runtimeContext.getTaskNameWithSubtasks();}@Overridepublic ExecutionConfig getExecutionConfig() {return runtimeContext.getExecutionConfig();}@Overridepublic ClassLoader getUserCodeClassLoader() {return runtimeContext.getUserCodeClassLoader();}// -----------------------------------------------------------------------------------// Unsupported operations// -----------------------------------------------------------------------------------@Overridepublic DistributedCache getDistributedCache() {throw new UnsupportedOperationException("Distributed cache is not supported in rich async functions.");}@Overridepublic <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {throw new UnsupportedOperationException("State is not supported in rich async functions.");}@Overridepublic <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {throw new UnsupportedOperationException("State is not supported in rich async functions.");}@Overridepublic <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {throw new UnsupportedOperationException("State is not supported in rich async functions.");}@Overridepublic <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {throw new UnsupportedOperationException("State is not supported in rich async functions.");}@Overridepublic <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {throw new UnsupportedOperationException("State is not supported in rich async functions.");}@Overridepublic <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {throw new UnsupportedOperationException("State is not supported in rich async functions.");}@Overridepublic <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");}@Overridepublic <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");}@Overridepublic Map<String, Accumulator<?, ?>> getAllAccumulators() {throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");}@Overridepublic IntCounter getIntCounter(String name) {throw new UnsupportedOperationException("Int counters are not supported in rich async functions.");}@Overridepublic LongCounter getLongCounter(String name) {throw new UnsupportedOperationException("Long counters are not supported in rich async functions.");}@Overridepublic DoubleCounter getDoubleCounter(String name) {throw new UnsupportedOperationException("Long counters are not supported in rich async functions.");}@Overridepublic Histogram getHistogram(String name) {throw new UnsupportedOperationException("Histograms are not supported in rich async functions.");}@Overridepublic boolean hasBroadcastVariable(String name) {throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");}@Overridepublic <RT> List<RT> getBroadcastVariable(String name) {throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");}@Overridepublic <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");}}- RichAsyncFunctionRuntimeContext實(shí)現(xiàn)了RuntimeContext接口,它將一些方法代理給RuntimeContext,其余的Unsupported的方法都覆蓋拋出UnsupportedOperationException
RichAsyncFunctionIterationRuntimeContext
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
private static class RichAsyncFunctionIterationRuntimeContext extends RichAsyncFunctionRuntimeContext implements IterationRuntimeContext {private final IterationRuntimeContext iterationRuntimeContext;RichAsyncFunctionIterationRuntimeContext(IterationRuntimeContext iterationRuntimeContext) {super(iterationRuntimeContext);this.iterationRuntimeContext = Preconditions.checkNotNull(iterationRuntimeContext);}@Overridepublic int getSuperstepNumber() {return iterationRuntimeContext.getSuperstepNumber();}// -----------------------------------------------------------------------------------// Unsupported operations// -----------------------------------------------------------------------------------@Overridepublic <T extends Aggregator<?>> T getIterationAggregator(String name) {throw new UnsupportedOperationException("Iteration aggregators are not supported in rich async functions.");}@Overridepublic <T extends Value> T getPreviousIterationAggregate(String name) {throw new UnsupportedOperationException("Iteration aggregators are not supported in rich async functions.");}}- RichAsyncFunctionIterationRuntimeContext繼承了RichAsyncFunctionRuntimeContext,實(shí)現(xiàn)了IterationRuntimeContext接口,它將getSuperstepNumber方法交由IterationRuntimeContext處理,然后覆蓋getIterationAggregator、getPreviousIterationAggregate方法拋出UnsupportedOperationException
AsyncDataStream
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
@PublicEvolving public class AsyncDataStream {/*** Output mode for asynchronous operations.*/public enum OutputMode { ORDERED, UNORDERED }private static final int DEFAULT_QUEUE_CAPACITY = 100;private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(DataStream<IN> in,AsyncFunction<IN, OUT> func,long timeout,int bufSize,OutputMode mode) {TypeInformation<OUT> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(func,AsyncFunction.class,0,1,new int[]{1, 0},in.getType(),Utils.getCallLocationName(),true);// create transformAsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(in.getExecutionEnvironment().clean(func),timeout,bufSize,mode);return in.transform("async wait operator", outTypeInfo, operator);}public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(DataStream<IN> in,AsyncFunction<IN, OUT> func,long timeout,TimeUnit timeUnit,int capacity) {return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED);}public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(DataStream<IN> in,AsyncFunction<IN, OUT> func,long timeout,TimeUnit timeUnit) {return addOperator(in,func,timeUnit.toMillis(timeout),DEFAULT_QUEUE_CAPACITY,OutputMode.UNORDERED);}public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(DataStream<IN> in,AsyncFunction<IN, OUT> func,long timeout,TimeUnit timeUnit,int capacity) {return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);}public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(DataStream<IN> in,AsyncFunction<IN, OUT> func,long timeout,TimeUnit timeUnit) {return addOperator(in,func,timeUnit.toMillis(timeout),DEFAULT_QUEUE_CAPACITY,OutputMode.ORDERED);} }- AsyncDataStream提供了unorderedWait、orderedWait兩類方法來將AsyncFunction作用于DataStream
- unorderedWait、orderedWait方法有帶capacity參數(shù)的也有不帶capacity參數(shù)的,不帶capacity參數(shù)即默認(rèn)使用DEFAULT_QUEUE_CAPACITY,即100;這些方法最后都是調(diào)用addOperator私有方法來實(shí)現(xiàn),它使用的是AsyncWaitOperator;unorderedWait、orderedWait方法都帶了timeout參數(shù),用于指定等待async操作完成的超時(shí)時(shí)間
- AsyncDataStream提供了兩種OutputMode,其中UNORDERED是無序的,即一旦async操作完成就emit結(jié)果,當(dāng)使用TimeCharacteristic.ProcessingTime的時(shí)候這種模式延遲最低、負(fù)載最低;ORDERED是有序的,即按element的輸入順序emit結(jié)果,為了保證有序operator需要緩沖數(shù)據(jù),因而會造成一定的延遲及負(fù)載
小結(jié)
- flink給外部數(shù)據(jù)訪問提供了Asynchronous I/O的API,用于提升streaming的吞吐量,其基本使用就是定義一個(gè)實(shí)現(xiàn)AsyncFunction接口的function,然后使用AsyncDataStream的unorderedWait或orderedWait方法將AsyncFunction作用到DataStream作為transformation
- AsyncFunction接口繼承了Function,它定義了asyncInvoke方法以及一個(gè)default的timeout方法;asyncInvoke方法執(zhí)行異步邏輯,然后通過ResultFuture.complete將結(jié)果或異常設(shè)置到ResultFuture,如果異常則通過ResultFuture.completeExceptionally(Throwable)來傳遞到ResultFuture;RichAsyncFunction繼承了AbstractRichFunction,同時(shí)聲明實(shí)現(xiàn)AsyncFunction接口,它不沒有實(shí)現(xiàn)asyncInvoke,交由子類實(shí)現(xiàn);它覆蓋了setRuntimeContext方法,這里使用RichAsyncFunctionRuntimeContext或者RichAsyncFunctionIterationRuntimeContext進(jìn)行包裝
- AsyncDataStream的unorderedWait或orderedWait有兩個(gè)關(guān)于async operation的參數(shù),一個(gè)是timeout參數(shù)用于設(shè)置async的超時(shí)時(shí)間,一個(gè)是capacity參數(shù)用于指定同一時(shí)刻最大允許多少個(gè)(并發(fā))async request在執(zhí)行;AsyncDataStream提供了兩種OutputMode,其中UNORDERED是無序的,即一旦async操作完成就emit結(jié)果,當(dāng)使用TimeCharacteristic.ProcessingTime的時(shí)候這種模式延遲最低、負(fù)載最低;ORDERED是有序的,即按element的輸入順序emit結(jié)果,為了保證有序operator需要緩沖數(shù)據(jù),因而會造成一定的延遲及負(fù)載
doc
- Asynchronous I/O for External Data Access
總結(jié)
以上是生活随笔為你收集整理的聊聊flink的Async I/O的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [NOI2018]你的名字
- 下一篇: 一步一步了解Promise原理