日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink or Spark?实时计算框架在K12场景的应用实践

發布時間:2025/3/21 编程问答 16 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink or Spark?实时计算框架在K12场景的应用实践 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

如今,越來越多的業務場景要求 OLTP 系統能及時得到業務數據計算、分析后的結果,這就需要實時的流式計算如Flink等來保障。例如,在 TB 級別數據量的數據庫中,通過 SQL 語句或相關 API直接對原始數據進行大規模關聯、聚合操作,是無法做到在極短的時間內通過接口反饋到前端進行展示的。若想實現大規模數據的“即席查詢”,就須用實時計算框架構建實時數倉來實現。

本文通過一個教育行業的應用案例,剖析業務系統對實時計算的需求場景,并分析了 Flink和Spark 兩種實現方式的異同,最后通過運用UCloud UFlink產品中封裝的SQL模塊,來加速開發效率,更快地完成需求。

1.1 業務場景簡述

在這個 K12 教育的業務系統中,學生不僅局限于紙質的練習冊進行練習,還可以通過各類移動終端進行練習。基于移動終端,可以更方便地收集學生的學習數據,然后通過大數據分析,量化學習狀態,快速定位薄弱知識點,進行查缺補漏。

在這套業務系統中,學生在手機 App 中對老師布置的作業進行答題訓練,每次答題訓練提交的數據格式如下表所示:

字段含義舉例
student_id學生唯一ID學生ID_16
textbook_id教材唯一ID教材ID_1
grade_id年級唯一ID年級ID_1
subject_id科目唯一ID科目ID_2_語文
chapter_id章節唯一ID章節ID_chapter_2
question_id題目唯一ID題目ID_100
score當前題目扣分(0 ~ 10)2
answer_time當前題目作答完畢的日期與時間2019-09-11 12:44:01
ts當前題目作答完畢的時間戳(java.sql.Timestamp)Sep 11, 2019 12:44:01 PM

?例如,傳入到后臺的單條答題記錄數據格式如下:

{"student_id": "學生ID_16","textbook_id": "教材ID_1","grade_id": "年級ID_1","subject_id": "科目ID_2_語文","chapter_id": "章節ID_chapter_2","question_id": "題目ID_100","score": 2,"answer_time": "2019-09-11 12:44:01","ts": "Sep 11, 2019 12:44:01 PM" }

然后,基于上述實時流入的數據,需要實現如下的分析任務:

? 實時統計每個題目被作答頻次

? 按照年級實時統計題目被作答頻次

? 按照科目實時統計每個科目下題目的作答頻次

1.2 技術方案選型

針對上述幾個需求點,設計了如下的方案。首先會將數據實時發送到 Kafka 中,然后再通過實時計算框架從 Kafka 中讀取數據,并進行分析計算,最后將計算結果重新輸出到 Kafka 另外的主題中,以方便下游框架使用聚合好的結果。

下游框架從 Kafka 中拿到聚合好的數據,并實時錄入到 OLTP 的業務庫中(例如:MySQL、UDW、HBase、ES等),以便于接口將想要的結果實時反饋給前端。

中間的實時計算框架,則在Flink和Spark中選擇。2018 年 08 月 08 日,Flink 1.6.0 推出,支持狀態過期管理(FLINK-9510, FLINK-9938)、支持RocksDB、在 SQL 客戶端中支持 UDXF 函數,大大加強了 SQL 處理功能,同時還支持 DML 語句、支持基于多種時間類型的事件處理、Kafka Table Sink等功能。隨后推出的 Flink 1.6.x 系列版本中,進行了大量優化。這些使得 Flink 成為一個很好的選擇。

早先 Spark 要解決此類需求,是通過 Spark Streaming 組件實現。為此需要先生成 RDD,然后通過 RDD 算子進行分析,或者將 RDD 轉換為 DataSet\DataFrame、創建臨時視圖,并通過 SQL 語法或者 DSL 語法進行分析。相比之下顯得不夠便捷和高效。后來 Spark 2.0.0 新增了 Structured Streaming 組件,具有了更快的流式處理能力,可達到和 Flink 接近的效果。

