日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark Streaming 实战案例(五) Spark Streaming与Kafka

發(fā)布時間:2024/1/23 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark Streaming 实战案例(五) Spark Streaming与Kafka 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

主要內容

  • Spark Streaming與Kafka版的WordCount示例(一)
  • Spark Streaming與Kafka版的WordCount示例(二)
  • 1. Spark Streaming與Kafka版本的WordCount示例 (一)

  • 啟動kafka集群
  • root@sparkslave02:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-server-start.sh config/server.properties root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-server-start.sh config/server.properties root@sparkmaster:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-server-start.sh config/server.properties

    向kafka集群發(fā)送消息

    root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-console-producer.sh --broker-list sparkslave01:9092 --sync --topic kafkatopictest
  • 編寫如下程序
  • import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord} import org.apache.log4j.{Level, Logger}import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.{Logging, SparkConf}object KafkaWordCount {def main(args: Array[String]) {if (args.length < 4) {System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")System.exit(1)}StreamingExamples.setStreamingLogLevels()val Array(zkQuorum, group, topics, numThreads) = argsval sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[4]")val ssc = new StreamingContext(sparkConf, Seconds(2))ssc.checkpoint("checkpoint")val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap//創(chuàng)建ReceiverInputDStreamval lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1L)).reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)wordCounts.print()ssc.start()ssc.awaitTermination()} }

    配置運行參數(shù):

    具體如下:

    sparkmaster:2181 test-consumer-group kafkatopictest 1

    sparkmaster:2181,zookeeper監(jiān)聽地址
    test-consumer-group, consumer-group的名稱,必須和$KAFKA_HOME/config/consumer.properties中的group.id的配置內容一致
    kafkatopictest,topic名稱
    1,線程數(shù)

    運行KafkaWordCount 后,在producer中輸入下列內容

    root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-console-producer.sh --broker-list sparkslave01:9092 --sync --topic kafkatopictest [2015-11-04 03:25:39,666] WARN Property topic is not valid (kafka.utils.VerifiableProperties) Spark Spark TEST TEST Spark Streaming

    得到結果如下:

    2. Spark Streaming與Kafka版本的WordCount示例(二)

    前面的例子中,producer是通過kafka的腳本生成的,本例中將給出通過編寫程序生成的producer

    // 隨機生成1-100間的數(shù)字 object KafkaWordCountProducer {def main(args: Array[String]) {if (args.length < 4) {System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +"<messagesPerSec> <wordsPerMessage>")System.exit(1)}val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args// Zookeeper連接屬性配置val props = new HashMap[String, Object]()props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")//創(chuàng)建KafkaProducerval producer = new KafkaProducer[String, String](props)// 向kafka集群發(fā)送消息while(true) {(1 to messagesPerSec.toInt).foreach { messageNum =>val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ")val message = new ProducerRecord[String, String](topic, null, str)producer.send(message)}Thread.sleep(1000)}}}

    KafkaWordCountProducer 運行參數(shù)設置如下:

    sparkmaster:9092 kafkatopictest 5 8

    sparkmaster:9092,broker-list
    kafkatopictest,top名稱
    5表示每秒發(fā)多少條消息
    8表示每條消息中有幾個單詞

    先KafkaWordCountProducer,然后再運行KafkaWordCount ,得到的計算結果如下:

    總結

    以上是生活随笔為你收集整理的Spark Streaming 实战案例(五) Spark Streaming与Kafka的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。