日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

实时数据交换平台 - BottledWater-pg with confluent

發布時間:2023/12/10 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 实时数据交换平台 - BottledWater-pg with confluent 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

標簽

PostgreSQL , Bottled Water , Kafka , Confluent , IoT


背景

想必大家都在圖書館借過書,小時候有好看的書也會在小伙伴之間傳閱。

借書和數據泵有點類似,一份數據通過數據泵實時的分享給訂閱者。

例如在IoT的場景中,有流式分析的需求,也有存儲歷史數據的需求,同時還有數據挖掘的需求,搜索引擎可能也需要同一份數據,還有一些業務可能也要用到同一份數據。

但是如果把數據統統放到一個地方,這么多的業務,它們有的要求實時處理,有的要求批量處理,有的可能需要實時的更新數據,有的可能要對大數據進行分析。

顯然一個產品可能無法滿足這么多的需求。

就好比數據庫就分了關系數據庫,NOSQL,OLTP場景,OLAP場景一樣。 也是因為一個產品無法滿足所有的業務需求。

在企業中通常是借助數據冗余來解決各類場景的需求。

那么如何才能夠更好的分享數據,保證數據的一致性,提高分享的實時性呢?

confluent platform

http://docs.confluent.io/3.1.0/platform.html

confluent 是一個實時的數據中轉服務,來自各個平臺的數據可以使用confluent進行流轉,達到分享和交換數據的目的。

例如來自物聯網傳感器的數據,來自數據庫的數據,來自HTTP,移動APP的數據,來自應用日志的數據,來自一些事件觸發的數據 等等。

confluent需要依賴一些基本的組件,核心組件如kafka.

用戶可以自定義消息的生產者和消費者,在confluent提供的平臺上交換數據。

BottledWater-pg

bottledwater-pg是confluent平臺的一種消息生產者,針對PostgreSQL數據庫,即將PostgreSQL數據庫的數據寫入confluent Kafka,從而實時的分享給消息訂閱者。

支持PostgreSQL 9.4以及以上版本,支持全量快照,以及持續的增量數據寫入Kafka。

bottledwater-pg使用PostgreSQL快照技術,可以讀取一致性的快照寫入Kafka。使用數據庫logical decode技術,從PostgreSQL的WAL日志中,解析為ROW數據寫入Kafka。

在Kafka中,每個topic代表一張數據庫的表。

數據在使用decode從WAL取出后,寫入Kafka之前,使用Avro將數據ROW打包成JSON, or Protobuf, or Thrift, or any number of formats,再寫入Kafka。

Avro支持的數據類型比較豐富,可以很好的支撐PostgreSQL豐富的數據類型。

為什么使用Avro請參考

http://radar.oreilly.com/2014/11/the-problem-of-managing-schemas.html

BottledWater-pg依賴環境

BottledWater-pg是PG的一個插件,它的目的是解析WAL,同時使用Avro封裝為json/Protobuf/Thrift/其他formats。 并寫入Kafka。

因此它依賴這些庫或軟件

PostgreSQL 9.4+ development libraries (PGXS and libpq). (Homebrew: brew install postgresql; Ubuntu: sudo apt-get install postgresql-server-dev-9.5 libpq-dev) libsnappy, a dependency of Avro. (Homebrew: brew install snappy; Ubuntu: sudo apt-get install libsnappy-dev) avro-c (1.8.0 or later), the C implementation of Avro. (Homebrew: brew install avro-c; others: build from source) Jansson, a JSON parser. (Homebrew: brew install jansson; Ubuntu: sudo apt-get install libjansson-dev) libcurl, a HTTP client. (Homebrew: brew install curl; Ubuntu: sudo apt-get install libcurl4-openssl-dev) librdkafka (0.9.1 or later), a Kafka client. (Ubuntu universe: sudo apt-get install librdkafka-dev, but see known gotchas; others: build from source)

部署BottledWater-pg

gcc,cmake

最好部署較新版本的,否則可能會有編譯問題。

gcc 6.2.0 python 2.7.12 cmake 3.6.3vi /etc/ld.so.conf /home/digoal/gcc6.2.0/lib /home/digoal/gcc6.2.0/lib64 /home/digoal/python2.7.12/libldconfigexport LD_LIBRARY_PATH=/home/digoal/gcc6.2.0/lib:/home/digoal/gcc6.2.0/lib64:/home/digoal/python2.7.12/lib:$LD_LIBRARY_PATH export PATH=/home/digoal/gcc6.2.0/bin:/home/digoal/python2.7.12/bin:/home/digoal/cmake3.6.3/bin:$PGHOME/bin:$PATH:.

snappy

可選,一種比較高效的壓縮和解壓縮庫。

由于avro還支持xz,可不安裝snappy

/* snappyhttp://google.github.io/snappy/wget https://github.com/google/snappy/archive/1.1.3.tar.gz tar -zxvf 1.1.3.tar.gzcd snappy-1.1.3yum install -y libtool gcc-c++ ./autogen.sh./configure --prefix=/home/digoal/snappy_home make make install- add LIBDIR to the `LD_LIBRARY_PATH' environment variableduring execution- add LIBDIR to the `LD_RUN_PATH' environment variableduring linking- use the `-Wl,-rpath -Wl,LIBDIR' linker flag- have your system administrator add LIBDIR to `/etc/ld.so.conf'*/

libjansson (libjansson >=2.3)

json parser,必須安裝,建議測試時安裝在默認目錄,否則可能遇到編譯問題,或者設置rpath。

http://www.digip.org/jansson/ wget http://www.digip.org/jansson/releases/jansson-2.9.tar.bz2tar -jxvf jansson-2.9.tar.bz2 cd jansson-2.9./configure --prefix=/home/digoal/jansson make make install- add LIBDIR to the 'LD_LIBRARY_PATH' environment variableduring execution- add LIBDIR to the 'LD_RUN_PATH' environment variableduring linking- use the '-Wl,-rpath -Wl,LIBDIR' linker flag- have your system administrator add LIBDIR to '/etc/ld.so.conf'export PKG_CONFIG_PATH=/home/digoal/jansson/lib/pkgconfig:$PKG_CONFIG_PATHpkg-config --cflags --libs jansson -I/home/digoal/jansson/include -L/home/digoal/jansson//home/digoal/jansson/lib -ljansson

建議測試時安裝在默認路徑中,如下。

./configure make make install export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH

liblzma

yum install -y xz-*

boost

可以不裝,如果你要安裝Avro的doc時才需要安裝boost。

/* boosthttp://www.boost.org/https://sourceforge.net/projects/boost/files/boost/1.62.0/ wget http://downloads.sourceforge.net/project/boost/boost/1.62.0/boost_1_62_0.tar.bz2?r=https%3A%2F%2Fsourceforge.net%2Fprojects%2Fboost%2Ffiles%2Fboost%2F1.62.0%2F&ts=1480929211&use_mirror=ncutar -jxvf boost_1_62_0.tar.bz2 cd boost_1_62_0/libs/regex/build如果要使用靜態庫,請執行make -fgcc.mak 如果要使用靜態庫,請執行make -fgcc-shared.makll gcc drwxr-xr-x 2 digoal users 4.0K Dec 5 17:18 boost_regex-gcc-1_53 drwxr-xr-x 2 digoal users 4.0K Dec 5 17:17 boost_regex-gcc-1_53_shared drwxr-xr-x 2 digoal users 4.0K Dec 5 17:19 boost_regex-gcc-d-1_53 drwxr-xr-x 2 digoal users 4.0K Dec 5 17:18 boost_regex-gcc-d-1_53_shared -rw-r--r-- 1 digoal users 2.6M Dec 5 17:18 libboost_regex-gcc-1_53.a -rwxr-xr-x 1 digoal users 1.3M Dec 5 17:17 libboost_regex-gcc-1_53.so -rw-r--r-- 1 digoal users 17M Dec 5 17:19 libboost_regex-gcc-d-1_53.a -rwxr-xr-x 1 digoal users 7.4M Dec 5 17:18 libboost_regex-gcc-d-1_53.solibboost_regex-gcc-1_53.a , 這是release版的靜態庫 libboost_regex-gcc-1_53.so , 這是release版的動態庫(共享庫) libboost_regex-gcc-d-1_53.a , 這是debug版的靜態庫 libboost_regex-gcc-d-1_53.so , 這里debug版的動態庫(共享庫)*/

