日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

【Spark-core学习之九】 Spark案例

發(fā)布時(shí)間:2025/5/22 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【Spark-core学习之九】 Spark案例 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

環(huán)境
  虛擬機(jī):VMware 10
  Linux版本:CentOS-6.5-x86_64
  客戶端:Xshell4
  FTP:Xftp4
  jdk1.8
  scala-2.10.4(依賴(lài)jdk1.8)
  spark-1.6

一、PV & UV

  PV是網(wǎng)站分析的一個(gè)術(shù)語(yǔ),用以衡量網(wǎng)站用戶訪問(wèn)的網(wǎng)頁(yè)的數(shù)量。對(duì)于廣告主,PV值可預(yù)期它可以帶來(lái)多少?gòu)V告收入。一般來(lái)說(shuō),PV與來(lái)訪者的數(shù)量成正比,但是PV并不直接決定頁(yè)面的真實(shí)來(lái)訪者數(shù)量,如同一個(gè)來(lái)訪者通過(guò)不斷的刷新頁(yè)面,也可以制造出非常高的PV。


1、什么是PV值
PV(page view)即頁(yè)面瀏覽量或點(diǎn)擊量,是衡量一個(gè)網(wǎng)站或網(wǎng)頁(yè)用戶訪問(wèn)量。具體的說(shuō),PV值就是所有訪問(wèn)者在24小時(shí)(0點(diǎn)到24點(diǎn))內(nèi)看了某個(gè)網(wǎng)站多少個(gè)頁(yè)面或某個(gè)網(wǎng)頁(yè)多少次。PV是指頁(yè)面刷新的次數(shù),每一次頁(yè)面刷新,就算做一次PV流量。
度量方法就是從瀏覽器發(fā)出一個(gè)對(duì)網(wǎng)絡(luò)服務(wù)器的請(qǐng)求(Request),網(wǎng)絡(luò)服務(wù)器接到這個(gè)請(qǐng)求后,會(huì)將該請(qǐng)求對(duì)應(yīng)的一個(gè)網(wǎng)頁(yè)(Page)發(fā)送給瀏覽器,從而產(chǎn)生了一個(gè)PV。那么在這里只要是這個(gè)請(qǐng)求發(fā)送給了瀏覽器,無(wú)論這個(gè)頁(yè)面是否完全打開(kāi)(下載完成),那么都是應(yīng)當(dāng)計(jì)為1個(gè)PV。

?

