日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

【Spark-core学习之九】 Spark案例

發(fā)布時(shí)間:2025/5/22 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【Spark-core学习之九】 Spark案例 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

環(huán)境
  虛擬機(jī):VMware 10
  Linux版本:CentOS-6.5-x86_64
  客戶端:Xshell4
  FTP:Xftp4
  jdk1.8
  scala-2.10.4(依賴jdk1.8)
  spark-1.6

一、PV & UV

  PV是網(wǎng)站分析的一個(gè)術(shù)語,用以衡量網(wǎng)站用戶訪問的網(wǎng)頁的數(shù)量。對于廣告主,PV值可預(yù)期它可以帶來多少廣告收入。一般來說,PV與來訪者的數(shù)量成正比,但是PV并不直接決定頁面的真實(shí)來訪者數(shù)量,如同一個(gè)來訪者通過不斷的刷新頁面,也可以制造出非常高的PV。


1、什么是PV值
PV(page view)即頁面瀏覽量或點(diǎn)擊量,是衡量一個(gè)網(wǎng)站或網(wǎng)頁用戶訪問量。具體的說,PV值就是所有訪問者在24小時(shí)(0點(diǎn)到24點(diǎn))內(nèi)看了某個(gè)網(wǎng)站多少個(gè)頁面或某個(gè)網(wǎng)頁多少次。PV是指頁面刷新的次數(shù),每一次頁面刷新,就算做一次PV流量。
度量方法就是從瀏覽器發(fā)出一個(gè)對網(wǎng)絡(luò)服務(wù)器的請求(Request),網(wǎng)絡(luò)服務(wù)器接到這個(gè)請求后,會將該請求對應(yīng)的一個(gè)網(wǎng)頁(Page)發(fā)送給瀏覽器,從而產(chǎn)生了一個(gè)PV。那么在這里只要是這個(gè)請求發(fā)送給了瀏覽器,無論這個(gè)頁面是否完全打開(下載完成),那么都是應(yīng)當(dāng)計(jì)為1個(gè)PV。

?

