日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) >

storm kafkaSpout 踩坑问题记录! offset问题!

發(fā)布時(shí)間:2025/3/20 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 storm kafkaSpout 踩坑问题记录! offset问题! 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

整合kafka和storm例子網(wǎng)上很多,自行查找

?

問(wèn)題描述:

  kafka是之前早就搭建好的,新建的storm集群要消費(fèi)kafka的主題,由于kafka中已經(jīng)記錄了很多消息,storm消費(fèi)時(shí)從最開始消費(fèi)

?

問(wèn)題解決:

  下面是摘自官網(wǎng)的一段話:

How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures

As shown in the above KafkaConfig properties, you can control from where in the Kafka topic the spout begins to read by setting?KafkaConfig.startOffsetTime?as follows:

  • kafka.api.OffsetRequest.EarliestTime(): read from the beginning of the topic (i.e. from the oldest messages onwards)
  • kafka.api.OffsetRequest.LatestTime(): read from the end of the topic (i.e. any new messsages that are being written to the topic)
  • A Unix timestamp aka seconds since the epoch (e.g. via?System.currentTimeMillis()): see?How do I accurately get offsets of messages for a certain timestamp using OffsetRequest??in the Kafka FAQ
  • As the topology runs the Kafka spout keeps track of the offsets it has read and emitted by storing state information under the ZooKeeper path?SpoutConfig.zkRoot+ "/" + SpoutConfig.id. In the case of failures it recovers from the last written offset in ZooKeeper.

    Important:?When re-deploying a topology make sure that the settings for?SpoutConfig.zkRoot?and?SpoutConfig.id?were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to data loss, depending on your use case.

    This means that when a topology has run once the setting?KafkaConfig.startOffsetTime?will not have an effect for subsequent runs of the topology because now the topology will rely on the consumer state information (offsets) in ZooKeeper to determine from where it should begin (more precisely: resume) reading. If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should set the parameter?KafkaConfig.ignoreZkOffsets?to?true. If?true, the spout will always begin reading from the offset defined by?KafkaConfig.startOffsetTime?as described above.

    ?

      這段話的包含的內(nèi)容大概有,通過(guò)SpoutConfig對(duì)象的startOffsetTime字段設(shè)置消費(fèi)進(jìn)度,默認(rèn)值是kafka.api.OffsetRequest.EarliestTime(),也就是從最早的消息開始消費(fèi),如果想從最新的消息開始消費(fèi)需要手動(dòng)設(shè)置成kafka.api.OffsetRequest.LatestTime()。另外還有一個(gè)問(wèn)題是,這個(gè)字段只會(huì)在第一次消費(fèi)消息時(shí)起作用,之后消費(fèi)的offset是從zookeeper中記錄的offset開始的(存放消費(fèi)記錄的地方是SpoutConfig對(duì)象的zkroot字段,未驗(yàn)證)

      如果想要當(dāng)前的topology的消費(fèi)進(jìn)度接著上一個(gè)topology的消費(fèi)進(jìn)度繼續(xù)消費(fèi),那么不要修改SpoutConfig對(duì)象的id。換言之,如果你第一次已經(jīng)從最早的消息開始消費(fèi)了,那么如果不換id的話,它就要從最早的消息一直消費(fèi)到最新的消息,這個(gè)時(shí)候如果想要跳過(guò)中間的消息直接從最新的消息開始消費(fèi),那么修改SpoutConfig對(duì)象的id就可以了

    ?

      下面是SpoutConfig對(duì)象的一些字段的含義,其實(shí)是繼承的KafkaConfig的字段,可看源碼

      public int fetchSizeBytes = 1024 * 1024; //發(fā)給Kafka的每個(gè)FetchRequest中,用此指定想要的response中總的消息的大小public int socketTimeoutMs = 10000;//與Kafka broker的連接的socket超時(shí)時(shí)間public int fetchMaxWait = 10000; //當(dāng)服務(wù)器沒(méi)有新消息時(shí),消費(fèi)者會(huì)等待這些時(shí)間public int bufferSizeBytes = 1024 * 1024;//SimpleConsumer所使用的SocketChannel的讀緩沖區(qū)大小public MultiScheme scheme = new RawMultiScheme();//從Kafka中取出的byte[],該如何反序列化public boolean forceFromStart = false;//是否強(qiáng)制從Kafka中offset最小的開始讀起public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//從何時(shí)的offset時(shí)間開始讀,默認(rèn)為最舊的offsetpublic long maxOffsetBehind = Long.MAX_VALUE;//KafkaSpout讀取的進(jìn)度與目標(biāo)進(jìn)度相差多少,相差太多,Spout會(huì)丟棄中間的消息
       public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//如果所請(qǐng)求的offset對(duì)應(yīng)的消息在Kafka中不存在,是否使用startOffsetTime
       public int metricsTimeBucketSizeInSecs = 60;//多長(zhǎng)時(shí)間統(tǒng)計(jì)一次metrics

    ?

    轉(zhuǎn)載于:https://www.cnblogs.com/wsss/p/6745493.html

    總結(jié)

    以上是生活随笔為你收集整理的storm kafkaSpout 踩坑问题记录! offset问题!的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。