日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

maven依赖 spark sql_使用Kafka+Spark+Cassandra构建实时处理引擎

發布時間:2024/2/28 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 maven依赖 spark sql_使用Kafka+Spark+Cassandra构建实时处理引擎 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

大數據技術與架構點擊右側關注,大數據開發領域最強公眾號!

暴走大數據點擊右側關注,暴走大數據!

Apache Kafka 是一個可擴展,高性能,低延遲的平臺,允許我們像消息系統一樣讀取和寫入數據。我們可以很容易地在 Java 中使用 Kafka。

Spark Streaming 是 Apache Spark 的一部分,是一個可擴展、高吞吐、容錯的實時流處理引擎。雖然是使用 Scala 開發的,但是支持 Java API。

Apache Cassandra 是分布式的 NoSQL 數據庫。在這篇文章中,我們將介紹如何通過這三個組件構建一個高擴展、容錯的實時數據處理平臺。

準備

在進行下面文章介紹之前,我們需要先創建好 Kafka 的主題以及 Cassandra 的相關表,具體如下:

在 Kafka 中創建名為 messages 的主題

$KAFKA_HOME$\bin\windows\kafka-topics.bat --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic messages

在 Cassandra 中創建 KeySpace 和 table

CREATE KEYSPACE vocabulary
WITH REPLICATION = {
'class' : 'SimpleStrategy',
'replication_factor' : 1
};
USE vocabulary;
CREATE TABLE words (word text PRIMARY KEY, count int);

上面我們創建了名為 vocabulary 的 KeySpace,以及名為 words 的表。

添加依賴

我們使用 Maven 進行依賴管理,這個項目使用到的依賴如下:


org.apache.spark
spark-core_2.11
2.3.0
provided


org.apache.spark
spark-sql_2.11
2.3.0
provided


org.apache.spark
spark-streaming_2.11
2.3.0
provided


org.apache.spark
spark-streaming-kafka-0-10_2.11
2.3.0


com.datastax.spark
spark-cassandra-connector_2.11
2.3.0


com.datastax.spark
spark-cassandra-connector-java_2.11
1.5.2

數據管道開發

我們將使用 Spark 在 Java 中創建一個簡單的應用程序,它將與我們之前創建的Kafka主題集成。應用程序將讀取已發布的消息并計算每條消息中的單詞頻率。然后將結果更新到 Cassandra 表中。整個數據架構如下:

現在我們來詳細介紹代碼是如何實現的。

獲取 JavaStreamingContext

Spark Streaming 中的切入點是 JavaStreamingContext,所以我們首先需要獲取這個對象,如下:

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("WordCountingApp");
sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");

JavaStreamingContext streamingContext = new JavaStreamingContext(
sparkConf, Durations.seconds(1));

從 Kafka 中讀取數據

有了 JavaStreamingContext 之后,我們就可以從 Kafka 對應主題中讀取實時流數據,如下:

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("messages");

JavaInputDStreamString, String>> messages =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));

我們在程序中提供了 key 和 value 的 deserializer。這個是 Kafka 內置提供的。我們也可以根據自己的需求自定義 deserializer。

處理 DStream

我們在前面只是定義了從 Kafka 中哪張表中獲取數據,這里我們將介紹如何處理這些獲取的數據:

JavaPairDStream<String, String> results = messages
.mapToPair(
record -> new Tuple2<>(record.key(), record.value())
);
JavaDStream<String> lines = results
.map(
tuple2 -> tuple2._2()
);
JavaDStream<String> words = lines
.flatMap(
x -> Arrays.asList(x.split("\\s+")).iterator()
);
JavaPairDStream<String, Integer> wordCounts = words
.mapToPair(
s -> new Tuple2<>(s, 1)
).reduceByKey(
(i1, i2) -> i1 + i2
);

將數據發送到 Cassandra 中

最后我們需要將結果發送到 Cassandra 中,代碼也很簡單。

wordCounts.foreachRDD(
javaRdd -> {
Map<String, Integer> wordCountMap = javaRdd.collectAsMap();
for (String key : wordCountMap.keySet()) {
List wordList = Arrays.asList(new Word(key, wordCountMap.get(key)));
JavaRDD rdd = streamingContext.sparkContext().parallelize(wordList);
javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class)).saveToCassandra();
}
}
);

啟動應用程序

最后,我們需要將這個 Spark Streaming 程序啟動起來,如下:

streamingContext.start();
streamingContext.awaitTermination();

