日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

基于持久化的wordcount程序 foreachRDD

發(fā)布時(shí)間:2023/12/3 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 基于持久化的wordcount程序 foreachRDD 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

基于持久化的wordCount程序!中途遇到了一個(gè)坑!
自己手動(dòng)封裝一個(gè)靜態(tài)線程池,使用RDD的foreachPartition操作,并且在該操作內(nèi)部,從靜態(tài)連接池中,通過(guò)靜態(tài)方法,獲取一個(gè)連接,使用之后再換回來(lái),這樣的話,可以在對(duì)個(gè)RDD的partition之間,也可以復(fù)用連接了,而且可以讓連接池采取懶創(chuàng)建的策略,并且空閑一段時(shí)間后,將其釋放掉。
代碼:
package com.bynear.spark_Streaming;

import com.bynear.tool.ConnectionPool; import com.google.common.base.Optional; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2;import java.sql.Connection; import java.sql.Statement; import java.util.Arrays; import java.util.Iterator; import java.util.List;/* 2018/5/16* 11:30* 基于持久化的wordcount程序*/ public class PersisWordCount {public static void main(String[] args) {final SparkConf conf = new SparkConf().setAppName("persiswordcount").setMaster("local[2]");JavaSparkContext jsc = new JavaSparkContext(conf);JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(5));jssc.checkpoint("hdfs://Spark01:9000/zjs/chepoint");JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterable<String> call(String line) throws Exception {return Arrays.asList(line.split(" "));}});JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<String, Integer>(word, 1);}});final JavaPairDStream<String, Integer> wordcount = pairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {@Overridepublic Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception {Integer newValue = 0;if (state.isPresent()) {newValue = state.get();}for (Integer value : values) {newValue += value;}return Optional.of(newValue);}});wordcount.foreachRDD(new Function<JavaPairRDD<String, Integer>, Void>() {@Overridepublic Void call(JavaPairRDD<String, Integer> wordCountsRDD) throws Exception {wordCountsRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Integer>>>() {@Overridepublic void call(Iterator<Tuple2<String, Integer>> wordcounts) throws Exception {Connection conn = ConnectionPool.getConection();Tuple2<String, Integer> wordcount = null;while (wordcounts.hasNext()) {wordcount = wordcounts.next();String sql = "insert into word (word,count) values ('" + wordcount._1 + "'," + wordcount._2 + ")";System.out.println(sql+conn+"YES");Statement stmt = conn.createStatement();stmt.executeUpdate(sql);}ConnectionPool.returnConnection(conn);}});return null;}});jssc.start();jssc.awaitTermination();jssc.stop();} }

手動(dòng)搭建的線程池

package com.bynear.tool; import java.sql.Connection; import java.sql.DriverManager; import java.util.LinkedList; /*** 2018/5/16* 12:24*/ public class ConnectionPool {// 靜態(tài)的Connection隊(duì)列public static LinkedList<Connection> connectionQueue;// 加載驅(qū)動(dòng)static {try {Class.forName("com.mysql.jdbc.Driver");} catch (ClassNotFoundException e) {e.printStackTrace();}}// 獲取連接,多線程訪問(wèn)并發(fā)控制public synchronized static Connection getConection() {connectionQueue = new LinkedList<Connection>();try {if (connectionQueue.isEmpty()) {for (int i = 0; i < 2; i++) {Connection conn = DriverManager.getConnection("jdbc:mysql://192.168.2.10:3306/testdb","root", "123456");connectionQueue.push(conn);}}} catch (Exception e) {e.printStackTrace();}return connectionQueue.poll();}public static void returnConnection(Connection conn) {connectionQueue.push(conn);} }

最開(kāi)始自己搭建的線程池中,用的方法為
if (connectionQueue==null) {
for (int i = 0; i < 2; i++) {
Connection conn = DriverManager.getConnection(“jdbc:mysql://192.168.2.10:3306/testdb”,
“root”, “123456”);
connectionQueue.push(conn);
}
}
將代碼提交到集群上時(shí),一直抱空指指針。
后來(lái) System.out.println(sql+conn+”YES”);輸出一下conn
conn = ConnectionPool.getConection();
insert into wordcount (word,count) values (‘heool,word’,1)nullYES 為null

跑成功代碼:
if (connectionQueue.isEmpty()) {
for (int i = 0; i < 2; i++) {
Connection conn = DriverManager.getConnection(“jdbc:mysql://192.168.2.10:3306/testdb”,
“root”, “123456”);
connectionQueue.push(conn);
}
}
輸出結(jié)果:在SQL中查詢:
mysql> select * from word;
+—-+———————+————+——-+
| id | updated_time | word | count |
+—-+———————+————+——-+
| 1 | 2018-05-16 01:11:10 | ???,?? | 1 |
| 2 | 2018-05-16 01:11:15 | ???,?? | 1 |
| 3 | 2018-05-16 01:13:00 | hello,word | 1 |
| 4 | 2018-05-16 01:16:00 | hello | 1 |
| 5 | 2018-05-16 01:16:00 | word | 1 |
| 6 | 2018-05-16 01:16:05 | hello | 1 |
| 7 | 2018-05-16 01:16:05 | word | 1 |
+—-+———————+————+——-+
7 rows in set (0.00 sec)
完美成功!!!!

總結(jié)

以上是生活随笔為你收集整理的基于持久化的wordcount程序 foreachRDD的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。