大数据技术_ 基础理论 之 数据采集与预处理
2.1 大數(shù)據(jù)采集架構(gòu)
2.1.1概述
如今,社會中各個機(jī)構(gòu)、部門、公司、團(tuán)體等正在實(shí)時不斷地產(chǎn)生大量的信息,這些信息需要以簡單的方式進(jìn)行處理,同時又要十分準(zhǔn)確且能迅速滿足各種類型的數(shù)據(jù)(信息)需求者。這給我們帶來了許多挑戰(zhàn),第一個挑戰(zhàn)就是在大量的數(shù)據(jù)中收集需要的數(shù)據(jù),下面介紹常用的大數(shù)據(jù)采集工具。
2.1.2 常用大數(shù)據(jù)采集工具
數(shù)據(jù)采集最傳統(tǒng)的方式是企業(yè)自己的生產(chǎn)系統(tǒng)產(chǎn)生的數(shù)據(jù),除上述生產(chǎn)系統(tǒng)中的數(shù)據(jù)外,企業(yè)的信息系統(tǒng)還充斥著大量的用戶行為數(shù)據(jù)、日志式的活動數(shù)據(jù)、事件信息等,越來越多的企業(yè)通過架設(shè)日志采集系統(tǒng)來保存這些數(shù)據(jù),希望通過這些數(shù)據(jù)獲取其商業(yè)或社會價值。
2.1.3 Apache Kafka數(shù)據(jù)采集
6、使用Java來編寫Kafka的實(shí)例
首先,編寫KafkaProducer.properties文件:
zk.connect = localhost:2181 broker.list = localhost:9092 serializer.class = kafka.serializer.StringEncoder request.required.acks = 1下面的代碼是使用Java編寫了一個Kafka消息發(fā)布者:
import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class MyKafkaProducer {private Producer<String, String> producer;private final String topic;public MyKafkaProducer(String topic) throws Exception {InputStream in = Properties.class.getResourceAsStream("KafkaProducer.properties");Properties props = new Properties();props.load(in);ProducerConfig config = new ProducerConfig(props);producer = new Producer<String, String>(config);}public void sendMessage(String msg){KeyedMessage<String, String> data =new KeyedMessage<String, String>( topic, msg);producer.send(data);producer.close();}public static void main(String[] args) throws Exception{MyKafkaProducer producer = new MyKafkaProducer("HelloTopic");String msg = "Hello Kafka!";producer. sendMessage(msg);} }下面創(chuàng)建Comsumer,首先編寫KafkaProperties文件:
zk.connect = localhost:2181 group.id = testgroup zookeeper.session.timeout.ms = 500 zookeeper.sync.time.ms = 250 auto.commit.interval.ms = 1000上述參數(shù)配置,十分容易理解,具體的詳細(xì)說明,可以參考Kafka的官方文檔。下面的代碼是使用Java編寫了一個Kafka的Comsumer。
import java.io.InputStream; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.consumer.Consumer;public class MyKafkaConsumer {private final ConsumerConnector consumer;private final String topic;public MyKafkaConsumer(String topic) throws Exception{InputStream in = Properties.class.getResourceAsStream("KafkaProducer.properties");Properties props = new Properties();props.load(in);ConsumerConfig config = new ConsumerConfig(props);consumer = Consumer.createJavaConsumerConnector(config);this.topic = topic;}public void consumeMessage() {Map<String, String> topicMap = new HashMap<String, String>();topicMap.put(topic, new Integer(1));Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap =consumer.createMessageStreams(topicMap);List<KafkaStream<byte[], byte[]>> streamList =consumerStreamsMap.get(topic);for (final KafkaStream<byte[], byte[]> stream : streamList) {ConsumerIterator<byte[], byte[]> consumerIte =stream.iterator();while (consumerIte.hasNext())System.out.println("message :: "+ new String(consumerIte.next().message()));}if (consumer != null)consumer.shutdown();}public static void main(String[] args) throws Exception{String groupId = "testgroup";String topic = "HelloTopic";MyKafkaConsumer consumer = new MyKafkaConsumer(topic);consumer.consumeMessage();} }2.2 數(shù)據(jù)預(yù)處理原理
通過數(shù)據(jù)預(yù)處理工作,可以使殘缺的數(shù)據(jù)完整,并將錯誤的數(shù)據(jù)糾正、多余的數(shù)據(jù)去除,進(jìn)而將所需的數(shù)據(jù)挑選出來,并且進(jìn)行數(shù)據(jù)集成。數(shù)據(jù)預(yù)處理的常見方法有數(shù)據(jù)清洗、數(shù)據(jù)集成與數(shù)據(jù)變換。
2.2.1 數(shù)據(jù)清洗
2.2.2 數(shù)據(jù)集成
2.2.3 數(shù)據(jù)變換
2.3 數(shù)據(jù)倉庫與ETL工具
2.3.1 數(shù)據(jù)倉庫與ETL工具
數(shù)據(jù)倉庫,是在企業(yè)管理和決策中面向主題的、集成的、隨時間變化的、非易失性數(shù)據(jù)的集合。
數(shù)據(jù)倉庫中的數(shù)據(jù)來自于多種業(yè)務(wù)數(shù)據(jù)源,這些數(shù)據(jù)源可能處于不同硬件平臺上,使用不同的操作系統(tǒng),數(shù)據(jù)模型也相差很遠(yuǎn)。如何獲取并向數(shù)據(jù)倉庫加載這些數(shù)據(jù)量大、種類多的數(shù)據(jù),已成為建立數(shù)據(jù)倉庫所面臨的一個關(guān)鍵問題。
2.3.2 常用ETL工具
2.3.3 案例:Kettle數(shù)據(jù)遷移
總結(jié)
以上是生活随笔為你收集整理的大数据技术_ 基础理论 之 数据采集与预处理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 装饰器函数
- 下一篇: 12.4日团队工作总结