Spark Streaming 实战案例(五) Spark Streaming与Kafka
主要內(nèi)容
1. Spark Streaming與Kafka版本的WordCount示例 (一)
向kafka集群發(fā)送消息
root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-console-producer.sh --broker-list sparkslave01:9092 --sync --topic kafkatopictest配置運(yùn)行參數(shù):
具體如下:
sparkmaster:2181,zookeeper監(jiān)聽(tīng)地址
test-consumer-group, consumer-group的名稱(chēng),必須和$KAFKA_HOME/config/consumer.properties中的group.id的配置內(nèi)容一致
kafkatopictest,topic名稱(chēng)
1,線(xiàn)程數(shù)
運(yùn)行KafkaWordCount 后,在producer中輸入下列內(nèi)容
root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-console-producer.sh --broker-list sparkslave01:9092 --sync --topic kafkatopictest [2015-11-04 03:25:39,666] WARN Property topic is not valid (kafka.utils.VerifiableProperties) Spark Spark TEST TEST Spark Streaming得到結(jié)果如下:
2. Spark Streaming與Kafka版本的WordCount示例(二)
前面的例子中,producer是通過(guò)kafka的腳本生成的,本例中將給出通過(guò)編寫(xiě)程序生成的producer
// 隨機(jī)生成1-100間的數(shù)字 object KafkaWordCountProducer {def main(args: Array[String]) {if (args.length < 4) {System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +"<messagesPerSec> <wordsPerMessage>")System.exit(1)}val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args// Zookeeper連接屬性配置val props = new HashMap[String, Object]()props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")//創(chuàng)建KafkaProducerval producer = new KafkaProducer[String, String](props)// 向kafka集群發(fā)送消息while(true) {(1 to messagesPerSec.toInt).foreach { messageNum =>val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ")val message = new ProducerRecord[String, String](topic, null, str)producer.send(message)}Thread.sleep(1000)}}}KafkaWordCountProducer 運(yùn)行參數(shù)設(shè)置如下:
sparkmaster:9092 kafkatopictest 5 8sparkmaster:9092,broker-list
kafkatopictest,top名稱(chēng)
5表示每秒發(fā)多少條消息
8表示每條消息中有幾個(gè)單詞
先KafkaWordCountProducer,然后再運(yùn)行KafkaWordCount ,得到的計(jì)算結(jié)果如下:
總結(jié)
以上是生活随笔為你收集整理的Spark Streaming 实战案例(五) Spark Streaming与Kafka的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Spark Streaming 实战案例
- 下一篇: Scala入门到精通——第二十六节 Sc