avro (1.8.0 or later)

http://avro.apache.org/

http://www.apache.org/dyn/closer.cgi/avro/

wget http://mirrors.hust.edu.cn/apache/avro/avro-1.8.1/avro-src-1.8.1.tar.gztar -zxvf avro-src-1.8.1.tar.gzcd avro-src-1.8.1/lang/c mkdir build cd build/*cmake .. -DCMAKE_INSTALL_PREFIX=/home/digoal/avro -DCMAKE_BUILD_TYPE=Release -DSNAPPY_LIBRARIES=/home/digoal/snappy_home/lib -DSNAPPY_INCLUDE_DIR=/home/digoal/snappy_home/include */yum install -y zlib-devel.x86_64cmake .. -DCMAKE_INSTALL_PREFIX=/home/digoal/avro -DCMAKE_BUILD_TYPE=Release -DTHREADSAFE=true make make test make installThe "RelWithDebInfo" build type will build an optimized copy of the library, including debugging symbols. Use the "Release" build type if you don't want debugging symbols. Use the "Debug" build type if you want a non-optimized library, with debugging symbols.On Unix, you can request thread-safe versions of the Avro library's global functions by defining the THREADSAFE cmake variable. Just add the following to your cmake invokation:-DTHREADSAFE=true

libcurl

yum install -y libcurl-devel.x86_64

librdkafka

git clone https://github.com/edenhill/librdkafka /*./configure --prefix=/home/digoal/librdkafka_homemake -j 32 make installexport PKG_CONFIG_PATH=/home/digoal/avro/lib/pkgconfig:/home/digoal/librdkafka_home/lib/pkgconfig:/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH*/

測試時建議按照在默認目錄,否則可能又會有編譯錯誤的問題。

./configure make make install export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH

PostgreSQL

安裝略

bottledwater-pg

由于bottledwater-pg是PostgreSQL的一個插件,所以首先要安裝PostgreSQL。

git clone https://github.com/confluentinc/bottledwater-pgcd bottledwater-pgmake make installvi /etc/ld.so.conf /home/digoal/avro/liborexport LD_LIBRARY_PATH=/home/digoal/avro/lib:$LD_LIBRARY_PATH

可能需要重啟數據庫,加載bottledwater.so

在數據庫中創建插件

psql postgres=# create extension bottledwater ; CREATE EXTENSION

部署confluent platform

這一步是基礎,搭建好confluent,后面才能測試一下bottledwater-pg生產消息,以及從confluent platform消費消息的過程。

http://docs.confluent.io/3.1.0/installation.html

--

部署PostgreSQL流復制

bottledwater-pg需要從wal解析logical row,所以必須配置WAL_LEVEL=logical級別。

同時wal sender進程數必須>=1。

worker process數必須>=1。

同時由于bottledwater為了保證可以支持斷點續傳,以及確保沒有轉換的WAL日志不會被主庫刪掉或覆蓋掉,需要用到replication slot,因此需要配置replication_slots>=1。

postgresql.confmax_worker_processes = 8 wal_level = logical max_wal_senders = 8 wal_keep_segments = 256 max_replication_slots = 4

同時為了保證數據庫的WAL訂閱者可以通過流復制協議連接到數據庫,需要配置pg_hba.conf

pg_hba.conflocal replication digoal trust host replication digoal 127.0.0.1/32 trust host replication digoal 0.0.0.0/0 md5

創建replication角色用戶

create role digoal login replication encrypted password 'digoal123';

使用bottledwater-pg生產消息

bottledwater-pg客戶端命令的目的是從WAL解析日志,寫入Kafka。

command option需要配置如何連接到數據庫(使用流復制連接),output格式,topic-prefix(建議為庫名),是否需要初始化快照,是否允許沒有主鍵的表,kafka broker的連接地址和端口,schema-registry的連接地址和端口。

以及一些kafka相關的配置。

cd bottledwater-pg/kafka./bottledwater --help Exports a snapshot of a PostgreSQL database, followed by a stream of changes, and sends the data to a Kafka cluster.Usage:./bottledwater [OPTION]...Options:-d, --postgres=postgres://user:pass@host:port/dbname (required)Connection string or URI of the PostgreSQL server.-s, --slot=slotname Name of replication slot (default: bottledwater)The slot is automatically created on first use.-b, --broker=host1[:port1],host2[:port2]... (default: localhost:9092)Comma-separated list of Kafka broker hosts/ports.-r, --schema-registry=http://hostname:port (default: http://localhost:8081)URL of the service where Avro schemas are registered.Used only for --output-format=avro.Omit when --output-format=json.-f, --output-format=[avro|json] (default: avro)How to encode the messages for writing to Kafka.-u, --allow-unkeyed Allow export of tables that don't have a primary key.This is disallowed by default, because updates anddeletes need a primary key to identify their row.-p, --topic-prefix=prefixString to prepend to all topic names.e.g. with --topic-prefix=postgres, updates from table'users' will be written to topic 'postgres.users'.-e, --on-error=[log|exit] (default: exit)What to do in case of a transient error, such asfailure to publish to Kafka.-x, --skip-snapshot Skip taking a consistent snapshot of the existingdatabase contents and just start streaming any newupdates. (Ignored if the replication slot alreadyexists.)-C, --kafka-config property=valueSet global configuration property for Kafka producer(see --config-help for list of properties).-T, --topic-config property=valueSet topic configuration property for Kafka producer.--config-help Print the list of configuration properties. See also:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md-h, --helpPrint this help text.

bottledwater配置文件說明如下

