【技术教程】SequoiaDB对接Kafka
2019獨角獸企業重金招聘Python工程師標準>>>
?
1、?背景
當前互聯網、金融、政府等行業,活動流數據幾乎無處不在。對這種數據通常的處理方式是先把各種活動以日志的形式寫入某種文件,然后周期性地對這些文件進行統計分析。活動流數據的這種處理方式對實時性要求越來越高的場景已經不在適用并且這種處理方式也增加了整個系統的復雜性,為了解決這種問題,分布式開源消息系統Kakfa已被多家不同類型的公司?作為多種類型的數據管道和消息系統使用。
Kafka是一種分布式的,基于發布/訂閱的消息系統。提供消息持久化能力,支持消息分區,分布式消費,同時保證每個分區內的消息順序傳輸,支持在線水平擴展、高吞吐率,同時支持離線數據處理和實時數據處理。
巨杉數據庫SequoiaDB支持海量分布式數據存儲,并且支持垂直分區和水平分區,利用這些特性可以將Kafka中的消息存儲到SequoiaDB中方便業務系統后續數據分析、數據應用。本文主要講解巨杉數據庫SequoiaDB如何消費Kafka中的消息以及將消息存儲到SequoiaDB中。
2、?產品介紹
巨杉數據庫SequoiaDB是一款分布式非關系型文檔數據庫,可以被用來存取海量非關系型的數據,其底層主要基于分布式,高可用,高性能與動態數據類型設計,它兼顧了關系型數據庫中眾多的優秀設計:如索引、動態查詢和更新等,同時以文檔記錄為基礎更好地處理了動態靈活的數據類型。PostgreSQL支持標準SQL,巨杉SequoiaDB?SSQL套件通過擴展?PostgreSQL功能可以使用標準SQL?語句訪問?SequoiaDB?數據庫,完成對SequoiaDB?數據庫的各種操作。將Kafka中的消息存儲到SequoiaDB后,可利用巨杉SequoiaDB?SSQL對這些消息數據進行在線實時的數據分析和數據應用。
3、?環境搭建
3.1、軟件配置
操作系統:windows?7
JDK:1.7.0_80?64位,下載地址為:http://www.oracle.com/technetwork/java/javase/downloads/java-archive-downloads-javase7-521261.html#jdk-7u80-oth-JPR
eclipse:4.5.2
SequoiaDB:1.12.5或以上版本
Kakfa:0.10.0.0,下載地址為:http://211.162.127.20/files/5115000001D9C0FE/www-us.apache.org/dist/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz
本項目主要實現從Kafka中消費數據并寫入到SequoiaDB中來展示Kafka對接SequoiaDB的整個過程。
創建項目工程如下圖:
?
圖3-1-1
3.2、kafka啟動及topic創建
在kafka啟動前啟動zookeeper,Kafka啟動,執行腳本如下:
./kafka-server-start.sh?../config/server.properties?&
Kafka創建topic,執行腳本如下:
./kafka-topics.sh?--zookeeper?localhost:2181?--create?--topic?kafkaSdb?--partitions?1?--replication-factor?1執行結果如下圖:
?
圖3-2-1
驗證Kafka主題,執行腳本如下:
./kafka-topics.sh?--zookeeper?localhost:2181?–list執行結果如下圖:
?
圖3-2-2
4、?代碼演示
4.1、框架搭建代碼展示
Kafka分布式系統分為生產者和消費者,生產者主要產生消息數據供消費者消費,消費者主要消費存儲在Kafka中的消息數據。本項目主要演示向SequoiaDB中寫入Kafka中的消息,故消息的生產只提供演示代碼。生產者和消費者各種參數分別放在各自的配置文件中。
??生產端配置文件如下:
kafka-producer.propertiesbootstrap.servers=192.168.1.35:9092retries=0linger.ms=1key.serializer=org.apache.kafka.common.serialization.StringSerializervalue.serializer=org.apache.kafka.common.serialization.StringSerializerpartitioner.class=com.sequoiadb.kafka.DefaultPartitioner??消費端配置文件如下:
kafka-consumer.propertiesbootstrap.servers=192.168.1.35:9092?enable.auto.commit=true??auto.commit.interval.ms=60000enable.auto.commit=falseauto.offset.reset=earliestsession.timeout.ms=30000key.deserializer=org.apache.kafka.common.serialization.StringDeserializervalue.deserializer=org.apache.kafka.common.serialization.StringDeserializer??Kafka主題、SequoiaDB集合、消息分區配置文件如下:
config.json[{topicName:'kafkaSdb',sdbCLName:'kafkaSdb',partitionNum:1,topicGroupName:'kafkaSdb-consumer-group',pollTimeout:5000}]4.2、業務實現代碼展示
4.2.1、配置代碼展示
本項目將Kafka的配置放在配置文件中如Kafka的主題,主題的分區數,SequoiaDB集合并用java對象進行封裝,利用工具類進行獲取。
配置信息java實體類如下:
package?com.sequoiadb.kafka.bean;public?class?KafkaConsumerConfig?{private?String?topicName;private?String?sdbCLName;private?int?partitionNum?=?1;private?String?topicGroupName;private?long?pollTimeout?=?Long.MAX_VALUE;public?String?getTopicName()?{return?topicName;}public?void?setTopicName(String?topicName)?{this.topicName?=?topicName;}public?String?getSdbCLName()?{return?sdbCLName;}public?void?setSdbCLName(String?sdbCLName)?{this.sdbCLName?=?sdbCLName;}public?int?getPartitionNum()?{return?partitionNum;}public?void?setPartitionNum(int?partitionNum)?{this.partitionNum?=?partitionNum;}public?String?getTopicGroupName()?{return?topicGroupName;}public?void?setTopicGroupName(String?topicGroupName)?{this.topicGroupName?=?topicGroupName;}public?long?getPollTimeout()?{return?pollTimeout;}public?void?setPollTimeout(long?pollTimeout)?{this.pollTimeout?=?pollTimeout;}public?String?toString(){return?"[topicName="+this.topicName+",sdbCLName="+this.sdbCLName+",partitionNum="+this.partitionNum",topicGroupName="+this.topicGroupName+",pollTimeout="+this.pollTimeout+"]";}}配置信息獲取工具類如下:
package?com.sequoiadb.utils;import?java.io.IOException;import?java.io.InputStream;import?java.util.Properties;public?class?PropertiesUtils?{private?static?Properties?prop?=?null;static{InputStream?in?=?PropertiesUtils.class.getClassLoader().getResourceAsStream("config.properties");prop?=?new?Properties();try?{prop.load(in);}?catch?(IOException?e)?{e.printStackTrace();}}public?static?String?getProperties(String?key){return?(String)prop.get(key);}public?static?void?main(String[]?argc){System.out.println(PropertiesUtils.getProperties("scm.url"));}}4.2.2、業務邏輯代碼演示
生產者業務邏輯代碼展示:
package?com.sequoiadb.kafka;import?java.io.IOException;import?java.io.InputStream;import?java.util.Properties;import?org.apache.commons.io.IOUtils;import?org.apache.kafka.clients.producer.Callback;import?org.apache.kafka.clients.producer.KafkaProducer;import?org.apache.kafka.clients.producer.ProducerRecord;import?org.apache.kafka.clients.producer.RecordMetadata;import?org.slf4j.Logger;import?org.slf4j.LoggerFactory;import?com.sequoiadb.utils.Configuration;public?class?PartitionTest?{private?static?Logger?log?=?LoggerFactory.getLogger(PartitionTest.class);private?static?String?location?=?"kafka-producer.properties";//?配置文件位置public?static?void?main(String[]?args)?{Properties?props?=?new?Properties();String?json?=?null;try?{props.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));InputStream?in?=?Configuration.class.getClassLoader().getResourceAsStream("oracle.json");json?=?IOUtils.toString(in);}?catch?(IOException?e)?{e.printStackTrace();}KafkaProducer<String,?String>?producer?=?new?KafkaProducer<String,?String>(props);for?(int?i?=?0;?i?<?1000;?i++)?{ProducerRecord<String,?String>?record?=?new?ProducerRecord<String,?String>("oracle",?json);producer.send(record,?new?Callback()?{@Overridepublic?void?onCompletion(RecordMetadata?metadata,?Exception?e)?{if?(e?!=?null)?{log.error("the?producer?has?a?error:"?+?e.getMessage());}}});}try?{Thread.sleep(1000);producer.close();}?catch?(InterruptedException?e1)?{e1.printStackTrace();}}}消費者業務邏輯采用一線程一主題的方式進行消息的消費,主程序入口代碼如下:
package?com.sequoiadb.kafka;import?java.util.ArrayList;import?java.util.List;import?java.util.Map;import?java.util.concurrent.ExecutorService;import?java.util.concurrent.Executors;import?java.util.concurrent.TimeUnit;import?org.slf4j.Logger;import?org.slf4j.LoggerFactory;import?com.sequoiadb.kafka.bean.KafkaConsumerConfig;import?com.sequoiadb.utils.Configuration;import?com.sequoiadb.utils.Constants;public?class?KafkaSdb?{private?static?Logger?log?=?LoggerFactory.getLogger(KafkaSdb.class);private?static?ExecutorService?executor;public?static?void?main(String[]?args)?{//?獲取kafka主題配置List<KafkaConsumerConfig>?topicSdbList?=?Configuration.getConfiguration();if?(topicSdbList?!=?null?&&?topicSdbList.size()?>?0)?{executor?=?Executors.newFixedThreadPool(topicSdbList.size());final?List<ConsumerThread>?consumerList?=?new?ArrayList<ConsumerThread>();for?(int?i?=?0;?i?<?topicSdbList.size();?i++)?{KafkaConsumerConfig?consumerConfig?=?topicSdbList.get(i);ConsumerThread?consumer?=?new?ConsumerThread(consumerConfig);consumerList.add(consumer);executor.submit(consumer);}Runtime.getRuntime().addShutdownHook(new?Thread()?{@Overridepublic?void?run()?{for?(ConsumerThread?consumer?:?consumerList)?{consumer.shutdown();}executor.shutdown();try?{executor.awaitTermination(5000,?TimeUnit.MILLISECONDS);}?catch?(InterruptedException?e)?{e.printStackTrace();}}});}?else?{log.error("主題為空,請確認主題配置是否正確!");}}}線程類負責具體的消息的消費,并且將消息數據寫入到SequoiaDB中,具體代碼如下:
package?com.sequoiadb.kafka;import?java.io.IOException;import?java.util.ArrayList;import?java.util.Arrays;import?java.util.Iterator;import?java.util.List;import?java.util.Properties;import?org.apache.kafka.clients.consumer.ConsumerRecord;import?org.apache.kafka.clients.consumer.ConsumerRecords;import?org.apache.kafka.clients.consumer.KafkaConsumer;import?org.apache.kafka.common.errors.WakeupException;import?org.bson.BSONObject;import?org.bson.BasicBSONObject;import?org.slf4j.Logger;import?org.slf4j.LoggerFactory;import?com.sequoiadb.base.CollectionSpace;import?com.sequoiadb.base.DBCollection;import?com.sequoiadb.base.Sequoiadb;import?com.sequoiadb.exception.BaseException;import?com.sequoiadb.kafka.bean.KafkaConsumerConfig;import?com.sequoiadb.utils.ConnectionPool;import?com.sequoiadb.utils.Constants;import?net.sf.json.JSONArray;import?net.sf.json.JSONObject;public?class?ConsumerThread?implements?Runnable?{private?static?Logger?log?=?LoggerFactory.getLogger(ConsumerThread.class);private?String?location?=?"kafka-consumer.properties";//?配置文件位置private?Sequoiadb?sdb?=?null;private?CollectionSpace?cs?=?null;private?DBCollection?cl?=?null;private?KafkaConsumer<String,?String>?consumer?=?null;// private?String?topicName?=?null;// private?String?clName?=?null;// private?String?topicGroupName?=?null;// private?long?pollTimeout?=?1000;private?KafkaConsumerConfig?consumerConfig;public?ConsumerThread(KafkaConsumerConfig?consumerConfig)?{if?(null?==?sdb)?{sdb?=?ConnectionPool.getInstance().getConnection();}if?(sdb.isCollectionSpaceExist(Constants.CS_NAME))?{cs?=?sdb.getCollectionSpace(Constants.CS_NAME);}?else?{throw?new?BaseException("集合空間"?+?Constants.CS_NAME?+?"不存在!");}if?(null?==?cs)?{throw?new?BaseException("集合空間不能為null!");}?else?{this.consumerConfig?=?consumerConfig;this.cl?=?cs.getCollection(this.consumerConfig.getSdbCLName());}Properties?props?=?new?Properties();try?{props.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));}?catch?(IOException?e)?{e.printStackTrace();}props.put("group.id",?this.consumerConfig.getTopicGroupName());consumer?=?new?KafkaConsumer<>(props);}@Overridepublic?void?run()?{log.info("主題為"?+?this.consumerConfig.getTopicName()?+?"的消費者線程啟動!");try?{//?訂閱topicconsumer.subscribe(Arrays.asList(this.consumerConfig.getTopicName()));while?(true)?{ConsumerRecords<String,?String>?records?=?consumer.poll(this.consumerConfig.getPollTimeout());//?consumer.seekToBeginning(Arrays.asList(new//?TopicPartition(this.topicName,?0)));//?consumer.seek(new?TopicPartition(this.topicName,?0),?0);List<BSONObject>?list?=?new?ArrayList<BSONObject>();for?(ConsumerRecord<String,?String>?record?:?records)?{String?value?=?record.value();JSONObject?valueJson?=?JSONObject.fromObject(value);if?(valueJson.containsKey("data"))?{JSONArray?dataJsonArray?=?valueJson.getJSONArray("data");for?(int?i?=?0;?i?<?dataJsonArray.size();?i++)?{BSONObject?httpBson?=?new?BasicBSONObject();JSONObject?dataJson?=?dataJsonArray.getJSONObject(i);Iterator?iter?=?dataJson.keys();while?(iter.hasNext())?{String?key?=?(String)?iter.next();String?bsonValue?=?dataJson.getString(key);httpBson.put(key,?bsonValue);}list.add(httpBson);//?clHttp.insert(httpBson);}}?else?{log.error("消息中不存在data節點!");}}if?(list?!=?null?&&?list.size()?>?0)?{try?{this.cl.bulkInsert(list,?DBCollection.FLG_INSERT_CONTONDUP);log.info("主題為"+this.consumerConfig.getTopicName()+"的消息插入SDB成功,插入記錄數為:"+list.size());}?catch?(BaseException?e)?{e.printStackTrace();}}consumer.commitSync();}}?catch?(WakeupException?e)?{}?finally?{consumer.close();}}public?void?shutdown(){consumer.wakeup();}}5、?總結
從上述對接過程中,Kafka中的消息寫入SequoiaDB難點是Kafka中主題分區的配置以及多線程如何消費各主題分區中的消息,并且處理消息消費失敗的情況。
?
?
? ? ?
轉載于:https://my.oschina.net/wangzhonnew/blog/1559772
總結
以上是生活随笔為你收集整理的【技术教程】SequoiaDB对接Kafka的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: HTML 5 视频,音频
- 下一篇: 【版本更新】Aspose.Slides