日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark 车流量项目实战

發布時間:2023/12/10 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark 车流量项目实战 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

spark 車流量項目實戰

這里目錄標題

  • spark 車流量項目實戰
    • 1、車流向項目介紹
      • 1.1. 數據采集
      • 1.2. 模塊介紹
        • 1.2.1. 卡扣流量分析模塊介紹
      • 1.3. 項目架構介紹
      • 1.4. 數據介紹
        • 1.4.1. 基本概念
        • 1.4.2. 表
      • 1.5. 需求分析
    • 2、數據流程
    • 3、spark任務
      • 如何將Spark任務提交到集群運行?
    • 4、模塊功能
      • 4.1、卡口流量分析
        • 4.1.1. 卡口狀態監控
          • 4.1.1.1. 統計卡口壞攝像頭
          • 4.1.1.2. 統計每個區域車流量
          • 4.1.1.3. 統計每個區域的攝像頭
          • 4.1.1.5 卡口流量分析 代碼解析
        • 4.1.2. 區域車流量Top3及其速度
        • 4.1.3. 區域中高低速數量
        • 4.1.3. 指定卡口對應卡口車輛軌跡
      • 4.2、行車軌跡
        • 4.2.1. 車輛行車軌跡
        • 4.2.2. 車輛套牌
        • 4.2.3. 車輛抽樣--蓄水池抽樣算法
        • 4.2.4. 道路轉換率
      • 4.3、區域道路流量Top3
        • 4.3.1 RDD解決
        • 4.3.2. Java連接Hive---SQL解決
      • hive中導入模擬數據
      • 4.4、Streaming 實時
        • 4.4.1. 道路實時擁堵情況 --kafka
        • 4.4.2. 動態改變廣播變量(布控)

1、車流向項目介紹

1.1. 數據采集

