日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java消息顺序执行_Apache Flink:如何并行执行但保持消息顺序?

發(fā)布時(shí)間:2023/12/10 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java消息顺序执行_Apache Flink:如何并行执行但保持消息顺序? 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

請(qǐng)?jiān)谙旅嬲业绞褂脗?cè)輸出和插槽組進(jìn)行本地?cái)U(kuò)展的示例 .

package org.example

/*

* Licensed to the Apache Software Foundation (ASF) under one

* or more contributor license agreements. See the NOTICE file

* distributed with this work for additional information

* regarding copyright ownership. The ASF licenses this file

* to you under the Apache License, Version 2.0 (the

* "License"); you may not use this file except in compliance

* with the License. You may obtain a copy of the License at

*

* http://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing, software

* distributed under the License is distributed on an "AS IS" BASIS,

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

* See the License for the specific language governing permissions and

* limitations under the License.

*/

import org.apache.flink.streaming.api.functions.ProcessFunction

import org.apache.flink.streaming.api.scala._

import org.apache.flink.util.Collector

/**

* This example shows an implementation of WordCount with data from a text socket.

* To run the example make sure that the service providing the text data is already up and running.

*

* To start an example socket text stream on your local machine run netcat from a command line,

* where the parameter specifies the port number:

*

* {{{

* nc -lk 9999

* }}}

*

* Usage:

* {{{

* SocketTextStreamWordCount

* }}}

*

* This example shows how to:

*

* - use StreamExecutionEnvironment.socketTextStream

* - write a simple Flink Streaming program in scala.

* - write and use user-defined functions.

*/

object SocketTextStreamWordCount {

def main(args: Array[String]) {

if (args.length != 2) {

System.err.println("USAGE:\nSocketTextStreamWordCount ")

return

}

val hostName = args(0)

val port = args(1).toInt

val outputTag1 = OutputTag[String]("side-1")

val outputTag2 = OutputTag[String]("side-2")

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.getConfig.enableObjectReuse()

//Create streams for names and ages by mapping the inputs to the corresponding objects

val text = env.socketTextStream(hostName, port).slotSharingGroup("processElement")

val counts = text.flatMap {

_.toLowerCase.split("\\W+") filter {

_.nonEmpty

}

}

.process(new ProcessFunction[String, String] {

override def processElement(

value: String,

ctx: ProcessFunction[String, String]#Context,

out: Collector[String]): Unit = {

if (value.head <= 'm') ctx.output(outputTag1, String.valueOf(value))

else ctx.output(outputTag2, String.valueOf(value))

}

})

val sideOutputStream1: DataStream[String] = counts.getSideOutput(outputTag1)

val sideOutputStream2: DataStream[String] = counts.getSideOutput(outputTag2)

val output1 = sideOutputStream1.map {

(_, 1)

}.slotSharingGroup("map1")

.keyBy(0)

.sum(1)

val output2 = sideOutputStream2.map {

(_, 1)

}.slotSharingGroup("map2")

.keyBy(0)

.sum(1)

output1.print()

output2.print()

env.execute("Scala SocketTextStreamWordCount Example")

}

}

總結(jié)

以上是生活随笔為你收集整理的java消息顺序执行_Apache Flink:如何并行执行但保持消息顺序?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。