2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构
目錄
案例一?實(shí)時(shí)數(shù)據(jù)ETL架構(gòu)
準(zhǔn)備主題
???????模擬基站日志數(shù)據(jù)
???????實(shí)時(shí)增量ETL
案例一?實(shí)時(shí)數(shù)據(jù)ETL架構(gòu)
?????在實(shí)際實(shí)時(shí)流式項(xiàng)目中,無(wú)論使用Storm、SparkStreaming、Flink及Structured Streaming處理流式數(shù)據(jù)時(shí),往往先從Kafka 消費(fèi)原始的流式數(shù)據(jù),經(jīng)過(guò)ETL后將其存儲(chǔ)到Kafka Topic中,以便其他業(yè)務(wù)相關(guān)應(yīng)用消費(fèi)數(shù)據(jù),實(shí)時(shí)處理分析,技術(shù)架構(gòu)流程圖如下所示:
?
?????接下來(lái)模擬產(chǎn)生運(yùn)營(yíng)商基站數(shù)據(jù),實(shí)時(shí)發(fā)送到Kafka 中,使用StructuredStreaming消費(fèi),經(jīng)過(guò)ETL(獲取通話狀態(tài)為success數(shù)據(jù))后,寫(xiě)入Kafka中,便于其他實(shí)時(shí)應(yīng)用消費(fèi)處理分析。
???????準(zhǔn)備主題
創(chuàng)建Topic,相關(guān)命令如下:
#查看topic信息/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181#刪除topic/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic stationTopic/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic etlTopic#創(chuàng)建topic/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic stationTopic/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic etlTopic#模擬生產(chǎn)者/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic stationTopic/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic etlTopic#模擬消費(fèi)者/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic stationTopic --from-beginning/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic etlTopic --from-beginning
?
???????模擬基站日志數(shù)據(jù)
運(yùn)行如下代碼,實(shí)時(shí)產(chǎn)生模擬日志數(shù)據(jù),發(fā)送Kafka Topic:
package cn.itcast.structedstreamingimport java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializerimport scala.util.Random/*** 模擬產(chǎn)生基站日志數(shù)據(jù),實(shí)時(shí)發(fā)送Kafka Topic中,數(shù)據(jù)字段信息:* 基站標(biāo)識(shí)符ID, 主叫號(hào)碼, 被叫號(hào)碼, 通話狀態(tài), 通話時(shí)間,通話時(shí)長(zhǎng)*/
object MockStationLog {def main(args: Array[String]): Unit = {// 發(fā)送Kafka Topicval props = new Properties()props.put("bootstrap.servers", "node1:9092")props.put("acks", "1")props.put("retries", "3")props.put("key.serializer", classOf[StringSerializer].getName)props.put("value.serializer", classOf[StringSerializer].getName)val producer = new KafkaProducer[String, String](props)val random = new Random()val allStatus = Array("fail", "busy", "barring", "success", "success", "success","success", "success", "success", "success", "success", "success")while (true) {val callOut: String = "1860000%04d".format(random.nextInt(10000))val callIn: String = "1890000%04d".format(random.nextInt(10000))val callStatus: String = allStatus(random.nextInt(allStatus.length))val callDuration = if ("success".equals(callStatus)) (1 + random.nextInt(10)) * 1000L else 0L// 隨機(jī)產(chǎn)生一條基站日志數(shù)據(jù)val stationLog: StationLog = StationLog("station_" + random.nextInt(10),callOut,callIn,callStatus,System.currentTimeMillis(),callDuration)println(stationLog.toString)Thread.sleep(100 + random.nextInt(100))val record = new ProducerRecord[String, String]("stationTopic", stationLog.toString)producer.send(record)}producer.close() // 關(guān)閉連接}/*** 基站通話日志數(shù)據(jù)*/case class StationLog(stationId: String, //基站標(biāo)識(shí)符IDcallOut: String, //主叫號(hào)碼callIn: String, //被叫號(hào)碼callStatus: String, //通話狀態(tài)callTime: Long, //通話時(shí)間duration: Long //通話時(shí)長(zhǎng)) {override def toString: String = {s"$stationId,$callOut,$callIn,$callStatus,$callTime,$duration"}}}
運(yùn)行程序,基站通話日志數(shù)據(jù)格式如下:
?
???????實(shí)時(shí)增量ETL
編寫(xiě)代碼實(shí)時(shí)從Kafka的【stationTopic】消費(fèi)數(shù)據(jù),經(jīng)過(guò)處理分析后,存儲(chǔ)至Kafka的【etlTopic】,其中需要設(shè)置檢查點(diǎn)目錄,保證應(yīng)用一次且僅一次的語(yǔ)義。
package cn.itcast.structedstreamingimport org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** 實(shí)時(shí)從Kafka Topic消費(fèi)基站日志數(shù)據(jù),過(guò)濾獲取通話轉(zhuǎn)態(tài)為success數(shù)據(jù),再存儲(chǔ)至Kafka Topic中* 1、從KafkaTopic中獲取基站日志數(shù)據(jù)* 2、ETL:只獲取通話狀態(tài)為success日志數(shù)據(jù)* 3、最終將ETL的數(shù)據(jù)存儲(chǔ)到Kafka Topic中*/
object StructuredEtlSink {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "3").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._import org.apache.spark.sql.functions._// 1. 從KAFKA讀取數(shù)據(jù)val kafkaStreamDF: DataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "node1:9092").option("subscribe", "stationTopic").load()// 2. 對(duì)基站日志數(shù)據(jù)進(jìn)行ETL操作// station_0,18600004405,18900009049,success,1589711564033,9000val etlStreamDF: Dataset[String] = kafkaStreamDF// 獲取value字段的值,轉(zhuǎn)換為String類(lèi)型.selectExpr("CAST(value AS STRING)").as[String]// 過(guò)濾數(shù)據(jù):通話狀態(tài)為success.filter(log => StringUtils.isNoneBlank(log) && "success".equals(log.trim.split(",")(3)))etlStreamDF.printSchema()// 3. 針對(duì)流式應(yīng)用來(lái)說(shuō),輸出的是流val query: StreamingQuery = etlStreamDF.writeStream// 對(duì)流式應(yīng)用輸出來(lái)說(shuō),設(shè)置輸出模式.outputMode(OutputMode.Append()).format("kafka").option("kafka.bootstrap.servers", "node1:9092").option("topic", "etlTopic")// 設(shè)置檢查點(diǎn)目錄.option("checkpointLocation", "./ckp" + System.currentTimeMillis()).start()query.awaitTermination()query.stop()}
}
?
總結(jié)
以上是生活随笔為你收集整理的2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 2021年大数据Spark(四十九):S
- 下一篇: 2021年大数据Spark(五十一):S