日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

底层框架_你有必要了解一下Flink底层RPC使用的框架和原理

發布時間:2023/12/19 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 底层框架_你有必要了解一下Flink底层RPC使用的框架和原理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1. 前言

對于Flink中各個組件(JobMaster、TaskManager、Dispatcher等),其底層RPC框架基于Akka實現,本文著重分析Flink中的Rpc框架實現機制及梳理其通信流程。

2. Akka介紹

由于Flink底層Rpc是基于Akka實現,我們先了解下Akka的基本使用。

Akka是一個開發并發、容錯和可伸縮應用的框架。它是Actor Model的一個實現,和Erlang的并發模型很像。在Actor模型中,所有的實體被認為是獨立的actors。actors和其他actors通過發送異步消息通信。Actor模型的強大來自于異步。它也可以顯式等待響應,這使得可以執行同步操作。但是,強烈不建議同步消息,因為它們限制了系統的伸縮性。每個actor有一個郵箱(mailbox),它收到的消息存儲在里面。另外,每一個actor維護自身單獨的狀態。一個Actors網絡如下所示:

每個actor是一個單一的線程,它不斷地從其郵箱中poll(拉取)消息,并且連續不斷地處理。對于已經處理過的消息的結果,actor可以改變它自身的內部狀態或者發送一個新消息或者孵化一個新的actor。盡管單個的actor是自然有序的,但一個包含若干個actor的系統卻是高度并發的并且極具擴展性的。因為那些處理線程是所有actor之間共享的。這也是我們為什么不該在actor線程里調用可能導致阻塞的“調用”。因為這樣的調用可能會阻塞該線程使得他們無法替其他actor處理消息。

2.1. 創建Akka系統