數據從哪兒來?

  • 我們知道數據來來源,比如網站,APP或者工業設備(比如卡口拍攝設備)實現實時數據采集,它首先有非常重要的一點就是所謂的埋點,也就是說,埋點,在網站的哪個頁面哪些操作發生時,前端的代碼比如javascript或者app android/ios,就通過網絡請求Ajax; socket向后端的服務器發送日志數據。
  • 如果是卡口信息,那么每次拍攝的信息都會傳輸到服務器端。
  • 首先就是說網站或者頁面設置埋點,那么就是你要跟前端的開發人員約定好,在哪些頁面哪些操作發生的時候,網站的話就通過ajax引擎,APP的話就通過Socket網絡請求,向后端的服務器發送指定格式的日志數據。卡口數據的話,是和廠商定制數據格式的,數據以指定的格式向服務器發送實時的數據。
  • 接著通過Flume監控指定的文件夾,轉移到HDFS里面去,實際大多數是放在Hive中因為Hive還有計算的能力,還有另外一條流程,實時數據,通常都是從分布式消息隊列集群中讀取的,比如Kafka,實時的log,實時的寫入消息隊列中,然后再由我們后端實時數據處理程序(storm、spark streaming),實時從kafka中讀取數據,log日志
  • 數據除了從Flume中來,也有可能直接使用kafka 的producer角色往kafka中直接生產數據。
  • 接下來就是大數據實時計算系統,比如說用storm、spark streaming開發的,可以實時的從kafka中拉取數據,然后對實時的數據進行處理和計算,這里可以有非常復雜的業務邏輯,甚至調用復雜的機器學習,數據挖掘,智能推薦的算法!然后實現實時的車輛調度,實時推薦等等。
  • 1.2. 模塊介紹

    • 卡扣流量分析 Spark Core
    • 卡扣車流量轉化率 Spark Core
    • 各區域車流量最高top5的道路統計 SparkSQL
    • 稽查布控,道路實時擁堵統計 SparkStreaming

    1.2.1. 卡扣流量分析模塊介紹

    根據使用者(平臺使用者)指定的某些條件,篩選出指定的一批卡扣信息(比如根據區域、時間篩選)

    檢測卡扣狀態,對于篩選出來的所有的卡口(不代表一個攝像頭)信息統計

  • 卡口正常數
  • 異常數
  • camera的正常數
  • camera的異常數
  • camera的詳細信息(monitor_id:camera_id)
  • 車流量最多的TonN卡扣號,延伸獲取每一個卡扣的詳細信息(Top5 )
  • 隨機抽取N個車輛信息,對這些數據可以進行多維度分析(因為隨機抽取出來的N個車輛信息可以很權威的代表整個區域的車輛)
  • 計算出經常高速通過的TopN卡口 (查看哪些卡扣經常被高速通過,高速,中速,正常,低速 根據三個速度段進行四次排序,高速通過的車輛數相同就比較中速通過的車輛數,以此來推)
  • 1.3. 項目架構介紹

    使用架構

    J2EE平臺,前端頁面,在頁面中可以指定任務類型,提交任務的參數(比如時間范圍,區域設定)平臺會接受到用戶的提交請求,會調用底層封裝的Spark-submit的shell腳本,怎么調用?運行的作業可以獲取到用戶指定的篩選條件,然后根據篩選條件進行計算。Spark任務的計算結果會寫入到數據庫中,比如MySQL,Redis等
    最后J2EE平臺可以通過前端頁面,展示結果(表格或者圖表的方式展示數據庫中的結果)。

    1.4. 數據介紹

    1.4.1. 基本概念

    卡扣號:在一條道路相同位置會有兩個卡扣,這兩個卡扣的編號是不同的,分別拍攝不同方向的車輛

    攝像頭編號:每一個卡扣拍攝的是一個方向的車輛,每一個方向都會有多個不同的車道,每一個車道對應一個攝像頭,所以卡扣號與攝像頭的對應關系是一對多的關系。

    1.4.2. 表

    monitor_flow_action表監控到的車流信息表
    date日期 單位:天
    monitor_id卡口號
    camera_id攝像頭編號
    car車牌
    action_time某個攝像頭拍攝時間 單位:秒
    speed通過卡扣的速度
    road_id道路id
    area_id區域ID
    monitor_camera_info表每個卡扣對應的攝像頭編號(標準表)
    monitor_id卡扣編號
    camera_id攝像頭編號

    具體內容見建表語句。

    1.5. 需求分析

    • 按條件篩選卡扣信息
      • 可以指定 不同的條件,時間范圍、區域范圍、卡扣號等 可以靈活的分析不同區域的卡扣信息
    • 監測卡扣狀態
      • 對符合條件的卡扣信息,可以動態的檢查每一個卡扣的狀態,查看卡扣是否正常工作,也可以查看攝像頭
    • 車流量最多的TonN卡扣
      • 查看哪些卡扣的車流量最高,為什么會出現這么高的車流量。分析原因,例如今天出城的車輛非常多,啥原因,今天進城的車輛非常多,啥原因? 集會還是聚集? 這個功能點里面也會拿到具體的車輛的信息,分析一下本地車牌造成的還是外地車牌?
    • 在符合條件的卡扣信息中隨機抽取N個車輛信息
      • 隨機抽取N輛車的信息,可以權威的代表整個區域的車輛,這時候可以分析這些車的軌跡,看一下在不同的時間點車輛的流動方向。以便于道路的規劃。
    • 計算出經常高速通過的TopN卡口
      • 統計出是否存在飆車現象,或者經常進行超速行駛,可以在此處安裝違章拍攝設備

    2、數據流程

    數據處理流程:

    公司有集群沒有數據 分布式爬取數據,多節點爬取數據,一般將數據爬取到flume中,或者將數據直接爬取放入HDFS中。 公司有集群有數據 每天每時每刻在產生數據,數據直接清洗放在HBase或者HDFS中。或者日志數據直接使用flum導入分布式文件中。

    一般有了數據之后又分為兩個大的方向處理數據:

  • 假設數據放在了HDFS集群中之后,一般下一步就要清洗數據,可以將數據通過Hive清洗,當然這里Hive一般使用外表,這樣做的目的是可以將相同的數據只在HDFS中存入一份,避免過多的重復數據。清洗完成的數據一般又會放入Hive表中或者以結構化的數據放在HDFS上。得到清洗后的數據后一般會使用MR或者使用Spark來對數據進行分析處理,也可以對清洗后的數據使用SparkSQL來進行處理分析。之后,將分析完成的數據放入數據庫中,如Redis,Mysql,Oracle中,供前端查詢展示。
  • 如果數據放入了flume中,一般將數據sink到kafka中,不同數據的種類放入不同的topic中。然后對打入kafka中的數據進行流式處理,一般可以使用storm或者SparkStreaming對數據進行清洗,分析處理,然后將結果放到數據庫中,如Redis,Mysql,Oracle中,以供前端頁面來查詢展示。
  • 3、spark任務

    如何將Spark任務提交到集群運行?

    最次也是腳本化執行Spark任務。

    平臺化提交Spark任務。流程圖如下:

    submit后先將數據存入Mysql中,task當做唯一主鍵,這樣做是為了簡化任務執行失敗時,可以直接在數據庫中查詢之前的提交的業務參數,當任務失敗后,下次retry時方便執行。

    submit后可以使用java調用liunx系統腳本,通過taskId得到系統中的業務參數數據。

    注意: 假如使用tomcat實現平臺化,那么tomcat應該部署在客戶端。

    l java代碼中如何執行liunx腳本?

    Process proc = Runtime.**getRuntime**().exec(“sh 腳本”); proc.waitFor();

    4、模塊功能

    4.1、卡口流量分析

    • 全部使用SparkCore實現。

    4.1.1. 卡口狀態監控

    4.1.1.1. 統計卡口壞攝像頭
    def main(args: Array[String]): Unit = {//獲取數據源val sparkSession = ContextUtils.getSparkSession("Hello01MonitorState")//讀取數據MockDataUtil.mock2view(sparkSession)//------------------------------統計卡口攝像頭通過的車輛的合計----------------------------import sparkSession.implicits._//開始讀取數據val dataFrame: DataFrame = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-19' ")//開始操作val mcRdd: RDD[(String, Int)] = dataFrame.map(ele => Tuple2(ele.getString(1) + ":" + ele.getString(2), 1)).rdd//開始進行合并val flowRdd: RDD[(String, Int)] = mcRdd.reduceByKey(_ + _)//------------------------------統計卡口所有的攝像頭----------------------------val cameraDataFrame = sparkSession.sql("select * from " + MockDataUtil.MONITOR_CAMERA_ACTION)val cameraRdd: RDD[(String, Int)] = cameraDataFrame.map(ele => ((ele.getString(0) + ":" + ele.getString(1)), 1)).rdd//------------------------------合并車流量和攝像頭RDD----------------------------val allRDD: RDD[(String, (Option[Int], Int))] = flowRdd.rightOuterJoin(cameraRdd).filter(ele => ele._2._1.isEmpty)allRDD.foreach(println)}

    4.1.1.2. 統計每個區域車流量
    def main(args: Array[String]): Unit = {//獲取數據源val sparkSession = ContextUtils.getSparkSession("Hello02MonitorFlowCount")//讀取數據MockDataUtil.mock2view(sparkSession)//------------------------------統計卡口通過的車輛的合計----------------------------import sparkSession.implicits._//開始讀取數據val dataFrame: DataFrame = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ")//開始操作val mcRdd: RDD[(String, Int)] = dataFrame.map(ele => Tuple2(ele.getString(1), 1)).rdd//開始進行合并val flowRdd: RDD[(String, Int)] = mcRdd.reduceByKey(_ + _)flowRdd.foreach(println)}

    4.1.1.3. 統計每個區域的攝像頭
    def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("Hello03MonitorStateAnalyze")MockDataUtil.mock2view(sparkSession)//---------------------開始操作車流量信息,假設任務編號為1 日期參數為今天val flowInfo: RDD[(String, String)] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd.map(row => (row.getString(1), row)).groupByKey().map(ele => {val monitorId: String = ele._1val cameraIdSet = new mutable.HashSet[String]()ele._2.foreach(row => cameraIdSet.add(row.getString(2)))//拼接字符串val info: String = Constants.FIELD_MONITOR_ID + "=" + monitorId + "|" + Constants.FIELD_AREA_ID + "=浦東新區|" + Constants.FIELD_CAMERA_IDS + "=" + cameraIdSet.mkString("-") + "|" + Constants.FIELD_CAMERA_COUNT + "=" + cameraIdSet.size + "|" + Constants.FIELD_CAR_COUNT + "=" + ele._2.size//返回結果(monitorId, info)})//-----------------------開始操作攝像頭數據val monitorInfo: RDD[(String, String)] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_CAMERA_ACTION).rdd.map(row => (row.getString(0), row.getString(1))).groupByKey().map(ele => {val monitorId: String = ele._1//拼接字符串val info: String = Constants.FIELD_CAMERA_IDS + "=" + ele._2.toList.mkString("-") + "|" + Constants.FIELD_CAMERA_COUNT + "=" + ele._2.size//返回結果(monitorId, info)})//-----------------------將數據Join到一起monitorInfo.leftOuterJoin(flowInfo).foreach(println)}

    4.1.1.5 卡口流量分析 代碼解析

    4.1.2. 區域車流量Top3及其速度

    • 區域車流量top3
    def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("AreaTop3Road")MockDataUtil.mock2view(sparkSession)//開始計算val fRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rddfRdd.map(row => (row.getString(7) + "_" + row.getString(6) + "&" + (Math.random() * 30 + 10).toInt, 1)).reduceByKey(_ + _).map(ele => {val area_road_random = ele._1val count = ele._2(area_road_random.split("_")(0), area_road_random.split("_")(1).split("&")(0) + "_" + count)}).groupByKey().map(ele => {val map = new mutable.HashMap[String, Int]()ele._2.foreach(e => {val key = e.split("_")(0)val value = e.split("_")(1).toIntmap.put(key, map.get(key).getOrElse(0) + value)})"區劃【" + ele._1 + "】車輛最多的三條道路分別為:" + map.toList.sortBy(_._2).takeRight(3).reverse.mkString("-")}).foreach(println)}

    • 區域各路速度
    def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("AreaTop3Speed")MockDataUtil.mock2view(sparkSession)val sRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rddsRdd.map(e=>{((e.getString(7),e.getString(6)),e.getString(5).toInt)}).groupByKey().map(e=>{val list: List[Int] = e._2.toListval i: Int = list.sum/list.size(e._1._1,(e._1._2,i))}).groupByKey().map(e=>{val tuples = e._2.toList.sortBy(_._2).reverse.take(3)var strBui: StringBuilder = new StringBuilderfor (i <- tuples ){val str: String = i._1 + "-均速度為:" + i._2strBui.append(">>>"+str)}(e._1,strBui)}).foreach(println)}

    4.1.3. 區域中高低速數量

    object Hello04MonitorTopNSpeed {def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("Hello04MonitorTopNSpeed")MockDataUtil.mock2view(sparkSession)//---------------------開始操作車流量信息,假設任務編號為1 日期參數為今天val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-20' ").rddval monitor2speedRDD: RDD[(String, Iterable[String])] = flowRdd.map(row => (row.getString(1), row.getString(5))).groupByKey()val speedCount2monitorRDD: RDD[(SpeedCount, String)] = monitor2speedRDD.map(ele => {//獲取卡口號val monitorId: String = ele._1//聲明一個Map[0,60,100,120]var high = 0;var normal = 0;var low = 0;//獲取所有的速度的車輛技術ele._2.foreach(speed => {//判斷速度if (speed.toInt > 100) {high += 1} else if (speed.toInt > 60) {normal += 1} else {low += 1}})//創建速度對象(SpeedCount(high, normal, low), monitorId)})speedCount2monitorRDD.sortByKey(false).map(x => (x._2, x._1)).foreach(println)} }case class SpeedCount(high: Int, normal: Int, low: Int) extends Ordered[SpeedCount] with KryoRegistrator {override def compare(that: SpeedCount): Int = {var result = this.high - that.highif (result == 0) {result = this.normal - that.normalif (result == 0) {result = this.low - that.low}}return result}override def registerClasses(kryo: Kryo): Unit = {kryo.register(SpeedCount.getClass)} }

    4.1.3. 指定卡口對應卡口車輛軌跡

    def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("Hello04MonitorTopNSpeed")MockDataUtil.mock2view(sparkSession)//獲取數據val area01Rdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' and area_id = '01' ").rddval area02Rdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' and area_id = '02' ").rddval area01CarRdd = area01Rdd.map(row => (row.getString(3), row.getString(7))).groupByKey()val area02CarRdd = area02Rdd.map(row => (row.getString(3), row.getString(7))).groupByKey()area01CarRdd.join(area02CarRdd).foreach(println)}

    4.2、行車軌跡

    4.2.1. 車輛行車軌跡

    def main(args: Array[String]): Unit = { val sparkSession = ContextUtils.getSparkSession("AreaCar") MockDataUtil.mock2view(sparkSession)//查詢 車子行駛軌跡 跟車分析 val c1Rdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd val carRdd: RDD[(String, StringBuilder)] = c1Rdd.map(e => { (e.getString(3), (e.getString(4), e.getString(6), e.getString(2))) }).groupByKey() .map(e => { val tuples: List[(String, String, String)] = e._2.toList.sortBy(_._1) val list = new StringBuilder for (i <- tuples) { //println(i) val str: String = i._2 + ":" + i._3 list.append(str + "-") } (e._1, list) }) //carRdd.foreach(println) }

    4.2.2. 車輛套牌

    def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("AreaCar")MockDataUtil.mock2view(sparkSession) //假設任何的卡口距離都是 10分鐘車程 ,如果同一分鐘出現在不同的卡口就懷疑是套牌val deckRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdddeckRdd.map(e => {val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")(e.getString(3), (dateFormat.parse(e.getString(4)),e.getString(1)))}).groupByKey(1).map(e => {val list: List[(util.Date, String)] = e._2.toList.sortBy(x=>x._1)var bool = falsevar d: util.Date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-23 00:00:00")var mid="?"for (i <- list) {if (d.getTime - i._1.getTime < 600000 && i._2!=mid )bool = trued = i._1mid=i._2}(e._1, bool)}).filter(f => f._2).foreach(println)}

    4.2.3. 車輛抽樣–蓄水池抽樣算法

    def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("Hello04MonitorTopNSpeed")MockDataUtil.mock2view(sparkSession)//獲取數據val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-21' ").rdd//yyyy-MM-dd_HH , rowval hourRDD: RDD[(String, Row)] = flowRdd.map(row => (DateUtils.getDateHour(row.getString(4)), row))//車流量的總數,并進行廣播val flowAllCount: Long = hourRDD.count()val broadcastFlowAllCount: Broadcast[Long] = sparkSession.sparkContext.broadcast(flowAllCount)//計算每個小時的比例 并進行廣播val hourRatio: collection.Map[String, Double] = hourRDD.countByKey().map(e => {(e._1, e._2 * 1.0 / broadcastFlowAllCount.value)})val broadcastHourRatio: Broadcast[collection.Map[String, Double]] = sparkSession.sparkContext.broadcast(hourRatio)//開始進行抽樣val sampleRDD: RDD[Row] = hourRDD.groupByKey().flatMap(ele => {val hour: String = ele._1val list: List[Row] = ele._2.iterator.toList//計算本時段要抽樣的數據量val sampleRatio: Double = broadcastHourRatio.value.get(hour).getOrElse(0)val sampleNum: Long = Math.round(sampleRatio * 100)//開始進行取樣(蓄水池抽樣)val sampleList: ListBuffer[Row] = new ListBuffer[Row]()sampleList.appendAll(list.take(sampleNum.toInt))for (i <- sampleNum until list.size) {//隨機生成一個數字val num = (Math.random() * list.size).toIntif (num < sampleNum) {sampleList.update(num, list(i.toInt))}}sampleList})sampleRDD.foreach(println)}

    4.2.4. 道路轉換率

    def main(args: Array[String]): Unit = {//創建會話val sparkSession = ContextUtils.getSparkSession("Hello07MonitorConvertRatio")MockDataUtil.mock2view(sparkSession)//開始計算val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd//計算每個卡口的總通車量val monitorCountMap: collection.Map[String, Long] = flowRdd.map(row => (row.getString(1), row)).countByKey()//計算卡口到卡口的通行率val sortRDD: RDD[(String, List[Row])] = flowRdd.map(row => (row.getString(3), row)).groupByKey().map(ele => (ele._1, ele._2.iterator.toList.sortBy(_.getString(4))))val m2mMap: collection.Map[String, Long] = sortRDD.flatMap(ele => {//存放映射關系val map: mutable.HashMap[String, Int] = mutable.HashMap[String, Int]()val list: List[Row] = ele._2.toListfor (i <- 0 until list.size; j <- i + 1 until list.size) {//拼接Keyval key = list(i).getString(1) + "->" + list(j).getString(1)map.put(key, map.get(key).getOrElse(0) + 1);}//返回結果map.toList}).countByKey()//開始進行計算m2mMap.foreach(ele => {println("卡口[" + ele._1 + "]的轉換率為:" + ele._2.toDouble / monitorCountMap.get(ele._1.split("->")(0)).get)}) }

    4.3、區域道路流量Top3

    • 數據傾斜問題
      • key添加后綴擴組,減小數據傾斜

    4.3.1 RDD解決

    def main(args: Array[String]): Unit = {//創建會話val sparkSession = ContextUtils.getSparkSession("Hello07MonitorConvertRatio")MockDataUtil.mock2view(sparkSession)//開始計算val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd//開始計算flowRdd.map(row => (row.getString(7) + "_" + row.getString(6) + "&" + (Math.random() * 30 + 10).toInt, 1)).reduceByKey(_ + _).map(ele => {val area_road_random = ele._1val count = ele._2(area_road_random.split("_")(0), area_road_random.split("_")(1).split("&")(0) + "_" + count)}).groupByKey().map(ele => {val map = new mutable.HashMap[String, Int]()ele._2.foreach(e => {val key = e.split("_")(0)val value = e.split("_")(1).toIntmap.put(key, map.get(key).getOrElse(0) + value)})"區劃【" + ele._1 + "】車輛最多的三條道路分別為:" + map.toList.sortBy(_._2).takeRight(3).reverse.mkString("-")}).foreach(println) }

    4.3.2. Java連接Hive—SQL解決

    • hive中導入模擬數據

  • 打開hive手動創建database 為traffic 。

  • 生產模擬數據,將模擬數據提交到linux上。

  • 將data2hive代碼 打包,提交到客戶端運行。

  • 查詢hive中數據庫為traffic中monitor_flow_action和monitor_camera_info兩張數據庫表是否導入數據。

  • 若出現mysql數據庫亂碼的問題

    在安裝mysql的linux節點路徑/etc/my.cnf中加入:

    • 在[client]下添加
      default-character-set=utf8
    • 在[mysqld]下添加
      default-character-set=utf8

    如圖:

    運行項目中AreaTop3RoadFlowAnalyze代碼

    4.4、Streaming 實時

    4.4.1. 道路實時擁堵情況 --kafka

    • 生產者
    public class MockRealTimeData extends Thread {private static final Random random = new Random();private static final String[] locations = new String[]{"魯", "滬", "滬", "滬", "滬", "京", "京", "深", "京", "京"};private static final String topic = "RoadRealTimeLog";private KafkaProducer<String, String> producer;public MockRealTimeData() {//創建配置文件列表Properties properties = new Properties();// kafka地址,多個地址用逗號分割properties.put("bootstrap.servers", "192.168.100.101:9092,192.168.100.102:9092,192.168.100.103:9092");//設置寫出數據的格式properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//寫出的應答方式properties.put("acks", "all");//錯誤重試properties.put("retries", 1);//批量寫出properties.put("batch.size", 16384);//創建生產者對象producer = new KafkaProducer<String, String>(properties);}public void run() {while (true) {String date = DateUtils.getTodayDate();String baseActionTime = date + " " + StringUtils.fullFill(random.nextInt(24) + "");baseActionTime = date + " " + StringUtils.fullFill((Integer.parseInt(baseActionTime.split(" ")[1]) + 1) + "");String actionTime = baseActionTime + ":" + StringUtils.fullFill(random.nextInt(60) + "") + ":" + StringUtils.fullFill(random.nextInt(60) + "");String monitorId = StringUtils.fullFill(4, random.nextInt(9) + "");String car = locations[random.nextInt(10)] + (char) (65 + random.nextInt(26)) + StringUtils.fullFill(5, random.nextInt(99999) + "");String speed = random.nextInt(260) + "";String roadId = random.nextInt(50) + 1 + "";String cameraId = StringUtils.fullFill(5, random.nextInt(9999) + "");String areaId = StringUtils.fullFill(2, random.nextInt(8) + "");//封裝消息對象ProducerRecord<String, String> banRecordBlue = new ProducerRecord<>(topic, "traffic_" + monitorId, date + "\t" + monitorId + "\t" + cameraId + "\t" + car + "\t" + actionTime + "\t" + speed + "\t" + roadId + "\t" + areaId);//發送消息producer.send(banRecordBlue);try {Thread.sleep(50);} catch (InterruptedException e) {e.printStackTrace();}}}/*** 啟動Kafka Producer** @param args*/public static void main(String[] args) {MockRealTimeData mockRealTimeData = new MockRealTimeData();mockRealTimeData.start();} }
    • 消費者
    object Hello10RealRoadState {def main(args: Array[String]): Unit = {//創建Confval sparkConf = new SparkConf().setAppName("Hello10RealRoadState").setMaster("local[2]")val streamingContext = new StreamingContext(sparkConf, Seconds(2))//創建Kafka讀取數據//配置信息val kafkaParams = Map[String, Object]("bootstrap.servers" -> "node01:9092,node02:9092,node03:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "traffic","auto.offset.reset" -> "earliest","enable.auto.commit" -> (false: java.lang.Boolean))val topics = Array("RoadRealTimeLog")//開始創建Kafkaval linesDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils createDirectStream(streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams))//窗口函數linesDStream.map(_.value()).window(Seconds(10), Seconds(10)).map(ele => (ele.split("\t")(1), ele.split("\t")(5).toInt)).groupByKey().map(ele => {(ele._1, ele._2.toList.sum.toDouble / ele._2.size)}).foreachRDD(rdd => {rdd.foreach(ele => {println(ele._1 + "--" + ele._2)})})//啟動任務streamingContext.start()streamingContext.awaitTermination() } }

    4.4.2. 動態改變廣播變量(布控)

    總結

    以上是生活随笔為你收集整理的spark 车流量项目实战的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。