日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka JAVA客户端代码示例--高级应用

發(fā)布時(shí)間:2025/4/16 编程问答 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka JAVA客户端代码示例--高级应用 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>

什么時(shí)間使用高級(jí)應(yīng)用?

  • 針對(duì)一個(gè)消息讀取多次
  • 在一個(gè)process中,僅僅處理一個(gè)topic中的一組partitions
  • 使用事務(wù),確保每個(gè)消息只被處理一次
  • 使用高級(jí)應(yīng)用(調(diào)用較底層函數(shù))的缺點(diǎn)?

    ????SimpleConsumer需要做很多額外的工作(在以groups方式進(jìn)行消息處理時(shí)不需要)

  • 在應(yīng)用程序中跟蹤上次消息處理的offset
  • 確定一個(gè)topic partition的lead broker
  • 手工處理broker leander的改變
  • 使用底層函數(shù)(SimpleConsumer)開發(fā)的步驟

  • ?通過active broker,確定topic partition的lead broker
  • 確定topic partition的replicat brokers
  • 根據(jù)需要,創(chuàng)建數(shù)據(jù)請(qǐng)求
  • 抓取數(shù)據(jù)
  • 識(shí)別lead brokder改變并進(jìn)行恢復(fù)
  • 代碼示例

    import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map;import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.cluster.Broker; import kafka.common.ErrorMapping; import kafka.common.TopicAndPartition; import kafka.javaapi.FetchResponse; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.TopicMetadataResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset;/*** https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example* @author Fung**/ public class ConsumerSimpleExample {public static void main(String arg[]) {String[] args={"20","page_visits","2","172.168.63.233","9092"};ConsumerSimpleExample example = new ConsumerSimpleExample();long maxReads = Long.parseLong(args[0]);String topic = args[1];int partition = Integer.parseInt(args[2]);List<String> seeds = new ArrayList<String>();seeds.add(args[3]);int port = Integer.parseInt(args[4]);try {example.run(maxReads, topic, partition, seeds, port);} catch (Exception e) {System.out.println("Oops:" + e);e.printStackTrace();}}private List<String> m_replicaBrokers = new ArrayList<String>();public ConsumerSimpleExample() {m_replicaBrokers = new ArrayList<String>();}public void run(long a_maxReads, String a_topic, int a_partition,List<String> a_seedBrokers, int a_port) throws Exception {// find the meta data about the topic and partition we are interested in//PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic,a_partition);if (metadata == null) {System.out.println("Can't find metadata for Topic and Partition. Exiting");return;}if (metadata.leader() == null) {System.out.println("Can't find Leader for Topic and Partition. Exiting");return;}String leadBroker = metadata.leader().host();String clientName = "Client_" + a_topic + "_" + a_partition;SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,100000, 64 * 1024, clientName);long readOffset = getLastOffset(consumer, a_topic, a_partition,kafka.api.OffsetRequest.LatestTime(), clientName);int numErrors = 0;while (a_maxReads > 0) {if (consumer == null) {consumer = new SimpleConsumer(leadBroker, a_port, 100000,64 * 1024, clientName);}// Note: this fetchSize of 100000 might need to be increased if// large batches are written to KafkaFetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();FetchResponse fetchResponse = consumer.fetch(req);if (fetchResponse.hasError()) {numErrors++;// Something went wrong!short code = fetchResponse.errorCode(a_topic, a_partition);System.out.println("Error fetching data from the Broker:"+ leadBroker + " Reason: " + code);if (numErrors > 5)break;if (code == ErrorMapping.OffsetOutOfRangeCode()) {// We asked for an invalid offset. For simple case ask for// the last element to resetreadOffset = getLastOffset(consumer, a_topic, a_partition,kafka.api.OffsetRequest.LatestTime(), clientName);continue;}consumer.close();consumer = null;leadBroker = findNewLeader(leadBroker, a_topic, a_partition,a_port);continue;}numErrors = 0;long numRead = 0;for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset = messageAndOffset.offset();if (currentOffset < readOffset) {System.out.println("Found an old offset: " + currentOffset+ " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);System.out.println(String.valueOf(messageAndOffset.offset())+ ": " + new String(bytes, "UTF-8"));numRead++;a_maxReads--;}if (numRead == 0) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}if (consumer != null)consumer.close();}public static long getLastOffset(SimpleConsumer consumer, String topic,int partition, long whichTime, String clientName) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic,partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);OffsetResponse response = consumer.getOffsetsBefore(request);if (response.hasError()) {System.out.println("Error fetching data Offset Data the Broker. Reason: "+ response.errorCode(topic, partition));return 0;}long[] offsets = response.offsets(topic, partition);return offsets[0];}private String findNewLeader(String a_oldLeader, String a_topic,int a_partition, int a_port) throws Exception {for (int i = 0; i < 3; i++) {boolean goToSleep = false;PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port,a_topic, a_partition);if (metadata == null) {goToSleep = true;} else if (metadata.leader() == null) {goToSleep = true;} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())&& i == 0) {// first time through if the leader hasn't changed give// ZooKeeper a second to recover// second time, assume the broker did recover before failover,// or it was a non-Broker issue//goToSleep = true;} else {return metadata.leader().host();}if (goToSleep) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}System.out.println("Unable to find new leader after Broker failure. Exiting");throw new Exception("Unable to find new leader after Broker failure. Exiting");}private PartitionMetadata findLeader(List<String> a_seedBrokers,int a_port, String a_topic, int a_partition) {PartitionMetadata returnMetaData = null;loop: for (String seed : a_seedBrokers) {SimpleConsumer consumer = null;try {consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,"leaderLookup");List<String> topics = Collections.singletonList(a_topic);TopicMetadataRequest req = new TopicMetadataRequest(topics);TopicMetadataResponse resp = consumer.send(req);List<TopicMetadata> metaData = resp.topicsMetadata();for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {if (part.partitionId() == a_partition) {returnMetaData = part;break loop;}}}} catch (Exception e) {System.out.println("Error communicating with Broker [" + seed+ "] to find Leader for [" + a_topic + ", "+ a_partition + "] Reason: " + e);} finally {if (consumer != null)consumer.close();}}if (returnMetaData != null) {m_replicaBrokers.clear();for (Broker replica : returnMetaData.replicas()) {m_replicaBrokers.add(replica.host());}}return returnMetaData;} }

    參考

    https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

    轉(zhuǎn)載于:https://my.oschina.net/cloudcoder/blog/299222

    總結(jié)

    以上是生活随笔為你收集整理的Kafka JAVA客户端代码示例--高级应用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。