架構如下圖所示:

本篇將省略下游框架的操作,重點介紹Flink框架進行任務計算的過程(虛線框中的內容),并簡述Spark的實現方法,便于讀者理解其異同。

1.3 實時計算在學情分析系統中的具體實現

1.3.1 Flink 實踐方案

1. 發送數據到 Kafka?

后臺服務通過 Flume 或后臺接口觸發的方式調用 Kafka 生產者 API,實時將數據發送到 Kafka 指定主題中。

例如發送數據如下所示:

{"student_id":"學生ID_16","textbook_id":"教材ID_1","grade_id":"年級ID_1","subject_id":"科目ID_2_語文", "chapter_id":"章節ID_chapter_2","question_id":"題目ID_100","score":2,"answer_time":"2019-09-11 12:44:01", "ts":"Sep 11, 2019 12:44:01 PM"} ………

提示:此處暫且忽略在 Kafka 集群中創建 Topic 的操作。

2. 編寫 Flink 任務分析代碼

使用 Flink 處理上述需求,需要將實時數據轉換為 DataStream 實例,并通過 DataStream 算子進行任務分析,另外,如果想使用 SQL 語法或者 DSL 語法進行任務分析,則需要將 DataStream 轉換為 Table 實例,并注冊臨時視圖。

(1)構建 Flink env

env(StreamExecutionEnvironment) 是 Flink 當前上下文對象,用于后續生成DataStream。代碼如下所示:

val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(3)

(2)從 Kafka 讀取答題數據

在 Flink 中讀取 Kafka 數據需要指定 KafkaSource,代碼如下所示:

val props = new Properties() props.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092") props.setProperty("group.id",?"group_consumer_learning_test01")val flinkKafkaSource = new FlinkKafkaConsumer011[String]("test_topic_learning_1", new SimpleStringSchema(), props) val eventStream = env.addSource[String](flinkKafkaSource)

(3)進行 JSON 解析

這里通過 map 算子實現 JSON 解析,代碼示例如下:

val answerDS = eventStream.map(s => {val gson = new Gson()val answer = gson.fromJson(s, classOf[Answer])answer })

(4)注冊臨時視圖

創建臨時視圖的目的,是為了在稍后可以基于 SQL 語法來進行數據分析,降低開發工作量。需要先獲取TableEnv 實例,再將 DataStream 實例轉換為 Table 實例,最后將其注冊為臨時視圖。代碼如下所示:

val tableEnv = StreamTableEnvironment.create(env) val table = tableEnv.fromDataStream(answerDS) tableEnv.registerTable("t_answer", table)

(5)進行任務分析

接下來,便可以通過 SQL 語句來進行數據分析任務了,3 個需求對應的分析代碼如下所示:

//實時:統計題目被作答頻次 val result1 = tableEnv.sqlQuery( """SELECT| question_id, COUNT(1) AS frequency|FROM| t_answer|GROUP BY| question_id""".stripMargin)//實時:按照年級統計每個題目被作答的頻次 val result2 = tableEnv.sqlQuery( """SELECT | grade_id, COUNT(1) AS frequency |FROM | t_answer |GROUP BY | grade_id """.stripMargin) //實時:統計不同科目下,每個題目被作答的頻次 val result3 = tableEnv.sqlQuery( """SELECT | subject_id, question_id, COUNT(1) AS frequency |FROM | t_answer |GROUP BY | subject_id, question_id """.stripMargin)

此時得到的 result1、result2、result3 均為 Table 實例。

(6)實時輸出分析結果

接下來,將不同需求的統計結果分別輸出到不同的 Kafka 主題中即可。

在 Flink 中,輸出數據之前,需要先將 Table 實例轉換為 DataStream 實例,然后通過 addSink 算子添加 KafkaSink即可。

因為涉及到聚合操作,Table 實例需要通過 RetractStream 來轉換為 DataStream 實例。

該部分代碼如下所示:

