日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

日志服务(SLS)集成 Spark 流计算实战

發(fā)布時間:2024/8/23 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 日志服务(SLS)集成 Spark 流计算实战 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

前言

日志服務作為一站式的日志的采集與分析平臺,提供了各種用戶場景的日志采集能力,通過日志服務提供的各種與·與SDK,采集客戶端(Logtail),Producer,用戶可以非常容易的把各種數(shù)據(jù)源中的數(shù)據(jù)采集到日志服務的Logstore中。同時為了便于用戶對日志進行處理,提供了各種支持流式消費的SDK,如各種語言的消費組,與 Spark,Flink,Storm 等各種流計算技術無縫對接的Connector,以便于用戶根據(jù)自己的業(yè)務場景非常便捷的處理海量日志。

從最早的Spark Streaming到最新的Stuctured Streaming,Spark 一直是最流行的流計算框架之一。使用日志服務的Spark SDK,可以非常方便的在Spark 中消費日志服務中的數(shù)據(jù),同時也支持將 Spark 的計算結果寫入日志服務。

日志服務基礎概念

日志服務的存儲層是一個類似Kafka的Append only的FIFO消息隊列,包含如下基本概念:

  • 日志(Log):由時間、及一組不定個數(shù)的Key-Value對組成。
  • 日志組(LogGroup):一組日志的集合,包含相同Meta信息如Topic,Source,Tags等。是讀寫的基本單位。

圖-1 Log與LogGroup的關系

  • Shard:分區(qū),LogGroup讀寫基本單元,對應于Kafka的partition。
  • Logstore:日志庫,用以存放同一類日志數(shù)據(jù)。Logstore會包含1個或多個Shard。
  • Project:Logstore存放容器,包含一個或者多個Logstore。

準備工作

1)添加Maven依賴:

<dependency><groupId>com.aliyun.emr</groupId><artifactId>emr-logservice_2.11</artifactId><version>1.9.0</version> </dependency>

Github源碼下載。
2)計劃消費的日志服務project,logstore以及對應的endpoint。
3)用于訪問日志服務Open API的Access Key。

對 Spark Streaming 的支持

Spark Streaming是Spark最早推出的流計算技術,現(xiàn)在已經(jīng)進入維護狀態(tài),不再會增加新的功能。但是考慮到Spark Streaming 的使用仍然非常廣泛,我們先從Spark Streaming開始介紹。Spark Streaming 提供了一個DStream 的數(shù)據(jù)模型抽象,本質是把無界數(shù)據(jù)集拆分成一個一個的RDD,轉化為有界數(shù)據(jù)集的流式計算。每個批次處理的數(shù)據(jù)就是這段時間內從日志服務消費到的數(shù)據(jù)。

?

圖-2 DStream

Spark Streaming 從日志服務消費支持?Receiver 和?Direct?兩種消費方式。

Receiver模式

Receivers的實現(xiàn)內部實現(xiàn)基于日志服務的消費組(Consumer Library)。數(shù)據(jù)拉取與處理完全分離。消費組自動均勻分配Logstore內的所有shard到所有的Receiver,并且自動提交checkpoint到SLS。這就意味著Logstore內的shard個數(shù)與Spark 實際的并發(fā)沒有對應關系。
對于所有的Receiver,接收到的數(shù)據(jù)默認會保存在Spark Executors中,所以Failover的時候有可能造成數(shù)據(jù)丟失,這個時候就需要開啟WAL日志,Failover的時候可以從WAL中恢復,防止丟失數(shù)據(jù)。

SDK將SLS中的每行日志解析為JSON字符串形式,Receiver使用示例如下所示:

object SLSReceiverSample {def main(args: Array[String]): Unit = {val project = "your project"val logstore = "your logstore"val consumerGroup = "consumer group"val endpoint = "your endpoint"val accessKeyId = "access key id"val accessKeySecret = "access key secret"val batchInterval = Milliseconds(5 * 1000)val conf = new SparkConf().setAppName("Test SLS Loghub")val ssc = new StreamingContext(conf, batchInterval)val stream = LoghubUtils.createStream(ssc,project,logstore,consumerGroup,endpoint,accessKeyId,accessKeySecret,StorageLevel.MEMORY_AND_DISK,LogHubCursorPosition.END_CURSOR)stream.checkpoint(batchInterval * 2).foreachRDD(rdd =>rdd.map(bytes => new String(bytes)).top(10).foreach(println))ssc.checkpoint("hdfs:///tmp/spark/streaming")ssc.start()ssc.awaitTermination()} }

