日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

Structured Streaming系列——输入与输出

發布時間:2023/12/19 综合教程 36 生活家
生活随笔 收集整理的這篇文章主要介紹了 Structured Streaming系列——输入与输出 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、輸入數據源

1. 文件輸入數據源(FIie)

file數據源提供了很多種內置的格式,如csv、parquet、orc、json等等,就以csv為例:

 import spark.implicits._
    val userSchema = new StructType()
.add("name", "string").add("age", "integer") val lines = spark.readStream .option("sep", ";") .schema(userSchema) .csv("file:///data/*") val query = lines.writeStream .outputMode("append") .format("console") .start() query.awaitTermination()

在對應的目錄下新建文件時,就可以在控制臺看到對應的數據了。

還有一些其他可以控制的參數:

maxFilesPerTrigger 每個batch最多的文件數,默認是沒有限制。比如我設置了這個值為1,那么同時增加了5個文件,這5個文件會每個文件作為一波數據,更新streaming dataframe。
latestFirst 是否優先處理最新的文件,默認是false。如果設置為true,那么最近被更新的會優先處理。這種場景一般是在監聽日志文件的時候使用。
fileNameOnly 是否只監聽固定名稱的文件

2.網絡輸入數據源(socket)

一般都是基于這個socket來做測試。首先開啟一個socket服務器(nc -lk 9999),然后streaming這邊連接進行處理。

  spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

3. 輸入數據源(kafka)

// Subscribe to 1 topic
val df= spark                                                        
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","host1:port1,host2:port2")
.option("subscribe","topic1")
.load()
df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.as[(String,String)]

// Subscribe to multiple topics
val df= spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","host1:port1,host2:port2")
.option("subscribe","topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.as[(String,String)]

// Subscribe to a pattern
val df= spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","host1:port1,host2:port2")
.option("subscribePattern","topic.*")
.load()
df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.as[(String,String)]

以批的形式查詢

關于Kafka的offset,structured streaming默認提供了幾種方式:

//設置每個分區的起始和結束值
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

//配置起始和結束的offset值(默認)
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Schema信息

讀取后的數據的Schema是固定的,包含的列如下:

Column Type 說明
key binary 信息key
value binary 信息的value(我們自己的數據)
topic string 主題
partition int 分區
offset long 偏移值
timestamp long 時間戳
timestampType int 類型

source相關的配置

無論是流的形式,還是批的形式,都需要一些必要的參數:

kafka.bootstrap.servers kafka的服務器配置,host:post形式,用逗號進行分割,如host1:9000,host2:9000
assign,以json的形式指定topic信息
subscribe,通過逗號分隔,指定topic信息
subscribePattern,通過java的正則指定多個topic
assign、subscribe、subscribePattern同時之中能使用一個。

其他比較重要的參數有:

startingOffsets, offset開始的值,如果是earliest,則從最早的數據開始讀;如果是latest,則從最新的數據開始讀。默認流是latest,批是earliest
endingOffsets,最大的offset,只在批處理的時候設置,如果是latest則為最新的數據
failOnDataLoss,在流處理時,當數據丟失時(比如topic被刪除了,offset在指定的范圍之外),查詢是否報錯,默認為true。這個功能可以當做是一種告警機制,如果對丟失數據不感興趣,可以設置為false。在批處理時,這個值總是為true。
kafkaConsumer.pollTimeoutMs,excutor連接kafka的超時時間,默認是512ms
fetchOffset.numRetries,獲取kafka的offset信息時,嘗試的次數;默認是3次
fetchOffset.retryIntervalMs,嘗試重新讀取kafka offset信息時等待的時間,默認是10ms
maxOffsetsPerTrigger,trigger暫時不會用,不太明白什么意思。Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.

二、輸出數據源

目前Structed Streaming有四種方式:

1.File sink。寫入到文件中。

2.Foreach sink。對輸出的記錄進行任意計算。比如保存到mysql中。目前spark不支持直接寫入外部數據庫,只提供了Foreach接收器自己來實現,而且官網也沒有示例代碼。

3.Console sink。輸出到控制臺,僅用于測試。

4.Memory sink。以表的形式輸出到內存,spark可以讀取內存中的表,僅用于測試。

5.Kafka sink。spark2.2.1更新了kafka sink,所以可以直接使用,如果你的版本低于2.2.1,那就只能使用第二個方法foreach sink來實現。

在配置完輸入,并針對DataFrame或者DataSet做了一些操作后,想要把結果保存起來。就可以使用DataSet.writeStream()方法,配置輸出需要配置下面的內容:

format : 配置輸出的格式
output mode:輸出的格式
query name:查詢的名稱,類似tempview的名字
trigger interval:觸發的間隔時間,如果前一個batch處理超時了,那么不會立即執行下一個batch,而是等下一個trigger時間在執行。
checkpoint location:為保證數據的可靠性,可以設置檢查點保存輸出的結果。

1.output Mode

只有三種類型

complete,把所有的DataFrame的內容輸出,這種模式只能在做agg聚合操作的時候使用,比如ds.group.count,之后可以使用它
append,普通的dataframe在做完map或者filter之后可以使用。這種模式會把新的batch的數據輸出出來,
update,把此次新增的數據輸出,并更新整個dataframe。有點類似之前的streaming的state處理。

2.輸出的類型

2.1)file:保存成csv或者parquet