./bottledwater --config-help 2>&1 |less ## Global configuration propertiesProperty | C/P | Range | Default | Description -----------------------------------------|-----|-----------------|--------------:|-------------------------- builtin.features | * | | gzip, snappy, ssl, sasl, regex | Indicates the builtin features for this build of librdkafka. An application can either query this value or attempt to set it with its list of required features to check for library support. <br>*Type: CSV flags* client.id | * | | rdkafka | Client identifier. <br>*Type: string* metadata.broker.list | * | | | Initial list of brokers. The application may also use `rd_kafka_brokers_add()` to add brokers during runtime. <br>*Type: string* bootstrap.servers | * | | | Alias for `metadata.broker.list` message.max.bytes | * | 1000 .. 1000000000 | 1000000 | Maximum transmit message size. <br>*Type: integer* message.copy.max.bytes | * | 0 .. 1000000000 | 65535 | Maximum size for message to be copied to buffer. Messages larger than this will be passed by reference (zero-copy) at the expense of larger iovecs. <br>*Type: integer* receive.message.max.bytes | * | 1000 .. 1000000000 | 100000000 | Maximum receive message size. This is a safety precaution to avoid memory exhaustion in case of protocol hickups. The value should be at least fetch.message.max.bytes * number of partitions consumed from + messaging overhead (e.g. 200000 bytes). <br>*Type: integer* max.in.flight.requests.per.connection | * | 1 .. 1000000 | 1000000 | Maximum number of in-flight requests the client will send. This setting applies per broker connection. <br>*Type: integer* max.in.flight | * | | | Alias for `max.in.flight.requests.per.connection` metadata.request.timeout.ms | * | 10 .. 900000 | 60000 | Non-topic request timeout in milliseconds. This is for metadata requests, etc. <br>*Type: integer* topic.metadata.refresh.interval.ms | * | -1 .. 3600000 | 300000 | Topic metadata refresh interval in milliseconds. The metadata is automatically refreshed on error and connect. Use -1 to disable the intervalled refresh. <br>*Type: integer* metadata.max.age.ms | * | | | Alias for `topic.metadata.refresh.interval.ms` topic.metadata.refresh.fast.cnt | * | 0 .. 1000 | 10 | When a topic looses its leader this number of metadata requests are sent with `topic.metadata.refresh.fast.interval.ms` interval disregarding the `topic.metadata.refresh.interval.ms` value. This is used to recover quickly from transitioning leader brokers. <br>*Type: integer* topic.metadata.refresh.fast.interval.ms | * | 1 .. 60000 | 250 | See `topic.metadata.refresh.fast.cnt` description <br>*Type: integer* topic.metadata.refresh.sparse | * | true, false | true | Sparse metadata requests (consumes less network bandwidth) <br>*Type: boolean* topic.blacklist | * | | | Topic blacklist, a comma-separated list of regular expressions for matching topic names that should be ignored in broker metadata information as if the topics did not exist. <br>*Type: pattern list* debug | * | generic, broker, topic, metadata, queue, msg, protocol, cgrp, security, fetch, feature, all | | A comma-separated list of debug contexts to enable. Debugging the Producer: broker,topic,msg. Consumer: cgrp,topic,fetch <br>*Type: CSV flags* socket.timeout.ms | * | 10 .. 300000 | 60000 | Timeout for network requests. <br>*Type: integer* socket.blocking.max.ms | * | 1 .. 60000 | 100 | Maximum time a broker socket operation may block. A lower value improves responsiveness at the expense of slightly higher CPU usage. <br>*Type: integer* socket.send.buffer.bytes | * | 0 .. 100000000 | 0 | Broker socket send buffer size. System default is used if 0. <br>*Type: integer* socket.receive.buffer.bytes | * | 0 .. 100000000 | 0 | Broker socket receive buffer size. System default is used if 0. <br>*Type: integer* socket.keepalive.enable | * | true, false | false | Enable TCP keep-alives (SO_KEEPALIVE) on broker sockets <br>*Type: boolean* socket.nagle.disable | * | true, false | false | Disable the Nagle algorithm (TCP_NODELAY). <br>*Type: boolean* socket.max.fails | * | 0 .. 1000000 | 3 | Disconnect from broker when this number of send failures (e.g., timed out requests) is reached. Disable with 0. NOTE: The connection is automatically re-established. <br>*Type: integer* broker.address.ttl | * | 0 .. 86400000 | 1000 | How long to cache the broker address resolving results (milliseconds). <br>*Type: integer* broker.address.family | * | any, v4, v6 | any | Allowed broker IP address families: any, v4, v6 <br>*Type: enum value* reconnect.backoff.jitter.ms | * | 0 .. 3600000 | 500 | Throttle broker reconnection attempts by this value +-50%. <br>*Type: integer* statistics.interval.ms | * | 0 .. 86400000 | 0 | librdkafka statistics emit interval. The application also needs to register a stats callback using `rd_kafka_conf_set_stats_cb()`. The granularity is 1000ms. A value of 0 disables statistics. <br>*Type: integer* enabled_events | * | 0 .. 2147483647 | 0 | See `rd_kafka_conf_set_events()` <br>*Type: integer* error_cb | * | | | Error callback (set with rd_kafka_conf_set_error_cb()) <br>*Type: pointer* throttle_cb | * | | | Throttle callback (set with rd_kafka_conf_set_throttle_cb()) <br>*Type: pointer* stats_cb | * | | | Statistics callback (set with rd_kafka_conf_set_stats_cb()) <br>*Type: pointer* log_cb | * | | | Log callback (set with rd_kafka_conf_set_log_cb()) <br>*Type: pointer* log_level | * | 0 .. 7 | 6 | Logging level (syslog(3) levels) <br>*Type: integer* log.thread.name | * | true, false | false | Print internal thread name in log messages (useful for debugging librdkafka internals) <br>*Type: boolean* log.connection.close | * | true, false | true | Log broker disconnects. It might be useful to turn this off when interacting with 0.9 brokers with an aggressive `connection.max.idle.ms` value. <br>*Type: boolean* socket_cb | * | | | Socket creation callback to provide race-free CLOEXEC <br>*Type: pointer* connect_cb | * | | | Socket connect callback <br>*Type: pointer* closesocket_cb | * | | | Socket close callback <br>*Type: pointer* open_cb | * | | | File open callback to provide race-free CLOEXEC <br>*Type: pointer* opaque | * | | | Application opaque (set with rd_kafka_conf_set_opaque()) <br>*Type: pointer* default_topic_conf | * | | | Default topic configuration for automatically subscribed topics <br>*Type: pointer* internal.termination.signal | * | 0 .. 128 | 0 | Signal that librdkafka will use to quickly terminate on rd_kafka_destroy(). If this signal is not set then there will be a delay before rd_kafka_wait_destroyed() returns true as internal threads are timing out their system calls. If this signal is set however the delay will be minimal. The application should mask this signal as an internal signal handler is installed. <br>*Type: integer* api.version.request | * | true, false | false | Request broker's supported API versions to adjust functionality to available protocol features. If set to false the fallback version `broker.version.fallback` will be used. **NOTE**: Depends on broker version >=0.10.0. If the request is not supported by (an older) broker the `broker.version.fallback` fallback is used. <br>*Type: boolean* api.version.fallback.ms | * | 0 .. 604800000 | 1200000 | Dictates how long the `broker.version.fallback` fallback is used in the case the ApiVersionRequest fails. **NOTE**: The ApiVersionRequest is only issued when a new connection to the broker is made (such as after an upgrade). <br>*Type: integer* broker.version.fallback | * | | 0.9.0 | Older broker versions (<0.10.0) provides no way for a client to query for supported protocol features (ApiVersionRequest, see `api.version.request`) making it impossible for the client to know what features it may use. As a workaround a user may set this property to the expected broker version and the client will automatically adjust its feature set accordingly if the ApiVersionRequest fails (or is disabled). The fallback broker version will be used for `api.version.fallback.ms`. Valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. <br>*Type: string* security.protocol | * | plaintext, ssl, sasl_plaintext, sasl_ssl | plaintext | Protocol used to communicate with brokers. <br>*Type: enum value* ssl.cipher.suites | * | | | A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. See manual page for `ciphers(1)` and `SSL_CTX_set_cipher_list(3). <br>*Type: string* ssl.key.location | * | | | Path to client's private key (PEM) used for authentication. <br>*Type: string* ssl.key.password | * | | | Private key passphrase <br>*Type: string* ssl.certificate.location | * | | | Path to client's public key (PEM) used for authentication. <br>*Type: string* ssl.ca.location | * | | | File or directory path to CA certificate(s) for verifying the broker's key. <br>*Type: string* ssl.crl.location | * | | | Path to CRL for verifying broker's certificate validity. <br>*Type: string* sasl.mechanisms | * | | GSSAPI | SASL mechanism to use for authentication. Supported: GSSAPI, PLAIN. **NOTE**: Despite the name only one mechanism must be configured. <br>*Type: string* sasl.kerberos.service.name | * | | kafka | Kerberos principal name that Kafka runs as. <br>*Type: string* sasl.kerberos.principal | * | | kafkaclient | This client's Kerberos principal name. <br>*Type: string* sasl.kerberos.kinit.cmd | * | | kinit -S "%{sasl.kerberos.service.name}/%{broker.name}" -k -t "%{sasl.kerberos.keytab}" %{sasl.kerberos.principal} | Full kerberos kinit command string, %{config.prop.name} is replaced by corresponding config object value, %{broker.name} returns the broker's hostname. <br>*Type: string* sasl.kerberos.keytab | * | | | Path to Kerberos keytab file. Uses system default if not set.**NOTE**: This is not automatically used but must be added to the template in sasl.kerberos.kinit.cmd as ` ... -t %{sasl.kerberos.keytab}`. <br>*Type: string* sasl.kerberos.min.time.before.relogin | * | 1 .. 86400000 | 60000 | Minimum time in milliseconds between key refresh attempts. <br>*Type: integer* sasl.username | * | | | SASL username for use with the PLAIN mechanism <br>*Type: string* sasl.password | * | | | SASL password for use with the PLAIN mechanism <br>*Type: string* group.id | * | | | Client group id string. All clients sharing the same group.id belong to the same group. <br>*Type: string* partition.assignment.strategy | * | | range,roundrobin | Name of partition assignment strategy to use when elected group leader assigns partitions to group members. <br>*Type: string* session.timeout.ms | * | 1 .. 3600000 | 30000 | Client group session and failure detection timeout. <br>*Type: integer* heartbeat.interval.ms | * | 1 .. 3600000 | 1000 | Group session keepalive heartbeat interval. <br>*Type: integer* group.protocol.type | * | | consumer | Group protocol type <br>*Type: string* coordinator.query.interval.ms | * | 1 .. 3600000 | 600000 | How often to query for the current client group coordinator. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment. <br>*Type: integer* enable.auto.commit | C | true, false | true | Automatically and periodically commit offsets in the background. <br>*Type: boolean* auto.commit.interval.ms | C | 0 .. 86400000 | 5000 | The frequency in milliseconds that the consumer offsets are committed (written) to offset storage. (0 = disable) <br>*Type: integer* enable.auto.offset.store | C | true, false | true | Automatically store offset of last message provided to application. <br>*Type: boolean* queued.min.messages | C | 1 .. 10000000 | 100000 | Minimum number of messages per topic+partition in the local consumer queue. <br>*Type: integer* queued.max.messages.kbytes | C | 1 .. 1000000000 | 1000000 | Maximum number of kilobytes per topic+partition in the local consumer queue. This value may be overshot by fetch.message.max.bytes. <br>*Type: integer* fetch.wait.max.ms | C | 0 .. 300000 | 100 | Maximum time the broker may wait to fill the response with fetch.min.bytes. <br>*Type: integer* fetch.message.max.bytes | C | 1 .. 1000000000 | 1048576 | Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched. <br>*Type: integer* max.partition.fetch.bytes | C | | | Alias for `fetch.message.max.bytes` fetch.min.bytes | C | 1 .. 100000000 | 1 | Minimum number of bytes the broker responds with. If fetch.wait.max.ms expires the accumulated data will be sent to the client regardless of this setting. <br>*Type: integer* fetch.error.backoff.ms | C | 0 .. 300000 | 500 | How long to postpone the next fetch request for a topic+partition in case of a fetch error. <br>*Type: integer* offset.store.method | C | none, file, broker | broker | Offset commit store method: 'file' - local file store (offset.store.path, et.al), 'broker' - broker commit store (requires Apache Kafka 0.8.2 or later on the broker). <br>*Type: enum value* consume_cb | C | | | Message consume callback (set with rd_kafka_conf_set_consume_cb()) <br>*Type: pointer* rebalance_cb | C | | | Called after consumer group has been rebalanced (set with rd_kafka_conf_set_rebalance_cb()) <br>*Type: pointer* offset_commit_cb | C | | | Offset commit result propagation callback. (set with rd_kafka_conf_set_offset_commit_cb()) <br>*Type: pointer* enable.partition.eof | C | true, false | true | Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition. <br>*Type: boolean* queue.buffering.max.messages | P | 1 .. 10000000 | 100000 | Maximum number of messages allowed on the producer queue. <br>*Type: integer* queue.buffering.max.kbytes | P | 1 .. 2147483647 | 4000000 | Maximum total message size sum allowed on the producer queue. <br>*Type: integer* queue.buffering.max.ms | P | 1 .. 900000 | 1000 | Maximum time, in milliseconds, for buffering data on the producer queue. <br>*Type: integer* message.send.max.retries | P | 0 .. 10000000 | 2 | How many times to retry sending a failing MessageSet. **Note:** retrying may cause reordering. <br>*Type: integer* retries | P | | | Alias for `message.send.max.retries` retry.backoff.ms | P | 1 .. 300000 | 100 | The backoff time in milliseconds before retrying a message send. <br>*Type: integer* compression.codec | P | none, gzip, snappy | none | compression codec to use for compressing message sets. This is the default value for all topics, may be overriden by the topic configuration property `compression.codec`. <br>*Type: enum value* batch.num.messages | P | 1 .. 1000000 | 10000 | Maximum number of messages batched in one MessageSet. The total MessageSet size is also limited by message.max.bytes. <br>*Type: integer* delivery.report.only.error | P | true, false | false | Only provide delivery reports for failed messages. <br>*Type: boolean* dr_cb | P | | | Delivery report callback (set with rd_kafka_conf_set_dr_cb()) <br>*Type: pointer* dr_msg_cb | P | | | Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb()) <br>*Type: pointer*## Topic configuration propertiesProperty | C/P | Range | Default | Description -----------------------------------------|-----|-----------------|--------------:|-------------------------- request.required.acks | P | -1 .. 1000 | 1 | This field indicates how many acknowledgements the leader broker must receive from ISR brokers before responding to the request: *0*=Broker does not send any response/ack to client, *1*=Only the leader broker will need to ack the message, *-1* or *all*=broker will block until message is committed by all in sync replicas (ISRs) or broker's `in.sync.replicas` setting before sending response. <br>*Type: integer* acks | P | | | Alias for `request.required.acks` request.timeout.ms | P | 1 .. 900000 | 5000 | The ack timeout of the producer request in milliseconds. This value is only enforced by the broker and relies on `request.required.acks` being != 0. <br>*Type: integer* message.timeout.ms | P | 0 .. 900000 | 300000 | Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. <br>*Type: integer* produce.offset.report | P | true, false | false | Report offset of produced message back to application. The application must be use the `dr_msg_cb` to retrieve the offset from `rd_kafka_message_t.offset`. <br>*Type: boolean* partitioner_cb | P | | | Partitioner callback (set with rd_kafka_topic_conf_set_partitioner_cb()) <br>*Type: pointer* opaque | * | | | Application opaque (set with rd_kafka_topic_conf_set_opaque()) <br>*Type: pointer* compression.codec | P | none, gzip, snappy, inherit | inherit | Compression codec to use for compressing message sets. <br>*Type: enum value* auto.commit.enable | C | true, false | true | If true, periodically commit offset of the last message handed to the application. This committed offset will be used when the process restarts to pick up where it left off. If false, the application will have to call `rd_kafka_offset_store()` to store an offset (optional). **NOTE:** This property should only be used with the simple legacy consumer, when using the high-level KafkaConsumer the global `enable.auto.commit` property must be used instead. **NOTE:** There is currently no zookeeper integration, offsets will be written to broker or local file according to offset.store.method. <br>*Type: boolean* enable.auto.commit | C | | | Alias for `auto.commit.enable` auto.commit.interval.ms | C | 10 .. 86400000 | 60000 | The frequency in milliseconds that the consumer offsets are committed (written) to offset storage. <br>*Type: integer* auto.offset.reset | C | smallest, earliest, beginning, largest, latest, end, error | largest | Action to take when there is no initial offset in offset store or the desired offset is out of range: 'smallest','earliest' - automatically reset the offset to the smallest offset, 'largest','latest' - automatically reset the offset to the largest offset, 'error' - trigger an error which is retrieved by consuming messages and checking 'message->err'. <br>*Type: enum value* offset.store.path | C | | . | Path to local file for storing offsets. If the path is a directory a filename will be automatically generated in that directory based on the topic and partition. <br>*Type: string* offset.store.sync.interval.ms | C | -1 .. 86400000 | -1 | fsync() interval for the offset file, in milliseconds. Use -1 to disable syncing, and 0 for immediate sync after each write. <br>*Type: integer* offset.store.method | C | file, broker | broker | Offset commit store method: 'file' - local file store (offset.store.path, et.al), 'broker' - broker commit store (requires "group.id" to be configured and Apache Kafka 0.8.2 or later on the broker.). <br>*Type: enum value* consume.callback.max.messages | C | 0 .. 1000000 | 0 | Maximum number of messages to dispatch in one `rd_kafka_consume_callback*()` call (0 = unlimited) <br>*Type: integer*### C/P legend: C = Consumer, P = Producer, * = both

