日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

sparkstreaming监听hdfs目录_大数据系列之Spark Streaming接入Kafka数据

發布時間:2023/12/1 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 sparkstreaming监听hdfs目录_大数据系列之Spark Streaming接入Kafka数据 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Spark Streaming官方提供Receiver-based和Direct Approach兩種方法接入Kafka數據,本文簡單介紹兩種方式的pyspark實現。


1、Spark Streaming接入Kafka方式介紹

Spark Streaming 官方提供了兩種方式讀取Kafka數據:

  • 一是Receiver-based Approach。該種讀取模式官方最先支持,并在Spark 1.2提供了數據零丟失(zero-data loss)的支持;

  • 一是Direct Approach (No Receivers)。該種讀取方式在Spark 1.3引入。

  • 1.1 Receiver-based Approach

    Receiver-based的Kafka讀取方式是基于Kafka高階(high-level) api來實現對Kafka數據的消費。在提交Spark Streaming任務后,Spark集群會劃出指定的Receivers來專門、持續不斷、異步讀取Kafka的數據,讀取時間間隔以及每次讀取offsets范圍可以由參數來配置。讀取的數據保存在Receiver中,具體StorageLevel方式由用戶指定,諸如MEMORY_ONLY等。當driver 觸發batch任務的時候,Receivers中的數據會轉移到剩余的Executors中去執行。在執行完之后,Receivers會相應更新ZooKeeper的offsets。如要確保at least once的讀取方式,可以設置spark.streaming.receiver.writeAheadLog.enable為true。具體Receiver執行流程見下圖:

  • 需要借助Write Ahead Logs 來保證數據的不丟失,如果啟用了Write Ahead Logs復制到文件系統如HDFS,那么storage level需要設置成StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(…, StorageLevel.MEMORY_AND_DISK_SER)

  • 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相關的,所以如果我們加大每個topic的partition數量,僅僅是增加線程來處理由單一Receiver消費的主題。但是這并沒有增加Spark在處理數據上的并行度

  • 對于不同的Group和topic我們可以使用多個Receiver創建不同的Dstream來并行接收數據,之后可以利用union來統一成一個Dstream

  • 1.2 Direct Approach (No Receivers)

    Direct方式采用Kafka簡單的consumer api方式來讀取數據,無需經由ZooKeeper,此種方式不再需要專門Receiver來持續不斷讀取數據。當batch任務觸發時,由Executor讀取數據,并參與到其他Executor的數據計算過程中去。由drive來決定讀取多少offsets,并將offsets交由checkpoints來維護。將觸發下次batch任務,再由Executor讀取Kafka數據并計算。從此過程可以發現Direct方式無需Receiver讀取數據,而是需要計算時再讀取數據,所以Direct方式的數據消費對內存的要求不高,只需要考慮批量計算所需要的內存即可;另外batch任務堆積時,也不會影響數據堆積。其具體讀取方式如下圖:

  • 簡化的并行:在Receiver的方式中提到創建多個Receiver之后利用union來合并成一個Dstream的方式提高數據傳輸并行度。而在Direct方式中,Kafka中的partition與RDD中的partition是一一對應的并行讀取Kafka數據,這種映射關系也更利于理解和優化。

  • 高效:在Receiver的方式中,為了達到0數據丟失需要將數據存入Write Ahead Log中,這樣在Kafka和日志中就保存了兩份數據,浪費!而第二種方式不存在這個問題,只要我們Kafka的數據保留時間足夠長,我們都能夠從Kafka進行數據恢復。

  • 精確一次:在Receiver的方式中,使用的是Kafka的高階API接口從Zookeeper中獲取offset值,這也是傳統的從Kafka中讀取數據的方式,但由于Spark Streaming消費的數據和Zookeeper中記錄的offset不同步,這種方式偶爾會造成數據重復消費。而第二種方式,直接使用了簡單的低階Kafka API,Offsets則利用Spark Streaming的checkpoints進行記錄,消除了這種不一致性。

  • 2、Spark Streaming接入Kafka數據實現

    以wordcount統計為例,kafka生產端輸入詞組,Spark端讀取kafka流數據,并統計詞頻

    2.1 Receiver方式收取數據

    1)Import KafkaUtils并創建DStream

    from pyspark.streaming.kafka import KafkaUtils

    kafkaStream = KafkaUtils.createStream(streamingContext, \
    [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
  • ZK Quorum:Zookeeper quorum (hostname:port,hostname:port,..)

  • Groupid:消費者的groupid

  • Topics:{topic_name : numPartitions}

  • 2)具體實現代碼如下:

    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils

    if __name__ == "__main__":
    #if len(sys.argv) != 3:
    # print("Usage: kafka_wordcount.py ", file=sys.stderr)
    # exit(-1)

    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    zkQuorum = "192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181"
    groupid = "spark-streaming-consumer"
    topic = {"kafka_spark_test1":0,"kafka_spark_test1":1,"kafka_spark_test1":2}
    #zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, groupid, topic)
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

    在Spark目錄執行命令:

    spark-submit --master yarn --deploy-mode client --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /usr/local/spark/ipynotebook/03-kafka2streaming-01.py
    2.2 Direct方式收取數據

    1)Import KafkaUtils并創建DStream

    from pyspark.streaming.kafka import KafkaUtils
    directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    • ssc:StreamingContext

    • topics:消費的topics清單

    • {"metadata.broker.list": brokers}:kafka參數,可以指定為 metadata.broker.list或bootstrap.servers

    • 默認情況下,從每個kafka分區的最新的offset進行消費,如果在kafka參數中設置了auto.offset.reset 為smallest,則會從最小的offset進行消費

    • 如果希望保存每個批量消費的kafka offset,可以進行如下操作:

    offsetRanges = []

    def storeOffsetRanges(rdd):
    global offsetRanges
    offsetRanges = rdd.offsetRanges()
    return rdd

    def printOffsetRanges(rdd):
    for o in offsetRanges:
    print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)

    directKafkaStream \
    .transform(storeOffsetRanges) \
    .foreachRDD(printOffsetRanges)

    如果希望使用基于Zookeeper的Kafka監控,也可以通過這種方法展現Streaming的進程。

    2)具體實現代碼如下:

    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils

    offsetRanges = []

    def storeOffsetRanges(rdd):
    global offsetRanges
    offsetRanges = rdd.offsetRanges()
    return rdd

    def printOffsetRanges(rdd):
    for o in offsetRanges:
    print("%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset))

    if __name__ == "__main__":
    #if len(sys.argv) != 3:
    # print("Usage: direct_kafka_wordcount.py ", file=sys.stderr)
    # exit(-1)

    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    #brokers, topic = sys.argv[1:]
    topic="kafka_spark_test1"
    brokers = "192.168.112.101:9092,192.168.112.102:9092,192.168.112.103:9092"
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a+b)
    kvs.transform(storeOffsetRanges).foreachRDD(printOffsetRanges)
    counts.pprint()

    ssc.start() # Start the computation
    ssc.awaitTermination() # Wait for the computation to terminate

    在Spark根目錄執行命令:

    spark-submit --master yarn --deploy-mode client --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /usr/local/spark/ipynotebook/03-kafka2streaming-02.py
    2.3 Kafka生產者配置

    Kafka集群環境的安裝配置,參考之前的文檔"大數據系列之Kafka集群環境部署"中相關內容

    1)啟動zookeeper

    [root@tango-centos01 kafka_2.11-1.1.0]# nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
    [root@tango-centos02 kafka_2.11-1.1.0]# nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
    [root@tango-centos03 kafka_2.11-1.1.0]# nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &

    2)啟動Kafka集群

    [root@tango-centos01 kafka_2.11-1.1.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties &
    [root@tango-centos02 kafka_2.11-1.1.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties &
    [root@tango-centos03 kafka_2.11-1.1.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties &

    3)創建Kafka topic

    [root@tango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-topics.sh --create --zookeeper 192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181 --replication-factor 2 --partitions 3 --topic kafka_spark_test1
    Created topic "kafka_spark_test1".

    創建名為kafka_spark_test1 的Topic,復制因子設為2,同時分區數為3,注意,分區數是read parallelisms的最大值

    4)查看Topic詳情

    [root@tango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-topics.sh --describe --zookeeper 192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181 --topic kafka_spark_test1
    Topic:kafka_spark_test1 PartitionCount:3 ReplicationFactor:2 Configs:
    Topic: kafka_spark_test1 Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
    Topic: kafka_spark_test1 Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1
    Topic: kafka_spark_test1 Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2

    指定--zookeeper選項的值為192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181,對應的Topic,即剛創建的kafka_spark_test1

    2.4 Kafka-Spark Streaming流測試

    1)下載依賴的jars包

    2)啟動kafka生產者

    [root@tango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-console-producer.sh --broker-list 192.168.112.101:9092 --topic kafka_spark_test1

    3)運行Spark Streaming流數據處理程序

    [root@tango-spark01 spark-2.3.0]# spark-submit --master yarn --deploy-mode client --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /usr/local/spark/ipynotebook/03-kafka2streaming-01.py
    [root@tango-spark01 spark-2.3.0]# spark-submit --master yarn --deploy-mode client --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /usr/local/spark/ipynotebook/03-kafka2streaming-02.py

    4)在Kafka生產端輸入流數據

    [root@tango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-console-producer.sh --broker-list 192.168.112.101:9092 --topic kafka_spark_test1
    >hello world
    >hello tango hello
    >hello tango tango

    5)終端打印結果

    -------------------------------------------
    Time: 2018-08-08 11:03:15
    -------------------------------------------
    (u'tango', 2)
    (u'hello', 1)

    6)登錄SparkWeb UI,查看Spark Streaming的的運行情況

    a) spark-submit時候指定spark-submit --master spark://192.168.112.121:7077才能在8080端口看到數據

    b) 如果通過yarn模式調度,可通過8088端口查看

    2.5 Spark寫入Kafka

    1)安裝Kafka插件

    Pyspark訪問Kafka需要使用到kafka安裝包,使用以下命令安裝:

    pip install --no-index --find-links=../kafka-1.3.5-py2.py3-none.any.whl kafka

    2)調用KafkaProducer模塊,spark作為生產者將數據傳輸到kafka端

    from kafka import KafkaProducer

    to_kafka = KafkaProducer(bootstrap_servers=broker_list)
    to_kafka.send(topic_name,send_msg,encode(‘utf8’))
    to_kafka.flush()

    參考資料

  • http://spark.apache.org/docs/latest/streaming-kafka-integration.html

  • 大數據系列之Kafka集群環境部署

  • 總結

    以上是生活随笔為你收集整理的sparkstreaming监听hdfs目录_大数据系列之Spark Streaming接入Kafka数据的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。