日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

string 中的offset_Kafka+Spark Streaming管理offset的几种方法

發布時間:2025/3/19 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 string 中的offset_Kafka+Spark Streaming管理offset的几种方法 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

來源:大數據技術與架構作者:王知無

大數據技術與架構

點擊右側關注,大數據開發領域最強公眾號!

暴走大數據

點擊右側關注,暴走大數據!

By 大數據技術與架構

場景描述:Kafka配合Spark Streaming是大數據領域常見的黃金搭檔之一,主要是用于數據實時入庫或分析。為了應對可能出現的引起Streaming程序崩潰的異常情況,我們一般都需要手動管理好Kafka的offset,而不是讓它自動提交,即需要將enable.auto.commit設為false。只有管理好offset,才能使整個流式系統最大限度地接近exactly once語義。

關鍵詞:offset Spark Streaming

Kafka+Spark Streaming主要用于實時流處理。到目前為止,在大數據領域中是一種非常常見的架構。Kafka在其中主要起著一個緩沖的作用,所有的實時數據都會經過kafka。所以對kafka offset的管理是其中至關重要的一環。

我們一般都需要手動管理好Kafka的offset,而不是讓它自動提交,即需要將enable.auto.commit設為false。

一但管理不善,就會到導致數據丟失或重復消費。

offset的管理方式

一個簡單的流程如下:

  • 在Kafka DirectStream初始化時,取得當前所有partition的存量offset,以讓DirectStream能夠從正確的位置開始讀取數據。
  • 讀取消息數據,處理并存儲結果。
  • 提交offset,并將其持久化在可靠的外部存儲中。
  • 圖中的“process and store results”及“commit offsets”兩項,都可以施加更強的限制,比如存儲結果時保證冪等性,或者提交offset時采用原子操作。

保存offset的方式

Checkpoint:

Spark Streaming的checkpoints是最基本的存儲狀態信息的方式,一般是保存在HDFS中。但是最大的問題是如果streaming程序升級的話,checkpoints的數據無法使用,所以幾乎沒人使用。

offset的三種管理方式:

自動提交offset:

  • enable.auto.commit=true。
  • 一但consumer掛掉,就會導致數據丟失或重復消費。
  • offset不可控。

Kafka自身的offset管理:

  • (屬于At-least-once語義,如果做好了冪等性,可以使用這種方式):
  • 在Kafka 0.10+版本中,offset的默認存儲由ZooKeeper移動到了一個自帶的topic中,名為__consumer_offsets。
  • Spark Streaming也專門提供了commitAsync() API用于提交offset。
  • 需要將參數修改為enable.auto.commit=false。
  • 在我實際測試中發現,這種offset的管理方式,不會丟失數據,但會出現重復消費。
  • 停掉streaming應用程序再次啟動后,會再次消費停掉前最后的一個批次數據,應該是由于offset是異步提交的方式導致,offset更新不及時引起的。
  • 因此需要做好數據的冪等性。
  • (修改源碼將異步改為同步,應該是可以做到Exactly-once語義的)

自定義offset:

  • (推薦,采用這種方式,可以做到At-least-once語義):
  • 可以將offset存放在第三方儲中,包括RDBMS、Redis、ZK、ES等。
  • 若消費數據存儲在帶事務的組件上,則強烈推薦將offset存儲在一起,借助事務實現 Exactly-once 語義。

示例

Kafka自身管理offset:

在Kafka 0.10+版本中,offset的默認存儲由ZooKeeper移動到了一個自帶的topic中,名為__consumer_offsets。所以我們讀寫offset的對象正是這個topic,Spark Streaming也專門提供了commitAsync() API用于提交offset。實際上,一切都已經封裝好了,直接調用相關API即可。

stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // 確保結果都已經正確且冪等地輸出了 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}

ZooKeeper

在Spark Streaming連接Kafka應用中使用Zookeeper來存儲offsets也是一種比較可靠的方式。

在這個方案中,Spark Streaming任務在啟動時會去Zookeeper中讀取每個分區的offsets。如果有新的分區出現,那么他的offset將會設置在最開始的位置。在每批數據處理完之后,用戶需要可以選擇存儲已處理數據的一個offset或者最后一個offset。此外,新消費者將使用跟舊的Kafka 消費者API一樣的格式將offset保存在ZooKeeper中。因此,任何追蹤或監控Zookeeper中Kafka Offset的工具仍然生效的。

一個典型的工具類:

class ZkKafkaOffsetManager(zkUrl: String) { private val logger = LoggerFactory.getLogger(classOf[ZkKafkaOffsetManager]) private val zkClientAndConn = ZkUtils.createZkClientAndConnection(zkUrl, 30000, 30000); private val zkUtils = new ZkUtils(zkClientAndConn._1, zkClientAndConn._2, false) def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = { val offsets = mutable.HashMap.empty[TopicPartition, Long] val partitionsForTopics = zkUtils.getPartitionsForTopics(topics) // /consumers//offsets// partitionsForTopics.foreach(partitions => { val topic = partitions._1 val groupTopicDirs = new ZKGroupTopicDirs(groupId, topic) partitions._2.foreach(partition => { val path = groupTopicDirs.consumerOffsetDir + "/" + partition try { val data = zkUtils.readData(path) if (data != null) { offsets.put(new TopicPartition(topic, partition), data._1.toLong) logger.info( "Read offset - topic={}, partition={}, offset={}, path={}

總結

以上是生活随笔為你收集整理的string 中的offset_Kafka+Spark Streaming管理offset的几种方法的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。