package com.wjy.test;import java.util.List;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;public class Pv {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("PV"); JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/pvuvdata");//根據(jù)PV定義 某個(gè)頁(yè)面/網(wǎng)址的訪問(wèn)數(shù)量 將每一條記錄根據(jù)網(wǎng)址解析出一條訪問(wèn)量JavaPairRDD<String, Integer> ipwebrdd = rdd.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {//7.213.213.208 吉林 2018-03-29 1522294977303 1920936170939152672 www.dangdang.com LoginString[] ss = line.split("\t");return new Tuple2<String, Integer>(ss[5],1);}});//累加頁(yè)面訪問(wèn)量JavaPairRDD<String, Integer> mapToPair = ipwebrdd.reduceByKey(new Function2<Integer, Integer, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {private static final long serialVersionUID = 1L;//換個(gè) 用于按照整數(shù)key排序 @Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> tuple)throws Exception {return tuple.swap();}}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> tuple)throws Exception {return tuple.swap();}});List<Tuple2<String, Integer>> list = mapToPair.take(5);for(Tuple2<String, Integer> t:list){System.out.println(t);}sc.stop();}}

結(jié)果:

(www.baidu.com,18791) (www.dangdang.com,18751) (www.suning.com,18699) (www.mi.com,18678) (www.taobao.com,18613)

?

2、什么是UV值
UV(unique visitor)即獨(dú)立訪客數(shù),指訪問(wèn)某個(gè)站點(diǎn)或點(diǎn)擊某個(gè)網(wǎng)頁(yè)的不同IP地址的人數(shù)。在同一天內(nèi),UV只記錄第一次進(jìn)入網(wǎng)站的具有獨(dú)立IP的訪問(wèn)者,在同一天內(nèi)再次訪問(wèn)該網(wǎng)站則不計(jì)數(shù)。UV提供了一定時(shí)間內(nèi)不同觀眾數(shù)量的統(tǒng)計(jì)指標(biāo),而沒(méi)有反應(yīng)出網(wǎng)站的全面活動(dòng)。

?

package com.wjy.test;import java.util.List;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;public class Uv {/*** 根據(jù)IP網(wǎng)址來(lái)確定唯一用戶訪問(wèn) 然后排重 累計(jì)* @param args*/public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("UV");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/pvuvdata");JavaPairRDD<String, Integer> rdd2 = rdd.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {String[] split = line.split("\t");return new Tuple2<String, Integer>(split[0]+"_"+split[5],1);}}).distinct().mapToPair(new PairFunction<Tuple2<String,Integer>, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<String, Integer> tuple)throws Exception {return new Tuple2<String, Integer>(tuple._1.split("_")[1],1);}});//累加JavaPairRDD<String, Integer> rdd3 = rdd2.reduceByKey(new Function2<Integer, Integer, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {private static final long serialVersionUID = 1L;//反轉(zhuǎn) 數(shù)值做KEY 用于排序 @Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> tuple)throws Exception {return tuple.swap();}}).sortByKey(false)//降序排序.mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {private static final long serialVersionUID = 1L;//排序之后 反轉(zhuǎn)回來(lái) @Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> tuple)throws Exception {return tuple.swap();}});//取前5個(gè)元素List<Tuple2<String, Integer>> list = rdd3.take(5);for(Tuple2<String, Integer> t:list){System.out.println(t);}sc.stop();}}

結(jié)果:

(www.baidu.com,15830) (www.suning.com,15764) (www.mi.com,15740) (www.jd.com,15682) (www.dangdang.com,15641)

?

二、二次排序

對(duì)于兩列以上的數(shù)據(jù),要求對(duì)第一列排序之后,之后的列也要依次排序,思路就是:先對(duì)第一列進(jìn)行排序,對(duì)于第一列數(shù)值相同,再對(duì)第二列進(jìn)行排序。

舉例:

待排序數(shù)據(jù):secondSort.txt

3 1 5 2 6 5 8 123 1 4 4 123 5 432 3 54 5 121 8 654 3 98 package com.wjy.test;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;public class SecondSort{public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("SecondSort");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/secondSort.txt");//轉(zhuǎn)成K-V格式//PairFunction 入?yún)?-rdd的一行記錄 入?yún)? 入?yún)?是call的出參JavaPairRDD<SecondSortKey, String> mapToPair = rdd.mapToPair(new PairFunction<String, SecondSortKey, String>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<SecondSortKey, String> call(String line)throws Exception {String[] sps = line.split(" ");int first = Integer.valueOf(sps[0]);int second = Integer.valueOf(sps[1]);SecondSortKey ss = new SecondSortKey(first,second);return new Tuple2<SecondSortKey, String>(ss,line);}});//sortByKey 會(huì)使用key也就是SecondSortKey的compareTo方法mapToPair.sortByKey(false).foreach(new VoidFunction<Tuple2<SecondSortKey,String>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<SecondSortKey, String> tuple) throws Exception {System.out.println(tuple._2);}});
    
sc.stop();
    
    
}}

?

對(duì)于KEY自定義類(lèi)型 實(shí)現(xiàn)comparable接口 實(shí)現(xiàn)comparTo方法

package com.wjy.test;import java.io.Serializable;public class SecondSortKey implements Serializable ,Comparable<SecondSortKey>{private static final long serialVersionUID = 1L;private int first;private int second;public SecondSortKey(int first,int second){super();this.first=first;this.second=second;}public int getFirst() {return first;}public void setFirst(int first) {this.first = first;}public int getSecond() {return second;}public void setSecond(int second) {this.second = second;}@Overridepublic int compareTo(SecondSortKey o) {//先比較第一個(gè)數(shù)值 如果相同再比較第二個(gè)值 否則直接返回第一個(gè)值的比較結(jié)果if (getFirst()-o.getFirst() == 0){return getSecond() - o.getSecond();}else{return getFirst()-o.getFirst();}}}

?排序結(jié)果:

8 654 8 123 6 5 5 432 5 121 5 2 4 123 3 98 3 54 3 1 1 4

?

三、分組取topN

對(duì)于多組數(shù)據(jù),去每一組數(shù)據(jù)前N個(gè)數(shù)據(jù),比如列出每個(gè)班級(jí)的前三名等等問(wèn)題。
解決的思路:先分組,然后每一組排序,取前N個(gè)。
案例:有三個(gè)班級(jí)的分?jǐn)?shù)清單scores.txt,取出每班前三名。

class1 100 class2 85 class3 70 class1 102 class2 65 class1 45 class2 85 class3 70 class1 16 class2 88 class1 95 class2 37 class3 98 class1 99 class2 23

?groupByKey+排序算法:

package com.wjy.test;import java.util.Iterator;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;public class TopNtest {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("TopOs");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/scores.txt");//轉(zhuǎn)成K-V格式 方便下一步分組和排序//PairFunction 入?yún)?rdd的一行數(shù)據(jù) 入?yún)?、3是call的出參元素JavaPairRDD<String, Integer> mapToPair = rdd.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {String[] ss = line.split("\t");return new Tuple2<String, Integer>(ss[0],Integer.valueOf(ss[1]));}});//使用groupByKey 將相同班級(jí)的數(shù)據(jù)放在一個(gè)集合里mapToPair.groupByKey().foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception {String classname = tuple._1;Iterator<Integer> it = tuple._2.iterator();Integer[] top3 = new Integer[3];while(it.hasNext()){Integer score = it.next();//排序for (int i = 0; i < top3.length; i++) {if(top3[i] == null){top3[i] = score;break;}else if(score > top3[i]){for (int j = 2; j > i; j--) {top3[j] = top3[j-1];}top3[i] = score;break;}}}System.out.println("classname="+classname);for (Integer i:top3){System.out.println(i);}}}); sc.stop(); } }

?

topN 結(jié)果:

classname=class3 98 70 70 classname=class1 102 100 99 classname=class2 88 85 85

?

轉(zhuǎn)載于:https://www.cnblogs.com/cac2020/p/10684754.html

總結(jié)

以上是生活随笔為你收集整理的【Spark-core学习之九】 Spark案例的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。