【Spark-core学习之九】 Spark案例
環(huán)境
虛擬機(jī):VMware 10
Linux版本:CentOS-6.5-x86_64
客戶端:Xshell4
FTP:Xftp4
jdk1.8
scala-2.10.4(依賴(lài)jdk1.8)
spark-1.6
一、PV & UV
PV是網(wǎng)站分析的一個(gè)術(shù)語(yǔ),用以衡量網(wǎng)站用戶訪問(wèn)的網(wǎng)頁(yè)的數(shù)量。對(duì)于廣告主,PV值可預(yù)期它可以帶來(lái)多少?gòu)V告收入。一般來(lái)說(shuō),PV與來(lái)訪者的數(shù)量成正比,但是PV并不直接決定頁(yè)面的真實(shí)來(lái)訪者數(shù)量,如同一個(gè)來(lái)訪者通過(guò)不斷的刷新頁(yè)面,也可以制造出非常高的PV。
1、什么是PV值
PV(page view)即頁(yè)面瀏覽量或點(diǎn)擊量,是衡量一個(gè)網(wǎng)站或網(wǎng)頁(yè)用戶訪問(wèn)量。具體的說(shuō),PV值就是所有訪問(wèn)者在24小時(shí)(0點(diǎn)到24點(diǎn))內(nèi)看了某個(gè)網(wǎng)站多少個(gè)頁(yè)面或某個(gè)網(wǎng)頁(yè)多少次。PV是指頁(yè)面刷新的次數(shù),每一次頁(yè)面刷新,就算做一次PV流量。
度量方法就是從瀏覽器發(fā)出一個(gè)對(duì)網(wǎng)絡(luò)服務(wù)器的請(qǐng)求(Request),網(wǎng)絡(luò)服務(wù)器接到這個(gè)請(qǐng)求后,會(huì)將該請(qǐng)求對(duì)應(yīng)的一個(gè)網(wǎng)頁(yè)(Page)發(fā)送給瀏覽器,從而產(chǎn)生了一個(gè)PV。那么在這里只要是這個(gè)請(qǐng)求發(fā)送給了瀏覽器,無(wú)論這個(gè)頁(yè)面是否完全打開(kāi)(下載完成),那么都是應(yīng)當(dāng)計(jì)為1個(gè)PV。
?
package com.wjy.test;import java.util.List;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;public class Pv {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("PV"); JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/pvuvdata");//根據(jù)PV定義 某個(gè)頁(yè)面/網(wǎng)址的訪問(wèn)數(shù)量 將每一條記錄根據(jù)網(wǎng)址解析出一條訪問(wèn)量JavaPairRDD<String, Integer> ipwebrdd = rdd.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {//7.213.213.208 吉林 2018-03-29 1522294977303 1920936170939152672 www.dangdang.com LoginString[] ss = line.split("\t");return new Tuple2<String, Integer>(ss[5],1);}});//累加頁(yè)面訪問(wèn)量JavaPairRDD<String, Integer> mapToPair = ipwebrdd.reduceByKey(new Function2<Integer, Integer, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {private static final long serialVersionUID = 1L;//換個(gè) 用于按照整數(shù)key排序 @Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> tuple)throws Exception {return tuple.swap();}}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> tuple)throws Exception {return tuple.swap();}});List<Tuple2<String, Integer>> list = mapToPair.take(5);for(Tuple2<String, Integer> t:list){System.out.println(t);}sc.stop();}}結(jié)果:
(www.baidu.com,18791) (www.dangdang.com,18751) (www.suning.com,18699) (www.mi.com,18678) (www.taobao.com,18613)?
2、什么是UV值
UV(unique visitor)即獨(dú)立訪客數(shù),指訪問(wèn)某個(gè)站點(diǎn)或點(diǎn)擊某個(gè)網(wǎng)頁(yè)的不同IP地址的人數(shù)。在同一天內(nèi),UV只記錄第一次進(jìn)入網(wǎng)站的具有獨(dú)立IP的訪問(wèn)者,在同一天內(nèi)再次訪問(wèn)該網(wǎng)站則不計(jì)數(shù)。UV提供了一定時(shí)間內(nèi)不同觀眾數(shù)量的統(tǒng)計(jì)指標(biāo),而沒(méi)有反應(yīng)出網(wǎng)站的全面活動(dòng)。
?
package com.wjy.test;import java.util.List;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;public class Uv {/*** 根據(jù)IP網(wǎng)址來(lái)確定唯一用戶訪問(wèn) 然后排重 累計(jì)* @param args*/public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("UV");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/pvuvdata");JavaPairRDD<String, Integer> rdd2 = rdd.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {String[] split = line.split("\t");return new Tuple2<String, Integer>(split[0]+"_"+split[5],1);}}).distinct().mapToPair(new PairFunction<Tuple2<String,Integer>, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<String, Integer> tuple)throws Exception {return new Tuple2<String, Integer>(tuple._1.split("_")[1],1);}});//累加JavaPairRDD<String, Integer> rdd3 = rdd2.reduceByKey(new Function2<Integer, Integer, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {private static final long serialVersionUID = 1L;//反轉(zhuǎn) 數(shù)值做KEY 用于排序 @Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> tuple)throws Exception {return tuple.swap();}}).sortByKey(false)//降序排序.mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {private static final long serialVersionUID = 1L;//排序之后 反轉(zhuǎn)回來(lái) @Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> tuple)throws Exception {return tuple.swap();}});//取前5個(gè)元素List<Tuple2<String, Integer>> list = rdd3.take(5);for(Tuple2<String, Integer> t:list){System.out.println(t);}sc.stop();}}結(jié)果:
(www.baidu.com,15830) (www.suning.com,15764) (www.mi.com,15740) (www.jd.com,15682) (www.dangdang.com,15641)?
二、二次排序
對(duì)于兩列以上的數(shù)據(jù),要求對(duì)第一列排序之后,之后的列也要依次排序,思路就是:先對(duì)第一列進(jìn)行排序,對(duì)于第一列數(shù)值相同,再對(duì)第二列進(jìn)行排序。
舉例:
待排序數(shù)據(jù):secondSort.txt
3 1 5 2 6 5 8 123 1 4 4 123 5 432 3 54 5 121 8 654 3 98 package com.wjy.test;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;public class SecondSort{public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("SecondSort");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/secondSort.txt");//轉(zhuǎn)成K-V格式//PairFunction 入?yún)?-rdd的一行記錄 入?yún)? 入?yún)?是call的出參JavaPairRDD<SecondSortKey, String> mapToPair = rdd.mapToPair(new PairFunction<String, SecondSortKey, String>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<SecondSortKey, String> call(String line)throws Exception {String[] sps = line.split(" ");int first = Integer.valueOf(sps[0]);int second = Integer.valueOf(sps[1]);SecondSortKey ss = new SecondSortKey(first,second);return new Tuple2<SecondSortKey, String>(ss,line);}});//sortByKey 會(huì)使用key也就是SecondSortKey的compareTo方法mapToPair.sortByKey(false).foreach(new VoidFunction<Tuple2<SecondSortKey,String>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<SecondSortKey, String> tuple) throws Exception {System.out.println(tuple._2);}});sc.stop();
}}
?
對(duì)于KEY自定義類(lèi)型 實(shí)現(xiàn)comparable接口 實(shí)現(xiàn)comparTo方法
package com.wjy.test;import java.io.Serializable;public class SecondSortKey implements Serializable ,Comparable<SecondSortKey>{private static final long serialVersionUID = 1L;private int first;private int second;public SecondSortKey(int first,int second){super();this.first=first;this.second=second;}public int getFirst() {return first;}public void setFirst(int first) {this.first = first;}public int getSecond() {return second;}public void setSecond(int second) {this.second = second;}@Overridepublic int compareTo(SecondSortKey o) {//先比較第一個(gè)數(shù)值 如果相同再比較第二個(gè)值 否則直接返回第一個(gè)值的比較結(jié)果if (getFirst()-o.getFirst() == 0){return getSecond() - o.getSecond();}else{return getFirst()-o.getFirst();}}}?排序結(jié)果:
8 654 8 123 6 5 5 432 5 121 5 2 4 123 3 98 3 54 3 1 1 4?
三、分組取topN
對(duì)于多組數(shù)據(jù),去每一組數(shù)據(jù)前N個(gè)數(shù)據(jù),比如列出每個(gè)班級(jí)的前三名等等問(wèn)題。
解決的思路:先分組,然后每一組排序,取前N個(gè)。
案例:有三個(gè)班級(jí)的分?jǐn)?shù)清單scores.txt,取出每班前三名。
?groupByKey+排序算法:
package com.wjy.test;import java.util.Iterator;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;public class TopNtest {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("TopOs");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/scores.txt");//轉(zhuǎn)成K-V格式 方便下一步分組和排序//PairFunction 入?yún)?rdd的一行數(shù)據(jù) 入?yún)?、3是call的出參元素JavaPairRDD<String, Integer> mapToPair = rdd.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {String[] ss = line.split("\t");return new Tuple2<String, Integer>(ss[0],Integer.valueOf(ss[1]));}});//使用groupByKey 將相同班級(jí)的數(shù)據(jù)放在一個(gè)集合里mapToPair.groupByKey().foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception {String classname = tuple._1;Iterator<Integer> it = tuple._2.iterator();Integer[] top3 = new Integer[3];while(it.hasNext()){Integer score = it.next();//排序for (int i = 0; i < top3.length; i++) {if(top3[i] == null){top3[i] = score;break;}else if(score > top3[i]){for (int j = 2; j > i; j--) {top3[j] = top3[j-1];}top3[i] = score;break;}}}System.out.println("classname="+classname);for (Integer i:top3){System.out.println(i);}}}); sc.stop(); } }?
topN 結(jié)果:
classname=class3 98 70 70 classname=class1 102 100 99 classname=class2 88 85 85?
轉(zhuǎn)載于:https://www.cnblogs.com/cac2020/p/10684754.html
總結(jié)
以上是生活随笔為你收集整理的【Spark-core学习之九】 Spark案例的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 如何用Kaplan-MeierPlott
- 下一篇: unittest中的测试固件