Kafka JAVA客户端代码示例--高级应用
生活随笔
收集整理的這篇文章主要介紹了
Kafka JAVA客户端代码示例--高级应用
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
什么時(shí)間使用高級(jí)應(yīng)用?
使用高級(jí)應(yīng)用(調(diào)用較底層函數(shù))的缺點(diǎn)?
????SimpleConsumer需要做很多額外的工作(在以groups方式進(jìn)行消息處理時(shí)不需要)
使用底層函數(shù)(SimpleConsumer)開發(fā)的步驟
代碼示例
import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map;import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.cluster.Broker; import kafka.common.ErrorMapping; import kafka.common.TopicAndPartition; import kafka.javaapi.FetchResponse; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.TopicMetadataResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset;/*** https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example* @author Fung**/ public class ConsumerSimpleExample {public static void main(String arg[]) {String[] args={"20","page_visits","2","172.168.63.233","9092"};ConsumerSimpleExample example = new ConsumerSimpleExample();long maxReads = Long.parseLong(args[0]);String topic = args[1];int partition = Integer.parseInt(args[2]);List<String> seeds = new ArrayList<String>();seeds.add(args[3]);int port = Integer.parseInt(args[4]);try {example.run(maxReads, topic, partition, seeds, port);} catch (Exception e) {System.out.println("Oops:" + e);e.printStackTrace();}}private List<String> m_replicaBrokers = new ArrayList<String>();public ConsumerSimpleExample() {m_replicaBrokers = new ArrayList<String>();}public void run(long a_maxReads, String a_topic, int a_partition,List<String> a_seedBrokers, int a_port) throws Exception {// find the meta data about the topic and partition we are interested in//PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic,a_partition);if (metadata == null) {System.out.println("Can't find metadata for Topic and Partition. Exiting");return;}if (metadata.leader() == null) {System.out.println("Can't find Leader for Topic and Partition. Exiting");return;}String leadBroker = metadata.leader().host();String clientName = "Client_" + a_topic + "_" + a_partition;SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,100000, 64 * 1024, clientName);long readOffset = getLastOffset(consumer, a_topic, a_partition,kafka.api.OffsetRequest.LatestTime(), clientName);int numErrors = 0;while (a_maxReads > 0) {if (consumer == null) {consumer = new SimpleConsumer(leadBroker, a_port, 100000,64 * 1024, clientName);}// Note: this fetchSize of 100000 might need to be increased if// large batches are written to KafkaFetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();FetchResponse fetchResponse = consumer.fetch(req);if (fetchResponse.hasError()) {numErrors++;// Something went wrong!short code = fetchResponse.errorCode(a_topic, a_partition);System.out.println("Error fetching data from the Broker:"+ leadBroker + " Reason: " + code);if (numErrors > 5)break;if (code == ErrorMapping.OffsetOutOfRangeCode()) {// We asked for an invalid offset. For simple case ask for// the last element to resetreadOffset = getLastOffset(consumer, a_topic, a_partition,kafka.api.OffsetRequest.LatestTime(), clientName);continue;}consumer.close();consumer = null;leadBroker = findNewLeader(leadBroker, a_topic, a_partition,a_port);continue;}numErrors = 0;long numRead = 0;for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset = messageAndOffset.offset();if (currentOffset < readOffset) {System.out.println("Found an old offset: " + currentOffset+ " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);System.out.println(String.valueOf(messageAndOffset.offset())+ ": " + new String(bytes, "UTF-8"));numRead++;a_maxReads--;}if (numRead == 0) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}if (consumer != null)consumer.close();}public static long getLastOffset(SimpleConsumer consumer, String topic,int partition, long whichTime, String clientName) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic,partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);OffsetResponse response = consumer.getOffsetsBefore(request);if (response.hasError()) {System.out.println("Error fetching data Offset Data the Broker. Reason: "+ response.errorCode(topic, partition));return 0;}long[] offsets = response.offsets(topic, partition);return offsets[0];}private String findNewLeader(String a_oldLeader, String a_topic,int a_partition, int a_port) throws Exception {for (int i = 0; i < 3; i++) {boolean goToSleep = false;PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port,a_topic, a_partition);if (metadata == null) {goToSleep = true;} else if (metadata.leader() == null) {goToSleep = true;} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())&& i == 0) {// first time through if the leader hasn't changed give// ZooKeeper a second to recover// second time, assume the broker did recover before failover,// or it was a non-Broker issue//goToSleep = true;} else {return metadata.leader().host();}if (goToSleep) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}System.out.println("Unable to find new leader after Broker failure. Exiting");throw new Exception("Unable to find new leader after Broker failure. Exiting");}private PartitionMetadata findLeader(List<String> a_seedBrokers,int a_port, String a_topic, int a_partition) {PartitionMetadata returnMetaData = null;loop: for (String seed : a_seedBrokers) {SimpleConsumer consumer = null;try {consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,"leaderLookup");List<String> topics = Collections.singletonList(a_topic);TopicMetadataRequest req = new TopicMetadataRequest(topics);TopicMetadataResponse resp = consumer.send(req);List<TopicMetadata> metaData = resp.topicsMetadata();for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {if (part.partitionId() == a_partition) {returnMetaData = part;break loop;}}}} catch (Exception e) {System.out.println("Error communicating with Broker [" + seed+ "] to find Leader for [" + a_topic + ", "+ a_partition + "] Reason: " + e);} finally {if (consumer != null)consumer.close();}}if (returnMetaData != null) {m_replicaBrokers.clear();for (Broker replica : returnMetaData.replicas()) {m_replicaBrokers.add(replica.host());}}return returnMetaData;} }參考
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
轉(zhuǎn)載于:https://my.oschina.net/cloudcoder/blog/299222
總結(jié)
以上是生活随笔為你收集整理的Kafka JAVA客户端代码示例--高级应用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JSP读取My SQL数据乱码问题的解决
- 下一篇: 移动端重构系列6——切入切出动画