除Project,Logstore,Access Key 這些基礎配置外,還可以指定StorageLevel,消費開始位置等。

Direct模式

Direct模式不再需要Receiver,也不依賴于消費組,而是使用日志服務的低級API,在每個批次內直接從服務端拉取數(shù)據(jù)處理。對于Logstore中的每個Shard來說,每個批次都會讀取指定位置范圍內的數(shù)據(jù)。為了保證一致性,只有在每個批次確認正常結束之后才能把每個Shard的消費結束位置(checkpoint)保存到服務端。

為了實現(xiàn)Direct模式,SDK依賴一個本地的ZooKeeper,每個shard的checkpoint會臨時保存到本地的ZooKeeper,等用戶手動提交checkpoint時,再從ZooKeeper中同步到服務端。Failover時也是先從本地ZooKeeper中嘗試讀上一次的checkpoint,如果沒有讀到再從服務端獲取。

object SLSDirectSample {def main(args: Array[String]): Unit = {val project = "your project"val logstore = "your logstore"val consumerGroup = "consumerGroup"val endpoint = "endpoint"val accessKeyId = "access key id"val accessKeySecret = "access key secret"val batchInterval = Milliseconds(5 * 1000)val zkAddress = "localhost:2181"val conf = new SparkConf().setAppName("Test Direct SLS Loghub")val ssc = new StreamingContext(conf, batchInterval)val zkParas = Map("zookeeper.connect" -> zkAddress)val loghubStream = LoghubUtils.createDirectStream(ssc,project,logstore,consumerGroup,accessKeyId,accessKeySecret,endpoint,zkParas,LogHubCursorPosition.END_CURSOR)loghubStream.checkpoint(batchInterval).foreachRDD(rdd => {println(s"count by key: ${rdd.map(s => {s.sorted(s.length, s)}).countByKey().size}")// 手動更新checkpointloghubStream.asInstanceOf[CanCommitOffsets].commitAsync()})ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directoryssc.start()ssc.awaitTermination()} }

Direct模式示例

如何限速

在Receiver中,如果需要限制消費速度,我們只需要調整 Consumer Library 本身的參數(shù)即可。而Direct方式是在每個批次開始時從SLS拉取數(shù)據(jù),這就涉及到一個問題:一個批次內拉取多少數(shù)據(jù)才合適。如果太多,一個批次內處理不完,造成處理延時。如果太少會導worker空閑,工作不飽和,消費延時。這個時候我們就需要合理配置拉取的速度和行數(shù),實現(xiàn)一個批次盡可能多處理又能及時完成的目標。理想狀態(tài)下Spark 消費的整體速率應該與SLS采集速率一致,才能實現(xiàn)真正的實時處理。

由于SLS的數(shù)據(jù)模型是以LogGroup作為讀寫的基本單位,而一個LogGroup中可能包含上萬行日志,這就意味著Spark中直接限制每個批次的行數(shù)難以實現(xiàn)。因此,Direct限流涉及到兩個配置參數(shù):

參數(shù)說明默認值
spark.streaming.loghub.maxRatePerShard每個批次每個Shard讀取行數(shù),決定了限流的下限10000
spark.loghub.batchGet.step每次請求讀取LogGroup個數(shù),決定了限流的粒度100

可以通過適當縮小spark.loghub.batchGet.step來控制限流的精度,但是即便如此,在某些情況下還是會存在較大誤差,如一個LogGroup中存在10000行日志,spark.streaming.loghub.maxRatePerShard設置為100,spark.loghub.batchGet.step設置為1,那一個批次內該shard還是會拉取10000行日志。

兩種模式的對比

和Receiver相比,Direct有如下的優(yōu)勢:

  • 降低資源消耗,不需要占用Executor資源來作為Receiver的角色。
  • 魯棒性更好,在計算的時候才會從服務端真正消費數(shù)據(jù),降低內存使用,不再需要WAL,Failover 直接在讀一次就行了,更容易實現(xiàn)exactly once語義。
  • 簡化并行。Spark partition 與 Logstore 的 shard 個數(shù)對應,增加shard個數(shù)就能提高Spark任務處理并發(fā)上限。
  • 但是也存在一些缺點:

  • 在SLS場景下,需要依賴本地的 ZooKeeper 來保存臨時 checkpoint,當調用 commitAsync 時從?ZooKeeper同步到日志服務服務端。所以當需要重置 checkpoint 時,也需要先刪除本地?ZooKeeper?中的 checkpoint 才能生效。
  • 上一個批次保存 checkpoint 之前,下一個批次無法真正開始,否則 ZooKeeper?中的 checkpoint 可能會被更新成一個中間狀態(tài)。目前SDK在每個批次會檢查是否上一個批次的 checkpoint 還沒有提交,如果沒有提交則生成一個空批次,而不是繼續(xù)從服務端消費。
  • 在 SLS 場景下,限流方式不夠精確。
  • Spark Streaming結果寫入SLS

    與消費SLS相反,Spark Streaming的處理結果也可以直接寫入SLS。使用示例:

    ...val lines = loghubStream.map(x => x)// 轉換函數(shù)把結果中每條記錄轉為一行日志def transformFunc(x: String): LogItem = {val r = new LogItem()r.PushBack("key", x)r}val callback = new Callback with Serializable {override def onCompletion(result: Result): Unit = {println(s"Send result ${result.isSuccessful}")}}// SLS producer configval producerConfig = Map("sls.project" -> loghubProject,"sls.logstore" -> targetLogstore,"access.key.id" -> accessKeyId,"access.key.secret" -> accessKeySecret,"sls.endpoint" -> endpoint,"sls.ioThreadCount" -> "2")lines.writeToLoghub(producerConfig,"topic","streaming",transformFunc, Option.apply(callback))ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directoryssc.start()ssc.awaitTermination()

    對Structured Streaming的支持

    Structured? Streaming 并不是最近才出現(xiàn)的技術,而是早在16年就已經(jīng)出現(xiàn),但是直到 Spark 2.2.0 才正式推出。其數(shù)據(jù)模型是基于無界表的概念,流數(shù)據(jù)相當于往一個表上不斷追加行。

    圖-3 無界表模型

    與Spark Streaming相比,Structured Streaming主要有如下特點:

  • 底層實現(xiàn)基于Spark SQL引擎,可以使用大多數(shù)Spark SQL的函數(shù)。和Spark SQL共用大部分API,如果對Spark SQL熟悉的用戶,非常容易上手。復用Spark SQL的執(zhí)行引用,性能更佳。
  • 支持?Process time 和?Event time,而Spark Streaming只支持 Process Time。
  • 批流同一的API。Structured Streaming 復用Spark SQL的 DataSet/DataFrame模型,和 RDD/DStream相比更High level,易用性更好。
  • 實時性更好,默認基于micro-batch模式。在 Spark 2.3 中,還增加了連續(xù)處理模型,號稱可以做到毫秒級延遲。
  • API 對用戶更友好,只保留了SparkSession一個入口,不需要創(chuàng)建各種Context對象,使用起來更簡單。
  • SDK使用示例

    import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{StringType, StructField, StructType}object StructuredStreamingDemo {def main(args: Array[String]) {val spark = SparkSession.builder.appName("StructuredLoghubWordCount").master("local").getOrCreate()import spark.implicits._val schema = new StructType(Array(StructField("content", StringType)))val lines = spark.readStream.format("loghub").schema(schema).option("sls.project", "your project").option("sls.store", "your logstore").option("access.key.id", "your access key id").option("access.key.secret", "your access key secret").option("endpoint", "your endpoint").option("startingoffsets", "latest").load().select("content").as[String]val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()val query = wordCounts.writeStream.outputMode("complete").format("loghub").option("sls.project", "sink project").option("sls.store", "sink logstore").option("access.key.id", "your access key id").option("access.key.secret", "your access key secret").option("endpoint", "your endpoint").option("checkpointLocation", "your checkpoint dir").start()query.awaitTermination()} }

    代碼解釋:
    1)schema 聲明了我們需要的字段,除了日志中的字段外,還有如下的內部字段:

    __logProject__ __logStore__ __shard__ __time__ __topic__ __source__ __sequence_number__ // 每行日志唯一id

    如果沒有指定schema,SDK默認提供一個__value__字段,其內容為由所有字段組成的一個JSON字符串。

    2)lines 定義了一個流。
    startingoffsets:開始位置,支持:

    • latest :日志服務最新寫入位置。強烈建議從latest開始,從其他位置開始意味著需要先處理歷史數(shù)據(jù),可能需要等待較長時間才能結束。
    • earliest:日志服務中最早的日志對應的位置。
    • 或者為每個shard指定一個開始時間,以JSON形式指定。

    maxOffsetsPerTrigger:批次讀取行數(shù),SDK中默認是64*1024 。

    3)結果寫入到日志服務
    format 指定為Loghub即可。

    不足之處

  • 不支持手動提交checkpoint,SDK內部自動保存checkpoint到checkpointLocation中。
  • 不再需要提供consumerGroup名稱,也就是說checkpoint沒有保存到SLS服務端,無法在日志服務里面監(jiān)控消費延遲,只能通過Spark 任務日志觀察消費進度。

  • 原文鏈接
    本文為阿里云原創(chuàng)內容,未經(jīng)允許不得轉載。

    總結

    以上是生活随笔為你收集整理的日志服务(SLS)集成 Spark 流计算实战的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 欧美专区第一页 | 国产精品成人自拍 | 欧美在线三级 | 深夜免费在线视频 | 乖疼润滑双性初h | 97精品久久人人爽人人爽 | 日韩免费一区二区 | 欧美成人日韩 | 日韩一区二区三区在线看 | 国产精品三级视频 | 黄色性生活一级片 | 日韩欧洲亚洲AV无码精品 | 午夜视频福利网站 | 国产真人无码作爱视频免费 | 熟女国产精品一区二区三 | 欧美一区亚洲 | 好看的中文字幕电影 | 久久久久国色av免费观看性色 | 日韩精品一区二区三区av | 美女擦边视频 | 亚洲区欧美区 | a级一片 | 91在线无精精品白丝 | 毛片内射久久久一区 | 青娱乐在线免费观看 | 久99视频| 男人的天堂avav | free性护士vidos猛交 | 波多野结衣视频一区 | 男男黄网站 | 九色影院| 亚洲大片精品 | 黄色肉肉视频 | 人人干免费 | 欧美三级在线视频 | 疯狂做爰高潮videossex | 国产有码在线观看 | 国产911| 大陆一级片 | 天天干天天操天天舔 | 天天躁夜夜操 | 亚洲一区二区免费视频 | 日朝毛片 | 国产在线第一页 | 午夜激情毛片 | 丰满少妇xoxoxo视频 | 欧美aaaaaaa| 小辣椒导航 | 国产欧美日韩综合精品一区二区三区 | 全程偷拍露脸中年夫妇 | 人人妻人人澡人人爽人人欧美一区 | 天天干天天日夜夜操 | 精品九一| 午夜在线观看影院 | 日本激情免费 | 亚洲美女免费视频 | 九九热在线视频 | www.五月婷婷.com| 红猫大本营在线观看的 | 国产喷白浆一区二区三区 | 求欧美精品网址 | 亚洲在线视频播放 | 国内黄色一级片 | 国产一级黄色片子 | 伊人av综合| 美女黄色免费网站 | 在线观看高清视频 | 久久午夜网 | 欧美做爰猛烈床戏大尺度 | 最新高清无码专区 | 91在线精品播放 | 欧美日韩另类在线 | 亚洲天堂成人在线观看 | 成人久久毛片 | 欧美成人免费观看 | 国模私拍xvideos私拍 | 国产片淫乱18一级毛片动态图 | 9色视频| 特大黑人巨交吊性xxxx视频 | 国产99精品视频 | 国产精品一区在线免费观看 | 日韩免费精品视频 | 福利视频一区 | 一区二区网 | 欧美精品久久久久久久免费 | 天天成人| 超碰激情 | 精品久久久av | 色综合99久久久无码国产精品 | 国产乱淫av公 | 日韩精品一区二区三区免费视频 | 97超碰人| 美女狂揉羞羞的视频 | 不卡av在线免费观看 | 日韩在线黄色 | 久久九 | 亚洲一区二区高清 | 亚洲 小说 欧美 激情 另类 | 国产高清成人久久 |