日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

获取系统URL访问的前三名(通过Scala方式实现/通过Spark方式实现),Spark将URL访问日志进行分类并通过自定义Partitioner的方式将文件写入到不同分区上

發布時間:2024/9/27 windows 24 豆豆

1、創建Maven項目

創建的過程參考:http://blog.csdn.net/tototuzuoquan/article/details/74571374

2、準備日志文件

url.log的內容類似:

20160321101954 http://java.toto.cn/java/course/javaeeadvanced.shtml 20160321101954 http://java.toto.cn/java/course/javaee.shtml 20160321101954 http://java.toto.cn/java/course/android.shtml 20160321101954 http://java.toto.cn/java/video.shtml 20160321101954 http://java.toto.cn/java/teacher.shtml 20160321101954 http://java.toto.cn/java/course/android.shtml 20160321101954 http://php.toto.cn/php/teacher.shtml 20160321101954 http://net.toto.cn/net/teacher.shtml 20160321101954 http://java.toto.cn/java/course/hadoop.shtml 20160321101954 http://java.toto.cn/java/course/base.shtml 20160321101954 http://net.toto.cn/net/course.shtml 20160321101954 http://php.toto.cn/php/teacher.shtml 20160321101954 http://net.toto.cn/net/video.shtml 20160321101954 http://java.toto.cn/java/course/base.shtml 20160321101954 http://net.toto.cn/net/teacher.shtml 20160321101954 http://java.toto.cn/java/video.shtml 20160321101954 http://java.toto.cn/java/video.shtml

3、編寫UrlCount1,代碼如下:

通過scala的方式獲取日志文件中每類次主機名出現的前3名

package cn.toto.sparkimport java.net.URLimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}/*** 獲取到每類host出現的次數的前三名,下面通過sacle的方式實現* Created by toto on 2017/7/8.*/ object UrlCount1 {def main(args: Array[String]): Unit = {//使用local就是啟動一個線程,local[2]表示啟動2個線程,Local[*]表示根據機器來自動分配val conf = new SparkConf().setAppName("UrlCount1").setMaster("local[2]")val sc = new SparkContext(conf)val lines:RDD[String] = sc.textFile(args(0))//splitval urlAndOne = lines.map(line =>{val fields = line.split("\t")val url = fields(1)//封裝成url,次數(url,1)})//聚合,計算某個url出現了多少次,所以要聚合一下,這里做了Cache,問題是當數據量很大的時候,可能出現內存溢出val summedUrl = urlAndOne.reduceByKey(_+_).cache()println(summedUrl)//返回的是[(host,url,次數)]這樣的元組//groupBy(_._1) 表示按照host進行分組val grouped = summedUrl.map(t => {val host = new URL(t._1).getHost//主機名,url,次數(host,t._1,t._2)}).groupBy(_._1)println(grouped)//_ :表示上面的集合//toList :表示它轉化為集合//sortBy :這里是scala的集合//_._3 :表示按照次數進行排序//.reverse.take(3) :表示取前3名val result = grouped.mapValues(_.toList.sortBy(_._3).reverse.take(3))println(result.collect().toBuffer)sc.stop()} }

運行參數配置:

運行結果:


4、通過Spark的方式計算URL出現的前3名

代碼如下:

package cn.toto.sparkimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}/*** 使用SparkRDD的方式取出每個子Host的出現的次數的前3名,并循環打印出來。* Created by toto on 2017/7/8.*/ object UrlCount2 {/*** 使用了Spark的RDD緩存機制,這樣再進行排序時不會出現內存溢出* @param args*/def main(args: Array[String]): Unit = {//后續這些Url就從數據庫中獲取到val urls = Array("http://java.toto.cn","http://php.toto.cn","http://net.toto.cn")val conf = new SparkConf().setAppName("UrlCount1").setMaster("local[2]")val sc = new SparkContext(conf)val lines:RDD[String] = sc.textFile(args(0))//splitval urlAndOne = lines.map(line => {val fields = line.split("\t")val url = fields(1)//(url,次數)(url,1)})//聚合val summedUrl = urlAndOne.reduceByKey(_+_)//循環過濾for(u <- urls) {//過濾(值過濾出urls這些的內容)val insRdd = summedUrl.filter(t => {val url = t._1url.startsWith(u)})val result = insRdd.sortBy(_._2, false).take(3)println(result.toBuffer)}sc.stop()} }

運行參數配置:

運行結果:


5、將url進行篩選,分類,并通過自定義分區將數據存儲到不同的文件中

package cn.toto.sparkimport java.net.URLimport org.apache.spark.rdd.RDD import org.apache.spark.{Partitioner, SparkConf, SparkContext}import scala.collection.mutable/*** 自定義Partitioner,按照不同的子主機名存儲到不同的分區文件中* Created by toto on 2017/7/8.*/ object UrlCount3 {/*** 如果把每個學員單獨產生的內容都寫入到磁盤文件中* @param args*/def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("UrlCount1").setMaster("local[2]")val sc = new SparkContext(conf)val lines : RDD[String] = sc.textFile("E:\\workspace\\url.log")//splitval urlAndOne = lines.map(line => {val fields = line.split("\t")val url = fields(1)(url,1)})//聚合val summedUrl = urlAndOne.reduceByKey(_+_).cache()val rdd1 = summedUrl.map(t => {val host = new URL(t._1).getHost//(host,(url,出現次數))(host,(t._1,t._2))})val urls = rdd1.map(_._1).distinct().collect()val partitioner = new HostPartitioner(urls)//安裝自定義的分區器重新分區val partitionedRdd = rdd1.partitionBy(partitioner)val result = partitionedRdd.mapPartitions(it => {it.toList.sortBy(_._2._2).reverse.take(3).iterator})result.saveAsTextFile("E:\\workspace\\out")sc.stop()} }class HostPartitioner(urls: Array[String]) extends Partitioner {val rules = new mutable.HashMap[String,Int]()var index = 0for(url <- urls){rules.put(url,index)index += 1}override def getPartition(key: Any): Int = {val url = key.toString//如果取到了值就返回url,否則返回0rules.getOrElse(url,0)}//分區數量override def numPartitions: Int = urls.length }

最終的輸出內容是:

總結

以上是生活随笔為你收集整理的获取系统URL访问的前三名(通过Scala方式实现/通过Spark方式实现),Spark将URL访问日志进行分类并通过自定义Partitioner的方式将文件写入到不同分区上的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。