日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark学习五——归属地计算案例

發(fā)布時間:2023/12/20 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark学习五——归属地计算案例 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

cache方法

他可以將數(shù)據(jù)標記為cache,在觸發(fā)action的時候,會將數(shù)據(jù)緩存進內(nèi)存當中,并進行計算。被標記為cache的RDD第一次觸發(fā)action的時候,因為需要將數(shù)據(jù)緩存入內(nèi)存當中,時間會比平時慢。但是在后續(xù)需要運用該被標記RDD進行計算的時候,計算會快特別多,十分快。所以需要多次重復運用的數(shù)據(jù)的時候可以將其cache,極大提高效率。比如機器學算法的多次迭代什么的

cache的前提

  • 要求的計算速度快,

  • 集群的資源要足夠大 .

  • 重要:cache的數(shù)據(jù)會多次的觸發(fā)Action,這個時候需要緩存,沒這個前提用一次沒必要緩存

  • .先進行過濾,然后將縮小范圍的數(shù)據(jù)在cache到內(nèi)存,過濾拿掉一部分數(shù)據(jù)。

val cache =RDD.cache //將RDD進行緩存放不下只會放一部分,

將RDD釋放內(nèi)存

cache.uppersist(true) //Ture 表示異步(先釋放內(nèi)存,再執(zhí)行后續(xù)代碼),表是False同步(一邊釋放內(nèi)存,一邊執(zhí)行后續(xù)代碼)

cache 底層調(diào)的方法persist()

底層調(diào)用的是persist(),這個方法很靈活,里面可以傳參數(shù)

可以通過該StorageLevel.MEMORY_ONLY ,進行緩存磁盤和內(nèi)存,還可以組合緩存,甚至對數(shù)據(jù)序列化。序列化可以將數(shù)據(jù)壓縮,節(jié)省空間,但是會多花一點時間。

參數(shù)含義:
第一個參數(shù),放到磁盤
第二個參數(shù),放到內(nèi)存
第三個參數(shù),磁盤中的數(shù)據(jù),不是以java對象的方式保存
第四個參數(shù),內(nèi)存中的數(shù)據(jù),以java對象的方式保存

val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)

帶2是可以存副本,防止丟失

Checkpoint

在做了復雜的計算后,可以將數(shù)據(jù)Checkphoint,然后存入HDFS當中,保證數(shù)據(jù)安全。

1.迭代計算,保證數(shù)據(jù)安全
2.對速度的要求不高(相對于cache至內(nèi)存當中)
3.中間結(jié)果存入HDFS

步驟

1.設置checkpoint的目錄,通常是HDFS文件系統(tǒng)的目錄
2.經(jīng)過復雜計算得到中間結(jié)果
3.將中間結(jié)果checkpoint緩存到HDFS中
4后續(xù)的計算可以使用之前checkpoint的數(shù)據(jù)

val sc =SparkContext(conf) //設置checkpoint的目錄 sc.setCheckpointDir("hdfs://node-4:") //對數(shù)據(jù)進行checkpoint RDD.checkpoint()

歸屬地計算案例

首先將URL中的ip地址給提取出來,然后將IP地址根據(jù)ip規(guī)則轉(zhuǎn)化成歸屬地,然后計算每個城市的用戶數(shù)量。

URL

每條URL當中包含了一名顧客的許多信息,比如ip地址,操作系統(tǒng)等等

20090121000133331104000|123.197.66.93|www.pkwutai.cn|/down/downLoad-id-45383.html|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 1.7)|http://www.baidu.com/s?tn=b1ank_pg&ie=gb2312&bs=%C3%C0%C6%BC%B7%FE%D7%B0%B9%DC%C0%ED%C8%ED%BC%FE&sr=&z=&cl=3&f=8&wd=%C6%C6%BD%E2%C3%C0%C6%BC%B7%FE%D7%B0%B9%DC%C0%ED%C8%ED%BC%FE&ct=0|

ip規(guī)則:

1.0.1.0|1.0.3.255|16777472|16778239|亞洲|中國|福建|福州||電信|350100|China|CN|119.306239|26.075302

規(guī)則解釋:第一第二個字段是ip的起始和結(jié)束,然后第三第四就是ip起始和結(jié)束轉(zhuǎn)化為十進制的形式,后面就是對應的地址。

將URL中提取ip,將ip轉(zhuǎn)化為十進制,根據(jù)規(guī)則進行匹配,在該城市的ip區(qū)間,則將其城市輸出。

二分法查找

二分法查找針對的是一個有序的數(shù)據(jù)集合,每次通過與區(qū)間的中間元素對比,將待查找的區(qū)間縮小為之前的一半,直到找到要查找的元素,或者區(qū)間被縮小為0

二分查找非常高效,假設數(shù)據(jù)大小是n,每次查找后數(shù)據(jù)都會縮小為原來的一半,也就是會除以2,最壞情況下,直到查找區(qū)間被縮小為空,才停止

單機計算ip地址

要求

需求:根據(jù)訪問日志的ip地址計算出訪問者的歸屬地,并且按照省份,計算出訪問次數(shù),然后將計算好的結(jié)果寫入到SQL。
步驟:

1,整理數(shù)據(jù),切分出ip字段,然后將ip地址轉(zhuǎn)換成十進制
2然后將數(shù)據(jù)緩存到內(nèi)存中( Executors中的內(nèi)存中)
3,將訪問log和ip規(guī)則進行匹配(二分法查找)
4.取出對份名稱,然后將其和一組合在一起
5.按省份名進行累合
6.將累合后的數(shù)據(jù)寫入到MySQL中

