日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka 学习 非常详细的经典教程

發布時間:2023/12/3 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka 学习 非常详细的经典教程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

轉自:?https://blog.csdn.net/tangdong3415/article/details/53432166?

?

一、基本概念

介紹

Kafka是一個分布式的、可分區的、可復制的消息系統。它提供了普通消息系統的功能,但具有自己獨特的設計。

這個獨特的設計是什么樣的呢?

首先讓我們看幾個基本的消息系統術語:
Kafka將消息以topic為單位進行歸納。
將向Kafka topic發布消息的程序成為producers.
將預訂topics并消費消息的程序成為consumer.
Kafka以集群的方式運行,可以由一個或多個服務組成,每個服務叫做一個broker.
producers通過網絡將消息發送到Kafka集群,集群向消費者提供消息,如下圖所示:

?

客戶端和服務端通過TCP協議通信。Kafka提供了Java客戶端,并且對多種語言都提供了支持。


Topics 和Logs

先來看一下Kafka提供的一個抽象概念:topic.
一個topic是對一組消息的歸納。對每個topic,Kafka 對它的日志進行了分區,如下圖所示:
?

每個分區都由一系列有序的、不可變的消息組成,這些消息被連續的追加到分區中。分區中的每個消息都有一個連續的序列號叫做offset,用來在分區中唯一的標識這個消息。
在一個可配置的時間段內,Kafka集群保留所有發布的消息,不管這些消息有沒有被消費。比如,如果消息的保存策略被設置為2天,那么在一個消息被發布的兩天時間內,它都是可以被消費的。之后它將被丟棄以釋放空間。Kafka的性能是和數據量無關的常量級的,所以保留太多的數據并不是問題。

實際上每個consumer唯一需要維護的數據是消息在日志中的位置,也就是offset.這個offset有consumer來維護:一般情況下隨著consumer不斷的讀取消息,這offset的值不斷增加,但其實consumer可以以任意的順序讀取消息,比如它可以將offset設置成為一個舊的值來重讀之前的消息。

以上特點的結合,使Kafka consumers非常的輕量級:它們可以在不對集群和其他consumer造成影響的情況下讀取消息。你可以使用命令行來"tail"消息而不會對其他正在消費消息的consumer造成影響。

將日志分區可以達到以下目的:首先這使得每個日志的數量不會太大,可以在單個服務上保存。另外每個分區可以單獨發布和消費,為并發操作topic提供了一種可能。

分布式

每個分區在Kafka集群的若干服務中都有副本,這樣這些持有副本的服務可以共同處理數據和請求,副本數量是可以配置的。副本使Kafka具備了容錯能力。
每個分區都由一個服務器作為“leader”,零或若干服務器作為“followers”,leader負責處理消息的讀和寫,followers則去復制leader.如果leader down了,followers中的一臺則會自動成為leader。集群中的每個服務都會同時扮演兩個角色:作為它所持有的一部分分區的leader,同時作為其他分區的followers,這樣集群就會據有較好的負載均衡。

Producers

Producer將消息發布到它指定的topic中,并負責決定發布到哪個分區。通常簡單的由負載均衡機制隨機選擇分區,但也可以通過特定的分區函數選擇分區。使用的更多的是第二種。


Consumers

發布消息通常有兩種模式:隊列模式(queuing)和發布-訂閱模式(publish-subscribe)。隊列模式中,consumers可以同時從服務端讀取消息,每個消息只被其中一個consumer讀到;發布-訂閱模式中消息被廣播到所有的consumer中。Consumers可以加入一個consumer 組,共同競爭一個topic,topic中的消息將被分發到組中的一個成員中。同一組中的consumer可以在不同的程序中,也可以在不同的機器上。如果所有的consumer都在一個組中,這就成為了傳統的隊列模式,在各consumer中實現負載均衡。如果所有的consumer都不在不同的組中,這就成為了發布-訂閱模式,所有的消息都被分發到所有的consumer中。更常見的是,每個topic都有若干數量的consumer組,每個組都是一個邏輯上的“訂閱者”,為了容錯和更好的穩定性,每個組由若干consumer組成。這其實就是一個發布-訂閱模式,只不過訂閱者是個組而不是單個consumer。

?

由兩個機器組成的集群擁有4個分區 (P0-P3) 2個consumer組. A組有兩個consumerB組有4個

相比傳統的消息系統,Kafka可以很好的保證有序性。
傳統的隊列在服務器上保存有序的消息,如果多個consumers同時從這個服務器消費消息,服務器就會以消息存儲的順序向consumer分發消息。雖然服務器按順序發布消息,但是消息是被異步的分發到各consumer上,所以當消息到達時可能已經失去了原來的順序,這意味著并發消費將導致順序錯亂。為了避免故障,這樣的消息系統通常使用“專用consumer”的概念,其實就是只允許一個消費者消費消息,當然這就意味著失去了并發性。

在這方面Kafka做的更好,通過分區的概念,Kafka可以在多個consumer組并發的情況下提供較好的有序性和負載均衡。將每個分區分只分發給一個consumer組,這樣一個分區就只被這個組的一個consumer消費,就可以順序的消費這個分區的消息。因為有多個分區,依然可以在多個consumer組之間進行負載均衡。注意consumer組的數量不能多于分區的數量,也就是有多少分區就允許多少并發消費。

Kafka只能保證一個分區之內消息的有序性,在不同的分區之間是不可以的,這已經可以滿足大部分應用的需求。如果需要topic中所有消息的有序性,那就只能讓這個topic只有一個分區,當然也就只有一個consumer組消費它。

###########################################

二、環境搭建


Step 1: 下載Kafka