使用 Checkpoints

在實時流處理應用中,將每個批次的狀態保存下來通常很有用。比如在前面的例子中,我們只能計算單詞的當前頻率,如果我們想計算單詞的累計頻率怎么辦呢?這時候我們就可以使用 Checkpoints。新的數據架構如下

為了啟用 Checkpoints,我們需要進行一些改變,如下:

streamingContext.checkpoint("./.checkpoint");

這里我們將 checkpoint 的數據寫入到名為 .checkpoint 的本地目錄中。但是在現實項目中,最好使用 HDFS 目錄。

現在我們可以通過下面的代碼計算單詞的累計頻率:

JavaMapWithStateDStream> cumulativeWordCounts = wordCounts
.mapWithState(
StateSpec.function((word, one, state) -> {
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2 output = new Tuple2<>(word, sum);
state.update(sum);return output;
}
)
);

部署應用程序

最后,我們可以使用 spark-submit 來部署我們的應用程序,具體如下:

$SPARK_HOME$\bin\spark-submit \
--class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \
--master local[2]
\target\spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies.jar

最后,我們可以在 Cassandra 中查看到對應的表中有數據生成了。完整的代碼可以參見 https://github.com/eugenp/tutorials/tree/master/apache-spark

歡迎點贊+收藏+轉發朋友圈素質三連

文章不錯?點個【在看】吧!??

總結

以上是生活随笔為你收集整理的maven依赖 spark sql_使用Kafka+Spark+Cassandra构建实时处理引擎的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 在线免费观看不卡av | 成人免费看片又大又黄 | 致命魔术电影高清在线观看 | 男人操女人的视频 | 在线v| 日韩av不卡一区二区 | 久久综合久久88 | 91精品国产乱码在线观看 | 国产无 | 欧美精品一区二区免费 | 毛片久久久久久久 | 国产成人在线影院 | 九九自拍 | 美女色av | 国产精品午夜在线 | 国产午夜精品久久久久 | 51吃瓜网今日吃瓜 | 免费在线网站 | 亚洲午夜久久久久 | 日韩欧美三级在线 | 婷婷色在线视频 | 可以直接观看的av | av一区不卡 | 中文字幕第一页在线播放 | 国产精品第一页在线观看 | 91精品国产一区 | 午夜精品一区二 | 欧美日韩激情一区 | 欧美性生交xxxxx久久久 | 日韩一区二区三区四区五区 | 98精品视频 | 在线免费观看视频网站 | 男生女生插插插 | 夜夜综合网 | 日本美女交配 | 加勒比视频在线观看 | 在线成人一区 | 激情五月婷婷 | 国产精品suv一区二区69 | 中文字幕一区二区三区免费视频 | 亚洲综合色自拍一区 | 中文字幕在线观看日本 | 久久久久久中文 | 日本少妇xxxxxx | 99re在线视频| 日本一本在线 | 午夜羞羞影院 | 亚洲熟女综合一区二区三区 | 色爽 | 丰满大乳奶做爰ⅹxx视频 | 免费av福利 | 免费一级片 | 亚洲精品一区二区三区婷婷月 | 6699嫩草久久久精品影院 | 国产又黄又大又粗视频 | 日韩不卡在线观看 | 视频免费在线 | 玖玖玖在线观看 | 亚洲色成人网站www永久四虎 | 四色网址 | 午夜天堂 | 精品乱码一区二区三区 | 国产成人无码专区 | 五级黄高潮片90分钟视频 | 三级在线国产 | 色综合亚洲| 亚洲精品无码成人 | 新超碰97 | 伊人影视网| 精品福利一区二区三区 | 欧美sese| 精品久久人妻av中文字幕 | 这里只有精品在线观看 | 网友自拍av| 影音先锋毛片 | 菊肠扩张playh | 人妻 丝袜美腿 中文字幕 | 成人免费观看视频大全 | 天天舔天天操天天干 | 伊人干综合| 亚洲一级无毛 | 91亚洲精品久久久蜜桃借种 | 性日本xxx | 蜜臂av| 变态另类丨国产精品 | 日日艹 | 免费一二区 | 欧美女人交配视频 | 男女ss视频 | wwwwxxxxx日本 | 亚洲无码精品免费 | 九九精品影院 | 成人精品一区二区三区视频 | 99re8在线精品视频免费播放 | 日韩欧美网 | 欧美影视一区 | 国产吃瓜黑料一区二区 | 校园春色av | 自拍偷拍第2页 |