tableEnv.toRetractStream[Result1](result1) .filter(_._1) .map(_._2).map(new Gson().toJson(_)) .addSink(new FlinkKafkaProducer011[String]("linux01:9092,linux02:9092,linux03:9092", "test_topic_learning_2", new SimpleStringSchema()))tableEnv.toRetractStream[Result2](result2) .filter(_._1) .map(_._2) .map(new Gson().toJson(_)) .addSink(new FlinkKafkaProducer011[String]("linux01:9092,linux02:9092,linux03:9092", "test_topic_learning_3", new SimpleStringSchema())) tableEnv.toRetractStream[Result3](result3) .filter(_._1) .map(_._2) .map(new Gson().toJson(_)) .addSink(new FlinkKafkaProducer011[String]("linux01:9092,linux02:9092,linux03:9092", "test_topic_learning_4", new SimpleStringSchema()))

(7)執行分析計劃

Flink 支持多流任務同時運行,執行分析計劃代碼如下所示:

env.execute("Flink StreamingAnalysis")

至此,編譯并運行項目后,即可看到實時的統計結果,如下圖所示,從左至右的 3 個窗體中,分別代表對應需求的輸出結果。

1.3.2 Spark 基于Structured Streaming的實現

Spark發送數據到Kafka,及最后的執行分析計劃,與Flink無區別,不再展開。下面簡述差異點。

1. 編寫 Spark 任務分析代碼

(1)構建 SparkSession

如果需要使用 Spark 的Structured Streaming組件,首先需要創建 SparkSession 實例,代碼如下所示:

val sparkConf = new SparkConf() .setAppName("StreamingAnalysis") .set("spark.local.dir", "F:\\temp") .set("spark.default.parallelism", "3") .set("spark.sql.shuffle.partitions", "3") .set("spark.executor.instances", "3")val spark = SparkSession .builder .config(sparkConf) .getOrCreate()

(2)從 Kafka 讀取答題數據

接下來,從 Kafka 中實時讀取答題數,并生成 streaming-DataSet 實例,代碼如下所示:

val inputDataFrame1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092").option("subscribe", "test_topic_learning_1").load()

(3)進行 JSON 解析

?從 Kafka 讀取到數據后,進行 JSON 解析,并封裝到 Answer 實例中,代碼如下所示:

val keyValueDataset1 = inputDataFrame1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] val answerDS = keyValueDataset1.map(t => { val gson = new Gson() val answer = gson.fromJson(t._2, classOf[Answer]) answer})

其中 Answer 為 Scala 樣例類,代碼結構如下所示:

case class Answer(student_id: String,textbook_id: String,grade_id: String,subject_id: String,chapter_id: String,question_id: String,score: Int,answer_time: String,ts: Timestamp) extends Serializable

(4)創建臨時視圖

創建臨時視圖代碼如下所示:

answerDS.createTempView("t_answer")

(5)進行任務分析

僅以需求1(統計題目被作答頻次)為例,編寫代碼如下所示:

? 實時:統計題目被作答頻次

//實時:統計題目被作答頻次 val result1 = spark.sql("""SELECT| question_id, COUNT(1) AS frequency|FROM| t_answer|GROUP BY| question_id""".stripMargin).toJSON

(6)實時輸出分析結果

?僅以需求1為例,輸出到Kafka 的代碼如下所示:

result1.writeStream.outputMode("update").trigger(Trigger.ProcessingTime(0)).format("kafka").option("kafka.bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092").option("topic", "test_topic_learning_2").option("checkpointLocation", "./checkpoint_chapter11_1").start()

1.3.3 使用 UFlink SQL 加速開發

通過上文可以發現,無論基于Flink還是Spark通過編寫代碼實現數據分析任務時,都需要編寫大量的代碼,并且在生產集群上運行時,需要打包程序,然后提交打包后生成的 Jar 文件到集群上運行。

為了簡化開發者的工作量,不少開發者開始致力于 SQL 模塊的封裝,希望能夠實現只寫 SQL 語句,就完成類似上述的需求。UFlink SQL 即是 UCloud 為簡化計算模型、降低用戶使用實時計算UFlink產品門檻而推出的一套符合 SQL 語義的開發套件。通過 UFlink SQL 模塊可以快速完成這一工作,實踐如下。

