c#中connect函数_Flink算子使用方法及实例演示:union和connect
Flink的Transformation轉換主要包括四種:單數據流基本轉換、基于Key的分組轉換、多數據流轉換和數據重分布轉換。讀者可以使用Flink Scala Shell或者Intellij Idea來進行練習:
- Flink Scala Shell:使用交互式編程環境學習和調試Flink
- Flink 01 | 十分鐘搭建第一個Flink應用和本地集群
- Flink算子使用方法及實例演示:map、filter和flatMap
- Flink算子使用方法及實例演示:keyBy、reduce和aggregations
很多情況下,我們需要對多個數據流進行整合處理,Flink為我們提供了多流轉換算子,本文主要介紹多流轉換。
union
在DataStream上使用union算子可以合并多個同類型的數據流,并生成同類型的數據流,即可以將多個DataStream[T]合并為一個新的DataStream[T]。數據將按照先進先出(First In First Out)的模式合并,且不去重。下圖union對白色和深色兩個數據流進行合并,生成一個數據流。
union示意圖
假設股票價格數據流來自不同的交易所,我們將其合并成一個數據流:
val shenzhenStockStream: DataStream[StockPrice] = ...val hongkongStockStream: DataStream[StockPrice] = ...val shanghaiStockStream: DataStream[StockPrice] = ...val unionStockStream: DataStream[StockPrice] = shenzhenStockStream.union(hongkongStockStream, shanghaiStockStream)connect
union雖然可以合并多個數據流,但有一個限制,即多個數據流的數據類型必須相同。connect提供了和union類似的功能,用來連接兩個數據流,它與union的區別在于:
connect經常被應用在對一個數據流使用另外一個流進行控制處理的場景上,如下圖所示。控制流可以是閾值、規則、機器學習模型或其他參數。
對一個數據流進行控制處理
對于ConnectedStreams,我們需要重寫CoMapFunction或CoFlatMapFunction。這兩個接口都提供了三個泛型,這三個泛型分別對應第一個輸入流的數據類型、第二個輸入流的數據類型和輸出流的數據類型。在重寫函數時,對于CoMapFunction,map1處理第一個流的數據,map2處理第二個流的數據;對于CoFlatMapFunction,flatMap1處理第一個流的數據,flatMap2處理第二個流的數據。Flink并不能保證兩個函數調用順序,兩個函數的調用依賴于兩個數據流數據的流入先后順序,即第一個數據流有數據到達時,map1或flatMap1會被調用,第二個數據流有數據到達時,map2或flatMap2會被調用。下面的代碼對一個整數流和一個字符串流進行了connect操作。
val intStream: DataStream[Int] = senv.fromElements(1, 0, 9, 2, 3, 6)val stringStream: DataStream[String] = senv.fromElements("LOW", "HIGH", "LOW", "LOW")val connectedStream: ConnectedStreams[Int, String] = intStream.connect(stringStream)// CoMapFunction三個泛型分別對應第一個流的輸入、第二個流的輸入,map之后的輸出class MyCoMapFunction extends CoMapFunction[Int, String, String] { override def map1(input1: Int): String = input1.toString override def map2(input2: String): String = input2}val mapResult = connectedStream.map(new MyCoMapFunction)我們知道,如果不對DataStream按照Key進行分組,數據是隨機分配在各個TaskSlot上的,而絕大多數情況我們是要對某個Key進行分析和處理,Flink允許我們將connect和keyBy或broadcast結合起來使用。例如,我們將之前的股票價格數據流與一個媒體評價數據流結合起來,按照股票代號進行分組。
// 先將兩個流connect,再進行keyByval keyByConnect1: ConnectedStreams[StockPrice, Media] = stockPriceRawStream .connect(mediaStatusStream) .keyBy(0,0)// 先keyBy再connectval keyByConnect2: ConnectedStreams[StockPrice, Media] = stockPriceRawStream.keyBy(0).connect(mediaStatusStream.keyBy(0))無論先keyBy還是先connect,我們都可以將含有相同Key的數據轉發到下游同一個算子實例上。這種操作有點像SQL中的join操作。Flink也提供了join算子,join主要在時間窗口維度上,connect相比而言更廣義一些,關于join的介紹將在后續文章中介紹。
下面的代碼展示了如何將股票價格和媒體正負面評價結合起來,當媒體評價為正且股票價格大于閾值時,輸出一個正面信號。完整代碼在我的github上:https://github.com/luweizheng/flink-tutorials
package com.flink.tutorials.demos.stockimport java.util.Calendarimport com.flink.tutorials.demos.stock.StockPriceDemo.{StockPrice, StockPriceSource, StockPriceTimeAssigner}import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunctionimport org.apache.flink.streaming.api.functions.source.RichSourceFunctionimport org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContextimport org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collectorimport scala.util.Randomobject StockMediaConnectedDemo { def main(args: Array[String]) { // 設置執行環境 val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 每5秒生成一個Watermark env.getConfig.setAutoWatermarkInterval(5000L) // 股票價格數據流 val stockPriceRawStream: DataStream[StockPrice] = env // 該數據流由StockPriceSource類隨機生成 .addSource(new StockPriceSource) // 設置 Timestamp 和 Watermark .assignTimestampsAndWatermarks(new StockPriceTimeAssigner) val mediaStatusStream: DataStream[Media] = env .addSource(new MediaSource) // 先將兩個流connect,再進行keyBy val keyByConnect1: ConnectedStreams[StockPrice, Media] = stockPriceRawStream .connect(mediaStatusStream) .keyBy(0,0) // 先keyBy再connect val keyByConnect2: ConnectedStreams[StockPrice, Media] = stockPriceRawStream.keyBy(0) .connect(mediaStatusStream.keyBy(0)) val alert1 = keyByConnect1.flatMap(new AlertFlatMap).print() val alerts2 = keyByConnect2.flatMap(new AlertFlatMap).print() // 執行程序 env.execute("connect stock price with media status") } /** 媒體評價 * * symbol 股票代號 * timestamp 時間戳 * status 評價 正面/一般/負面 */ case class Media(symbol: String, timestamp: Long, status: String) class MediaSource extends RichSourceFunction[Media]{ var isRunning: Boolean = true val rand = new Random() var stockId = 0 override def run(srcCtx: SourceContext[Media]): Unit = { while (isRunning) { // 每次從列表中隨機選擇一只股票 stockId = rand.nextInt(5) var status: String = "NORMAL" if (rand.nextGaussian() > 0.9) { status = "POSITIVE" } else if (rand.nextGaussian() < 0.05) { status = "NEGATIVE" } val curTime = Calendar.getInstance.getTimeInMillis srcCtx.collect(Media(stockId.toString, curTime, status)) Thread.sleep(rand.nextInt(100)) } } override def cancel(): Unit = { isRunning = false } } case class Alert(symbol: String, timestamp: Long, alert: String) class AlertFlatMap extends RichCoFlatMapFunction[StockPrice, Media, Alert] { var priceMaxThreshold: List[Double] = List(101.0d, 201.0d, 301.0d, 401.0d, 501.0d) var mediaLevel: String = "NORMAL" override def flatMap1(stock: StockPrice, collector: Collector[Alert]) : Unit = { val stockId = stock.symbol.toInt if ("POSITIVE".equals(mediaLevel) && stock.price > priceMaxThreshold(stockId)) { collector.collect(Alert(stock.symbol, stock.timestamp, "POSITIVE")) } } override def flatMap2(media: Media, collector: Collector[Alert]): Unit = { mediaLevel = media.status } }}總結
以上是生活随笔為你收集整理的c#中connect函数_Flink算子使用方法及实例演示:union和connect的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: (枚举)餐厅点餐(fzu2086)
- 下一篇: 利用matlab绘制图形