消費消息

由于confluent中存儲的是avro封裝的binary格式,所以消費時,需要使用解析avro的消費者。

./bin/kafka-avro-console-consumer --topic test --zookeeper localhost:2181 \--property print.key=true

風險評估

1. 首次連接數據庫時,會自動創建slot,同時自動開始將快照數據寫入Kafka,如果數據庫很大,這個過程會很漫長。

2. 為了得到一致的數據,會開啟repeatable read的事務隔離級別,如果是9.6,并且配置了snapshot too old參數,可能導致快照拷貝失敗。

3. 由于是邏輯DECODE,被復制的表必須包含逐漸,或指定非空唯一約束列,作為復制時的KEY。

4. 增,刪,改在WAL中被解析為:

insert : key + full row delete : old.key update : old.key + new.key+full row

如果使用--allow-unkeyed跳過了主鍵,那么delete該表時,不會將任何數據寫入Kafka,插入和更新則將所有列發給Kafka。

5. DDL操作不會記錄到wal日志中,如果你需要將DDL也寫入Kafka怎么辦?

你可以使用event trigger,發生ddl時,將DDL封裝并寫入表中,然后這些表的DML會寫入Kafka,從而實現DDL的傳遞。

6. 如果要刪除生產者,務必刪除數據庫中對應的slot ,否則PostgreSQL會一直保留slot未讀取的日志。 導致WAL目錄撐爆。