1. 創建 UKafka 集群

?在UCloud控制臺UKafka創建頁,選擇配置并設置相關閾值,創建UKafka集群。

?

?更多細節可以參考UKafka產品文檔 https://docs.ucloud.cn/analysis/ukafka/index

提示:此處暫且忽略在 Kafka 集群中創建 Topic 的操作。

2. 創建 UFlink 集群

? 在UCloud控制臺UFlink創建頁,選擇配置和運行模式,創建一個 Flink 集群。

?

? 完成創建

更多細節可以參考UFlink產品文檔 https://docs.ucloud.cn/analysis/uflink/index

3. 編寫 SQL 語句

完成之后,只需要在工作空間中創建如下形式的 SQL 語句,即可完成上述3個需求分析任務。

(1)創建數據源表

創建數據源表,本質上就是為 Flink 當前上下文環境執行 addSource 操作,SQL 語句如下:

CREATE TABLE t_answer(student_id VARCHAR,textbook_id VARCHAR,grade_id VARCHAR,subject_id VARCHAR,chapter_id VARCHAR,question_id VARCHAR,score INT,answer_time VARCHAR,ts TIMESTAMP)WITH(type ='kafka11',bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092',zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka',topic ='test_topic_learning_1',groupId = 'group_consumer_learning_test01',parallelism ='3');

(2)創建結果表

創建結果表,本質上就是為 Flink 當前上下文環境執行 addSink 操作,SQL 語句如下:

CREATE TABLE t_result1( question_id VARCHAR, frequency INT) WITH( type ='kafka11', bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092', zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka', topic ='test_topic_learning_2', parallelism ='3');CREATE TABLE t_result2( grade_id VARCHAR, frequency INT) WITH( type ='kafka11', bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092', zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka', topic ='test_topic_learning_3', parallelism ='3'); CREATE TABLE t_result3( subject_id VARCHAR, question_id VARCHAR, frequency INT) WITH( type ='kafka11', bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092', zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka', topic ='test_topic_learning_4', parallelism ='3');

(3)執行查詢計劃

最后,執行查詢計劃,并向結果表中插入查詢結果,SQL 語句形式如下:

INSERT INTO t_result1 SELECT question_id, COUNT(1) AS frequency FROM t_answer GROUP BY question_id;INSERT INTO t_result2 SELECT grade_id, COUNT(1) AS frequency FROM t_answer GROUP BY grade_id; INSERT INTO t_result3 SELECT subject_id, question_id, COUNT(1) AS frequency FROM t_answer GROUP BY subject_id, question_id;

SQL 語句編寫完畢后,將其直接粘貼到 UFlink 前端頁面對話框中,并提交任務,即可快速完成上述 3 個需求。如下圖所示:

1.3.4. UFlink SQL 支持多流 JOIN

Flink、Spark 目前都支持多流 JOIN,即stream-stream join,并且也都支持Watermark處理延遲數據,以上特性均可以在 SQL 中體現,得益于此,UFlink SQL 也同樣支持純 SQL 環境下進行 JOIN 操作、維表JOIN操作、自定義函數操作、JSON數組解析、嵌套JSON解析等。更多細節歡迎大家參考 UFlink SQL 相關案例展示https://docs.ucloud.cn/analysis/uflink/dev/sql

1.4 總結

UFlink 基于 Apache Flink 構建,除100%兼容開源外,也在不斷推出 UFlink SQL 等模塊,從而提高開發效率,降低使用門檻,在性能、可靠性、易用性上為用戶創造價值。 今年8月新推出的 Flink 1.9.0,大規模變更了 Flink 架構,能夠更好地處理批、流任務,同時引入全新的 SQL 類型系統和更強大的 SQL 式任務編程。UFlink 預計將于10月底支持 Flink 1.9.0,敬請期待。

總結

以上是生活随笔為你收集整理的Flink or Spark?实时计算框架在K12场景的应用实践的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。