日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > C# >内容正文

C#

c#中connect函数_Flink算子使用方法及实例演示:union和connect

發布時間:2025/3/12 C# 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 c#中connect函数_Flink算子使用方法及实例演示:union和connect 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Flink的Transformation轉換主要包括四種:單數據流基本轉換、基于Key的分組轉換、多數據流轉換和數據重分布轉換。讀者可以使用Flink Scala Shell或者Intellij Idea來進行練習:

  • Flink Scala Shell:使用交互式編程環境學習和調試Flink
  • Flink 01 | 十分鐘搭建第一個Flink應用和本地集群
  • Flink算子使用方法及實例演示:map、filter和flatMap
  • Flink算子使用方法及實例演示:keyBy、reduce和aggregations

很多情況下,我們需要對多個數據流進行整合處理,Flink為我們提供了多流轉換算子,本文主要介紹多流轉換。

union

在DataStream上使用union算子可以合并多個同類型的數據流,并生成同類型的數據流,即可以將多個DataStream[T]合并為一個新的DataStream[T]。數據將按照先進先出(First In First Out)的模式合并,且不去重。下圖union對白色和深色兩個數據流進行合并,生成一個數據流。

union示意圖

假設股票價格數據流來自不同的交易所,我們將其合并成一個數據流:

val shenzhenStockStream: DataStream[StockPrice] = ...val hongkongStockStream: DataStream[StockPrice] = ...val shanghaiStockStream: DataStream[StockPrice] = ...val unionStockStream: DataStream[StockPrice] = shenzhenStockStream.union(hongkongStockStream, shanghaiStockStream)

connect