點擊下載最新的版本并解壓.
?

  • > tar -xzf kafka_2.9.2-0.8.1.1.tgz
  • > cd kafka_2.9.2-0.8.1.1
  • 復制代碼




    Step 2: 啟動服務

    Kafka用到了Zookeeper,所有首先啟動Zookper,下面簡單的啟用一個單實例的Zookkeeper服務。可以在命令的結尾加個&符號,這樣就可以啟動后離開控制臺。

  • > bin/zookeeper-server-start.sh config/zookeeper.properties &
  • [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
  • ...
  • 復制代碼



    現在啟動Kafka:

  • > bin/kafka-server-start.sh config/server.properties
  • [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
  • [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
  • ...
  • 復制代碼



    Step 3: 創建 topic

    創建一個叫做“test”的topic,它只有一個分區,一個副本。

  • > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  • 復制代碼



    可以通過list命令查看創建的topic:

  • > bin/kafka-topics.sh --list --zookeeper localhost:2181
  • test
  • 復制代碼



    除了手動創建topic,還可以配置broker讓它自動創建topic.

    Step 4:發送消息.

    Kafka 使用一個簡單的命令行producer,從文件中或者從標準輸入中讀取消息并發送到服務端。默認的每條命令將發送一條消息。

    運行producer并在控制臺中輸一些消息,這些消息將被發送到服務端:

  • > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test?
  • This is a messageThis is another message
  • 復制代碼



    ctrl+c可以退出發送。

    Step 5: 啟動consumer

    Kafka also has a command line consumer that will dump out messages to standard output.
    Kafka也有一個命令行consumer可以讀取消息并輸出到標準輸出:

  • > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
  • This is a message
  • This is another message
  • 復制代碼



    你在一個終端中運行consumer命令行,另一個終端中運行producer命令行,就可以在一個終端輸入消息,另一個終端讀取消息。
    這兩個命令都有自己的可選參數,可以在運行的時候不加任何參數可以看到幫助信息。

    Step 6: 搭建一個多個broker的集群

    剛才只是啟動了單個broker,現在啟動有3個broker組成的集群,這些broker節點也都是在本機上的:
    首先為每個節點編寫配置文件:
    ?

  • > cp config/server.properties config/server-1.properties
  • > cp config/server.properties config/server-2.properties
  • 復制代碼



    在拷貝出的新文件中添加以下參數:

  • config/server-1.properties:
  • ? ? broker.id=1
  • ? ? port=9093
  • ? ? log.dir=/tmp/kafka-logs-1
  • 復制代碼


    ?

  • config/server-2.properties:
  • ? ? broker.id=2
  • ? ? port=9094
  • ? ? log.dir=/tmp/kafka-logs-2
  • 復制代碼



    broker.id在集群中唯一的標注一個節點,因為在同一個機器上,所以必須制定不同的端口和日志文件,避免數據被覆蓋。

    We already have Zookeeper and our single node started, so we just need to start the two new nodes:
    剛才已經啟動可Zookeeper和一個節點,現在啟動另外兩個節點:

  • > bin/kafka-server-start.sh config/server-1.properties &
  • ...
  • > bin/kafka-server-start.sh config/server-2.properties &
  • ...
  • 復制代碼



    創建一個擁有3個副本的topic:

  • > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
  • 復制代碼



    現在我們搭建了一個集群,怎么知道每個節點的信息呢?運行“"describe topics”命令就可以了:

  • > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
  • 復制代碼

    ?

  • Topic:my-replicated-topic? ?? ? PartitionCount:1? ?? ???ReplicationFactor:3? ???Configs:
  • ? ?? ???Topic: my-replicated-topic? ?? ?Partition: 0? ? Leader: 1? ?? ? Replicas: 1,2,0 Isr: 1,2,0
  • 復制代碼



    下面解釋一下這些輸出。第一行是對所有分區的一個描述,然后每個分區都會對應一行,因為我們只有一個分區所以下面就只加了一行。
    leader?:負責處理消息的讀和寫,leader是從所有節點中隨機選擇的.
    replicas?:列出了所有的副本節點,不管節點是否在服務中.
    isr?:是正在服務中的節點.
    在我們的例子中,節點1是作為leader運行。
    向topic發送消息:
    ?

  • > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
  • 復制代碼

    ?

  • ...
  • my test message 1my test message 2^C
  • 復制代碼



    消費這些消息:

  • > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
  • 復制代碼

    ?

  • ...
  • my test message 1
  • my test message 2
  • ^C
  • 復制代碼



    測試一下容錯能力.Broker 1作為leader運行,現在我們kill掉它:

  • > ps | grep server-1.properties7564 ttys002? ? 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...
  • > kill -9 7564
  • 復制代碼



    另外一個節點被選做了leader,node 1 不再出現在 in-sync 副本列表中:

  • > bin/kafka-topics.sh --describe --zookeeper localhost:218192 --topic my-replicated-topic
  • Topic:my-replicated-topic? ?? ? PartitionCount:1? ?? ???ReplicationFactor:3? ???Configs:
  • ? ?? ???Topic: my-replicated-topic? ?? ?Partition: 0? ? Leader: 2? ?? ? Replicas: 1,2,0 Isr: 2,0
  • 復制代碼



    雖然最初負責續寫消息的leader down掉了,但之前的消息還是可以消費的:

  • > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
  • ...
  • my test message 1
  • my test message 2
  • 復制代碼




    看來Kafka的容錯機制還是不錯的。

    ################################################

    三、搭建Kafka開發環境

    我們搭建了kafka的服務器,并可以使用Kafka的命令行工具創建topic,發送和接收消息。下面我們來搭建kafka的開發環境。
    添加依賴

    搭建開發環境需要引入kafka的jar包,一種方式是將Kafka安裝包中lib下的jar包加入到項目的classpath中,這種比較簡單了。不過我們使用另一種更加流行的方式:使用maven管理jar包依賴。
    創建好maven項目后,在pom.xml中添加以下依賴:
    ?

  • <dependency>
  • ? ?? ???<groupId> org.apache.kafka</groupId >
  • ? ?? ???<artifactId> kafka_2.10</artifactId >
  • ? ?? ???<version> 0.8.0</ version>
  • </dependency>
  • 復制代碼


    添加依賴后你會發現有兩個jar包的依賴找不到。沒關系我都幫你想好了,點擊這里下載這兩個jar包,解壓后你有兩種選擇,第一種是使用mvn的install命令將jar包安裝到本地倉庫,另一種是直接將解壓后的文件夾拷貝到mvn本地倉庫的com文件夾下,比如我的本地倉庫是d:\mvn,完成后我的目錄結構是這樣的:

    ?


    配置程序

    首先是一個充當配置文件作用的接口,配置了Kafka的各種連接參數:

  • package com.sohu.kafkademon;
  • public interface KafkaProperties
  • {
  • ? ? final static String zkConnect = "10.22.10.139:2181";
  • ? ? final static String groupId = "group1";
  • ? ? final static String topic = "topic1";
  • ? ? final static String kafkaServerURL = "10.22.10.139";
  • ? ? final static int kafkaServerPort = 9092;
  • ? ? final static int kafkaProducerBufferSize = 64 * 1024;
  • ? ? final static int connectionTimeOut = 20000;
  • ? ? final static int reconnectInterval = 10000;
  • ? ? final static String topic2 = "topic2";
  • ? ? final static String topic3 = "topic3";
  • ? ? final static String clientId = "SimpleConsumerDemoClient";
  • }
  • 復制代碼


    producer
    ?

  • package com.sohu.kafkademon;
  • import java.util.Properties;
  • import kafka.producer.KeyedMessage;
  • import kafka.producer.ProducerConfig;
  • /**
  • * @author leicui?bourne_cui@163.com
  • */
  • public class KafkaProducer extends Thread
  • {
  • ? ? private final kafka.javaapi.producer.Producer<Integer, String> producer;
  • ? ? private final String topic;
  • ? ? private final Properties props = new Properties();
  • ? ? public KafkaProducer(String topic)
  • ? ? {
  • ? ?? ???props.put("serializer.class", "kafka.serializer.StringEncoder");
  • ? ?? ???props.put("metadata.broker.list", "10.22.10.139:9092");
  • ? ?? ???producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
  • ? ?? ???this.topic = topic;
  • ? ? }
  • ? ? @Override
  • ? ? public void run() {
  • ? ?? ???int messageNo = 1;
  • ? ?? ???while (true)
  • ? ?? ???{
  • ? ?? ?? ?? ?String messageStr = new String("Message_" + messageNo);
  • ? ?? ?? ?? ?System.out.println("Send:" + messageStr);
  • ? ?? ?? ?? ?producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
  • ? ?? ?? ?? ?messageNo++;
  • ? ?? ?? ?? ?try {
  • ? ?? ?? ?? ?? ? sleep(3000);
  • ? ?? ?? ?? ?} catch (InterruptedException e) {
  • ? ?? ?? ?? ?? ? // TODO Auto-generated catch block
  • ? ?? ?? ?? ?? ? e.printStackTrace();
  • ? ?? ?? ?? ?}
  • ? ?? ???}
  • ? ? }
  • }
  • 復制代碼


    consumer

  • package com.sohu.kafkademon;
  • import java.util.HashMap;
  • import java.util.List;
  • import java.util.Map;
  • import java.util.Properties;
  • import kafka.consumer.ConsumerConfig;
  • import kafka.consumer.ConsumerIterator;
  • import kafka.consumer.KafkaStream;
  • import kafka.javaapi.consumer.ConsumerConnector;
  • /**
  • * @author leicui?bourne_cui@163.com
  • */
  • public class KafkaConsumer extends Thread
  • {
  • ? ? private final ConsumerConnector consumer;
  • ? ? private final String topic;
  • ? ? public KafkaConsumer(String topic)
  • ? ? {
  • ? ?? ???consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
  • ? ?? ?? ?? ?? ? createConsumerConfig());
  • ? ?? ???this.topic = topic;
  • ? ? }
  • ? ? private static ConsumerConfig createConsumerConfig()
  • ? ? {
  • ? ?? ???Properties props = new Properties();
  • ? ?? ???props.put("zookeeper.connect", KafkaProperties.zkConnect);
  • ? ?? ???props.put("group.id", KafkaProperties.groupId);
  • ? ?? ???props.put("zookeeper.session.timeout.ms", "40000");
  • ? ?? ???props.put("zookeeper.sync.time.ms", "200");
  • ? ?? ???props.put("auto.commit.interval.ms", "1000");
  • ? ?? ???return new ConsumerConfig(props);
  • ? ? }
  • ? ? @Override
  • ? ? public void run() {
  • ? ?? ???Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  • ? ?? ???topicCountMap.put(topic, new Integer(1));
  • ? ?? ???Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  • ? ?? ???KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
  • ? ?? ???ConsumerIterator<byte[], byte[]> it = stream.iterator();
  • ? ?? ???while (it.hasNext()) {
  • ? ?? ?? ?? ?System.out.println("receive:" + new String(it.next().message()));
  • ? ?? ?? ?? ?try {
  • ? ?? ?? ?? ?? ? sleep(3000);
  • ? ?? ?? ?? ?} catch (InterruptedException e) {
  • ? ?? ?? ?? ?? ? e.printStackTrace();
  • ? ?? ?? ?? ?}
  • ? ?? ???}
  • ? ? }
  • }
  • 復制代碼


    簡單的發送接收

    運行下面這個程序,就可以進行簡單的發送接收消息了:

  • package com.sohu.kafkademon;
  • /**
  • * @author leicui?bourne_cui@163.com
  • */
  • public class KafkaConsumerProducerDemo
  • {
  • ? ? public static void main(String[] args)
  • ? ? {
  • ? ?? ???KafkaProducer producerThread = new KafkaProducer(KafkaProperties.topic);
  • ? ?? ???producerThread.start();
  • ? ?? ???KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic);
  • ? ?? ???consumerThread.start();
  • ? ? }
  • }
  • 復制代碼


    高級別的consumer

    下面是比較負載的發送接收的程序:

  • package com.sohu.kafkademon;
  • import java.util.HashMap;
  • import java.util.List;
  • import java.util.Map;
  • import java.util.Properties;
  • import kafka.consumer.ConsumerConfig;
  • import kafka.consumer.ConsumerIterator;
  • import kafka.consumer.KafkaStream;
  • import kafka.javaapi.consumer.ConsumerConnector;
  • /**
  • * @author leicui?bourne_cui@163.com
  • */
  • public class KafkaConsumer extends Thread
  • {
  • ? ? private final ConsumerConnector consumer;
  • ? ? private final String topic;
  • ? ? public KafkaConsumer(String topic)
  • ? ? {
  • ? ?? ???consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
  • ? ?? ?? ?? ?? ? createConsumerConfig());
  • ? ?? ???this.topic = topic;
  • ? ? }
  • ? ? private static ConsumerConfig createConsumerConfig()
  • ? ? {
  • ? ?? ???Properties props = new Properties();
  • ? ?? ???props.put("zookeeper.connect", KafkaProperties.zkConnect);
  • ? ?? ???props.put("group.id", KafkaProperties.groupId);
  • ? ?? ???props.put("zookeeper.session.timeout.ms", "40000");
  • ? ?? ???props.put("zookeeper.sync.time.ms", "200");
  • ? ?? ???props.put("auto.commit.interval.ms", "1000");
  • ? ?? ???return new ConsumerConfig(props);
  • ? ? }
  • ? ? @Override
  • ? ? public void run() {
  • ? ?? ???Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  • ? ?? ???topicCountMap.put(topic, new Integer(1));
  • ? ?? ???Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  • ? ?? ???KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
  • ? ?? ???ConsumerIterator<byte[], byte[]> it = stream.iterator();
  • ? ?? ???while (it.hasNext()) {
  • ? ?? ?? ?? ?System.out.println("receive:" + new String(it.next().message()));
  • ? ?? ?? ?? ?try {
  • ? ?? ?? ?? ?? ? sleep(3000);
  • ? ?? ?? ?? ?} catch (InterruptedException e) {
  • ? ?? ?? ?? ?? ? e.printStackTrace();
  • ? ?? ?? ?? ?}
  • ? ?? ???}
  • ? ? }
  • }
  • 復制代碼


    ############################################################

    四、數據持久化


    不要畏懼文件系統!

    Kafka大量依賴文件系統去存儲和緩存消息。對于硬盤有個傳統的觀念是硬盤總是很慢,這使很多人懷疑基于文件系統的架構能否提供優異的性能。實際上硬盤的快慢完全取決于使用它的方式。設計良好的硬盤架構可以和內存一樣快。

    在6塊7200轉的SATA RAID-5磁盤陣列的線性寫速度差不多是600MB/s,但是隨即寫的速度卻是100k/s,差了差不多6000倍。現代的操作系統都對次做了大量的優化,使用了 read-ahead 和 write-behind的技巧,讀取的時候成塊的預讀取數據,寫的時候將各種微小瑣碎的邏輯寫入組織合并成一次較大的物理寫入。對此的深入討論可以查看這里,它們發現線性的訪問磁盤,很多時候比隨機的內存訪問快得多。

    為了提高性能,現代操作系統往往使用內存作為磁盤的緩存,現代操作系統樂于把所有空閑內存用作磁盤緩存,雖然這可能在緩存回收和重新分配時犧牲一些性能。所有的磁盤讀寫操作都會經過這個緩存,這不太可能被繞開除非直接使用I/O。所以雖然每個程序都在自己的線程里只緩存了一份數據,但在操作系統的緩存里還有一份,這等于存了兩份數據。

    另外再來討論一下JVM,以下兩個事實是眾所周知的:

    ?Java對象占用空間是非常大的,差不多是要存儲的數據的兩倍甚至更高。

    ?隨著堆中數據量的增加,垃圾回收回變的越來越困難。

    基于以上分析,如果把數據緩存在內存里,因為需要存儲兩份,不得不使用兩倍的內存空間,Kafka基于JVM,又不得不將空間再次加倍,再加上要避免GC帶來的性能影響,在一個32G內存的機器上,不得不使用到28-30G的內存空間。并且當系統重啟的時候,又必須要將數據刷到內存中( 10GB 內存差不多要用10分鐘),就算使用冷刷新(不是一次性刷進內存,而是在使用數據的時候沒有就刷到內存)也會導致最初的時候新能非常慢。但是使用文件系統,即使系統重啟了,也不需要刷新數據。使用文件系統也簡化了維護數據一致性的邏輯。

    所以與傳統的將數據緩存在內存中然后刷到硬盤的設計不同,Kafka直接將數據寫到了文件系統的日志中。

    常量時間的操作效率

    在大多數的消息系統中,數據持久化的機制往往是為每個cosumer提供一個B樹或者其他的隨機讀寫的數據結構。B樹當然是很棒的,但是也帶了一些代價:比如B樹的復雜度是O(log N),O(log N)通常被認為就是常量復雜度了,但對于硬盤操作來說并非如此。磁盤進行一次搜索需要10ms,每個硬盤在同一時間只能進行一次搜索,這樣并發處理就成了問題。雖然存儲系統使用緩存進行了大量優化,但是對于樹結構的性能的觀察結果卻表明,它的性能往往隨著數據的增長而線性下降,數據增長一倍,速度就會降低一倍。

    直觀的講,對于主要用于日志處理的消息系統,數據的持久化可以簡單的通過將數據追加到文件中實現,讀的時候從文件中讀就好了。這樣做的好處是讀和寫都是 O(1) 的,并且讀操作不會阻塞寫操作和其他操作。這樣帶來的性能優勢是很明顯的,因為性能和數據的大小沒有關系了。

    既然可以使用幾乎沒有容量限制(相對于內存來說)的硬盤空間建立消息系統,就可以在沒有性能損失的情況下提供一些一般消息系統不具備的特性。比如,一般的消息系統都是在消息被消費后立即刪除,Kafka卻可以將消息保存一段時間(比如一星期),這給consumer提供了很好的機動性和靈活性,這點在今后的文章中會有詳述。

    ############################################################

    五、消息傳輸的事務定義

    之前討論了consumer和producer是怎么工作的,現在來討論一下數據傳輸方面。數據傳輸的事務定義通常有以下三種級別:

    • 最多一次: 消息不會被重復發送,最多被傳輸一次,但也有可能一次不傳輸。
    • 最少一次: 消息不會被漏發送,最少被傳輸一次,但也有可能被重復傳輸.
    • 精確的一次(Exactly once): 不會漏傳輸也不會重復傳輸,每個消息都傳輸被一次而且僅僅被傳輸一次,這是大家所期望的。

    大多數消息系統聲稱可以做到“精確的一次”,但是仔細閱讀它們的的文檔可以看到里面存在誤導,比如沒有說明當consumer或producer失敗時怎么樣,或者當有多個consumer并行時怎么樣,或寫入硬盤的數據丟失時又會怎么樣。kafka的做法要更先進一些。當發布消息時,Kafka有一個“committed”的概念,一旦消息被提交了,只要消息被寫入的分區的所在的副本broker是活動的,數據就不會丟失。關于副本的活動的概念,下節文檔會討論。現在假設broker是不會down的。

    如果producer發布消息時發生了網絡錯誤,但又不確定實在提交之前發生的還是提交之后發生的,這種情況雖然不常見,但是必須考慮進去,現在Kafka版本還沒有解決這個問題,將來的版本正在努力嘗試解決。

    并不是所有的情況都需要“精確的一次”這樣高的級別,Kafka允許producer靈活的指定級別。比如producer可以指定必須等待消息被提交的通知,或者完全的異步發送消息而不等待任何通知,或者僅僅等待leader聲明它拿到了消息(followers沒有必要)。

    現在從consumer的方面考慮這個問題,所有的副本都有相同的日志文件和相同的offset,consumer維護自己消費的消息的offset,如果consumer不會崩潰當然可以在內存中保存這個值,當然誰也不能保證這點。如果consumer崩潰了,會有另外一個consumer接著消費消息,它需要從一個合適的offset繼續處理。這種情況下可以有以下選擇:
    ?

    • consumer可以先讀取消息,然后將offset寫入日志文件中,然后再處理消息。這存在一種可能就是在存儲offset后還沒處理消息就crash了,新的consumer繼續從這個offset處理,那么就會有些消息永遠不會被處理,這就是上面說的“最多一次”。
    • consumer可以先讀取消息,處理消息,最后記錄offset,當然如果在記錄offset之前就crash了,新的consumer會重復的消費一些消息,這就是上面說的“最少一次”。
    • “精確一次”可以通過將提交分為兩個階段來解決:保存了offset后提交一次,消息處理成功之后再提交一次。但是還有個更簡單的做法:將消息的offset和消息被處理后的結果保存在一起。比如用Hadoop ETL處理消息時,將處理后的結果和offset同時保存在HDFS中,這樣就能保證消息和offser同時被處理了。



    ############################################################

    六、性能優化

    Kafka在提高效率方面做了很大努力。Kafka的一個主要使用場景是處理網站活動日志,吞吐量是非常大的,每個頁面都會產生好多次寫操作。讀方面,假設每個消息只被消費一次,讀的量的也是很大的,Kafka也盡量使讀的操作更輕量化。

    我們之前討論了磁盤的性能問題,線性讀寫的情況下影響磁盤性能問題大約有兩個方面:太多的瑣碎的I/O操作和太多的字節拷貝。I/O問題發生在客戶端和服務端之間,也發生在服務端內部的持久化的操作中。

    消息集(message set)
    為了避免這些問題,Kafka建立了“消息集(message set)”的概念,將消息組織到一起,作為處理的單位。以消息集為單位處理消息,比以單個的消息為單位處理,會提升不少性能。Producer把消息集一塊發送給服務端,而不是一條條的發送;服務端把消息集一次性的追加到日志文件中,這樣減少了瑣碎的I/O操作。consumer也可以一次性的請求一個消息集。

    另外一個性能優化是在字節拷貝方面。在低負載的情況下這不是問題,但是在高負載的情況下它的影響還是很大的。為了避免這個問題,Kafka使用了標準的二進制消息格式,這個格式可以在producer,broker和producer之間共享而無需做任何改動。

    zero copy
    Broker維護的消息日志僅僅是一些目錄文件,消息集以固定隊的格式寫入到日志文件中,這個格式producer和consumer是共享的,這使得Kafka可以一個很重要的點進行優化:消息在網絡上的傳遞。現代的unix操作系統提供了高性能的將數據從頁面緩存發送到socket的系統函數,在linux中,這個函數是sendfile.

    為了更好的理解sendfile的好處,我們先來看下一般將數據從文件發送到socket的數據流向:

    • 操作系統把數據從文件拷貝內核中的頁緩存中
    • 應用程序從頁緩存從把數據拷貝自己的內存緩存中
    • 應用程序將數據寫入到內核中socket緩存中
    • 操作系統把數據從socket緩存中拷貝到網卡接口緩存,從這里發送到網絡上。


    這顯然是低效率的,有4次拷貝和2次系統調用。Sendfile通過直接將數據從頁面緩存發送網卡接口緩存,避免了重復拷貝,大大的優化了性能。
    在一個多consumers的場景里,數據僅僅被拷貝到頁面緩存一次而不是每次消費消息的時候都重復的進行拷貝。這使得消息以近乎網絡帶寬的速率發送出去。這樣在磁盤層面你幾乎看不到任何的讀操作,因為數據都是從頁面緩存中直接發送到網絡上去了。
    這篇文章?詳細介紹了sendfile和zero-copy技術在Java方面的應用。

    數據壓縮
    很多時候,性能的瓶頸并非CPU或者硬盤而是網絡帶寬,對于需要在數據中心之間傳送大量數據的應用更是如此。當然用戶可以在沒有Kafka支持的情況下各自壓縮自己的消息,但是這將導致較低的壓縮率,因為相比于將消息單獨壓縮,將大量文件壓縮在一起才能起到最好的壓縮效果。
    Kafka采用了端到端的壓縮:因為有“消息集”的概念,客戶端的消息可以一起被壓縮后送到服務端,并以壓縮后的格式寫入日志文件,以壓縮的格式發送到consumer,消息從producer發出到consumer拿到都被是壓縮的,只有在consumer使用的時候才被解壓縮,所以叫做“端到端的壓縮”。
    Kafka支持GZIP和Snappy壓縮協議。更詳細的內容可以查看?這里?。

    ##########################################################

    七、Producer和Consumer


    Kafka Producer?消息發送
    producer直接將數據發送到broker的leader(主節點),不需要在多個節點進行分發。為了幫助producer做到這點,所有的Kafka節點都可以及時的告知:哪些節點是活動的,目標topic目標分區的leader在哪。這樣producer就可以直接將消息發送到目的地了。

    客戶端控制消息將被分發到哪個分區。可以通過負載均衡隨機的選擇,或者使用分區函數。Kafka允許用戶實現分區函數,指定分區的key,將消息hash到不同的分區上(當然有需要的話,也可以覆蓋這個分區函數自己實現邏輯).比如如果你指定的key是user id,那么同一個用戶發送的消息都被發送到同一個分區上。經過分區之后,consumer就可以有目的的消費某個分區的消息。

    異步發送
    批量發送可以很有效的提高發送效率。Kafka producer的異步發送模式允許進行批量發送,先將消息緩存在內存中,然后一次請求批量發送出去。這個策略可以配置的,比如可以指定緩存的消息達到某個量的時候就發出去,或者緩存了固定的時間后就發送出去(比如100條消息就發送,或者每5秒發送一次)。這種策略將大大減少服務端的I/O次數。

    既然緩存是在producer端進行的,那么當producer崩潰時,這些消息就會丟失。Kafka0.8.1的異步發送模式還不支持回調,就不能在發送出錯時進行處理。Kafka 0.9可能會增加這樣的回調函數。見?Proposed Producer API?.

    Kafka Consumer
    Kafa consumer消費消息時,向broker發出"fetch"請求去消費特定分區的消息。consumer指定消息在日志中的偏移量(offset),就可以消費從這個位置開始的消息。customer擁有了offset的控制權,可以向后回滾去重新消費之前的消息,這是很有意義的。

    推還是拉?
    Kafka最初考慮的問題是,customer應該從brokes拉取消息還是brokers將消息推送到consumer,也就是pull還push。在這方面,Kafka遵循了一種大部分消息系統共同的傳統的設計:producer將消息推送到broker,consumer從broker拉取消息。

    一些消息系統比如Scribe和Apache Flume采用了push模式,將消息推送到下游的consumer。這樣做有好處也有壞處:由broker決定消息推送的速率,對于不同消費速率的consumer就不太好處理了。消息系統都致力于讓consumer以最大的速率最快速的消費消息,但不幸的是,push模式下,當broker推送的速率遠大于consumer消費的速率時,consumer恐怕就要崩潰了。最終Kafka還是選取了傳統的pull模式。

    Pull模式的另外一個好處是consumer可以自主決定是否批量的從broker拉取數據。Push模式必須在不知道下游consumer消費能力和消費策略的情況下決定是立即推送每條消息還是緩存之后批量推送。如果為了避免consumer崩潰而采用較低的推送速率,將可能導致一次只推送較少的消息而造成浪費。Pull模式下,consumer就可以根據自己的消費能力去決定這些策略。

    Pull有個缺點是,如果broker沒有可供消費的消息,將導致consumer不斷在循環中輪詢,直到新消息到t達。為了避免這點,Kafka有個參數可以讓consumer阻塞知道新消息到達(當然也可以阻塞知道消息的數量達到某個特定的量這樣就可以批量發送)。

    消費狀態跟蹤
    對消費消息狀態的記錄也是很重要的。
    大部分消息系統在broker端的維護消息被消費的記錄:一個消息被分發到consumer后broker就馬上進行標記或者等待customer的通知后進行標記。這樣也可以在消息在消費后立馬就刪除以減少空間占用。

    但是這樣會不會有什么問題呢??如果一條消息發送出去之后就立即被標記為消費過的,一旦consumer處理消息時失敗了(比如程序崩潰)消息就丟失了。為了解決這個問題,很多消息系統提供了另外一個個功能:當消息被發送出去之后僅僅被標記為已發送狀態,當接到consumer已經消費成功的通知后才標記為已被消費的狀態。這雖然解決了消息丟失的問題,但產生了新問題,首先如果consumer處理消息成功了但是向broker發送響應時失敗了,這條消息將被消費兩次。第二個問題時,broker必須維護每條消息的狀態,并且每次都要先鎖住消息然后更改狀態然后釋放鎖。這樣麻煩又來了,且不說要維護大量的狀態數據,比如如果消息發送出去但沒有收到消費成功的通知,這條消息將一直處于被鎖定的狀態,
    Kafka采用了不同的策略。Topic被分成了若干分區,每個分區在同一時間只被一個consumer消費。這意味著每個分區被消費的消息在日志中的位置僅僅是一個簡單的整數:offset。這樣就很容易標記每個分區消費狀態就很容易了,僅僅需要一個整數而已。這樣消費狀態的跟蹤就很簡單了。

    這帶來了另外一個好處:consumer可以把offset調成一個較老的值,去重新消費老的消息。這對傳統的消息系統來說看起來有些不可思議,但確實是非常有用的,誰規定了一條消息只能被消費一次呢?consumer發現解析數據的程序有bug,在修改bug后再來解析一次消息,看起來是很合理的額呀!

    離線處理消息
    高級的數據持久化允許consumer每個隔一段時間批量的將數據加載到線下系統中比如?Hadoop?或者數據倉庫。這種情況下,Hadoop可以將加載任務分拆,拆成每個broker或每個topic或每個分區一個加載任務。Hadoop具有任務管理功能,當一個任務失敗了就可以重啟而不用擔心數據被重新加載,只要從上次加載的位置繼續加載消息就可以了。

    #########################################################


    八、主從同步


    Kafka允許topic的分區擁有若干副本,這個數量是可以配置的,你可以為每個topci配置副本的數量。Kafka會自動在每個個副本上備份數據,所以當一個節點down掉時數據依然是可用的。

    Kafka的副本功能不是必須的,你可以配置只有一個副本,這樣其實就相當于只有一份數據。
    創建副本的單位是topic的分區,每個分區都有一個leader和零或多個followers.所有的讀寫操作都由leader處理,一般分區的數量都比broker的數量多的多,各分區的leader均勻的分布在brokers中。所有的followers都復制leader的日志,日志中的消息和順序都和leader中的一致。flowers向普通的consumer那樣從leader那里拉取消息并保存在自己的日志文件中。

    許多分布式的消息系統自動的處理失敗的請求,它們對一個節點是否

    著(alive)”有著清晰的定義。?Kafka判斷一個節點是否活著有兩個條件:

    • 節點必須可以維護和ZooKeeper的連接,Zookeeper通過心跳機制檢查每個節點的連接。
    • 如果節點是個follower,他必須能及時的同步leader的寫操作,延時不能太久。

    符合以上條件的節點準確的說應該是“同步中的(in sync)”,而不是模糊的說是“活著的”或是“失敗的”。Leader會追蹤所有“同步中”的節點,一旦一個down掉了,或是卡住了,或是延時太久,leader就會把它移除。至于延時多久算是“太久”,是由參數replica.lag.max.messages決定的,怎樣算是卡住了,怎是由參數replica.lag.time.max.ms決定的。?

    只有當消息被所有的副本加入到日志中時,才算是“committed”,只有committed的消息才會發送給consumer,這樣就不用擔心一旦leader down掉了消息會丟失。Producer也可以選擇是否等待消息被提交的通知,這個是由參數request.required.acks決定的。
    Kafka保證只要有一個“同步中”的節點,“committed”的消息就不會丟失。

    Leader的選擇
    Kafka的核心是日志文件,日志文件在集群中的同步是分布式數據系統最基礎的要素。

    如果leaders永遠不會down的話我們就不需要followers了!一旦leader down掉了,需要在followers中選擇一個新的leader.但是followers本身有可能延時太久或者crash,所以必須選擇高質量的follower作為leader.必須保證,一旦一個消息被提交了,但是leader down掉了,新選出的leader必須可以提供這條消息。大部分的分布式系統采用了多數投票法則選擇新的leader,對于多數投票法則,就是根據所有副本節點的狀況動態的選擇最適合的作為leader.Kafka并不是使用這種方法。

    Kafaka動態維護了一個同步狀態的副本的集合(a set of in-sync replicas),簡稱ISR,在這個集合中的節點都是和leader保持高度一致的,任何一條消息必須被這個集合中的每個節點讀取并追加到日志中了,才回通知外部這個消息已經被提交了。因此這個集合中的任何一個節點隨時都可以被選為leader.ISR在ZooKeeper中維護。ISR中有f+1個節點,就可以允許在f個節點down掉的情況下不會丟失消息并正常提供服。ISR的成員是動態的,如果一個節點被淘汰了,當它重新達到“同步中”的狀態時,他可以重新加入ISR.這種leader的選擇方式是非常快速的,適合kafka的應用場景。

    一個邪惡的想法:如果所有節點都down掉了怎么辦?Kafka對于數據不會丟失的保證,是基于至少一個節點是存活的,一旦所有節點都down了,這個就不能保證了。
    實際應用中,當所有的副本都down掉時,必須及時作出反應。可以有以下兩種選擇:

    • 等待ISR中的任何一個節點恢復并擔任leader。
    • 選擇所有節點中(不只是ISR)第一個恢復的節點作為leader.

    這是一個在可用性和連續性之間的權衡。如果等待ISR中的節點恢復,一旦ISR中的節點起不起來或者數據都是了,那集群就永遠恢復不了了。如果等待ISR意外的節點恢復,這個節點的數據就會被作為線上數據,有可能和真實的數據有所出入,因為有些數據它可能還沒同步到。Kafka目前選擇了第二種策略,在未來的版本中將使這個策略的選擇可配置,可以根據場景靈活的選擇。
    這種窘境不只Kafka會遇到,幾乎所有的分布式數據系統都會遇到。

    副本管理
    以上僅僅以一個topic一個分區為例子進行了討論,但實際上一個Kafka將會管理成千上萬的topic分區.Kafka盡量的使所有分區均勻的分布到集群所有的節點上而不是集中在某些節點上,另外主從關系也盡量均衡這樣每個幾點都會擔任一定比例的分區的leader.
    優化leader的選擇過程也是很重要的,它決定了系統發生故障時的空窗期有多久。Kafka選擇一個節點作為“controller”,當發現有節點down掉的時候它負責在游泳分區的所有節點中選擇新的leader,這使得Kafka可以批量的高效的管理所有分區節點的主從關系。如果controller down掉了,活著的節點中的一個會備切換為新的controller.

    ###################################################

    九、客戶端API

    Kafka Producer APIs
    Procuder API有兩種:kafka.producer.SyncProducer和kafka.producer.async.AsyncProducer.它們都實現了同一個接口:

  • class Producer {
  • /* 將消息發送到指定分區 */
  • publicvoid send(kafka.javaapi.producer.ProducerData<K,V> producerData);
  • /* 批量發送一批消息 */
  • publicvoid send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);
  • /* 關閉producer */
  • publicvoid close();
  • }
  • 復制代碼




    Producer API提供了以下功能:

    • 可以將多個消息緩存到本地隊列里,然后異步的批量發送到broker,可以通過參數producer.type=async做到。緩存的大小可以通過一些參數指定:queue.time和batch.size。一個后臺線程((kafka.producer.async.ProducerSendThread)從隊列中取出數據并讓kafka.producer.EventHandler將消息發送到broker,也可以通過參數event.handler定制handler,在producer端處理數據的不同的階段注冊處理器,比如可以對這一過程進行日志追蹤,或進行一些監控。只需實現kafka.producer.async.CallbackHandler接口,并在callback.handler中配置。
    • 自己編寫Encoder來序列化消息,只需實現下面這個接口。默認的Encoder是kafka.serializer.DefaultEncoder。
      • interface Encoder<T> {
      • public Message toMessage(T data);
      • }
    • 提供了基于Zookeeper的broker自動感知能力,可以通過參數zk.connect實現。如果不使用Zookeeper,也可以使用broker.list參數指定一個靜態的brokers列表,這樣消息將被隨機的發送到一個broker上,一旦選中的broker失敗了,消息發送也就失敗了。
    • 通過分區函數kafka.producer.Partitioner類對消息分區。
      • interface Partitioner<T> {
      • int partition(T key, int numPartitions);
      • }

      分區函數有兩個參數:key和可用的分區數量,從分區列表中選擇一個分區并返回id。默認的分區策略是hash(key)%numPartitions.如果key是null,就隨機的選擇一個。可以通過參數partitioner.class定制分區函數。
    • ?

    KafKa Consumer APIs

    Consumer API有兩個級別。低級別的和一個指定的broker保持連接,并在接收完消息后關閉連接,這個級別是無狀態的,每次讀取消息都帶著offset。
    高級別的API隱藏了和brokers連接的細節,在不必關心服務端架構的情況下和服務端通信。還可以自己維護消費狀態,并可以通過一些條件指定訂閱特定的topic,比如白名單黑名單或者正則表達式。

    低級別的API

  • class SimpleConsumer {
  • /*向一個broker發送讀取請求并得到消息集 */
  • public ByteBufferMessageSet fetch(FetchRequest request);
  • /*向一個broker發送讀取請求并得到一個相應集 */
  • public MultiFetchResponse multifetch(List<FetchRequest> fetches);
  • /**
  • * 得到指定時間之前的offsets
  • * 返回值是offsets列表,以倒序排序
  • * @param time: 時間,毫秒,
  • * 如果指定為OffsetRequest$.MODULE$.LATIEST_TIME(), 得到最新的offset.
  • * 如果指定為OffsetRequest$.MODULE$.EARLIEST_TIME(),得到最老的offset.
  • */
  • publiclong[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
  • }
  • 復制代碼



    低級別的API是高級別API實現的基礎,也是為了一些對維持消費狀態有特殊需求的場景,比如?Hadoop??consumer這樣的離線consumer。

    高級別的API

  • /* 創建連接 */
  • ConsumerConnector connector = Consumer.create(consumerConfig);
  • interface ConsumerConnector {
  • /**
  • * 這個方法可以得到一個流的列表,每個流都是MessageAndMetadata的迭代,通過MessageAndMetadata可以拿到消息和其他的元數據(目前之后topic)
  • * Input: a map of <topic, #streams>
  • * Output: a map of <topic, list of message streams>
  • */
  • public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
  • /**
  • * 你也可以得到一個流的列表,它包含了符合TopicFiler的消息的迭代,
  • * 一個TopicFilter是一個封裝了白名單或黑名單的正則表達式。
  • */
  • public List<KafkaStream> createMessageStreamsByFilter(
  • TopicFilter topicFilter, int numStreams);
  • /* 提交目前消費到的offset */
  • public commitOffsets()
  • /* 關閉連接 */
  • public shutdown()
  • }
  • 復制代碼




    這個API圍繞著由KafkaStream實現的迭代器展開,每個流代表一系列從一個或多個分區多和broker上匯聚來的消息,每個流由一個線程處理,所以客戶端可以在創建的時候通過參數指定想要幾個流。一個流是多個分區多個broker的合并,但是每個分區的消息只會流向一個流。

    每調用一次createMessageStreams都會將consumer注冊到topic上,這樣consumer和brokers之間的負載均衡就會進行調整。API鼓勵每次調用創建更多的topic流以減少這種調整。createMessageStreamsByFilter方法注冊監聽可以感知新的符合filter的tipic。

    #######################################################

    十、消息和日志



    消息由一個固定長度的頭部和可變長度的字節數組組成。頭部包含了一個版本號和CRC32校驗碼。

  • /**
  • * 具有N個字節的消息的格式如下
  • *
  • * 如果版本號是0
  • *
  • * 1. 1個字節的 "magic" 標記
  • *
  • * 2. 4個字節的CRC32校驗碼
  • *
  • * 3. N - 5個字節的具體信息
  • *
  • * 如果版本號是1
  • *
  • * 1. 1個字節的 "magic" 標記
  • *
  • * 2.1個字節的參數允許標注一些附加的信息比如是否壓縮了,解碼類型等
  • *
  • * 3.4個字節的CRC32校驗碼
  • *
  • * 4. N - 6 個字節的具體信息
  • *
  • */
  • 復制代碼




    日志?一個叫做“my_topic”且有兩個分區的的topic,它的日志有兩個文件夾組成,my_topic_0和my_topic_1,每個文件夾里放著具體的數據文件,每個數據文件都是一系列的日志實體,每個日志實體有一個4個字節的整數N標注消息的長度,后邊跟著N個字節的消息。每個消息都可以由一個64位的整數offset標注,offset標注了這條消息在發送到這個分區的消息流中的起始位置。每個日志文件的名稱都是這個文件第一條日志的offset.所以第一個日志文件的名字就是00000000000.kafka.所以每相鄰的兩個文件名字的差就是一個數字S,S差不多就是配置文件中指定的日志文件的最大容量。
    消息的格式都由一個統一的接口維護,所以消息可以在producer,broker和consumer之間無縫的傳遞。存儲在硬盤上的消息格式如下所示:

    • 消息長度: 4 bytes (value: 1+4+n)
    • 版本號: 1 byte
    • CRC校驗碼: 4 bytes
    • 具體的消息: n bytes



    ?


    寫操作?消息被不斷的追加到最后一個日志的末尾,當日志的大小達到一個指定的值時就會產生一個新的文件。對于寫操作有兩個參數,一個規定了消息的數量達到這個值時必須將數據刷新到硬盤上,另外一個規定了刷新到硬盤的時間間隔,這對數據的持久性是個保證,在系統崩潰的時候只會丟失一定數量的消息或者一個時間段的消息。

    讀操作
    讀操作需要兩個參數:一個64位的offset和一個S字節的最大讀取量。S通常比單個消息的大小要大,但在一些個別消息比較大的情況下,S會小于單個消息的大小。這種情況下讀操作會不斷重試,每次重試都會將讀取量加倍,直到讀取到一個完整的消息。可以配置單個消息的最大值,這樣服務器就會拒絕大小超過這個值的消息。也可以給客戶端指定一個嘗試讀取的最大上限,避免為了讀到一個完整的消息而無限次的重試。
    在實際執行讀取操縱時,首先需要定位數據所在的日志文件,然后根據offset計算出在這個日志中的offset(前面的的offset是整個分區的offset),然后在這個offset的位置進行讀取。定位操作是由二分查找法完成的,Kafka在內存中為每個文件維護了offset的范圍。

    下面是發送給consumer的結果的格式:
    ?

  • MessageSetSend (fetch result)
  • ?
  • total length? ???: 4 bytes
  • error code? ?? ? : 2 bytes
  • message 1? ?? ???: x bytes
  • ...
  • message n? ?? ???: x bytes
  • MultiMessageSetSend (multiFetch result)
  • ?
  • total length? ?? ? : 4 bytes
  • error code? ?? ?? ?: 2 bytes
  • messageSetSend 1
  • ...
  • messageSetSend n
  • 復制代碼



    刪除
    日志管理器允許定制刪除策略。目前的策略是刪除修改時間在N天之前的日志(按時間刪除),也可以使用另外一個策略:保留最后的N GB數據的策略(按大小刪除)。為了避免在刪除時阻塞讀操作,采用了copy-on-write形式的實現,刪除操作進行時,讀取操作的二分查找功能實際是在一個靜態的快照副本上進行的,這類似于Java的CopyOnWriteArrayList。

    可靠性保證
    日志文件有一個可配置的參數M,緩存超過這個數量的消息將被強行刷新到硬盤。一個日志矯正線程將循環檢查最新的日志文件中的消息確認每個消息都是合法的。合法的標準為:所有文件的大小的和最大的offset小于日志文件的大小,并且消息的CRC32校驗碼與存儲在消息實體中的校驗碼一致。如果在某個offset發現不合法的消息,從這個offset到下一個合法的offset之間的內容將被移除。
    有兩種情況必須考慮:
    1,當發生崩潰時有些數據塊未能寫入。
    2,寫入了一些空白數據塊。第二種情況的原因是,對于每個文件,操作系統都有一個inode(inode是指在許多“類Unix文件系統”中的一種數據結構。每個inode保存了文件系統中的一個文件系統對象,包括文件、目錄、大小、設備文件、socket、管道, 等等),但無法保證更新inode和寫入數據的順序,當inode保存的大小信息被更新了,但寫入數據時發生了崩潰,就產生了空白數據塊。CRC校驗碼可以檢查這些塊并移除,當然因為崩潰而未寫入的數據塊也就丟失了。

    ?

    ?

    創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

    總結

    以上是生活随笔為你收集整理的kafka 学习 非常详细的经典教程的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    麻豆影视在线播放 | 夜夜爽88888免费视频4848 | 国产91aaa| www.久久爱.cn| 国产不卡一 | 欧美在线视频免费 | 日韩一区二区三区在线看 | 亚洲天堂免费视频 | 免费情趣视频 | 久久久久久久久久久久电影 | 亚洲精品午夜久久久久久久久久久 | 人人艹视频 | 久热这里有精品 | 精品久久影院 | 日韩有码中文字幕在线 | 91精品视频观看 | 色综合天| 婷婷中文字幕 | 黄色软件视频大全免费下载 | 色综合久久88色综合天天人守婷 | 日韩一区二区三区不卡 | 日韩理论片在线观看 | 国产色婷婷 | 亚洲成a人片77777潘金莲 | 久久久蜜桃一区二区 | 久久久久久久免费观看 | 免费黄色av.| 人人插人人舔 | 2021国产精品视频 | 国产成人久久 | 97色狠狠 | 丁香五月亚洲综合在线 | 国产精品久久99精品毛片三a | 中文字幕乱码视频 | 粉嫩一区二区三区粉嫩91 | 日韩字幕 | 免费视频网 | 色综久久 | 亚洲最大成人免费网站 | 午夜精品视频在线 | 99久久超碰中文字幕伊人 | 九九色综合 | 亚洲va在线va天堂va偷拍 | 久久精品亚洲一区二区三区观看模式 | 国产免费专区 | 免费看污在线观看 | 欧美日韩高清一区二区三区 | 精品国产伦一区二区三区免费 | 国产精品第52页 | 丁香婷婷自拍 | 激情伊人五月天 | 亚洲人成综合 | 在线视频18在线视频4k | 韩日精品在线观看 | 三级视频国产 | 91九色视频在线播放 | 91完整版观看 | 欧美精品在线免费 | 久久久久亚洲精品中文字幕 | 成人网444ppp | 91伊人影院 | 五月天色网站 | 在线免费观看涩涩 | 深爱激情综合 | 91精品国产福利 | 天天操天天舔天天干 | 日本狠狠干 | 亚洲尺码电影av久久 | 精品国产乱码久久久久久1区2匹 | 日韩欧美在线免费观看 | 久久人人爽视频 | 免费欧美精品 | 欧美男同网站 | 91天天视频| 免费在线观看国产精品 | 性色xxxxhd| 免费在线色 | 色妞色视频一区二区三区四区 | 国产精品9999 | 国内毛片毛片 | 五月天国产 | 91tv国产成人福利 | 国产精品一区二区三区四 | 免费黄色小网站 | 三级黄免费看 | 国产一级片免费视频 | 中文字幕在线免费播放 | 深爱综合网 | 美女网站视频免费黄 | 婷婷干五月 | 色五月成人 | 精品高清美女精品国产区 | 国产精品中文 | 在线黄色免费 | 国产专区免费 | 91麻豆精品国产91久久久无需广告 | 国产亚洲精品av | 日韩精品一区二区三区不卡 | 久久国色夜色精品国产 | 正在播放一区 | 美女网站在线观看 | 国产精品成人自拍 | 久久久久久久国产精品视频 | 成人午夜精品福利免费 | 久久久蜜桃一区二区 | 在线日韩视频 | 97成人在线观看 | 国产一级在线观看 | 丁香av| 国内精品久久久久国产 | 日韩精品一区二区三区免费观看视频 | 在线免费观看国产黄色 | 中文字幕最新精品 | 亚洲伊人av | 蜜臀久久99精品久久久酒店新书 | 国产精品免费在线播放 | 四虎影视成人精品国库在线观看 | 国内精品久久天天躁人人爽 | 午夜黄色 | 国产精品麻豆99久久久久久 | 天天躁天天操 | 黄色一级大片在线免费看产 | 91 在线视频播放 | 伊人永久在线 | 亚洲综合在线五月天 | 91在线日韩 | 337p日本欧洲亚洲大胆裸体艺术 | 岛国av在线免费 | 免费看的黄色的网站 | 久久伊人精品一区二区三区 | 日韩视频一区二区三区在线播放免费观看 | 日韩三级免费观看 | 国产精品成人自产拍在线观看 | 成人一区二区三区在线 | 欧美一区二区伦理片 | av大全在线 | 国产在线观看二区 | 九九热在线播放 | 在线观看视频在线观看 | 久久96国产精品久久99软件 | 国产精品久久久久一区二区三区 | 日韩高清一区在线 | 天天色天天操综合网 | 99精品国产99久久久久久97 | 国产日韩精品在线观看 | 91中文字幕在线 | 999在线视频 | 91探花在线视频 | 成x99人av在线www| 中文字幕在线视频精品 | 免费观看视频的网站 | 国产精品第72页 | 亚洲成人免费在线观看 | 日日爽天天 | 久久午夜电影院 | 婷婷成人综合 | 日韩欧美高清一区二区三区 | 91麻豆高清视频 | 色综合 久久精品 | 99av国产精品欲麻豆 | 激情综合亚洲 | 亚洲人成精品久久久久 | 国产成人黄色 | 久久久久久久久影视 | 国产日产精品一区二区三区四区的观看方式 | 婷婷播播网| 丁香婷婷激情五月 | 成人影音av| 人人干狠狠干 | 国产成人在线免费观看 | 九九欧美视频 | 激情av综合| 国产精品美乳一区二区免费 | 亚洲精品日韩在线观看 | 日本深夜福利视频 | 国产淫片 | 免费视频久久久久久久 | 日韩,精品电影 | 亚州日韩中文字幕 | 国产成人精品av在线观 | 人人爽久久涩噜噜噜网站 | 中文字幕免费观看视频 | 久久久久草| 精品在线看 | 亚洲成a人片77777kkkk1在线观看 | 91爱在线| 色窝资源 | 韩日精品在线 | 91丨九色丨国产女 | av网站有哪些 | 国产999久久久 | 亚洲综合少妇 | 蜜桃视频精品 | av在线激情| 久草视频视频在线播放 | 狠狠操狠狠操 | 一区二区视频欧美 | 99久久99久久精品国产片果冰 | 亚洲黄色片一级 | 超碰97人人爱 | 人人揉人人揉人人揉人人揉97 | 午夜av在线| 国产精品高清一区二区三区 | 国产又粗又猛又色 | 日韩欧美99| 四虎欧美 | 91精品视频在线免费观看 | 少妇性bbb搡bbb爽爽爽欧美 | 日韩综合视频在线观看 | 久久久国产精品成人免费 | 国产精品毛片一区 | 国产福利资源 | 三上悠亚一区二区在线观看 | 一级电影免费在线观看 | 最新久久免费视频 | 日韩三级免费观看 | av观看免费在线 | 国产97色 | 成人在线你懂得 | 国产精品永久久久久久久www | 久久99精品国产91久久来源 | 亚洲一区二区三区91 | 午夜视频在线观看一区二区三区 | 在线免费黄色 | 国产精品每日更新 | 亚洲精区二区三区四区麻豆 | 一区二区网 | 久久视频二区 | 在线观看免费福利 | 在线观看91视频 | 九九免费精品视频在线观看 | 久久这里有精品 | 国产系列 在线观看 | 久久一区二区三区国产精品 | 色资源网免费观看视频 | 麻豆91在线看| 欧美精品乱码99久久影院 | 99久久婷婷 | 中文字幕在线免费看 | 欧美日韩国产精品爽爽 | 欧美 日韩 国产 中文字幕 | 国产 字幕 制服 中文 在线 | 久久免费视频99 | 国产999在线| 国产一区二区久久精品 | 日韩成人免费在线 | 日韩欧美视频二区 | 国产又粗又猛又黄又爽的视频 | av888.com| 黄色小网站在线观看 | 国产亚洲精品成人av久久ww | 国产视频在线观看一区 | a级免费观看 | 91av免费观看 | 97精品国产97久久久久久 | 精品视频亚洲 | 在线视频18在线视频4k | 在线免费av播放 | 8x成人免费视频 | 国产短视频在线播放 | 69亚洲精品 | 日韩在线观看精品 | 国产无吗一区二区三区在线欢 | 亚洲专区一二三 | 六月丁香六月婷婷 | 波多野结衣在线观看一区 | 久久精品9| 在线观看免费高清视频大全追剧 | 天天狠狠干| 日日干天天操 | 超碰资源在线 | 日韩在线视频观看免费 | 黄av在线| 久久精品一区二区三 | 日韩大片在线 | 免费看v片网站 | av在线网站大全 | 国产精品久久久久久69 | 亚洲天堂网视频 | 国产探花视频在线播放 | 97精品久久 | av丝袜制服| 安徽妇搡bbbb搡bbbb | 欧美日韩国产在线观看 | 国产成人一区二区三区电影 | 99精品国产福利在线观看免费 | 国产丝袜 | 在线观看免费一级片 | 国产高清不卡 | 在线成人一区二区 | 国产精品国产自产拍高清av | 中文字幕在线国产 | 国产成人久久77777精品 | av福利在线播放 | 国产高清在线不卡 | 久久久久久久久久亚洲精品 | 81精品国产乱码久久久久久 | 四虎成人网 | 国产一级久久 | 欧美午夜久久 | 99爱在线 | 日本女人逼 | 亚洲精品美女 | 久久久 激情 | 国产高清精 | 欧美另类高清 videos | 99久免费精品视频在线观看 | 奇米影视8888在线观看大全免费 | 麻豆视频入口 | 色视频网站在线观看一=区 a视频免费在线观看 | 日本特黄一级 | av一级免费 | 国产在线美女 | av免费网站 | 日韩精品资源 | 在线免费观看黄网站 | 亚洲高清在线观看视频 | 成年在线观看 | 国产精品久久久久国产精品日日 | 1区2区视频 | 日韩精品视频一二三 | 啪啪激情网 | 青青河边草手机免费 | 永久免费精品视频 | 丁香婷婷激情国产高清秒播 | 91九色porny蝌蚪主页 | 亚洲高清国产视频 | 亚洲 成人 一区 | 91黄色在线视频 | 最近免费观看的电影完整版 | 国产短视频在线播放 | 久草手机视频 | 成人av视屏| 国产精品久久久久久久久久新婚 | 人人澡超碰碰 | 手机在线永久免费观看av片 | 国产精品久99| 欧美做受高潮 | 亚洲精品国产综合99久久夜夜嗨 | 成人av免费看 | 成人三级av | 欧美久久成人 | 中文亚洲欧美日韩 | 欧美日韩午夜爽爽 | 最新日韩中文字幕 | 国产中文字幕精品 | 国产一级特黄电影 | 人人射人人澡 | 久产久精国产品 | 天天看天天操 | 久久久久久久国产精品 | 天天操夜夜操国产精品 | 999一区二区三区 | 国产精品乱码久久久 | 免费成视频 | 91亚州 | 99久高清在线观看视频99精品热在线观看视频 | 久久亚洲精品电影 | 在线免费高清 | 992tv成人免费看片 | 欧美a视频在线观看 | 免费欧美高清视频 | 伊色综合久久之综合久久 | 国产最新在线观看 | 精品亚洲一区二区 | 色婷婷久久久 | 精品中文字幕在线 | 欧美日韩精品在线免费观看 | 丁香六月激情婷婷 | 91在线麻豆 | av综合网址 | 国产黄色精品在线 | 在线视频a | 久久不卡日韩美女 | 国产人成一区二区三区影院 | 91亚洲精品久久久中文字幕 | 精品视频在线观看 | 久久精品看片 | 国产综合香蕉五月婷在线 | 人人爱人人爽 | 99久久久久国产精品免费 | 精品视频成人 | 99综合视频 | 久草免费在线观看视频 | 国产精品原创在线 | 99精品黄色片免费大全 | 福利视频| 极品国产91在线网站 | 久久久久成人精品亚洲国产 | 日日躁天天躁 | 午夜天使 | 日本精品二区 | 99久久国产免费免费 | 久久av一区二区三区亚洲 | 96国产精品视频 | 女人久久久久 | 久久久精品国产免费观看一区二区 | 五月婷婷视频在线观看 | 欧美中文字幕第一页 | 欧美日韩国产伦理 | 最近最新中文字幕视频 | 精品久久在线 | 久久8精品| 国产一二区视频 | 超级碰碰免费视频 | 中文字幕乱码电影 | 欧美日韩免费观看一区二区三区 | 亚洲国产一区在线观看 | 婷婷草| 久久精品国产亚洲aⅴ | 亚洲精品中文字幕视频 | 制服丝袜一区二区 | www日日| 在线观看的黄色 | 91尤物国产尤物福利在线播放 | 久久亚洲热 | 免费视频a | 日本女人逼 | 天天操天天射天天操 | 黄a在线| 亚洲成人黄色在线 | 一本到在线| 中文av在线播放 | 在线观看电影av | 日本性生活一级片 | 久免费视频 | 国产伦精品一区二区三区免费 | 9热精品| 午夜视频亚洲 | 日韩精品在线观看视频 | 国产综合精品一区二区三区 | 久久伦理 | 日本公妇在线观看高清 | 久草电影免费在线观看 | 91精品国产综合久久婷婷香蕉 | 亚洲精品成人在线 | 国产精品va在线观看入 | 亚洲精品18日本一区app | 成人精品福利 | 在线观看免费av网站 | 亚洲精品xxx | 久久久久久国产精品999 | 国产精品av一区二区 | 视频91 | 在线综合色 | 久久国产精品免费一区二区三区 | 亚洲永久精品一区 | 九色精品免费永久在线 | 夜色资源站国产www在线视频 | 免费看片日韩 | 国产高清视频在线观看 | 国产一级高清视频 | 视频一区二区在线观看 | 国产精品videossex国产高清 | 久久99国产精品久久99 | 黄色a一级视频 | 成人黄色大片在线观看 | 久久精品一区二区三区国产主播 | 免费黄色a网站 | 丁香九月激情综合 | 精品视频免费播放 | 少妇bbw搡bbbb搡bbbb| 五月婷婷丁香色 | 狠狠躁18三区二区一区ai明星 | 一本一本久久a久久精品综合小说 | 成人四虎影院 | 日韩中文字幕a | 国产精品手机播放 | 久久精品小视频 | 丁香婷婷激情啪啪 | 国产亚洲午夜高清国产拍精品 | 免费视频久久久久久久 | 二区三区中文字幕 | 天天想夜夜操 | 热久久这里只有精品 | 亚洲 综合 国产 精品 | 免费看短| 国产剧情在线一区 | 国产高清视频在线 | 中文字幕在线日本 | 午夜体验区 | 一级黄色片网站 | 狠狠躁18三区二区一区ai明星 | 99视频免费播放 | 中文字幕在线免费看 | 精品成人在线 | 六月丁香社区 | 热久在线 | 麻豆视频免费在线观看 | 中文字幕免费观看视频 | 成人av亚洲 | 亚洲精品1区2区3区 超碰成人网 | 日韩成人免费在线 | 天天射天天做 | 免费h精品视频在线播放 | 国产亚洲精品无 | 人人讲下载| 婷婷天天色| 久久免费在线视频 | 96av在线视频 | 少妇bbbb| 亚洲国产日韩av | 黄色软件视频网站 | 黄色小说免费在线观看 | 欧美精品九九99久久 | 国产一区二区免费在线观看 | 久久综合狠狠综合久久狠狠色综合 | 欧洲精品视频一区二区 | 亚洲免费专区 | 欧美激情综合色 | 日韩在线色视频 | 久久久久久久久久久久国产精品 | 日日操日日插 | 五月在线视频 | 日韩亚洲在线 | 久在线 | 成人毛片在线观看 | 精品久久一级片 | 中文字幕资源在线观看 | 日韩大片在线观看 | 国产又粗又长的视频 | 91精品亚洲影视在线观看 | 久久精品综合 | 最新免费av在线 | 精品一区二区影视 | 四虎海外影库www4hu | 国产美女精品视频 | 人人爽人人搞 | 日韩一区二区三区高清免费看看 | 亚洲精品资源在线观看 | 国产精品一区二区免费看 | 91精品国产欧美一区二区 | 一区二区男女 | 9999亚洲 | 国产一区二区精品91 | 中文字幕在线久一本久 | 国产99久久久国产精品免费二区 | 国产精品第一页在线观看 | 久久网站免费 | 天天爽夜夜爽人人爽一区二区 | 精品国产免费人成在线观看 | 国产成人高清在线 | 久久久久久久久久久久久国产精品 | 国产精品久久视频 | 久久国产午夜精品理论片最新版本 | 中文字幕视频 | 久久香蕉国产精品麻豆粉嫩av | 999久久国精品免费观看网站 | 一区二区免费不卡在线 | 337p西西人体大胆瓣开下部 | 中文字幕免费高清av | 在线免费观看黄色大片 | 91探花国产综合在线精品 | 中文字幕视频免费观看 | 成年人免费在线观看 | 日本高清dvd| 日日夜色| 久久高清国产 | 日本不卡一区二区 | 欧美精品午夜 | 久久中文欧美 | www.超碰97.com| 免费观看黄色12片一级视频 | 久久免费国产视频 | 在线观看网站黄 | 亚洲精品97 | 国产97在线看 | 免费日韩 精品中文字幕视频在线 | 亚洲黄色网络 | 国产91精品一区二区绿帽 | 久久曰视频 | 韩国av免费观看 | 69久久夜色精品国产69 | 久久久久久久综合色一本 | 成人在线免费看 | 久久精品中文字幕免费mv | 在线观看亚洲精品 | 99在线精品视频在线观看 | 日韩久久精品一区 | 00av视频 | 日韩黄色一区 | 国产精品久久久久9999吃药 | 天天操天天草 | 久久免费黄色大片 | 射射射av | av电影在线免费观看 | 免费黄色在线网址 | 久草在线官网 | 久久首页| 亚洲精品国产成人 | 99免费看片| 五月婷婷六月丁香激情 | 亚洲热久久| 97热久久免费频精品99 | 91成人蝌蚪 | 天天色成人网 | 91九色视频观看 | 成人9ⅰ免费影视网站 | 国产一区二区久久 | 国产精品精品国产 | 黄色软件网站在线观看 | 国产精品入口麻豆 | 成人影音在线 | 欧美韩国日本在线观看 | 成人h电影在线观看 | 国产18精品乱码免费看 | 日韩欧美精品一区二区三区经典 | 日韩精品中文字幕av | 欧美成人影音 | se视频网址 | 日韩激情网 | 成年人国产视频 | 日韩精品电影在线播放 | 色综合久久悠悠 | 久久视频精品 | 日韩午夜精品福利 | 欧美极品少妇xbxb性爽爽视频 | 欧美电影黄色 | 玖玖玖精品 | 亚洲一二三在线 | 99久久电影 | 狠狠色丁香婷婷综合 | 日p视频| 亚洲精品久久久久58 | 国产精品久久在线观看 | 婷婷丁香久久五月婷婷 | 久热免费 | 久久久久欠精品国产毛片国产毛生 | 丰满少妇在线观看 | 97综合视频 | 中文字幕丝袜美腿 | 欧美日韩国产一二 | 久久伊人综合 | 国产1区2区3区精品美女 | 麻豆免费精品视频 | 亚洲综合国产精品 | 亚洲成人午夜在线 | 香蕉视频18 | 在线观看国产区 | 免费视频资源 | 中文字幕在线精品 | 久久久国产一区二区三区 | 久色网 | 456成人精品影院 | 国内99视频 | 99精品黄色片免费大全 | 久久精品美女 | 国产男女爽爽爽免费视频 | 一本一本久久aa综合精品 | 九九视频免费观看视频精品 | 插婷婷| 最新中文字幕在线播放 | 亚洲毛片一区二区三区 | 午夜av电影 | 视频福利在线观看 | 99免费在线视频 | 日韩精品无码一区二区三区 | 中文字幕一区二区三区视频 | 亚洲五月| 999久久国精品免费观看网站 | 日本精品一区二区三区在线播放视频 | 久久精品一区八戒影视 | japanesefreesex中国少妇 | 久久综合免费视频 | 国产日韩精品在线观看 | 国产又粗又长又硬免费视频 | 懂色av一区二区在线播放 | 精品9999 | 婷婷中文字幕在线观看 | 成年人毛片在线观看 | 久久久久久网 | 色综合网 | 毛片的网址| 久久精品久久综合 | 在线免费高清一区二区三区 | 一区二区三区av在线 | 在线一级片 | 国产一区二三区好的 | 激情欧美一区二区三区免费看 | 久草久草久草久草 | 日韩免费在线一区 | 色综合www| 国产69精品久久app免费版 | 欧美a性| 五月激情五月激情 | 日韩动漫免费观看高清完整版在线观看 | 国产福利电影网址 | a在线一区 | 午夜久久久久久久久久影院 | 97视频免费在线观看 | 成年人视频在线观看免费 | 草久久影院 | 国产一区二区午夜 | 久久午夜网| 视色网站| 五月婷婷丁香综合 | 99热 精品在线 | 欧美在线aa | 日韩久久精品一区二区三区下载 | 操久在线 | 国产成在线观看免费视频 | 91资源在线免费观看 | 91av在线播放 | 久久国内免费视频 | 久久国产免 | 色综合天天狠天天透天天伊人 | 亚洲成人午夜av | 久久久久久看片 | 成人黄色免费观看 | 久久爱影视i| 国产成人精品亚洲a | 一区二区av| jizz999| 日韩中出在线 | 在线 你懂 | 正在播放国产91 | 欧美一区二区三区免费看 | 日韩美在线 | 国产一区二区三区在线免费观看 | 天天做天天爱夜夜爽 | 伊人国产在线观看 | 玖玖爱免费视频 | 成人免费看片98欧美 | 亚洲综合成人婷婷小说 | 日韩高清黄色 | 久久这里| 视频一区二区在线 | 日韩av免费一区二区 | 日韩一区二区在线免费观看 | 国产成人在线观看 | 久久亚洲视频 | 黄色一级大片在线观看 | 久久a v视频 | 久久精品久久精品 | 国产高清视频在线观看 | 免费进去里的视频 | 91九色视频在线 | 国产精品成人免费一区久久羞羞 | 最新av中文字幕 | 国产日韩中文字幕 | 中文字幕精 | 偷拍福利视频一区二区三区 | 黄色a视频 | 99资源网 | 欧美国产一区二区 | 日韩电影中文字幕在线观看 | 欧美日韩一区二区三区在线免费观看 | 天天操操操操操 | 国产精品一区二区免费看 | 成人久久久精品国产乱码一区二区 | 久久久999精品视频 国产美女免费观看 | 欧美一区二区三区免费看 | 国产精品久久久久久久久蜜臀 | 夜色资源站wwwcom | 一区二区三区在线不卡 | av中文字幕在线观看网站 | 亚洲视屏在线播放 | 狠狠干夜夜操 | 黄色成人影院 | 69精品久久久 | 亚洲婷婷丁香 | 欧美一区在线看 | 又黄又色又爽 | 欧美一级片在线 | 久久久亚洲国产精品麻豆综合天堂 | www久久国产 | 在线免费视| 狠狠干电影 | 亚洲闷骚少妇在线观看网站 | 欧美精品一级视频 | 狠狠久久伊人 | www.国产精品 | 在线观看日韩av | 亚洲丝袜一区 | 日韩免费看视频 | 亚洲在线视频观看 | 欧美日韩中文国产一区发布 | 久草手机视频 | 在线a视频| 深爱激情久久 | 免费看国产一级片 | 综合网天天 | 免费看污网站 | 永久免费精品视频 | 日韩不卡高清 | 超碰国产人人 | 亚洲久草在线视频 | 天天玩夜夜操 | 五月婷婷色 | 三级黄色在线观看 | 国产精品久久久久久久久久99 | 黄色三级视频片 | av线上看 | 正在播放久久 | 久久99视频 | 91在线在线观看 | 国产一级免费在线 | 成人羞羞免费 | 91高清完整版在线观看 | 婷婷激情在线 | av成人免费 | 国产免费高清视频 | 天天干 夜夜操 | 久久久久久久久久影院 | 免费色视频网址 | 美女黄久久 | 国产在线观看91 | 99久热在线精品视频观看 | 天天干天天操天天入 | 国产九九在线 | av女优中文字幕在线观看 | 香蕉视频免费在线播放 | 国产精品一区二区在线播放 | 色偷偷中文字幕 | 天天天干夜夜夜操 | 久久久久国产精品免费 | 精品毛片一区二区免费看 | 毛片网站在线看 | 亚洲一区二区精品在线 | 国产精品美乳一区二区免费 | 日韩久久久久久久 | 精品国产乱码久久久久久天美 | 波多野结衣视频一区 | 久久久成人精品 | 激情av五月婷婷 | 亚洲丁香日韩 | 日韩欧美在线免费观看 | 色五月成人 | 97视频免费在线看 | 天天弄天天操 | 亚洲视频综合 | 天天干天天操av | 国产精品尤物视频 | 中文字幕有码在线观看 | 91精品少妇偷拍99 | 久久免费av | 国产精品资源 | 香蕉色综合 | 91麻豆精品国产 | 日韩精品视频久久 | 毛片网站观看 | 日本黄色免费在线 | 国产视频在线一区二区 | 色黄www小说 | 免费观看国产精品 | 亚洲欧美视频在线观看 | 人交video另类hd | 中文字幕在线观看2018 | 成人久久视频 | 国产亚洲婷婷 | 超碰人人干人人 | 8x成人免费视频 | 亚州黄色一级 | 99精品免费在线观看 | 视频一区二区精品 | 久久手机在线视频 | av大片免费在线观看 | 久久美女精品 | 波多野结衣在线观看视频 | 一区二区三区在线视频观看58 | 久久久久网址 | 深爱婷婷久久综合 | 午夜久久久久久久久久久 | 午夜电影久久 | 中文字幕日韩一区二区三区不卡 | 激情丁香月 | 91视频免费看片 | 精品影院一区二区久久久 | 日韩免费视频观看 | 亚洲91视频 | 91九色porny在线 | 国产精品久久久久婷婷 | 精品国产91亚洲一区二区三区www | 中文字幕av一区二区三区四区 | 超碰人人草人人 | 久久 一区 | 国产一区在线看 | 99国产精品久久久久久久久久 | 国产一级黄色电影 | 中文字幕在线观看免费高清电影 | 中文在线√天堂 | 伊人欧美| 91成人免费视频 | 一区二区三区精品在线视频 | 夜夜夜| 丰满少妇在线观看 | 亚洲成人家庭影院 | 一区二区精品视频 | 久久久男人的天堂 | 中字幕视频在线永久在线观看免费 | 91人网站 | 久久精品—区二区三区 | 人人爽人人爱 | 草久电影 | 黄色大全免费网站 | 91看片淫黄大片一级在线观看 | 精品久久网 | 色婷婷视频在线 | 99精品小视频 | 国产精品99页 | 国产视频亚洲视频 | 成人在线观看你懂的 | 久久久www成人免费毛片 | 九九精品视频在线观看 | 在线观看免费色 | 一区二区精品 | 亚洲成成品网站 | 色www永久免费 | 亚洲欧洲精品一区二区精品久久久 | 丁香婷婷射 | 精品国产乱码久久 | 91最新地址永久入口 | 二区三区精品 | a级成人毛片 | 亚洲一级黄色av | 美女av免费 | 综合久久五月天 | 国产精品一二 | 国产成人福利片 | 久久免费视频99 | 亚洲精品视频免费看 | 全久久久久久久久久久电影 | 国产精品18毛片一区二区 | 国内精品久久久久影院日本资源 | 亚洲精品女人久久久 | 国产美女视频网站 | 丁香婷婷久久久综合精品国产 | 亚洲91精品 | 久久人人插 | 免费在线观看黄网站 | 色网站国产精品 | 中文字幕乱在线伦视频中文字幕乱码在线 | 国产成人在线观看免费 | 国产精品第7页 | 国产精品99久久久久久宅男 | 久久久91精品国产一区二区精品 | 久久精品综合视频 | 日本黄色大片免费看 | 久久亚洲福利 | 久久男人视频 | 美女福利视频 | 国产精品系列在线观看 | 夜夜躁狠狠躁 | 天天干,天天干 | 女人魂免费观看 | 日韩欧美久久 | 中文字幕资源在线观看 | 在线免费色视频 | 四虎影视成人精品国库在线观看 | 缴情综合网五月天 | 国产群p| 亚洲欧美激情插 | 这里只有精品视频在线观看 | 色婷婷啪啪免费在线电影观看 | 久久人91精品久久久久久不卡 | 欧美精品乱码久久久久久按摩 | 欧美精品在线一区二区 | 久久久久久久av麻豆果冻 | av成人免费观看 | 国产做aⅴ在线视频播放 | 欧美黑人xxxx猛性大交 | 亚洲美女视频在线 | 色诱亚洲精品久久久久久 | 成人影视免费看 | 婷婷色伊人 | 免费看的黄色片 | 精品国产乱子伦一区二区 | 国产高清在线免费观看 | 777久久久| 亚洲日韩精品欧美一区二区 | av在线免费在线 | 亚洲精品国产欧美在线观看 | 色av色av色av | 五月丁色 | 亚洲精品在线观看不卡 | 日韩免费三区 | 国产亚洲va综合人人澡精品 | 久久久精品网站 | 免费亚洲黄色 | 精品一区二区影视 | 91激情视频在线 | 99九九视频 | 国产1区2区3区精品美女 | 亚洲成av人片一区二区梦乃 | 成人黄色在线 | 亚洲婷婷丁香 | 中文视频一区二区 | 日韩免费在线观看 | 99热999| 色av婷婷 | 奇米影视8888在线观看大全免费 | 欧美日韩二三区 | 亚洲国产激情 | 婷婷在线免费观看 | 久草在线官网 | 蜜臀久久99精品久久久酒店新书 | 在线成人免费 | 色视频网页 |