日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

string 中的offset_Kafka+Spark Streaming管理offset的两种方法

發布時間:2025/3/21 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 string 中的offset_Kafka+Spark Streaming管理offset的两种方法 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?Kafka配合Spark Streaming是大數據領域常見的黃金搭檔之一,主要是用于數據實時入庫或分析。

為了應對可能出現的引起Streaming程序崩潰的異常情況,我們一般都需要手動管理好Kafka的offset,而不是讓它自動提交,即需要將enable.auto.commit設為false。只有管理好offset,才能使整個流式系統最大限度地接近exactly once語義。

管理offset的流程

下面這張圖能夠簡要地說明管理offset的大致流程。

offset管理流程

  • 在Kafka DirectStream初始化時,取得當前所有partition的存量offset,以讓DirectStream能夠從正確的位置開始讀取數據。
  • 讀取消息數據,處理并存儲結果。
  • 提交offset,并將其持久化在可靠的外部存儲中。
  • 圖中的“process and store results”及“commit offsets”兩項,都可以施加更強的限制,比如存儲結果時保證冪等性,或者提交offset時采用原子操作。
  • 圖中提出了4種offset存儲的選項,分別是HBase、Kafka自身、HDFS和ZooKeeper。綜合考慮實現的難易度和效率,我們目前采用過的是Kafka自身與ZooKeeper兩種方案。

Kafka自身

在Kafka 0.10+版本中,offset的默認存儲由ZooKeeper移動到了一個自帶的topic中,名為__consumer_offsets。Spark Streaming也專門提供了commitAsync() API用于提交offset。使用方法如下。

stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // 確保結果都已經正確且冪等地輸出了 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}

上面是Spark Streaming官方文檔中給出的寫法。但在實際上我們總會對DStream進行一些運算,這時我們可以借助DStream的transform()算子。

var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange] stream.transform(rdd => { // 利用transform取得OffsetRanges offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }).mapPartitions(records => { var result = new ListBuffer[...]() // 處理流程 result.toList.iterator }).foreachRDD(rdd => { if (!rdd.isEmpty()) { // 數據入庫 session.createDataFrame... } // 提交offset stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) })

特別需要注意,在轉換過程中不能破壞RDD分區與Kafka分區之間的映射關系。亦即像map()/mapPartitions()這樣的算子是安全的,而會引起shuffle或者repartition的算子,如reduceByKey()/join()/coalesce()等等都是不安全的。

另外需要注意的是,HasOffsetRanges是KafkaRDD的一個trait,而CanCommitOffsets是DirectKafkaInputDStream的一個trait。從spark-streaming-kafka包的源碼中,可以看得一清二楚。

private[spark] class KafkaRDD[K, V]( sc: SparkContext, val kafkaParams: ju.Map[String, Object], val offsetRanges: Array[OffsetRange], val preferredHosts: ju.Map[TopicPartition, String], useConsumerCache: Boolean) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRangesprivate[spark] class DirectKafkaInputDStream[K, V]( _ssc: StreamingContext, locationStrategy: LocationStrategy, consumerStrategy: ConsumerStrategy[K, V], ppc: PerPartitionConfig ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets {

這就意味著不能對stream對象做transformation操作之后的結果進行強制轉換(會直接報ClassCastException),因為RDD與DStream的類型都改變了。只有RDD或DStream的包含類型為ConsumerRecord才行。

ZooKeeper

雖然Kafka將offset從ZooKeeper中移走是考慮到可能的性能問題,但ZooKeeper內部是采用樹形node結構存儲的,這使得它天生適合存儲像offset這樣細碎的結構化數據。并且我們的分區數不是很多,batch間隔也相對長(20秒),因此并沒有什么瓶頸。

Kafka中還保留了一個已經標記為過時的類ZKGroupTopicDirs,其中預先指定了Kafka相關數據的存儲路徑,借助它,我們可以方便地用ZooKeeper來管理offset。為了方便調用,將存取offset的邏輯封裝成一個類如下。

class ZkKafkaOffsetManager(zkUrl: String) { private val logger = LoggerFactory.getLogger(classOf[ZkKafkaOffsetManager]) private val zkClientAndConn = ZkUtils.createZkClientAndConnection(zkUrl, 30000, 30000); private val zkUtils = new ZkUtils(zkClientAndConn._1, zkClientAndConn._2, false) def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = { val offsets = mutable.HashMap.empty[TopicPartition, Long] val partitionsForTopics = zkUtils.getPartitionsForTopics(topics) // /consumers//offsets// partitionsForTopics.foreach(partitions => { val topic = partitions._1 val groupTopicDirs = new ZKGroupTopicDirs(groupId, topic) partitions._2.foreach(partition => { val path = groupTopicDirs.consumerOffsetDir + "/" + partition try { val data = zkUtils.readData(path) if (data != null) { offsets.put(new TopicPartition(topic, partition), data._1.toLong) logger.info( "Read offset - topic={}, partition={}, offset={}, path={}

總結

以上是生活随笔為你收集整理的string 中的offset_Kafka+Spark Streaming管理offset的两种方法的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。