DF.writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start()

2.2)console:直接輸出到控制臺。一般做測試的時候用這個比較方便(測試用)

DF.writeStream
  .format("console")
  .start()

2.3)memory:可以保存在內容,供后面的代碼使用(測試用)

DF.writeStream
  .queryName("aggregates")
  .outputMode("complete")
  .format("memory")
  .start()
spark.sql("select * from aggregates").show()  

2.4) kafka: 輸出到kafka, 在spark 2.2.1以前用自定義實現寫入。在spark2.2.1后提供了方法。

spark 2.2.1之前寫入kafka的方法

自定義一個類KafkaSink繼承ForeachWriter

import java.util.Properties
 
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.sql.{ForeachWriter, Row}
 
class KafkaSink(topic: String, servers: String) extends ForeachWriter[Row]{
  val kafkaProperties = new Properties()
  kafkaProperties.put("bootstrap.servers", servers)
  kafkaProperties.put("key.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")
  kafkaProperties.put("value.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")
 
  val results = new scala.collection.mutable.HashMap
  var producer: KafkaProducer[String, String] = _
 
  override def open(partitionId: Long, version: Long): Boolean = {
    producer = new KafkaProducer(kafkaProperties)
    return true
  }
 
  override def process(value: Row): Unit = {
    val word = value.getAs[String]("word")
    val count = value.getAs[String]("count")
    producer.send(new ProducerRecord(topic, word, count))
  }
 
  override def close(errorOrNull: Throwable): Unit = {
    producer.close()
  }
}

spark 2.2.1以后寫入kafka的方法

// spark 2.2.1以后
wordcount.writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "wordcount") .start()

2.5)foreach:參數是一個foreach的方法,用戶可以實現這個方法實現寫入mysql自定義的功能。

import java.sql._
 
import org.apache.spark.sql.{ForeachWriter, Row}
 
class JDBCSink(url: String, userName: String, password: String) extends ForeachWriter[Row]{
 
  var statement: Statement = _
  var resultSet: ResultSet = _
  var connection: Connection = _
  // 初始化信息
  override def open(partitionId: Long, version: Long): Boolean = {
    
    Class.forName("com.mysql.jdbc.Driver")
    connection = DriverManager.getConnection(url, userName, password)
    statement = connection.createStatement()
    return true
  }
   // 執行操作
  override def process(value: Row): Unit = {
 
    val word= value.getAs[String]("word")
    val count = value.getAs[Integer]("count")
 
 
    val insertSql = "insert into webCount(word,count)" +
      "values('" + word + "'," + count + ")"
 
    statement.execute(insertSql)
  }
  // 結束操作
  override def close(errorOrNull: Throwable): Unit = {
      connection.close()
  }
}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
 
object KafkaStructedStreaming {
 
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().master("local[2]").appName("streaming").getOrCreate()
 
    val df = sparkSession
        .readStream
        .format("socket")
        .option("host", "hadoop102")
        .option("port", "9999")
        .load()
 
    import sparkSession.implicits._
    val lines = df.selectExpr("CAST(value as STRING)").as[String]
    val weblog = lines.as[String].flatMap(_.split(" "))
 
    val wordCount = weblog.groupBy("value").count().toDF("word", "count")
 
    val url ="jdbc:mysql://hadoop102:3306/test"
    val username="root"
    val password="000000"
 
    val writer = new JDBCSink(url, username, password)
 
    val query = wordCount.writeStream
        .foreach(writer)
        .outputMode("update")
        .trigger(ProcessingTime("10 seconds"))
        .start()
    query.awaitTermination()
}

參考原文鏈接:https://blog.csdn.net/a790439710/article/details/103027602

總結

以上是生活随笔為你收集整理的Structured Streaming系列——输入与输出的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。