union雖然可以合并多個數據流,但有一個限制,即多個數據流的數據類型必須相同。connect提供了和union類似的功能,用來連接兩個數據流,它與union的區別在于:

  • connect只能連接兩個數據流,union可以連接多個數據流。
  • connect所連接的兩個數據流的數據類型可以不一致,union所連接的兩個數據流的數據類型必須一致。
  • 兩個DataStream經過connect之后被轉化為ConnectedStreams,ConnectedStreams會對兩個流的數據應用不同的處理方法,且雙流之間可以共享狀態。
  • connect經常被應用在對一個數據流使用另外一個流進行控制處理的場景上,如下圖所示。控制流可以是閾值、規則、機器學習模型或其他參數。

    對一個數據流進行控制處理

    對于ConnectedStreams,我們需要重寫CoMapFunction或CoFlatMapFunction。這兩個接口都提供了三個泛型,這三個泛型分別對應第一個輸入流的數據類型、第二個輸入流的數據類型和輸出流的數據類型。在重寫函數時,對于CoMapFunction,map1處理第一個流的數據,map2處理第二個流的數據;對于CoFlatMapFunction,flatMap1處理第一個流的數據,flatMap2處理第二個流的數據。Flink并不能保證兩個函數調用順序,兩個函數的調用依賴于兩個數據流數據的流入先后順序,即第一個數據流有數據到達時,map1或flatMap1會被調用,第二個數據流有數據到達時,map2或flatMap2會被調用。下面的代碼對一個整數流和一個字符串流進行了connect操作。

    val intStream: DataStream[Int] = senv.fromElements(1, 0, 9, 2, 3, 6)val stringStream: DataStream[String] = senv.fromElements("LOW", "HIGH", "LOW", "LOW")val connectedStream: ConnectedStreams[Int, String] = intStream.connect(stringStream)// CoMapFunction三個泛型分別對應第一個流的輸入、第二個流的輸入,map之后的輸出class MyCoMapFunction extends CoMapFunction[Int, String, String] { override def map1(input1: Int): String = input1.toString override def map2(input2: String): String = input2}val mapResult = connectedStream.map(new MyCoMapFunction)

    我們知道,如果不對DataStream按照Key進行分組,數據是隨機分配在各個TaskSlot上的,而絕大多數情況我們是要對某個Key進行分析和處理,Flink允許我們將connect和keyBy或broadcast結合起來使用。例如,我們將之前的股票價格數據流與一個媒體評價數據流結合起來,按照股票代號進行分組。

    // 先將兩個流connect,再進行keyByval keyByConnect1: ConnectedStreams[StockPrice, Media] = stockPriceRawStream .connect(mediaStatusStream) .keyBy(0,0)// 先keyBy再connectval keyByConnect2: ConnectedStreams[StockPrice, Media] = stockPriceRawStream.keyBy(0).connect(mediaStatusStream.keyBy(0))

    無論先keyBy還是先connect,我們都可以將含有相同Key的數據轉發到下游同一個算子實例上。這種操作有點像SQL中的join操作。Flink也提供了join算子,join主要在時間窗口維度上,connect相比而言更廣義一些,關于join的介紹將在后續文章中介紹。

    下面的代碼展示了如何將股票價格和媒體正負面評價結合起來,當媒體評價為正且股票價格大于閾值時,輸出一個正面信號。完整代碼在我的github上:https://github.com/luweizheng/flink-tutorials

    package com.flink.tutorials.demos.stockimport java.util.Calendarimport com.flink.tutorials.demos.stock.StockPriceDemo.{StockPrice, StockPriceSource, StockPriceTimeAssigner}import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunctionimport org.apache.flink.streaming.api.functions.source.RichSourceFunctionimport org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContextimport org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collectorimport scala.util.Randomobject StockMediaConnectedDemo { def main(args: Array[String]) { // 設置執行環境 val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 每5秒生成一個Watermark env.getConfig.setAutoWatermarkInterval(5000L) // 股票價格數據流 val stockPriceRawStream: DataStream[StockPrice] = env // 該數據流由StockPriceSource類隨機生成 .addSource(new StockPriceSource) // 設置 Timestamp 和 Watermark .assignTimestampsAndWatermarks(new StockPriceTimeAssigner) val mediaStatusStream: DataStream[Media] = env .addSource(new MediaSource) // 先將兩個流connect,再進行keyBy val keyByConnect1: ConnectedStreams[StockPrice, Media] = stockPriceRawStream .connect(mediaStatusStream) .keyBy(0,0) // 先keyBy再connect val keyByConnect2: ConnectedStreams[StockPrice, Media] = stockPriceRawStream.keyBy(0) .connect(mediaStatusStream.keyBy(0)) val alert1 = keyByConnect1.flatMap(new AlertFlatMap).print() val alerts2 = keyByConnect2.flatMap(new AlertFlatMap).print() // 執行程序 env.execute("connect stock price with media status") } /** 媒體評價 * * symbol 股票代號 * timestamp 時間戳 * status 評價 正面/一般/負面 */ case class Media(symbol: String, timestamp: Long, status: String) class MediaSource extends RichSourceFunction[Media]{ var isRunning: Boolean = true val rand = new Random() var stockId = 0 override def run(srcCtx: SourceContext[Media]): Unit = { while (isRunning) { // 每次從列表中隨機選擇一只股票 stockId = rand.nextInt(5) var status: String = "NORMAL" if (rand.nextGaussian() > 0.9) { status = "POSITIVE" } else if (rand.nextGaussian() < 0.05) { status = "NEGATIVE" } val curTime = Calendar.getInstance.getTimeInMillis srcCtx.collect(Media(stockId.toString, curTime, status)) Thread.sleep(rand.nextInt(100)) } } override def cancel(): Unit = { isRunning = false } } case class Alert(symbol: String, timestamp: Long, alert: String) class AlertFlatMap extends RichCoFlatMapFunction[StockPrice, Media, Alert] { var priceMaxThreshold: List[Double] = List(101.0d, 201.0d, 301.0d, 401.0d, 501.0d) var mediaLevel: String = "NORMAL" override def flatMap1(stock: StockPrice, collector: Collector[Alert]) : Unit = { val stockId = stock.symbol.toInt if ("POSITIVE".equals(mediaLevel) && stock.price > priceMaxThreshold(stockId)) { collector.collect(Alert(stock.symbol, stock.timestamp, "POSITIVE")) } } override def flatMap2(media: Media, collector: Collector[Alert]): Unit = { mediaLevel = media.status } }}

    總結

    以上是生活随笔為你收集整理的c#中connect函数_Flink算子使用方法及实例演示:union和connect的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 国产超碰人人爽人人做人人爱 | www.日日夜夜 | 免费看成人片 | 风韵丰满熟妇啪啪区老熟熟女 | 自拍偷拍精品 | 在线高清免费观看 | 日韩欧美小视频 | 激情网络| 色在线综合 | 女同av在线 | 日本人妖网站 | 日批视频免费在线观看 | 成人v| 特黄1级潘金莲 | 伊人网成人 | 找个毛片看看 | 18av在线视频 | 亚洲另类视频 | av国产在线观看 | 在线看国产精品 | 色小姐综合| 亚洲29p| 91欧美在线视频 | 少妇三级全黄 | 伊人久久大香线 | 日日操天天操夜夜操 | 亚瑟av在线 | av网站久久| 久久国产精品波多野结衣 | 国产一级一级国产 | 欧洲a级片| 亚洲黄色在线免费观看 | 精品人妻久久久久一区二区三区 | 老牛嫩草二区三区观影体验 | 亚洲大胆视频 | 日本中出视频 | 激情图片区 | 打屁屁日本xxxxx变态 | 亚洲色图网友自拍 | 成人激情视频在线播放 | 日本成人在线一区 | 五月天天色| 久久久性色精品国产免费观看 | 丝袜美腿亚洲一区二区图片 | 黄片毛片在线看 | 蜜桃久久精品 | 丝袜老师办公室里做好紧好爽 | 日本中文字幕二区 | 国产美女被遭强高潮免费网站 | av在线播放观看 | 亚洲免费国产 | 国产精品传媒在线观看 | 日韩理论片在线观看 | 久久久久久黄 | 亚洲在线| 岛国色图 | 美女扒开腿让男人 | 欧美黄色大片免费观看 | 中国极品少妇xxxx | 色偷偷伊人 | 一区二区三区四区在线播放 | 国产精品久久久久久久久免费软件 | 非洲一级黄色片 | 亚洲图片一区二区 | 亚洲欧美黄色片 | 国产成人超碰人人澡人人澡 | 午夜老司机福利 | 日韩成人短视频 | 色图18p | 精品国产96亚洲一区二区三区 | 人人澡人人爽 | 久久精品国产亚洲av麻豆色欲 | se日韩| 欧美精品首页 | 在线一区二区三区四区 | 可以在线观看av的网站 | 免费观看a级片 | 国产成人自拍视频在线 | jizz视频在线观看 | 欧类av怡春院 | av一区三区 | 在线免费中文字幕 | 一级大黄毛片 | 亚欧洲精品在线视频免费观看 | 蜜桃av久久久亚洲精品 | 成人黄色大全 | 精品国产大片大片大片 | 久久久久一区二区精码av少妇 | 国产无遮挡免费视频 | av优选在线观看 | 播放灌醉水嫩大学生国内精品 | 国产福利一区二区 | 国产成人中文字幕 | 国产日产精品一区二区三区四区 | 99在线精品视频免费观看20 | 1024手机在线看片 | 国产精品熟妇一区二区三区四区 | 日本高清有码 | 第九色 |