package cn.edu360.day4import java.sql.{Connection, DriverManager, PreparedStatement}import scala.io.{BufferedSource, Source}/*** Created by zx on 2017/10/9.*/object MyUtils {// 將URL中的ip地址給提取出來def ip2Long(ip: String): Long = {val fragments = ip.split("[.]")var ipNum = 0Lfor (i <- 0 until fragments.length){ipNum = fragments(i).toLong | ipNum << 8L}ipNum}//根據(jù)ip規(guī)則,從中提取出ip的起始和結(jié)尾將其轉(zhuǎn)化十進制的起始和結(jié)尾,并且提取出來,還有省份def readRules(path: String): Array[(Long, Long, String)] = {//讀取ip規(guī)則//從path中讀取文件val bf: BufferedSource = Source.fromFile(path) //取出每一行數(shù)據(jù)val lines: Iterator[String] = bf.getLines()//對ip規(guī)則進行整理,并放入到內(nèi)存val rules: Array[(Long, Long, String)] = lines.map(line => {val fileds = line.split("[|]")val startNum = fileds(2).toLongval endNum = fileds(3).toLongval province = fileds(6)(startNum, endNum, province)}).toArray //將它轉(zhuǎn)成數(shù)組,則會放入內(nèi)存當中rules}//二分法查找def binarySearch(lines: Array[(Long, Long, String)], ip: Long) : Int = {var low = 0var high = lines.length - 1while (low <= high) {val middle = (low + high) / 2if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))return middleif (ip < lines(middle)._1)high = middle - 1else {low = middle + 1}}-1 //如果沒有查找到便輸出-1}//將其存入sqldef data2MySQL(it: Iterator[(String, Int)]): Unit = {//一個迭代器代表一個分區(qū),分區(qū)中有多條數(shù)據(jù)//一個分區(qū)建立,獲得一個JDBC連接val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "123568")//將數(shù)據(jù)通過Connection寫入到數(shù)據(jù)庫val pstm: PreparedStatement = conn.prepareStatement("INSERT INTO access_log VALUES (?, ?)")//將分區(qū)中的數(shù)據(jù)一條一條寫入到MySQL中it.foreach(tp => {pstm.setString(1, tp._1)pstm.setInt(2, tp._2)pstm.executeUpdate()})//將分區(qū)中的數(shù)據(jù)全部寫完之后,在關閉連接if(pstm != null) {pstm.close()}if (conn != null) {conn.close()}}def main(args: Array[String]): Unit = {//數(shù)據(jù)是在內(nèi)存中val rules: Array[(Long, Long, String)] = readRules("/Users/zx/Desktop/ip/ip.txt")//將ip地址轉(zhuǎn)換成十進制val ipNum = ip2Long("114.215.43.42")//查找val index = binarySearch(rules, ipNum)//根據(jù)腳本到rules中查找對應的數(shù)據(jù)val tp = rules(index)val province = tp._3println(province)} }

分布式處理計算ip地址1


步驟:

  • 整理數(shù)據(jù),切分出ip字段,然后將ip地址轉(zhuǎn)換成十進制

  • 然后將數(shù)據(jù)緩存到內(nèi)存中( Executors中的內(nèi)存中)

  • 然后使用廣播變量,將Drive端的數(shù)據(jù)廣播到Executor中

  • 將訪問log和ip規(guī)則進行匹配(二分法查找),得出對應的城市

  • 取出對份名稱,然后將其和一組合在一起

  • 按省份名進行累合

  • 將累合后的數(shù)據(jù)寫入到MySQL中

  • package cn.edu360.day4import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object IpLoaction1 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("IpLoaction1").setMaster("local[4]")val sc = new SparkContext(conf)//在Driver端獲取到全部的IP規(guī)則數(shù)據(jù)(全部的IP規(guī)則數(shù)據(jù)在某一臺機器上,跟Driver在同一臺機器上)//全部的IP規(guī)則在Driver端了(在Driver端的內(nèi)存中了)//arg(0)存入的規(guī)則的存儲地址val rules: Array[(Long, Long, String)] = MyUtils.readRules(args(0))//將Drive端的數(shù)據(jù)廣播到Executor中//調(diào)用sc上的廣播方法//廣播變量的引用(還在Driver端)val broadcastRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(rules)//創(chuàng)建RDD,讀取訪問日志,從HDFS中讀取,arg(1)存其存儲地址val accessLines: RDD[String] = sc.textFile(args(1))//這個函數(shù)是在哪一端定義的?(Driver)val func = (line: String) => {val fields = line.split("[|]")val ip = fields(1)//將ip轉(zhuǎn)換成十進制val ipNum = MyUtils.ip2Long(ip)//進行二分法查找,通過Driver端的引用或取到Executor中的廣播變量//(該函數(shù)中的代碼是在Executor中別調(diào)用執(zhí)行的,通過廣播變量的引用,就可以拿到當前Executor中的廣播的規(guī)則了)val rulesInExecutor: Array[(Long, Long, String)] = broadcastRef.value//查找var province = "未知"val index = MyUtils.binarySearch(rulesInExecutor, ipNum)if (index != -1) {province = rulesInExecutor(index)._3}(province, 1)}//整理數(shù)據(jù)val proviceAndOne: RDD[(String, Int)] = accessLines.map(func)//聚合//val sum = (x: Int, y: Int) => x + yval reduced: RDD[(String, Int)] = proviceAndOne.reduceByKey(_+_)//將結(jié)果打印val r = reduced.collect()println(r.toBuffer)sc.stop()} }

    分布式處理計算ip地址2

    與上面的區(qū)別是,當ip規(guī)則過大的時候可以考慮將數(shù)據(jù)存儲至HDFS中,用spark將數(shù)據(jù)讀取至excutor當中,收集到driver,再次廣播至各個excutor。

  • 讀取存儲在HDFS中的IP規(guī)則

  • 然后對ip規(guī)則進行處理

  • 將分散在多個Executor中的部分IP規(guī)則收集到Driver端(collect)

  • 將Driver端的數(shù)據(jù)廣播到Executo

  • 整理數(shù)據(jù),切分出ip字段,然后將ip地址轉(zhuǎn)換成十進制

  • 將訪問log和ip規(guī)則進行匹配(二分法查找),得出對應的城市

  • 取出對份名稱,然后將其和一組合在一起

  • 按省份名進行累合

  • 將累合后的數(shù)據(jù)寫入到MySQL中

  • package cn.edu360.day4import java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object IpLoaction2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("IpLoaction1").setMaster("local[4]")val sc = new SparkContext(conf)//取到HDFS中的ip規(guī)則val rulesLines:RDD[String] = sc.textFile(args(0))//整理ip規(guī)則數(shù)據(jù)val ipRulesRDD: RDD[(Long, Long, String)] = rulesLines.map(line => {val fields = line.split("[|]")val startNum = fields(2).toLongval endNum = fields(3).toLongval province = fields(6)(startNum, endNum, province)})//將分散在多個Executor中的部分IP規(guī)則收集到Driver端val rulesInDriver: Array[(Long, Long, String)] = ipRulesRDD.collect()//將Driver端的數(shù)據(jù)廣播到Executor//廣播變量的引用(還在Driver端)val broadcastRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(rulesInDriver)//創(chuàng)建RDD,讀取訪問日志val accessLines: RDD[String] = sc.textFile(args(1))//整理數(shù)據(jù)val proviceAndOne: RDD[(String, Int)] = accessLines.map(log => {//將log日志的每一行進行切分val fields = log.split("[|]")val ip = fields(1)//將ip轉(zhuǎn)換成十進制val ipNum = MyUtils.ip2Long(ip)//進行二分法查找,通過Driver端的引用或取到Executor中的廣播變量//(該函數(shù)中的代碼是在Executor中別調(diào)用執(zhí)行的,通過廣播變量的引用,就可以拿到當前Executor中的廣播的規(guī)則了)//Driver端廣播變量的引用是怎樣跑到Executor中的呢?//Task是在Driver端生成的,廣播變量的引用是伴隨著Task被發(fā)送到Executor中的val rulesInExecutor: Array[(Long, Long, String)] = broadcastRef.value//查找var province = "未知"val index = MyUtils.binarySearch(rulesInExecutor, ipNum)if (index != -1) {province = rulesInExecutor(index)._3}(province, 1)})//聚合//val sum = (x: Int, y: Int) => x + yval reduced: RDD[(String, Int)] = proviceAndOne.reduceByKey(_+_)//將結(jié)果打印//val r = reduced.collect()//println(r.toBuffer)//該方法寫入sql,每行調(diào)用一次jdbc連接,浪費資源/**reduced.foreach(tp => {//將數(shù)據(jù)寫入到MySQL中//問?在哪一端獲取到MySQL的鏈接的?//是在Executor中的Task獲取的JDBC連接val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?charatorEncoding=utf-8", "root", "123568")//寫入大量數(shù)據(jù)的時候,有沒有問題?val pstm = conn.prepareStatement("...")pstm.setString(1, tp._1)pstm.setInt(2, tp._2)pstm.executeUpdate()pstm.close()conn.close()})*///一個分區(qū)調(diào)用建立一次JDBC連接,比較合理//一次拿出一個分區(qū)(一個分區(qū)用一個連接,可以將一個分區(qū)中的多條數(shù)據(jù)寫完在釋放jdbc連接,這樣更節(jié)省資源) // reduced.foreachPartition(it => { // val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "123568") // //將數(shù)據(jù)通過Connection寫入到數(shù)據(jù)庫 // val pstm: PreparedStatement = conn.prepareStatement("INSERT INTO access_log VALUES (?, ?)") // //將一個分區(qū)中的每一條數(shù)據(jù)拿出來 // it.foreach(tp => { // pstm.setString(1, tp._1) // pstm.setInt(2, tp._2) // pstm.executeUpdate() // }) // pstm.close() // conn.close() // })reduced.foreachPartition(it => MyUtils.data2MySQL(it))sc.stop()} }

    總結(jié)

    以上是生活随笔為你收集整理的spark学习五——归属地计算案例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。