获取系统URL访问的前三名(通过Scala方式实现/通过Spark方式实现),Spark将URL访问日志进行分类并通过自定义Partitioner的方式将文件写入到不同分区上
生活随笔
收集整理的這篇文章主要介紹了
获取系统URL访问的前三名(通过Scala方式实现/通过Spark方式实现),Spark将URL访问日志进行分类并通过自定义Partitioner的方式将文件写入到不同分区上
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
1、創(chuàng)建Maven項(xiàng)目
創(chuàng)建的過(guò)程參考:http://blog.csdn.net/tototuzuoquan/article/details/74571374
2、準(zhǔn)備日志文件
url.log的內(nèi)容類(lèi)似:
3、編寫(xiě)UrlCount1,代碼如下:
通過(guò)scala的方式獲取日志文件中每類(lèi)次主機(jī)名出現(xiàn)的前3名
package cn.toto.sparkimport java.net.URLimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}/*** 獲取到每類(lèi)host出現(xiàn)的次數(shù)的前三名,下面通過(guò)sacle的方式實(shí)現(xiàn)* Created by toto on 2017/7/8.*/ object UrlCount1 {def main(args: Array[String]): Unit = {//使用local就是啟動(dòng)一個(gè)線程,local[2]表示啟動(dòng)2個(gè)線程,Local[*]表示根據(jù)機(jī)器來(lái)自動(dòng)分配val conf = new SparkConf().setAppName("UrlCount1").setMaster("local[2]")val sc = new SparkContext(conf)val lines:RDD[String] = sc.textFile(args(0))//splitval urlAndOne = lines.map(line =>{val fields = line.split("\t")val url = fields(1)//封裝成url,次數(shù)(url,1)})//聚合,計(jì)算某個(gè)url出現(xiàn)了多少次,所以要聚合一下,這里做了Cache,問(wèn)題是當(dāng)數(shù)據(jù)量很大的時(shí)候,可能出現(xiàn)內(nèi)存溢出val summedUrl = urlAndOne.reduceByKey(_+_).cache()println(summedUrl)//返回的是[(host,url,次數(shù))]這樣的元組//groupBy(_._1) 表示按照host進(jìn)行分組val grouped = summedUrl.map(t => {val host = new URL(t._1).getHost//主機(jī)名,url,次數(shù)(host,t._1,t._2)}).groupBy(_._1)println(grouped)//_ :表示上面的集合//toList :表示它轉(zhuǎn)化為集合//sortBy :這里是scala的集合//_._3 :表示按照次數(shù)進(jìn)行排序//.reverse.take(3) :表示取前3名val result = grouped.mapValues(_.toList.sortBy(_._3).reverse.take(3))println(result.collect().toBuffer)sc.stop()} }運(yùn)行參數(shù)配置:
運(yùn)行結(jié)果:
4、通過(guò)Spark的方式計(jì)算URL出現(xiàn)的前3名
代碼如下:
package cn.toto.sparkimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}/*** 使用SparkRDD的方式取出每個(gè)子Host的出現(xiàn)的次數(shù)的前3名,并循環(huán)打印出來(lái)。* Created by toto on 2017/7/8.*/ object UrlCount2 {/*** 使用了Spark的RDD緩存機(jī)制,這樣再進(jìn)行排序時(shí)不會(huì)出現(xiàn)內(nèi)存溢出* @param args*/def main(args: Array[String]): Unit = {//后續(xù)這些Url就從數(shù)據(jù)庫(kù)中獲取到val urls = Array("http://java.toto.cn","http://php.toto.cn","http://net.toto.cn")val conf = new SparkConf().setAppName("UrlCount1").setMaster("local[2]")val sc = new SparkContext(conf)val lines:RDD[String] = sc.textFile(args(0))//splitval urlAndOne = lines.map(line => {val fields = line.split("\t")val url = fields(1)//(url,次數(shù))(url,1)})//聚合val summedUrl = urlAndOne.reduceByKey(_+_)//循環(huán)過(guò)濾for(u <- urls) {//過(guò)濾(值過(guò)濾出urls這些的內(nèi)容)val insRdd = summedUrl.filter(t => {val url = t._1url.startsWith(u)})val result = insRdd.sortBy(_._2, false).take(3)println(result.toBuffer)}sc.stop()} }運(yùn)行參數(shù)配置:
運(yùn)行結(jié)果:
5、將url進(jìn)行篩選,分類(lèi),并通過(guò)自定義分區(qū)將數(shù)據(jù)存儲(chǔ)到不同的文件中
package cn.toto.sparkimport java.net.URLimport org.apache.spark.rdd.RDD import org.apache.spark.{Partitioner, SparkConf, SparkContext}import scala.collection.mutable/*** 自定義Partitioner,按照不同的子主機(jī)名存儲(chǔ)到不同的分區(qū)文件中* Created by toto on 2017/7/8.*/ object UrlCount3 {/*** 如果把每個(gè)學(xué)員單獨(dú)產(chǎn)生的內(nèi)容都寫(xiě)入到磁盤(pán)文件中* @param args*/def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("UrlCount1").setMaster("local[2]")val sc = new SparkContext(conf)val lines : RDD[String] = sc.textFile("E:\\workspace\\url.log")//splitval urlAndOne = lines.map(line => {val fields = line.split("\t")val url = fields(1)(url,1)})//聚合val summedUrl = urlAndOne.reduceByKey(_+_).cache()val rdd1 = summedUrl.map(t => {val host = new URL(t._1).getHost//(host,(url,出現(xiàn)次數(shù)))(host,(t._1,t._2))})val urls = rdd1.map(_._1).distinct().collect()val partitioner = new HostPartitioner(urls)//安裝自定義的分區(qū)器重新分區(qū)val partitionedRdd = rdd1.partitionBy(partitioner)val result = partitionedRdd.mapPartitions(it => {it.toList.sortBy(_._2._2).reverse.take(3).iterator})result.saveAsTextFile("E:\\workspace\\out")sc.stop()} }class HostPartitioner(urls: Array[String]) extends Partitioner {val rules = new mutable.HashMap[String,Int]()var index = 0for(url <- urls){rules.put(url,index)index += 1}override def getPartition(key: Any): Int = {val url = key.toString//如果取到了值就返回url,否則返回0rules.getOrElse(url,0)}//分區(qū)數(shù)量override def numPartitions: Int = urls.length }最終的輸出內(nèi)容是:
總結(jié)
以上是生活随笔為你收集整理的获取系统URL访问的前三名(通过Scala方式实现/通过Spark方式实现),Spark将URL访问日志进行分类并通过自定义Partitioner的方式将文件写入到不同分区上的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 港股恒生指数交易时间
- 下一篇: 荣耀v40微信视频美颜怎么设置 其它配置