7. 如果數據庫產生的REDO沒有被及時的解析并寫入Kafka,可能導致未取走的數據庫的wal文件一直留在數據庫服務器,甚至導致數據庫空間撐爆。

請謹慎使用slot,同時請將監控做得健壯。

8. Kafka topic與table一一對應,命名規則如下

由于命名中只有三個部分?[topic_prefix].[postgres_schema_name].table_name?沒有考慮庫名,所以如果有多個數據庫時,建議配置top_prefix,和庫名對應即可。

For each table being streamed, Bottled Water publishes messages to a corresponding Kafka topic. The naming convention for topics is [topic_prefix].[postgres_schema_name].table_name:table_name is the name of the table in Postgres. postgres_schema_name is the name of the Postgres schema the table belongs to; this is omitted if the schema is "public" (the default schema under the default Postgres configuration). N.B. this requires the avro-c library to be at least version 0.8.0. topic_prefix is omitted by default, but may be configured via the --topic-prefix command-line option. A prefix is useful: to prevent name collisions with other topics, if the Kafka broker is also being used for other purposes besides Bottled Water. if you want to stream several databases into the same broker, using a separate Bottled Water instance with a different prefix for each database. to make it easier for a Kafka consumer to consume updates from all Postgres tables, by using a topic regex that matches the prefix. For example:with no prefix configured, a table named "users" in the public (default) schema would be streamed to a topic named "users". with --topic-prefix=bottledwater, a table named "transactions" in the "point-of-sale" schema would be streamed to a topic named "bottledwater.point-of-sale.transactions". (Support for namespaces in Kafka has been proposed that would replace this sort of ad-hoc prefixing, but it's still under discussion.)

參考

1.?https://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/

2.?http://docs.confluent.io/3.1.0/platform.html

3.?https://github.com/confluentinc/bottledwater-pg/tree/master#building-from-source

4.?http://docs.confluent.io/3.0.1/quickstart.html

5.?https://www.postgresql.org/message-id/797DF957-CE33-407F-99DB-7C7125E37ACE@kleppmann.com

總結

以上是生活随笔為你收集整理的实时数据交换平台 - BottledWater-pg with confluent的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

婷婷激情av | 五月婷在线播放 | 国产剧情一区二区 | 中文字幕一区二区三区在线播放 | 久草电影在线观看 | 91大神精品视频在线观看 | 中文字幕有码在线观看 | 久久久精品综合 | 国产视频久久久久 | 国产精品久久久久久久久久久久久久 | 成年人黄色免费看 | 伊人成人激情 | 国产精品久久久久久69 | av中文在线 | 日本最新中文字幕 | 欧美成人在线免费 | 亚洲欧美视频在线播放 | 天天操天天干天天插 | 久久精美视频 | 亚洲精品色 | 国产.精品.日韩.另类.中文.在线.播放 | 五月天婷婷狠狠 | 福利网在线 | 中文字幕在线观看一区 | 亚洲激情影院 | 久草网免费 | 国产精品黄色影片导航在线观看 | 亚洲精品视频免费 | 欧美一级免费黄色片 | 国产乱对白刺激视频在线观看女王 | 成人精品一区二区三区电影免费 | 亚洲在线视频免费观看 | 精品在线观看国产 | 中文在线8新资源库 | 亚洲第一区在线播放 | av一级一片 | 久久乐九色婷婷综合色狠狠182 | 国产亚洲aⅴaaaaaa毛片 | 日韩精品一区二区三区不卡 | 天天做天天爱天天爽综合网 | 人人爽久久涩噜噜噜网站 | 91在线最新| 成人wwwxxx视频 | 中文字幕在线观看的网站 | 久久久久久不卡 | 超级碰碰碰视频 | 美女视频黄,久久 | 国产成人精品一区二区三区在线 | 日韩在线 一区二区 | 中午字幕在线观看 | 中文字幕一区二区在线观看 | 超碰97在线人人 | 中文字幕国产在线 | 天天操天天摸天天射 | 久久伊人精品一区二区三区 | 成人一区二区三区在线 | 九九视频网 | 一区二区国产精品 | 天堂在线一区二区 | 狠狠躁18三区二区一区ai明星 | 亚洲精品人人 | 一级黄色视屏 | 美女啪啪图片 | 婷婷成人亚洲综合国产xv88 | 99精品视频播放 | 国产美女视频网站 | 最近中文字幕久久 | 啪啪动态视频 | 国产亚洲综合在线 | 成人理论电影 | 欧美成a人片在线观看久 | 亚洲视频axxx | 最新一区二区三区 | 日韩国产精品一区 | 久久国产视频网站 | 在线免费观看黄色大片 | 成人激情开心网 | 天天综合中文 | 97精品国产97久久久久久粉红 | 国产精彩视频一区 | 亚洲男男gaygayxxxgv | av三级在线看 | 天天射成人 | 国产探花视频在线播放 | 亚洲精品在线观看的 | 欧美不卡视频在线 | 综合网伊人| 久久激情五月丁香伊人 | 91看片看淫黄大片 | 日韩中文字幕视频在线观看 | 欧美一级免费 | 久久久久久久久久久黄色 | 日韩中文字幕免费看 | 久久久久久免费视频 | www.国产高清 | 韩国视频一区二区三区 | 久久观看| 国产乱老熟视频网88av | 在线天堂中文在线资源网 | 91豆麻精品91久久久久久 | 日韩在线看片 | 久久免费成人精品视频 | 欧美日韩在线网站 | 国产视频一区二区三区在线 | 中文字幕日韩电影 | 亚洲婷久久 | 我要色综合天天 | 亚洲精品xxxx | 激情中文字幕 | 九九九九九九精品任你躁 | 在线成人免费电影 | 伊人电影在线观看 | 国产午夜三级一区二区三 | 91av电影在线 | 色视频网站免费观看 | 不卡的av片 | 韩国一区视频 | 亚洲精品999 | 天天干天天搞天天射 | 国产又粗又猛又色又黄网站 | 日韩超碰在线 | 狠狠干 狠狠操 | 中文字幕网站 | 天天草天天色 | 人交video另类hd| www.黄色片.com| 97超碰人人爱 | 免费观看成年人视频 | 免费在线看v | 欧美成人黄 | 亚洲黄色免费网站 | 黄色毛片一级片 | 一区二区三区在线观看中文字幕 | 美女在线国产 | 久久精选视频 | 91看片在线 | 视频国产在线 | 伊人影院在线观看 | 日韩大片在线免费观看 | 久久国产精品久久久 | 午夜成人影视 | 国内精品久久久久影院优 | 久久69av | 久久久久这里只有精品 | 在线中文字幕一区二区 | 黄色免费网站大全 | 人人舔人人射 | 中文字幕一区二区三区久久 | 亚洲视频在线看 | 少妇激情久久 | 免费看一级特黄a大片 | 在线视频电影 | 福利av影院| 亚洲天堂精品 | 97高清免费视频 | 国产第一福利网 | 日韩欧美精品在线视频 | 99视频免费 | 色播五月激情综合网 | 97在线视频免费 | 亚洲国产日韩欧美在线 | 夜夜夜精品 | 婷婷综合网 | 亚州精品天堂中文字幕 | 美女免费视频一区 | 国产一级二级三级在线观看 | 亚洲欧美在线综合 | 久久久色 | 天天操夜夜操天天射 | 日韩免费一区二区三区 | 日韩视频免费 | 少妇bbw撒尿 | 国产成人精品久久亚洲高清不卡 | 婷婷丁香六月天 | 最新黄色av网址 | 国内精品久久久久影院一蜜桃 | 精品欧美一区二区精品久久 | 久久久国产成人 | 国产丝袜一区二区三区 | 国产精品久久久久四虎 | 在线91视频 | 欧美日韩国产在线 | 四虎影视久久久 | 国产精品久久久久毛片大屁完整版 | 久久99精品久久久久婷婷 | 色婷婷精品大在线视频 | 欧美日韩在线免费观看视频 | 久久视频国产精品免费视频在线 | 国产黄色片久久久 | 91丨九色丨国产在线 | 成人羞羞视频在线观看免费 | 久久国精品 | 91视频 - v11av| 久久一区91 | 久久精品牌麻豆国产大山 | 欧美激情综合五月色丁香 | 久久久精品99 | 69久久夜色精品国产69 | 亚洲国产精品日韩 | 福利在线看片 | 欧美精品久久久久久 | 91免费在线 | 日韩免费电影在线观看 | 国产精品久久久久久999 | 久久少妇| 日韩精品免费一线在线观看 | 免费看久久 | 亚洲精品国产综合99久久夜夜嗨 | 国产精品扒开做爽爽的视频 | 国产精品久久久久久久久久久久午夜 | www.狠狠操 | 国产在线观看午夜 | 精品国产三级a∨在线欧美 免费一级片在线观看 | 久久精品黄色 | 天天色视频 | 免费h漫在线观看 | 五月婷久 | 日韩高清免费观看 | 99精品免费在线观看 | 亚洲精品视频在线观看免费视频 | 在线欧美最极品的av | 国产成人一区二区三区久久精品 | 日韩在线观看影院 | 久久午夜电影院 | 日本天天操 | 久久综合色播五月 | 日本不卡一区二区三区在线观看 | 国产精品久久久久一区二区国产 | 天堂久久电影网 | 欧美性生活一级片 | 免费黄色a网站 | 天天干天天做天天操 | 久草视频免费在线播放 | 久久精品波多野结衣 | 99久久免费看 | 91精品国产99久久久久久久 | 日韩成人不卡 | 亚洲精品久久久久www | 亚洲精品国产精品久久99热 | 国产成人av电影在线 | .国产精品成人自产拍在线观看6 | 三级免费黄色 | 日日摸日日 | 99c视频在线| 国产免费成人 | 国内揄拍国产精品 | 91久久精品一区二区二区 | 日韩欧美高清一区二区 | 日日摸日日添夜夜爽97 | 国产精品色婷婷 | 色在线高清 | 日韩欧美网址 | 国产精品va在线观看入 | 久久久精品在线观看 | 8x成人在线 | 国产香蕉视频在线播放 | 色播五月激情综合网 | 一区二区亚洲精品 | 91九色视频观看 | 国产精品免费久久久久 | 视频一区二区三区视频 | 国产精品一区二区三区久久 | 黄色三级免费观看 | 友田真希av | 国产永久免费高清在线观看视频 | 国产精品18久久久久久久久 | 久久综合爱 | 欧美日韩在线精品一区二区 | 国产色网站 | 免费观看黄色12片一级视频 | 精品在线99 | 国产午夜一区二区 | bbb搡bbb爽爽爽 | 狠狠操.com | 91在线九色 | 免费在线观看视频a | 91中文视频 | 成人av在线一区二区 | 国产成人av一区二区三区在线观看 | 国产一区二区不卡视频 | 免费看麻豆 | 在线亚洲成人 | 美女久久 | 在线观看91av | 99在线免费观看视频 | 91成人免费观看视频 | 久久一级片 | 色香网 | 免费av试看| 欧美成人va | 国产一性一爱一乱一交 | 成人av中文字幕在线观看 | 中文字幕之中文字幕 | 美女又爽又黄 | 国产精品原创视频 | 亚洲国产高清视频 | 中文av在线播放 | 999国内精品永久免费视频 | 婷婷综合五月天 | 五月激情姐姐 | 日日夜夜综合 | 欧美亚洲久久 | 天天天天射 | 国产一区二区影院 | 日韩av电影免费在线观看 | 国产精品午夜在线观看 | 最新色视频 | 97成人资源 | 在线国产福利 | 亚洲成人av影片 | 久久视频 | 国产精品国内免费一区二区三区 | 国产日产精品久久久久快鸭 | 色偷偷88欧美精品久久久 | 亚洲欧美激情插 | 六月丁香伊人 | 国产又粗又硬又爽的视频 | 97精品在线视频 | 日韩精品一区二区三区免费观看 | 精品日韩中文字幕 | 99精品视频免费观看视频 | 色婷婷av一区二 | 亚洲视频每日更新 | 成人蜜桃 | 亚洲一级片 | 麻豆久久一区二区 | 免费视频在线观看网站 | 久草9视频 | 国产精品 中文字幕 亚洲 欧美 | 这里只有精彩视频 | freejavvideo日本免费 | 精品免费一区二区三区 | 日韩av电影免费在线观看 | 亚洲视频999 | 日韩中文字幕在线观看 | 91av视频在线观看 | 六月激情丁香 | 一级黄色a视频 | 97天天综合网 | 色99导航 | 激情视频综合网 | 中文字幕av最新 | 黄网站色成年免费观看 | 成人亚洲网 | 香蕉视频在线网站 | 91黄站| 五月婷婷综合久久 | 国内一区二区视频 | 777视频在线观看 | 久久夜视频 | 日韩一级电影在线 | 亚洲最大激情中文字幕 | 久久九九免费 | 青青河边草免费观看完整版高清 | 91丨九色丨国产丨porny精品 | 亚洲成人一区 | 久久精品五月 | 国产在线色视频 | 免费黄色a网站 | 色91在线视频| 色综合激情网 | 国产精品中文字幕在线 | 久久综合婷婷综合 | 中文字幕在线有码 | 激情 一区二区 | 免费在线看成人av | 国产黄色免费在线观看 | 99久久国产免费免费 | 亚洲黄色免费电影 | 99久高清在线观看视频99精品热在线观看视频 | 久久精品欧美日韩精品 | 人人爽久久涩噜噜噜网站 | 国产不卡在线 | 丁香婷婷综合网 | 亚洲精品午夜久久久久久久 | 久久尤物电影视频在线观看 | 国产成人av | 日日夜夜天天操 | 国产尤物一区二区三区 | 国产xx在线| 亚洲天堂在线观看完整版 | 99热这里只有精品8 久久综合毛片 | 免费看的国产视频网站 | 综合网婷婷 | 久久国色夜色精品国产 | av网站免费在线 | 久久久天天操 | 日本三级国产 | 91麻豆免费看 | www国产亚洲精品久久麻豆 | 欧美 激情 国产 91 在线 | 日韩高清免费在线观看 | 日韩免费在线网站 | 国产欧美精品一区二区三区四区 | 亚洲 欧美 另类人妖 | 婷婷色婷婷 | 九九热视频在线播放 | 中文字幕在线观看免费高清电影 | 国产精品福利午夜在线观看 | 亚洲成人精品国产 | 最近日本中文字幕 | 97狠狠操| aaawww| 国产精品嫩草影院99网站 | www.夜夜爽 | 国产一级在线播放 | 国产美女永久免费 | 精品视频在线观看 | 日韩网站中文字幕 | 久久久久久久久久国产精品 | 黄色午夜 | 亚洲黄色小说网址 | 亚洲精品久久久久久久蜜桃 | 精品毛片久久久久久 | 99爱精品视频 | 精品日本视频 | 国产欧美精品一区二区三区 | a在线v| 丁香六月婷婷开心婷婷网 | 乱男乱女www7788 | 久久精品男人的天堂 | www.操.com| 久久久久夜色 | 黄色在线免费观看网站 | 99久久久久免费精品国产 | 手机在线观看国产精品 | 日本精品一区二区三区在线播放视频 | 亚洲精品在线二区 | 99免费在线播放99久久免费 | 一区二区三区免费看 | 亚洲永久精品在线观看 | 黄p网站在线观看 | 天天天色综合a | 91精品资源 | 不卡在线一区 | 成年人精品 | 美女免费视频一区 | 中文字幕丰满人伦在线 | 99久久精品午夜一区二区小说 | 欧美午夜激情网 | 激情在线五月天 | 亚洲精品久久久久久中文传媒 | 6080yy午夜一二三区久久 | 国产香蕉97碰碰久久人人 | sesese图片 | 伊人六月 | 国产欧美综合视频 | 国产99精品在线观看 | 欧美精品三级在线观看 | 免费一区在线 | 亚洲性少妇性猛交wwww乱大交 | 在线播放精品一区二区三区 | 成人禁用看黄a在线 | 天天操福利视频 | 久久在线免费观看视频 | 草久在线观看视频 | 欧美亚洲专区 | 91精品在线免费视频 | 日韩av影视在线 | 中文字幕在线观看日本 | 91视频在线国产 | 久久天天躁夜夜躁狠狠躁2022 | 久久在线影院 | 天天色天天射综合网 | 黄色小网站在线 | 欧美日韩高清在线观看 | 色婷婷久久久 | 国产精品涩涩屋www在线观看 | 亚洲视频 一区 | 在线 视频 一区二区 | 四虎影视成人永久免费观看亚洲欧美 | 操操色| 中文字幕日本特黄aa毛片 | 国产在线精品观看 | 一级片免费观看 | 韩日精品中文字幕 | av网站免费在线 | 日韩精品在线播放 | 国产不卡一二三区 | 亚洲国产一区二区精品专区 | 日韩高清二区 | 综合久久久久 | 免费观看性生活大片3 | 日韩videos | 久久综合九色99 | 美女福利视频一区二区 | 三级黄色欧美 | 99热九九这里只有精品10 | 日韩在线观看网址 | 久草精品视频 | 欧美一区,二区 | 四虎影视成人精品 | 九九热精品视频在线观看 | 亚洲高清精品在线 | 激情导航 | 永久免费毛片在线观看 | 69国产盗摄一区二区三区五区 | 99在线视频免费观看 | 亚洲丁香日韩 | 97超碰在线久草超碰在线观看 | 成人a级网站 | 亚洲另类视频在线 | 色婷婷播放 | 欧美极品在线播放 | 国产一区精品在线观看 | 亚洲精品视频在线观看视频 | 91久久久久久国产精品 | 国产伦精品一区二区三区四区视频 | 国产免费激情久久 | 国产小视频在线观看免费 | 99久久精品费精品 | 狠狠色丁香婷婷综合久小说久 | 久久精品之 | 色狠狠久久av五月综合 | 免费99精品国产自在在线 | 国产丝袜在线 | 免费网站看av片 | 麻豆视频观看 | 超碰在线中文字幕 | 天天操夜夜叫 | 四虎影视8848dvd | 天天射综合网视频 | 中午字幕在线观看 | 精品国产精品久久一区免费式 | 九九亚洲视频 | 亚洲欧美日韩中文在线 | 成人在线观看你懂的 | 欧美一区二区精美视频 | 丁香综合av | 久章草在线观看 | 久久久久久美女 | 久久免费视频一区 | 91精品视频免费看 | 欧美激情视频免费看 | 日韩乱理 | 五月天久久 | 99精品免费久久久久久日本 | 国产又黄又爽无遮挡 | 国产美女免费观看 | 视频在线观看国产 | 欧美 日韩 久久 | 日韩草比 | 91日韩在线播放 | 九九九九精品九九九九 | 麻豆一区在线观看 | 欧美日韩国产在线观看 | 亚洲九九九 | 色就干| 成人久久久精品国产乱码一区二区 | 999久久久久久久久久久 | 在线国产视频观看 | 婷婷六月丁 | 亚洲另类在线视频 | 美女黄频视频大全 | av网站大全免费 | 国产91aaa | 国内精品久久久久久久久 | 免费在线黄网 | 午夜精品久久久久久中宇69 | 免费电影一区二区三区 | 午夜精品久久久久久久久久 | 久久久久久久久久久影院 | 婷婷播播网 | 久久综合电影 | 久久国产欧美日韩精品 | 欧美成年人在线观看 | 五月天亚洲精品 | 亚洲成人av一区 | 超碰在线人人艹 | 欧美日韩中文字幕视频 | 毛片一级免费一级 | 欧美精品一区二区蜜臀亚洲 | 特级黄色电影 | 免费69视频 | 久草综合在线观看 | 日日草天天草 | 成 人 黄 色 视频 免费观看 | 日韩免费一区二区 | 色综合久久中文综合久久牛 | 高清av免费观看 | 日韩av二区| 亚洲va韩国va欧美va精四季 | 欧洲精品码一区二区三区免费看 | 五月天丁香视频 | 91成人免费在线视频 | 性色av香蕉一区二区 | 黄免费在线观看 | 天天干天天摸 | 中文免费在线观看 | 久久久久久久久久久免费av | 美女精品 | 久久精品福利视频 | 综合激情婷婷 | 国产美女视频网站 | 国产一区二区免费看 | 国产a精品 | 国产专区在线播放 | 日韩女同av| 午夜视频在线观看欧美 | 九九免费观看全部免费视频 | 亚洲成a人片在线www | 狠狠操天天射 | 久久久久久久久久久高潮一区二区 | 国产一二区免费视频 | 日本久久久久久久久久 | 欧洲视频一区 | 51久久成人国产精品麻豆 | 91九色视频在线播放 | 久久高视频 | 日本不卡视频 | 日韩高清成人在线 | 国产日韩中文字幕在线 | 国产精品一区二区三区四区在线观看 | 亚洲精品国产成人av在线 | 超碰在线官网 | 24小时日本在线www免费的 | 日韩精品一区在线播放 | 伊人精品在线 | 韩国av不卡| 在线成人高清电影 | wwwav视频 | 午夜精品一区二区三区在线播放 | 啪啪凸凸 | 久草在线手机观看 | 色欧美综合 | 久久天天躁夜夜躁狠狠85麻豆 | .精品久久久麻豆国产精品 亚洲va欧美 | 日韩三级久久 | 久久久久国 | 激情网五月婷婷 | 国产精品aⅴ | 波多野结依在线观看 | 欧美成人高清 | 色综合久久88 | 男女免费视频观看 | 亚洲国产精品久久久久婷婷884 | 欧美va天堂va视频va在线 | 特级西西444www大精品视频免费看 | 亚洲成人av片 | 337p日本欧洲亚洲大胆裸体艺术 | 国产精品igao视频网入口 | 人人盈棋牌 | 国产高清不卡 | 国产小视频国产精品 | 欧洲精品一区二区 | 免费在线观看av的网站 | 欧美福利片在线观看 | 五月天.com | 国产精品一区二区白浆 | 亚洲天堂网站视频 | 天天操天天操天天操 | 91精品国产99久久久久久久 | 国产区 在线 | 国内视频在线 | 久久永久视频 | 久久精品久久精品久久精品 | 成年人在线免费视频观看 | 日韩视频区 | 香蕉在线播放 | 免费视频区 | 日韩欧美精品在线观看视频 | 久久精品99视频 | 国产黄色片免费 | 在线视频a| 久久精品视频在线免费观看 | 麻豆系列在线观看 | 午夜成人免费影院 | 日本久久久亚洲精品 | 国产一级h | 狠狠狠色丁香婷婷综合激情 | 欧美成人a在线 | 麻豆免费看片 | 国产一级视频在线观看 | 欧美激情视频久久 | 亚洲理论在线观看 | 亚洲午夜av电影 | 国产精品美女久久久久久久 | 欧美成人xxxx | 99产精品成人啪免费网站 | 麻豆久久久久 | 伊人天堂网 | 天天色天天射天天干 | 99久久日韩精品免费热麻豆美女 | 深爱五月激情五月 | 99视频免费看 | 国产美女无遮挡永久免费 | 91久久精品一区二区二区 | 亚洲成人二区 | 少妇bbb搡bbbb搡bbbb′ | 少妇av网| 免费午夜视频在线观看 | 国产精品18久久久久久久久久久久 | 欧美精品久久久 | 91精品久久香蕉国产线看观看 | 日韩精品久久久久久久电影99爱 | 国产三级av在线 | 日韩精品一区二区三区中文字幕 | 91视频在线免费 | 91精品久久久久久久久久入口 | 国产中文在线播放 | 中文字幕精品在线 | 亚洲国产免费av | 天天插伊人 | 久久视频免费看 | 五月婷亚洲 | 日本黄色大片免费看 | 99这里只有精品99 | 亚洲一级免费观看 | 日韩国产精品一区 | 91成年人网站 | av高清免费在线 | 欧美精品一区二区在线播放 | 欧美91精品久久久久国产性生爱 | 99欧美精品 | 一本色道久久综合亚洲二区三区 | 国产91精品在线观看 | 欧美综合久久 | 麻豆免费精品视频 | 日韩啪视频 | 曰韩在线 | 91免费的视频在线播放 | 日韩免费观看高清 | 日韩激情网 | 国产丝袜美腿在线 | 色婷婷啪啪免费在线电影观看 | 欧美日韩在线精品 | 成人精品一区二区三区电影免费 | 国产一级性生活 | 久久资源总站 | 国产一级视屏 | 欧美日韩国产页 | 激情网站| 伊人天天干 | 久久高清精品 | 日韩欧美大片免费观看 | 国产色网| www日韩视频 | 天天射射天天 | 午夜精品999 | 精品视频在线播放 | 一级做a爱片性色毛片www | 国产亚洲成人网 | 午夜91在线 | 日日日日| 国产免费国产 | 国产在线精品一区二区 | 国产成人精品一区二区在线观看 | 日韩在线理论 | 国产美女在线精品免费观看 | 成 人 黄 色 片 在线播放 | 久久99国产精品久久 | 中文字幕人成一区 | 日韩精品中文字幕在线观看 | 麻花豆传媒mv在线观看 | 91在线视频免费 | 国产午夜精品一区二区三区四区 | 日韩欧美一区二区三区免费观看 | 亚洲jizzjizz日本少妇 | 久久一区二区三区国产精品 | 99久热精品| 亚洲jizzjizz日本少妇 | 最近2019中文免费高清视频观看www99 | 精品天堂av| 国产视频69 | 国产视频1| av在线日韩 | 精品视频在线观看 | 国产成人久久精品亚洲 | 中文字幕在线视频一区 | 日本久久成人中文字幕电影 | 在线有码中文字幕 | 午夜影院在线观看18 | 黄色成人影视 | av天天色| 玖玖在线视频观看 | av大片免费看 | 91麻豆精品国产91久久久使用方法 | 久久综合色综合88 | 日韩二区在线播放 | 夜色成人av | 激情视频区 | 成年人在线免费看视频 | 五月婷影院 | 国产又粗又硬又长又爽的视频 | 国产一级二级在线 | 亚洲视频久久久 | 天天se天天cao天天干 | 国产综合福利在线 | 日韩av免费一区二区 | 中文字幕亚洲欧美日韩 | 日韩欧美高清视频在线观看 | 久久久麻豆精品一区二区 | 久久观看免费视频 | 欧美一级免费 | 91 中文字幕 | 久久成人精品电影 | 808电影免费观看三年 | 精品久久91| 五月婷婷中文 | 在线免费视频你懂的 | 日韩视频一区二区在线观看 | 欧美视频一区二 | 日本久久久久久久久久 | 亚洲精品久久激情国产片 | 亚洲高清视频在线播放 | 日韩视频免费 | 精品亚洲一区二区 | 中文av字幕在线观看 | 天天综合五月天 | 国产女人免费看a级丨片 | 又污又黄网站 | 免费看日韩片 | 亚洲精品视频免费观看 | 久久人人看| 98超碰人人| 亚洲综合狠狠干 | 婷婷在线视频观看 | 九色视频网 | 精品美女久久久久久免费 | 国产精品久久免费看 | 久久久三级视频 | 国产剧情一区二区在线观看 | 日韩日韩日韩日韩 | 在线观看av小说 | 天天干天天干天天操 | 天天鲁天天干天天射 | 91精品国自产拍天天拍 | 成人在线播放免费观看 | 国产粉嫩在线观看 | 欧美俄罗斯性视频 | av中文字幕日韩 | 久久久免费观看 | 久9在线| 日韩在线视频一区 | 在线观看亚洲电影 | 天天操天天射天天爱 | 美女精品 | 国产精品一区二区在线观看 | 久久久香蕉视频 | 超碰97成人 | 日韩在线国产精品 | 亚洲一级片在线观看 | 麻豆视频一区 | 精品国产精品国产偷麻豆 | 久久午夜免费视频 | 亚洲精品国偷拍自产在线观看 | 午夜美女wwww | 午夜久久精品 | 亚洲精品免费在线播放 | 免费看三级黄色片 | 国产精品热视频 | 91精品国产自产91精品 | 午夜精品视频福利 | 人人爽人人爽人人片av | 欧美乱淫视频 | 99re国产| 激情综合五月天 | 亚洲激情 欧美激情 | 国产精品久久99精品毛片三a | 亚洲香蕉在线观看 | 在线亚洲人成电影网站色www | 欧美黄色免费 | 日本爱爱片 | 国产欧美最新羞羞视频在线观看 | 色播五月激情综合网 | 国产.精品.日韩.另类.中文.在线.播放 | 国产91精品一区二区麻豆网站 | 1024久久 | 中文字幕欧美日韩va免费视频 | 日韩天堂网 | av黄色国产 | 日韩午夜电影 | 麻豆久久精品 | 91亚洲国产 | 国产精品k频道 | 91av在线视频播放 | 96av麻豆蜜桃一区二区 | 久久夜夜操| 免费视频二区 | 96精品高清视频在线观看软件特色 | 国产伦精品一区二区三区无广告 | 久久精品欧美一区二区三区麻豆 | 最新av在线免费观看 | 国产视频一 | 欧美一区二区三区免费观看 | 视频在线一区 | 伊人五月天 | 久久久精品欧美 | 人人干人人添 | 日韩网站在线免费观看 | 超碰在线网 | 伊人一级 | 日本中文字幕网站 | 免费观看v片在线观看 | 精品国产99国产精品 | 日韩午夜一级片 | 欧美精品乱码久久久久 | 久久成人免费 | 黄污网站在线 | 成人免费电影 | 欧美一级电影片 | 亚洲精品视频中文字幕 | 成人动漫精品一区二区 | 国产成人一区二区啪在线观看 | 天天操天天干天天干 | 免费在线激情电影 | av免费在线免费观看 | 婷婷中文字幕 | 久久国产精品99久久久久久进口 | 午夜av免费观看 | 正在播放 国产精品 | 欧美一级裸体视频 | 亚洲成色777777在线观看影院 | 亚洲国产精品va在线看 | 免费看成人a | 500部大龄熟乱视频使用方法 | 日韩久久精品一区二区三区 | av网站在线免费观看 | 欧美日一级片 | 丁香5月婷婷 | 亚洲精品影院在线观看 | 成人免费在线视频 | 日韩av黄| 激情六月婷婷久久 | 欧产日产国产69 | 一区二区三区 中文字幕 | 欧美少妇影院 | 婷婷www| 高清av免费观看 | 免费黄色在线 | 天天干天天射天天操 | 日韩欧美视频一区 | 久久久久国产精品一区 | www.天天射.com | 在线视频你懂 | www.com久久| 日韩av免费一区二区 | 高清一区二区三区av | 97免费在线观看视频 | 亚洲闷骚少妇在线观看网站 | 亚洲激情综合网 | 日韩亚洲精品电影 | 亚洲美女在线国产 | 亚洲视频99 | av中文字幕日韩 | 国产精品99久久久久久久久 | 久久国产精品99久久人人澡 | 欧美日产在线观看 | 国产91免费观看 | 五月婷婷六月丁香 | av中文字幕剧情 | 日韩在线观看网址 | 中文字幕美女免费在线 | 久久亚洲热 | 国产精品剧情在线亚洲 | 国产成本人视频在线观看 | 久久一精品 | 久久久成人精品 | 欧美日韩国产精品一区 | 久久成人久久 | 久久国产精品小视频 | 黄色中文字幕 | 国产精品理论片在线观看 | 三级av免费看 | 久久国产a | 久久久精品成人 | 丝袜美女在线观看 | 精品成人国产 | 色国产精品一区在线观看 | av动态图片| 中文在线a在线 | 正在播放国产一区二区 | 久久久久国产成人免费精品免费 | 日批网站在线观看 | 欧美日韩中文国产一区发布 | 日批视频在线播放 | 婷婷网站天天婷婷网站 | 国产精品 中文字幕 亚洲 欧美 | 五月婷婷香蕉 | 久久在线精品视频 | 人人爽人人片 | 依人成人综合网 | 欧美日韩视频 | 中文字幕高清在线 | 黄色的网站免费看 | 激情五月婷婷综合网 | 免费在线观看成人av | 激情五月婷婷综合 | 国产精品三级视频 | 国产在线视频导航 | 亚洲综合视频在线 | 国产一级片观看 | 成人中文字幕av | 欧美日韩激情视频8区 | 人人干97| 日日碰狠狠添天天爽超碰97久久 | 91免费观看视频在线 | 国内久久精品 |