Akka系統的核心ActorSystem和Actor,若需構建一個Akka系統,首先需要創建ActorSystem,創建完ActorSystem后,可通過其創建Actor(注意:Akka不允許直接new一個Actor,只能通過 Akka 提供的某些 API 才能創建或查找 Actor,一般會通過 ActorSystem#actorOf和ActorContext#actorOf來創建 Actor),另外,我們只能通過ActorRef(Actor的引用, 其對原生的 Actor 實例做了良好的封裝,外界不能隨意修改其內部狀態)來與Actor進行通信。如下代碼展示了如何配置一個Akka系統。

// 1. 構建ActorSystem// 使用缺省配置ActorSystem system = ActorSystem.create("sys");// 也可顯示指定appsys配置// ActorSystem system1 = ActorSystem.create("helloakka", ConfigFactory.load("appsys"));// 2. 構建Actor,獲取該Actor的引用,即ActorRefActorRef helloActor = system.actorOf(Props.create(HelloActor.class), "helloActor");// 3. 給helloActor發送消息helloActor.tell("hello helloActor", ActorRef.noSender());// 4. 關閉ActorSystemsystem.terminate();

在Akka中,創建的每個Actor都有自己的路徑,該路徑遵循 ActorSystem 的層級結構,大致如下:

本地:akka://sys/user/helloActor遠程:akka.tcp://sys@l27.0.0.1:2020/user/remoteActor

其中本地路徑含義如下:

  • sys,創建的ActorSystem的名字;

  • user,通過ActorSystem#actorOf和ActorContext#actorOf 方法創建的 Actor 都屬于/user下,與/user對應的是/system, 其是系統層面創建的,與系統整體行為有關,在開發階段并不需要對其過多關注

  • helloActor,我們創建的HelloActor

其中遠程部分路徑含義如下:

  • akka.tcp,遠程通信方式為tcp;

  • sys@127.0.0.1:2020,ActorSystem名字及遠程主機ip和端口號。

2.2. 根據path獲取Actor

若提供了Actor的路徑,可以通過路徑獲取到ActorRef,然后與之通信,代碼如下所示:

ActorSystem system = ActorSystem.create("sys");ActorSelection as = system.actorSelection("/path/to/actor");Timeout timeout = new Timeout(Duration.create(2, "seconds"));Future fu = as.resolveOne(timeout);fu.onSuccess(new OnSuccess() { @Overridepublic void onSuccess(ActorRef actor) { System.out.println("actor:" + actor); actor.tell("hello actor", ActorRef.noSender()); }}, system.dispatcher());fu.onFailure(new OnFailure() { @Overridepublic void onFailure(Throwable failure) { System.out.println("failure:" + failure); }}, system.dispatcher());

由上面可知,若需要與遠端Actor通信,路徑中必須提供ip:port。

2.3. 與Actor通信

2.3.1. tell方式

當使用tell方式時,表示僅僅使用異步方式給某個Actor發送消息,無需等待Actor的響應結果,并且也不會阻塞后續代碼的運行,如:

helloActor.tell("hello helloActor", ActorRef.noSender());

其中:第一個參數為消息,它可以是任何可序列化的數據或對象,第二個參數表示發送者,通常來講是另外一個 Actor 的引用, ActorRef.noSender()表示無發送者((實際上是一個 叫做deadLetters的Actor)。

2.3.2. ask方式

當我們需要從Actor獲取響應結果時,可使用ask方法,ask方法會將返回結果包裝在scala.concurrent.Future中,然后通過異步回調獲取返回結果。如調用方:

// 異步發送消息給Actor,并獲取響應結果Future<Object> fu = Patterns.ask(printerActor, "hello helloActor", timeout);fu.onComplete(new OnComplete<Object>() {@Overridepublic void onComplete(Throwable failure, String success) throws Throwable {if (failure != null) { System.out.println("failure is " + failure); } else { System.out.println("success is " + success); } }}, system.dispatcher());

HelloActor處理消息方法的代碼大致如下:

private void handleMessage(Object object) {if (object instanceof String) {String str = (String) object; log.info("[HelloActor] message is {}, sender is {}", str, getSender().path().toString());// 給發送者發送消息 getSender().tell(str, getSelf()); } }

上面主要介紹了Akka中的ActorSystem、Actor,及與Actor的通信;Flink借此構建了其底層通信系統。

3. RPC類圖結構

下圖展示了Flink中RPC框架中涉及的主要類。

3.1. RpcGateway

Flink的RPC協議通過RpcGateway來定義;由前面可知,若想與遠端Actor通信,則必須提供地址(ip和port),如在Flink-on-Yarn模式下,JobMaster會先啟動ActorSystem,此時TaskExecutor的Container還未分配,后面與TaskExecutor通信時,必須讓其提供對應地址,從類繼承圖可以看到基本上所有組件都實現了RpcGateway接口,其代碼如下:

public interface RpcGateway {/** * Returns the fully qualified address under which the associated rpc endpoint is reachable. * * @return Fully qualified (RPC) address under which the associated rpc endpoint is reachable */String getAddress();/** * Returns the fully qualified hostname under which the associated rpc endpoint is reachable. * * @return Fully qualified hostname under which the associated rpc endpoint is reachable */String getHostname();}

3.2. RpcEndpoint

每個RpcEndpoint對應了一個路徑(endpointId和actorSystem共同確定),每個路徑對應一個Actor,其實現了RpcGateway接口,其構造函數如下:

protected RpcEndpoint(final RpcService rpcService, final String endpointId) {// 保存rpcService和endpointIdthis.rpcService = checkNotNull(rpcService, "rpcService");this.endpointId = checkNotNull(endpointId, "endpointId");// 通過RpcService啟動RpcServerthis.rpcServer = rpcService.startServer(this);// 主線程執行器,所有調用在主線程中串行執行this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);}

在RpcEndpoint中還定義了一些方法如runAsync(Runnable)、callAsync(Callable, Time)方法來執行Rpc調用,值得注意的是在Flink的設計中,對于同一個Endpoint,所有的調用都運行在主線程,因此不會有并發問題,當啟動RpcEndpoint/進行Rpc調用時,其會委托RcpServer進行處理。

3.3. RpcService

Rpc服務的接口,其主要作用如下:

根據提供的RpcEndpoint來啟動RpcServer(Actor);

根據提供的地址連接到RpcServer,并返回一個RpcGateway;

延遲/立刻調度Runnable、Callable;

停止RpcServer(Actor)或自身服務;

在Flink中其實現類為AkkaRpcService。

3.3.1. AkkaRpcService

AkkaRpcService中封裝了ActorSystem,并保存了ActorRef到RpcEndpoint的映射關系,在構造RpcEndpoint時會啟動指定rpcEndpoint上的RpcServer,其會根據Endpoint類型(FencedRpcEndpoint或其他)來創建不同的Actor(FencedAkkaRpcActor或AkkaRpcActor),并將RpcEndpoint和Actor對應的ActorRef保存起來,然后使用動態代理創建RpcServer,具體代碼如下:

public RpcServer startServer(C rpcEndpoint) { checkNotNull(rpcEndpoint, "rpc endpoint"); CompletableFuture terminationFuture = new CompletableFuture<>();final Props akkaRpcActorProps;// 根據RpcEndpoint類型創建不同類型的Propsif (rpcEndpoint instanceof FencedRpcEndpoint) { akkaRpcActorProps = Props.create( FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion(), configuration.getMaximumFramesize()); } else { akkaRpcActorProps = Props.create( AkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion(), configuration.getMaximumFramesize()); } ActorRef actorRef;// 同步塊,創建Actor,并獲取對應的ActorRefsynchronized (lock) { checkState(!stopped, "RpcService is stopped"); actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId()); actors.put(actorRef, rpcEndpoint); } LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());// 獲取Actor的路徑final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);final String hostname; Option host = actorRef.path().address().host();if (host.isEmpty()) { hostname = "localhost"; } else { hostname = host.get(); }// 解析該RpcEndpoint實現的所有RpcGateway接口 Set> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass())); // 額外添加RpcServer和AkkaBasedEnpoint類 implementedRpcGateways.add(RpcServer.class); implementedRpcGateways.add(AkkaBasedEndpoint.class);final InvocationHandler akkaInvocationHandler;// 根據不同類型動態創建代理對象if (rpcEndpoint instanceof FencedRpcEndpoint) {// a FencedRpcEndpoint needs a FencedAkkaInvocationHandler akkaInvocationHandler = new FencedAkkaInvocationHandler<>( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), terminationFuture, ((FencedRpcEndpoint>) rpcEndpoint)::getFencingToken); implementedRpcGateways.add(FencedMainThreadExecutable.class); } else { akkaInvocationHandler = new AkkaInvocationHandler( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), terminationFuture); }// Rather than using the System ClassLoader directly, we derive the ClassLoader// from this class . That works better in cases where Flink runs embedded and all Flink// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader ClassLoader classLoader = getClass().getClassLoader();// 生成RpcServer對象,而后對該server的調用都會進入Handler的invoke方法處理,handler實現了多個接口的方法@SuppressWarnings("unchecked") RpcServer server = (RpcServer) Proxy.newProxyInstance( classLoader, implementedRpcGateways.toArray(new Class>[implementedRpcGateways.size()]), akkaInvocationHandler);return server; }

當啟動RpcServer后,即創建了相應的Actor(注意此時Actor的處于停止狀態)和動態代理對象,需要調用RpcEndpoint#start啟動啟動Actor,此時啟動RpcEndpoint流程如下(以非FencedRpcEndpoint為例):

  • 調用RpcEndpoint#start;

  • 委托給RpcServer#start;

  • 調用動態代理的AkkaInvocationHandler#invoke;發現調用的是StartStoppable#start方法,則直接進行本地方法調用;invoke方法的代碼如下:

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Class> declaringClass = method.getDeclaringClass(); Object result;// 先匹配指定類型(handler已實現接口的方法),若匹配成功則直接進行本地方法調用;若匹配為FencedRpcGateway類型,則拋出異常(應該在FencedAkkaInvocationHandler中處理);其他則進行Rpc調用if (declaringClass.equals(AkkaBasedEndpoint.class) || declaringClass.equals(Object.class) || declaringClass.equals(RpcGateway.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(RpcServer.class)) { result = method.invoke(this, args); } else if (declaringClass.equals(FencedRpcGateway.class)) {throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" + method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a " +"fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " +"retrieve a properly FencedRpcGateway."); } else { result = invokeRpc(method, args); }return result; }
  • 調用AkkaInvocationHandler#start;

  • 通過ActorRef#tell給對應的Actor發送消息rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());;

  • 調用AkkaRpcActor#handleControlMessage處理控制類型消息;

  • 在主線程中將自身狀態變更為Started狀態;

經過上述步驟就完成了Actor的啟動過程,Actor啟動后便可與Acto通信讓其執行代碼(如runSync/callSync等)和處理Rpc請求了。下面分別介紹處理執行代碼和處理Rpc請求;

3.3.1.1. 執行代碼

與Actor通信,通過調用runSync/callSync等方法其直接執行代碼。

下面以scheduleRunAsync方法為例分析請求Actor執行代碼流程,方法代碼如下:public void scheduleRunAsync(Runnable runnable, long delayMillis) { checkNotNull(runnable, "runnable"); checkArgument(delayMillis >= 0, "delay must be zero or greater");// 判斷是否為本地Actorif (isLocal) {long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000);// 向Actor發送消息runnable tell(new RunAsync(runnable, atTimeNanos)); } else {// 拋出異常,不支持遠程發送Runnable消息throw new RuntimeException("Trying to send a Runnable to a remote actor at " + rpcEndpoint.path() + ". This is not supported."); } }

AkkaInvocationHandler#invoke -> AkkaInvocation#scheduleRunAsync;

AkkaRpcActor#handleMessage->AkkaRpcActor#handleRpcMessage,其中handleRpcMessage方法如下:

protected void handleRpcMessage(Object message) {// 根據消息類型不同進行不同的處理if (message instanceof RunAsync) { handleRunAsync((RunAsync) message); } else if (message instanceof CallAsync) { handleCallAsync((CallAsync) message); } else if (message instanceof RpcInvocation) { handleRpcInvocation((RpcInvocation) message); } else { log.warn("Received message of unknown type {} with value {}. Dropping this message!", message.getClass().getName(), message); sendErrorIfSender(new AkkaUnknownMessageException("Received unknown message " + message +" of type " + message.getClass().getSimpleName() + '.')); } }

AkkaRpcActor#handleRunAsync,其代碼如下:

private void handleRunAsync(RunAsync runAsync) {// 獲取延遲調度時間final long timeToRun = runAsync.getTimeNanos();final long delayNanos;// 若為0或已經到了調度時間,則立刻進行調度if (timeToRun == 0 || (delayNanos = timeToRun - System.nanoTime()) <= 0) {// run immediatelytry { runAsync.getRunnable().run(); } catch (Throwable t) { log.error("Caught exception while executing runnable in main thread.", t); ExceptionUtils.rethrowIfFatalErrorOrOOM(t); } }else {// schedule for later. send a new message after the delay, which will then be immediately executed// 計算出延遲時間 FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);// 重新封裝消息 RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun);final Object envelopedSelfMessage = envelopeSelfMessage(message);// 等待指定延遲時間后給自己再發送一個消息 getContext().system().scheduler().scheduleOnce(delay, getSelf(), envelopedSelfMessage, getContext().dispatcher(), ActorRef.noSender()); } }

注意:當還未到調度時間時,該Actor會延遲一段時間后再次給自己發送消息;

3.3.1.2. 處理Rpc請求

當調用非AkkaInvocationHandler實現的方法時,則進行Rpc請求。

下面分析處理Rpc調用的流程。

AkkaInvocationHandler#invokeRpc,其方法如下:private Object invokeRpc(Method method, Object[] args) throws Exception {// 獲取方法相應的信息String methodName = method.getName(); Class>[] parameterTypes = method.getParameterTypes(); Annotation[][] parameterAnnotations = method.getParameterAnnotations(); Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);// 創建RpcInvocationMessage(可分為LocalRpcInvocation/RemoteRpcInvocation) final RpcInvocation rpcInvocation = createRpcInvocationMessage(methodName, parameterTypes, args); Class> returnType = method.getReturnType(); final Object result;// 無返回,則使用tell方法if (Objects.equals(returnType, Void.TYPE)) { tell(rpcInvocation); result = null; } else {// execute an asynchronous call// 有返回,則使用ask方法 CompletableFuture> resultFuture = ask(rpcInvocation, futureTimeout); CompletableFuture> completableFuture = resultFuture.thenApply((Object o) -> {// 調用返回后進行反序列化if (o instanceof SerializedValue) {try {return ((SerializedValue>) o).deserializeValue(getClass().getClassLoader()); } catch (IOException | ClassNotFoundException e) {throw new CompletionException(new RpcException("Could not deserialize the serialized payload of RPC method : " + methodName, e)); } } else {// 直接返回return o; } });// 若返回類型為CompletableFuture則直接賦值if (Objects.equals(returnType, CompletableFuture.class)) { result = completableFuture; } else {try {// 從CompletableFuture獲取 result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit()); } catch (ExecutionException ee) {throw new RpcException("Failure while obtaining synchronous RPC result.", ExceptionUtils.stripExecutionException(ee)); } } }return result; }
  • AkkaRpcActor#handleRpcInvocation,其代碼如下:

private void handleRpcInvocation(RpcInvocation rpcInvocation) { Method rpcMethod = null;try {// 獲取方法的信息 String methodName = rpcInvocation.getMethodName(); Class>[] parameterTypes = rpcInvocation.getParameterTypes();// 在RpcEndpoint中找指定方法 rpcMethod = lookupRpcMethod(methodName, parameterTypes); } catch (ClassNotFoundException e) { log.error("Could not load method arguments.", e);// 異常處理 RpcConnectionException rpcException = new RpcConnectionException("Could not load method arguments.", e); getSender().tell(new Status.Failure(rpcException), getSelf()); } catch (IOException e) { log.error("Could not deserialize rpc invocation message.", e);// 異常處理 RpcConnectionException rpcException = new RpcConnectionException("Could not deserialize rpc invocation message.", e); getSender().tell(new Status.Failure(rpcException), getSelf()); } catch (final NoSuchMethodException e) { log.error("Could not find rpc method for rpc invocation.", e);// 異常處理 RpcConnectionException rpcException = new RpcConnectionException("Could not find rpc method for rpc invocation.", e); getSender().tell(new Status.Failure(rpcException), getSelf()); }if (rpcMethod != null) {try {// this supports declaration of anonymous classes rpcMethod.setAccessible(true);// 返回類型為空則直接進行invokeif (rpcMethod.getReturnType().equals(Void.TYPE)) {// No return value to send back rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()); }else {final Object result;try { result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()); }catch (InvocationTargetException e) { log.debug("Reporting back error thrown in remote procedure {}", rpcMethod, e);// tell the sender about the failure getSender().tell(new Status.Failure(e.getTargetException()), getSelf());return; }final String methodName = rpcMethod.getName();// 方法返回類型為CompletableFutureif (result instanceof CompletableFuture) {final CompletableFuture> responseFuture = (CompletableFuture>) result;// 發送結果(使用Patterns發送結果給調用者,并會進行序列化并驗證結果大小) sendAsyncResponse(responseFuture, methodName); } else {// 類型非CompletableFuture,發送結果(使用Patterns發送結果給調用者,并會進行序列化并驗證結果大小) sendSyncResponse(result, methodName); } } } catch (Throwable e) { log.error("Error while executing remote procedure call {}.", rpcMethod, e);// tell the sender about the failure getSender().tell(new Status.Failure(e), getSelf()); } } }
  • 將結果返回給調用者AkkaInvocationHandler#ask;

經過上述步驟就完成Rpc(本地/遠程)調用,可以看到底層也是通過Akka提供的tell/ask方法進行通信;經過上述步驟就完成Rpc(本地/遠程)調用,可以看到底層也是通過Akka提供的tell/ask方法進行通信;

4. 總結

RPC框架是Flink任務運行的基礎,Flink整個RPC框架基于Akka實現,并對Akka中的ActorSystem、Actor進行了封裝和使用,文章主要分析了Flink底層RPC通信框架的實現和相關流程,Flink整個通信框架的組件主要由RpcEndpoint、RpcService、RpcServer、AkkaInvocationHandler、AkkaRpcActor等構成。RpcEndpoint定義了一個Actor的路徑;RpcService提供了啟動RpcServer、執行代碼體等方法;RpcServer/AkkaInvocationHandler提供了與Actor通信的接口;AkkaRpcActor為Flink封裝的Actor。

—?THE END —

猜你喜歡:

◤半年文章精選系列◥

Flink從入門到放棄之源碼解析系列

  • 《Flink組件和邏輯計劃》

  • 《Flink執行計劃生成》

  • 《JobManager中的基本組件(1)》

  • 《JobManager中的基本組件(2)》

  • 《JobManager中的基本組件(3)》

  • 《TaskManager》

  • 《算子》

  • 《網絡》

  • 《水印WaterMark》

  • 《CheckPoint》

  • 《任務調度及負載均衡》

  • 《異常處理》

大數據成神之路-基礎篇

  • 《HashSet》

  • 《HashMap》

  • 《LinkedList》

  • 《ArrayList/Vector》

  • 《ConcurrentSkipListMap》

  • 《ConcurrentHashMap1.7》

  • 《ConcurrentHashMap1.8 Part1》

  • 《ConcurrentHashMap1.8 Part2》

  • 《CopyOnWriteArrayList》

  • 《CopyOnWriteArraySet》

  • 《ConcurrentLinkedQueue》

  • 《LinkedBlockingDeque》

  • 《LinkedBlockingQueue》

  • 《ArrayBlockingQueue》

  • 《ConcurrentSkipListSet》

大數據成神之路-進階篇

  • 《JVM&NIO基礎入門》

  • 《分布式理論基礎和原理》

  • 《分布式中的常見問題解決方案(分布式鎖/事務/ID)》

  • 《Zookeeper》

  • 《RPC》

  • 《Netty入門篇》

  • 《Netty源碼篇》

  • 《Linux基礎》

Flink入門系列

  • 《Flink入門》

  • 《Flink DataSet&DataSteam API》

  • 《Flink集群部署》

  • 《Flink重啟策略》

  • 《Flink分布式緩存》

  • 《Flink廣播變量》

  • 《Flink中的Time》

  • 《Flink中的窗口》

  • 《時間戳和水印》

  • 《Broadcast廣播變量》

  • 《Flink-Kafka-Connector》

  • 《Flink之Table-&-SQL》

  • 《Flink實戰項目之實時熱銷排行》

  • 《Flink-Redis-Sink》

  • 《Flink消費Kafka寫入Mysql》

Flink高級進階

  • 《FaultTolerance》

  • 《流表對偶(duality)性》

  • 《持續查詢(ContinuousQueries)》

  • 《DataStream-Connectors之Kafka》

  • 《SQL概覽》

  • 《JOIN 算子》

  • 《TableAPI》

  • 《JOIN-LATERAL》

  • 《JOIN-LATERAL-Time Interval(Time-windowed)》

  • 《Temporal-Table-JOIN》

  • 《State》

  • 《FlinkSQL中的回退更新-Retraction》

  • 《Apache Flink結合Apache Kafka實現端到端的一致性語義》

  • 《Flink1.8.0發布!新功能搶先看》

  • 《Flink1.8.0重大更新-Flink中State的自動清除詳解》

  • 《Flink在滴滴出行的應用與實踐》

  • 《批流統一計算引擎的動力源泉—Flink Shuffle機制的重構與優化》

  • 《HBase分享 | Flink+HBase場景化解決方案》

  • 《騰訊基于Flink的實時流計算平臺演進之路》

  • 《Flink進階-Flink CEP(復雜事件處理)》

  • 《Flink基于EventTime和WaterMark處理亂序事件和晚到的數據》

  • 《Flink 最鋒利的武器:Flink SQL 入門和實戰》

  • 《Flink Back Pressure》

  • 《使用Flink讀取Kafka中的消息》

  • 《Flink on YARN部署快速入門指南》

  • 《Apache Flink狀態管理和容錯機制介紹》

Hadoop生態圈系列

  • 《Hadoop極簡入門》

  • 《MapReduce編程模型和計算框架架構原理》

  • 《分布式文件系統-HDFS》

  • 《YARN》

  • 《Hadoop機架感知》

  • 《HDFS的一個重要知識點-HDFS的數據流》

  • 《Hadoop分布式緩存(DistributedCache)》

  • 《如何從根源上解決 HDFS 小文件問題》(https://dwz.cn/FqDPpRUc)

  • 《Hadoop解決小文件存儲思路》(https://dwz.cn/2oCdmCkw)

  • 《Hadoop所支持的幾種壓縮格式》

  • 《MapReduce Join》

  • 《YARN Capacity Scheduler(容量調度器)》

  • 《hadoop上搭建hive》

  • 《基于Hadoop的數據倉庫Hive基礎知識》

  • 《Hive使用必知必會系列》

  • 《一個小知識點-Hive行轉列實現Pivot》

  • 《面試必備技能-HiveSQL優化》

  • 《HBase和Hive的區別和各自適用的場景》

  • 《一篇文章入門Hbase》

  • 《敲黑板:HBase的RowKey設計》

  • 《HBase讀寫優化》

  • 《HBase在滴滴出行的應用場景和最佳實踐》

  • 《Phoenix=HBase+SQL,讓HBase插上了翅膀》

  • 《一個知識點將你拒之門外之Hbase的二級索引》(https://dwz.cn/umfBOZ5l)

  • 《Phoenix重磅 | Phoenix核心功能原理及應用場景介紹》

  • 《DB、DW、DM、ODS、OLAP、OLTP和BI的概念理解》

  • 《Hive/HiveSQL常用優化方法全面總結》

實時計算系列(spark、kafka等)

  • 《Spark Streaming消費Kafka數據的兩種方案》

  • 《Apache Kafka簡單入門》

  • 《你不得不知道的知識-零拷貝》

  • 《Kafka在字節跳動的實踐和災備方案》

  • 《萬字長文干貨 | Kafka 事務性之冪等性實現》

  • 《Kafka最佳實踐》

  • 《Kafka Exactly-Once 之事務性實現》

  • 《Kafka連接器深度解讀之錯誤處理和死信隊列》

  • 《Spark之數據傾斜調優》

  • 《Structured Streaming 實現思路與實現概述》

  • 《Spark內存調優》

  • 《廣告點擊數實時統計:Spark StructuredStreaming + Redis Streams》

  • 《Spark Shuffle在網易的優化》

  • 《SparkSQL極簡入門》

  • 《下一代分布式消息隊列Apache Pulsar》

  • 《Pulsar與Kafka消費模型對比》

  • 《Spark SQL重點知識總結》

  • 《Structured Streaming 之狀態存儲解析》

  • 《周期性清除Spark Streaming流狀態的方法》

  • 《Spark Structured Streaming特性介紹》

  • 《Spark Streaming 反壓(Back Pressure)機制介紹》

  • 《Spark 從 Kafka 讀數設置子并發度問題》

規范和系統設計

  • 《阿里云10 PB+/天的日志系統設計和實現》

  • 《阿里云Redis開發規范》

  • 《Java中多個ifelse語句的替代設計》

  • 《面試系列:十個海量數據處理方法大總結》

雜談

  • 《作為面試官的一點點感悟,談談技術人的成長之路》

  • 《成年人的世界沒有容易二字》

  • 《我最近在關注的事》

  • 《真香》

  • 《簡單說說學習這件事》

  • 《20多歲做什么,將來才不會后悔》

  • 《2019-05-12最近的總結》

  • 《我軍新聞聯播氣勢+9999》

  • 《周末分享 | 高手的戰略》

  • 《周末分享 | 快速定位自己的缺點》

  • 《周末分享 | 我見過最高級的聰明是靠譜》

總結

以上是生活随笔為你收集整理的底层框架_你有必要了解一下Flink底层RPC使用的框架和原理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。