Structured Streaming系列——输入与输出
一、輸入數據源
1. 文件輸入數據源(FIie)
file數據源提供了很多種內置的格式,如csv、parquet、orc、json等等,就以csv為例:
import spark.implicits._
val userSchema = new StructType()
.add("name", "string").add("age", "integer")
val lines = spark.readStream
.option("sep", ";")
.schema(userSchema)
.csv("file:///data/*")
val query = lines.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
在對應的目錄下新建文件時,就可以在控制臺看到對應的數據了。
還有一些其他可以控制的參數:
| maxFilesPerTrigger | 每個batch最多的文件數,默認是沒有限制。比如我設置了這個值為1,那么同時增加了5個文件,這5個文件會每個文件作為一波數據,更新streaming dataframe。 |
| latestFirst | 是否優先處理最新的文件,默認是false。如果設置為true,那么最近被更新的會優先處理。這種場景一般是在監聽日志文件的時候使用。 |
| fileNameOnly | 是否只監聽固定名稱的文件 |
2.網絡輸入數據源(socket)
一般都是基于這個socket來做測試。首先開啟一個socket服務器(nc -lk 9999),然后streaming這邊連接進行處理。
spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
3. 輸入數據源(kafka)
// Subscribe to 1 topic
val df= spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","host1:port1,host2:port2")
.option("subscribe","topic1")
.load()
df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.as[(String,String)]
// Subscribe to multiple topics
val df= spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","host1:port1,host2:port2")
.option("subscribe","topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.as[(String,String)]
// Subscribe to a pattern
val df= spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","host1:port1,host2:port2")
.option("subscribePattern","topic.*")
.load()
df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.as[(String,String)]
以批的形式查詢
關于Kafka的offset,structured streaming默認提供了幾種方式:
//設置每個分區的起始和結束值
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
//配置起始和結束的offset值(默認)
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Schema信息
讀取后的數據的Schema是固定的,包含的列如下:
| Column | Type | 說明 |
| key | binary | 信息key |
| value | binary | 信息的value(我們自己的數據) |
| topic | string | 主題 |
| partition | int | 分區 |
| offset | long | 偏移值 |
| timestamp | long | 時間戳 |
| timestampType | int | 類型 |
source相關的配置
無論是流的形式,還是批的形式,都需要一些必要的參數:
kafka.bootstrap.servers kafka的服務器配置,host:post形式,用逗號進行分割,如host1:9000,host2:9000
assign,以json的形式指定topic信息
subscribe,通過逗號分隔,指定topic信息
subscribePattern,通過java的正則指定多個topic
assign、subscribe、subscribePattern同時之中能使用一個。
其他比較重要的參數有:
startingOffsets, offset開始的值,如果是earliest,則從最早的數據開始讀;如果是latest,則從最新的數據開始讀。默認流是latest,批是earliest
endingOffsets,最大的offset,只在批處理的時候設置,如果是latest則為最新的數據
failOnDataLoss,在流處理時,當數據丟失時(比如topic被刪除了,offset在指定的范圍之外),查詢是否報錯,默認為true。這個功能可以當做是一種告警機制,如果對丟失數據不感興趣,可以設置為false。在批處理時,這個值總是為true。
kafkaConsumer.pollTimeoutMs,excutor連接kafka的超時時間,默認是512ms
fetchOffset.numRetries,獲取kafka的offset信息時,嘗試的次數;默認是3次
fetchOffset.retryIntervalMs,嘗試重新讀取kafka offset信息時等待的時間,默認是10ms
maxOffsetsPerTrigger,trigger暫時不會用,不太明白什么意思。Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.
二、輸出數據源
目前Structed Streaming有四種方式:
1.File sink。寫入到文件中。
2.Foreach sink。對輸出的記錄進行任意計算。比如保存到mysql中。目前spark不支持直接寫入外部數據庫,只提供了Foreach接收器自己來實現,而且官網也沒有示例代碼。
3.Console sink。輸出到控制臺,僅用于測試。
4.Memory sink。以表的形式輸出到內存,spark可以讀取內存中的表,僅用于測試。
5.Kafka sink。spark2.2.1更新了kafka sink,所以可以直接使用,如果你的版本低于2.2.1,那就只能使用第二個方法foreach sink來實現。
在配置完輸入,并針對DataFrame或者DataSet做了一些操作后,想要把結果保存起來。就可以使用DataSet.writeStream()方法,配置輸出需要配置下面的內容:
format : 配置輸出的格式
output mode:輸出的格式
query name:查詢的名稱,類似tempview的名字
trigger interval:觸發的間隔時間,如果前一個batch處理超時了,那么不會立即執行下一個batch,而是等下一個trigger時間在執行。
checkpoint location:為保證數據的可靠性,可以設置檢查點保存輸出的結果。
1.output Mode
只有三種類型
complete,把所有的DataFrame的內容輸出,這種模式只能在做agg聚合操作的時候使用,比如ds.group.count,之后可以使用它
append,普通的dataframe在做完map或者filter之后可以使用。這種模式會把新的batch的數據輸出出來,
update,把此次新增的數據輸出,并更新整個dataframe。有點類似之前的streaming的state處理。
2.輸出的類型
2.1)file:保存成csv或者parquet
DF.writeStream
.format("parquet")
.option("checkpointLocation", "path/to/checkpoint/dir")
.option("path", "path/to/destination/dir")
.start()
2.2)console:直接輸出到控制臺。一般做測試的時候用這個比較方便(測試用)
DF.writeStream
.format("console")
.start()
2.3)memory:可以保存在內容,供后面的代碼使用(測試用)
DF.writeStream
.queryName("aggregates")
.outputMode("complete")
.format("memory")
.start()
spark.sql("select * from aggregates").show()
2.4) kafka: 輸出到kafka, 在spark 2.2.1以前用自定義實現寫入。在spark2.2.1后提供了方法。
spark 2.2.1之前寫入kafka的方法
自定義一個類KafkaSink繼承ForeachWriter
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.sql.{ForeachWriter, Row}
class KafkaSink(topic: String, servers: String) extends ForeachWriter[Row]{
val kafkaProperties = new Properties()
kafkaProperties.put("bootstrap.servers", servers)
kafkaProperties.put("key.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")
kafkaProperties.put("value.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")
val results = new scala.collection.mutable.HashMap
var producer: KafkaProducer[String, String] = _
override def open(partitionId: Long, version: Long): Boolean = {
producer = new KafkaProducer(kafkaProperties)
return true
}
override def process(value: Row): Unit = {
val word = value.getAs[String]("word")
val count = value.getAs[String]("count")
producer.send(new ProducerRecord(topic, word, count))
}
override def close(errorOrNull: Throwable): Unit = {
producer.close()
}
}
spark 2.2.1以后寫入kafka的方法
// spark 2.2.1以后
wordcount.writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "wordcount") .start()
2.5)foreach:參數是一個foreach的方法,用戶可以實現這個方法實現寫入mysql自定義的功能。
import java.sql._
import org.apache.spark.sql.{ForeachWriter, Row}
class JDBCSink(url: String, userName: String, password: String) extends ForeachWriter[Row]{
var statement: Statement = _
var resultSet: ResultSet = _
var connection: Connection = _
// 初始化信息
override def open(partitionId: Long, version: Long): Boolean = {
Class.forName("com.mysql.jdbc.Driver")
connection = DriverManager.getConnection(url, userName, password)
statement = connection.createStatement()
return true
}
// 執行操作
override def process(value: Row): Unit = {
val word= value.getAs[String]("word")
val count = value.getAs[Integer]("count")
val insertSql = "insert into webCount(word,count)" +
"values('" + word + "'," + count + ")"
statement.execute(insertSql)
}
// 結束操作
override def close(errorOrNull: Throwable): Unit = {
connection.close()
}
}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
object KafkaStructedStreaming {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().master("local[2]").appName("streaming").getOrCreate()
val df = sparkSession
.readStream
.format("socket")
.option("host", "hadoop102")
.option("port", "9999")
.load()
import sparkSession.implicits._
val lines = df.selectExpr("CAST(value as STRING)").as[String]
val weblog = lines.as[String].flatMap(_.split(" "))
val wordCount = weblog.groupBy("value").count().toDF("word", "count")
val url ="jdbc:mysql://hadoop102:3306/test"
val username="root"
val password="000000"
val writer = new JDBCSink(url, username, password)
val query = wordCount.writeStream
.foreach(writer)
.outputMode("update")
.trigger(ProcessingTime("10 seconds"))
.start()
query.awaitTermination()
}
參考原文鏈接:https://blog.csdn.net/a790439710/article/details/103027602
總結
以上是生活随笔為你收集整理的Structured Streaming系列——输入与输出的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 推荐一款功能强大的Tomcat 管理监控
- 下一篇: 沃尔玛计划在美国门店建设快速充电站,20