package com.wjy.test;import java.util.List;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;public class Pv {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("PV"); JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/pvuvdata");//根據(jù)PV定義 某個(gè)頁面/網(wǎng)址的訪問數(shù)量 將每一條記錄根據(jù)網(wǎng)址解析出一條訪問量JavaPairRDD<String, Integer> ipwebrdd = rdd.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {//7.213.213.208 吉林 2018-03-29 1522294977303 1920936170939152672 www.dangdang.com LoginString[] ss = line.split("\t");return new Tuple2<String, Integer>(ss[5],1);}});//累加頁面訪問量JavaPairRDD<String, Integer> mapToPair = ipwebrdd.reduceByKey(new Function2<Integer, Integer, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {private static final long serialVersionUID = 1L;//換個(gè) 用于按照整數(shù)key排序 @Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> tuple)throws Exception {return tuple.swap();}}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> tuple)throws Exception {return tuple.swap();}});List<Tuple2<String, Integer>> list = mapToPair.take(5);for(Tuple2<String, Integer> t:list){System.out.println(t);}sc.stop();}}

結(jié)果:

(www.baidu.com,18791) (www.dangdang.com,18751) (www.suning.com,18699) (www.mi.com,18678) (www.taobao.com,18613)

?

2、什么是UV值
UV(unique visitor)即獨(dú)立訪客數(shù),指訪問某個(gè)站點(diǎn)或點(diǎn)擊某個(gè)網(wǎng)頁的不同IP地址的人數(shù)。在同一天內(nèi),UV只記錄第一次進(jìn)入網(wǎng)站的具有獨(dú)立IP的訪問者,在同一天內(nèi)再次訪問該網(wǎng)站則不計(jì)數(shù)。UV提供了一定時(shí)間內(nèi)不同觀眾數(shù)量的統(tǒng)計(jì)指標(biāo),而沒有反應(yīng)出網(wǎng)站的全面活動(dòng)。

?

package com.wjy.test;import java.util.List;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;public class Uv {/*** 根據(jù)IP網(wǎng)址來確定唯一用戶訪問 然后排重 累計(jì)* @param args*/public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("UV");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/pvuvdata");JavaPairRDD<String, Integer> rdd2 = rdd.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {String[] split = line.split("\t");return new Tuple2<String, Integer>(split[0]+"_"+split[5],1);}}).distinct().mapToPair(new PairFunction<Tuple2<String,Integer>, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<String, Integer> tuple)throws Exception {return new Tuple2<String, Integer>(tuple._1.split("_")[1],1);}});//累加JavaPairRDD<String, Integer> rdd3 = rdd2.reduceByKey(new Function2<Integer, Integer, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {private static final long serialVersionUID = 1L;//反轉(zhuǎn) 數(shù)值做KEY 用于排序 @Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> tuple)throws Exception {return tuple.swap();}}).sortByKey(false)//降序排序.mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {private static final long serialVersionUID = 1L;//排序之后 反轉(zhuǎn)回來 @Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> tuple)throws Exception {return tuple.swap();}});//取前5個(gè)元素List<Tuple2<String, Integer>> list = rdd3.take(5);for(Tuple2<String, Integer> t:list){System.out.println(t);}sc.stop();}}

結(jié)果:

(www.baidu.com,15830) (www.suning.com,15764) (www.mi.com,15740) (www.jd.com,15682) (www.dangdang.com,15641)

?

二、二次排序

對于兩列以上的數(shù)據(jù),要求對第一列排序之后,之后的列也要依次排序,思路就是:先對第一列進(jìn)行排序,對于第一列數(shù)值相同,再對第二列進(jìn)行排序。

舉例:

待排序數(shù)據(jù):secondSort.txt

3 1 5 2 6 5 8 123 1 4 4 123 5 432 3 54 5 121 8 654 3 98 package com.wjy.test;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;public class SecondSort{public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("SecondSort");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/secondSort.txt");//轉(zhuǎn)成K-V格式//PairFunction 入?yún)?-rdd的一行記錄 入?yún)? 入?yún)?是call的出參JavaPairRDD<SecondSortKey, String> mapToPair = rdd.mapToPair(new PairFunction<String, SecondSortKey, String>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<SecondSortKey, String> call(String line)throws Exception {String[] sps = line.split(" ");int first = Integer.valueOf(sps[0]);int second = Integer.valueOf(sps[1]);SecondSortKey ss = new SecondSortKey(first,second);return new Tuple2<SecondSortKey, String>(ss,line);}});//sortByKey 會使用key也就是SecondSortKey的compareTo方法mapToPair.sortByKey(false).foreach(new VoidFunction<Tuple2<SecondSortKey,String>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<SecondSortKey, String> tuple) throws Exception {System.out.println(tuple._2);}});
    
sc.stop();
    
    
}}

?

對于KEY自定義類型 實(shí)現(xiàn)comparable接口 實(shí)現(xiàn)comparTo方法

package com.wjy.test;import java.io.Serializable;public class SecondSortKey implements Serializable ,Comparable<SecondSortKey>{private static final long serialVersionUID = 1L;private int first;private int second;public SecondSortKey(int first,int second){super();this.first=first;this.second=second;}public int getFirst() {return first;}public void setFirst(int first) {this.first = first;}public int getSecond() {return second;}public void setSecond(int second) {this.second = second;}@Overridepublic int compareTo(SecondSortKey o) {//先比較第一個(gè)數(shù)值 如果相同再比較第二個(gè)值 否則直接返回第一個(gè)值的比較結(jié)果if (getFirst()-o.getFirst() == 0){return getSecond() - o.getSecond();}else{return getFirst()-o.getFirst();}}}

?排序結(jié)果:

8 654 8 123 6 5 5 432 5 121 5 2 4 123 3 98 3 54 3 1 1 4

?

三、分組取topN

對于多組數(shù)據(jù),去每一組數(shù)據(jù)前N個(gè)數(shù)據(jù),比如列出每個(gè)班級的前三名等等問題。
解決的思路:先分組,然后每一組排序,取前N個(gè)。
案例:有三個(gè)班級的分?jǐn)?shù)清單scores.txt,取出每班前三名。

class1 100 class2 85 class3 70 class1 102 class2 65 class1 45 class2 85 class3 70 class1 16 class2 88 class1 95 class2 37 class3 98 class1 99 class2 23

?groupByKey+排序算法:

package com.wjy.test;import java.util.Iterator;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;public class TopNtest {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("TopOs");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/scores.txt");//轉(zhuǎn)成K-V格式 方便下一步分組和排序//PairFunction 入?yún)?rdd的一行數(shù)據(jù) 入?yún)?、3是call的出參元素JavaPairRDD<String, Integer> mapToPair = rdd.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {String[] ss = line.split("\t");return new Tuple2<String, Integer>(ss[0],Integer.valueOf(ss[1]));}});//使用groupByKey 將相同班級的數(shù)據(jù)放在一個(gè)集合里mapToPair.groupByKey().foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception {String classname = tuple._1;Iterator<Integer> it = tuple._2.iterator();Integer[] top3 = new Integer[3];while(it.hasNext()){Integer score = it.next();//排序for (int i = 0; i < top3.length; i++) {if(top3[i] == null){top3[i] = score;break;}else if(score > top3[i]){for (int j = 2; j > i; j--) {top3[j] = top3[j-1];}top3[i] = score;break;}}}System.out.println("classname="+classname);for (Integer i:top3){System.out.println(i);}}}); sc.stop(); } }

?

topN 結(jié)果:

classname=class3 98 70 70 classname=class1 102 100 99 classname=class2 88 85 85

?

轉(zhuǎn)載于:https://www.cnblogs.com/cac2020/p/10684754.html

總結(jié)

以上是生活随笔為你收集整理的【Spark-core学习之九】 Spark案例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。