spark 车流量项目实战
spark 車流量項目實戰
這里目錄標題
- spark 車流量項目實戰
- 1、車流向項目介紹
- 1.1. 數據采集
- 1.2. 模塊介紹
- 1.2.1. 卡扣流量分析模塊介紹
- 1.3. 項目架構介紹
- 1.4. 數據介紹
- 1.4.1. 基本概念
- 1.4.2. 表
- 1.5. 需求分析
- 2、數據流程
- 3、spark任務
- 如何將Spark任務提交到集群運行?
- 4、模塊功能
- 4.1、卡口流量分析
- 4.1.1. 卡口狀態監控
- 4.1.1.1. 統計卡口壞攝像頭
- 4.1.1.2. 統計每個區域車流量
- 4.1.1.3. 統計每個區域的攝像頭
- 4.1.1.5 卡口流量分析 代碼解析
- 4.1.2. 區域車流量Top3及其速度
- 4.1.3. 區域中高低速數量
- 4.1.3. 指定卡口對應卡口車輛軌跡
- 4.2、行車軌跡
- 4.2.1. 車輛行車軌跡
- 4.2.2. 車輛套牌
- 4.2.3. 車輛抽樣--蓄水池抽樣算法
- 4.2.4. 道路轉換率
- 4.3、區域道路流量Top3
- 4.3.1 RDD解決
- 4.3.2. Java連接Hive---SQL解決
- hive中導入模擬數據
- 4.4、Streaming 實時
- 4.4.1. 道路實時擁堵情況 --kafka
- 4.4.2. 動態改變廣播變量(布控)
1、車流向項目介紹
1.1. 數據采集
數據從哪兒來?
1.2. 模塊介紹
- 卡扣流量分析 Spark Core
- 卡扣車流量轉化率 Spark Core
- 各區域車流量最高top5的道路統計 SparkSQL
- 稽查布控,道路實時擁堵統計 SparkStreaming
1.2.1. 卡扣流量分析模塊介紹
根據使用者(平臺使用者)指定的某些條件,篩選出指定的一批卡扣信息(比如根據區域、時間篩選)
檢測卡扣狀態,對于篩選出來的所有的卡口(不代表一個攝像頭)信息統計
1.3. 項目架構介紹
使用架構
J2EE平臺,前端頁面,在頁面中可以指定任務類型,提交任務的參數(比如時間范圍,區域設定)平臺會接受到用戶的提交請求,會調用底層封裝的Spark-submit的shell腳本,怎么調用?運行的作業可以獲取到用戶指定的篩選條件,然后根據篩選條件進行計算。Spark任務的計算結果會寫入到數據庫中,比如MySQL,Redis等
最后J2EE平臺可以通過前端頁面,展示結果(表格或者圖表的方式展示數據庫中的結果)。
1.4. 數據介紹
1.4.1. 基本概念
卡扣號:在一條道路相同位置會有兩個卡扣,這兩個卡扣的編號是不同的,分別拍攝不同方向的車輛
攝像頭編號:每一個卡扣拍攝的是一個方向的車輛,每一個方向都會有多個不同的車道,每一個車道對應一個攝像頭,所以卡扣號與攝像頭的對應關系是一對多的關系。
1.4.2. 表
| date | 日期 單位:天 |
| monitor_id | 卡口號 |
| camera_id | 攝像頭編號 |
| car | 車牌 |
| action_time | 某個攝像頭拍攝時間 單位:秒 |
| speed | 通過卡扣的速度 |
| road_id | 道路id |
| area_id | 區域ID |
| monitor_id | 卡扣編號 |
| camera_id | 攝像頭編號 |
具體內容見建表語句。
1.5. 需求分析
- 按條件篩選卡扣信息
- 可以指定 不同的條件,時間范圍、區域范圍、卡扣號等 可以靈活的分析不同區域的卡扣信息
- 監測卡扣狀態
- 對符合條件的卡扣信息,可以動態的檢查每一個卡扣的狀態,查看卡扣是否正常工作,也可以查看攝像頭
- 車流量最多的TonN卡扣
- 查看哪些卡扣的車流量最高,為什么會出現這么高的車流量。分析原因,例如今天出城的車輛非常多,啥原因,今天進城的車輛非常多,啥原因? 集會還是聚集? 這個功能點里面也會拿到具體的車輛的信息,分析一下本地車牌造成的還是外地車牌?
- 在符合條件的卡扣信息中隨機抽取N個車輛信息
- 隨機抽取N輛車的信息,可以權威的代表整個區域的車輛,這時候可以分析這些車的軌跡,看一下在不同的時間點車輛的流動方向。以便于道路的規劃。
- 計算出經常高速通過的TopN卡口
- 統計出是否存在飆車現象,或者經常進行超速行駛,可以在此處安裝違章拍攝設備
2、數據流程
數據處理流程:
公司有集群沒有數據 分布式爬取數據,多節點爬取數據,一般將數據爬取到flume中,或者將數據直接爬取放入HDFS中。 公司有集群有數據 每天每時每刻在產生數據,數據直接清洗放在HBase或者HDFS中。或者日志數據直接使用flum導入分布式文件中。一般有了數據之后又分為兩個大的方向處理數據:
3、spark任務
如何將Spark任務提交到集群運行?
最次也是腳本化執行Spark任務。
平臺化提交Spark任務。流程圖如下:
submit后先將數據存入Mysql中,task當做唯一主鍵,這樣做是為了簡化任務執行失敗時,可以直接在數據庫中查詢之前的提交的業務參數,當任務失敗后,下次retry時方便執行。
submit后可以使用java調用liunx系統腳本,通過taskId得到系統中的業務參數數據。
注意: 假如使用tomcat實現平臺化,那么tomcat應該部署在客戶端。
l java代碼中如何執行liunx腳本?
Process proc = Runtime.**getRuntime**().exec(“sh 腳本”); proc.waitFor();4、模塊功能
4.1、卡口流量分析
- 全部使用SparkCore實現。
4.1.1. 卡口狀態監控
4.1.1.1. 統計卡口壞攝像頭
def main(args: Array[String]): Unit = {//獲取數據源val sparkSession = ContextUtils.getSparkSession("Hello01MonitorState")//讀取數據MockDataUtil.mock2view(sparkSession)//------------------------------統計卡口攝像頭通過的車輛的合計----------------------------import sparkSession.implicits._//開始讀取數據val dataFrame: DataFrame = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-19' ")//開始操作val mcRdd: RDD[(String, Int)] = dataFrame.map(ele => Tuple2(ele.getString(1) + ":" + ele.getString(2), 1)).rdd//開始進行合并val flowRdd: RDD[(String, Int)] = mcRdd.reduceByKey(_ + _)//------------------------------統計卡口所有的攝像頭----------------------------val cameraDataFrame = sparkSession.sql("select * from " + MockDataUtil.MONITOR_CAMERA_ACTION)val cameraRdd: RDD[(String, Int)] = cameraDataFrame.map(ele => ((ele.getString(0) + ":" + ele.getString(1)), 1)).rdd//------------------------------合并車流量和攝像頭RDD----------------------------val allRDD: RDD[(String, (Option[Int], Int))] = flowRdd.rightOuterJoin(cameraRdd).filter(ele => ele._2._1.isEmpty)allRDD.foreach(println)}4.1.1.2. 統計每個區域車流量
def main(args: Array[String]): Unit = {//獲取數據源val sparkSession = ContextUtils.getSparkSession("Hello02MonitorFlowCount")//讀取數據MockDataUtil.mock2view(sparkSession)//------------------------------統計卡口通過的車輛的合計----------------------------import sparkSession.implicits._//開始讀取數據val dataFrame: DataFrame = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ")//開始操作val mcRdd: RDD[(String, Int)] = dataFrame.map(ele => Tuple2(ele.getString(1), 1)).rdd//開始進行合并val flowRdd: RDD[(String, Int)] = mcRdd.reduceByKey(_ + _)flowRdd.foreach(println)}4.1.1.3. 統計每個區域的攝像頭
def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("Hello03MonitorStateAnalyze")MockDataUtil.mock2view(sparkSession)//---------------------開始操作車流量信息,假設任務編號為1 日期參數為今天val flowInfo: RDD[(String, String)] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd.map(row => (row.getString(1), row)).groupByKey().map(ele => {val monitorId: String = ele._1val cameraIdSet = new mutable.HashSet[String]()ele._2.foreach(row => cameraIdSet.add(row.getString(2)))//拼接字符串val info: String = Constants.FIELD_MONITOR_ID + "=" + monitorId + "|" + Constants.FIELD_AREA_ID + "=浦東新區|" + Constants.FIELD_CAMERA_IDS + "=" + cameraIdSet.mkString("-") + "|" + Constants.FIELD_CAMERA_COUNT + "=" + cameraIdSet.size + "|" + Constants.FIELD_CAR_COUNT + "=" + ele._2.size//返回結果(monitorId, info)})//-----------------------開始操作攝像頭數據val monitorInfo: RDD[(String, String)] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_CAMERA_ACTION).rdd.map(row => (row.getString(0), row.getString(1))).groupByKey().map(ele => {val monitorId: String = ele._1//拼接字符串val info: String = Constants.FIELD_CAMERA_IDS + "=" + ele._2.toList.mkString("-") + "|" + Constants.FIELD_CAMERA_COUNT + "=" + ele._2.size//返回結果(monitorId, info)})//-----------------------將數據Join到一起monitorInfo.leftOuterJoin(flowInfo).foreach(println)}4.1.1.5 卡口流量分析 代碼解析
4.1.2. 區域車流量Top3及其速度
- 區域車流量top3
- 區域各路速度
4.1.3. 區域中高低速數量
object Hello04MonitorTopNSpeed {def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("Hello04MonitorTopNSpeed")MockDataUtil.mock2view(sparkSession)//---------------------開始操作車流量信息,假設任務編號為1 日期參數為今天val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-20' ").rddval monitor2speedRDD: RDD[(String, Iterable[String])] = flowRdd.map(row => (row.getString(1), row.getString(5))).groupByKey()val speedCount2monitorRDD: RDD[(SpeedCount, String)] = monitor2speedRDD.map(ele => {//獲取卡口號val monitorId: String = ele._1//聲明一個Map[0,60,100,120]var high = 0;var normal = 0;var low = 0;//獲取所有的速度的車輛技術ele._2.foreach(speed => {//判斷速度if (speed.toInt > 100) {high += 1} else if (speed.toInt > 60) {normal += 1} else {low += 1}})//創建速度對象(SpeedCount(high, normal, low), monitorId)})speedCount2monitorRDD.sortByKey(false).map(x => (x._2, x._1)).foreach(println)} }case class SpeedCount(high: Int, normal: Int, low: Int) extends Ordered[SpeedCount] with KryoRegistrator {override def compare(that: SpeedCount): Int = {var result = this.high - that.highif (result == 0) {result = this.normal - that.normalif (result == 0) {result = this.low - that.low}}return result}override def registerClasses(kryo: Kryo): Unit = {kryo.register(SpeedCount.getClass)} }4.1.3. 指定卡口對應卡口車輛軌跡
def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("Hello04MonitorTopNSpeed")MockDataUtil.mock2view(sparkSession)//獲取數據val area01Rdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' and area_id = '01' ").rddval area02Rdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' and area_id = '02' ").rddval area01CarRdd = area01Rdd.map(row => (row.getString(3), row.getString(7))).groupByKey()val area02CarRdd = area02Rdd.map(row => (row.getString(3), row.getString(7))).groupByKey()area01CarRdd.join(area02CarRdd).foreach(println)}4.2、行車軌跡
4.2.1. 車輛行車軌跡
def main(args: Array[String]): Unit = { val sparkSession = ContextUtils.getSparkSession("AreaCar") MockDataUtil.mock2view(sparkSession)//查詢 車子行駛軌跡 跟車分析 val c1Rdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd val carRdd: RDD[(String, StringBuilder)] = c1Rdd.map(e => { (e.getString(3), (e.getString(4), e.getString(6), e.getString(2))) }).groupByKey() .map(e => { val tuples: List[(String, String, String)] = e._2.toList.sortBy(_._1) val list = new StringBuilder for (i <- tuples) { //println(i) val str: String = i._2 + ":" + i._3 list.append(str + "-") } (e._1, list) }) //carRdd.foreach(println) }4.2.2. 車輛套牌
def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("AreaCar")MockDataUtil.mock2view(sparkSession) //假設任何的卡口距離都是 10分鐘車程 ,如果同一分鐘出現在不同的卡口就懷疑是套牌val deckRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdddeckRdd.map(e => {val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")(e.getString(3), (dateFormat.parse(e.getString(4)),e.getString(1)))}).groupByKey(1).map(e => {val list: List[(util.Date, String)] = e._2.toList.sortBy(x=>x._1)var bool = falsevar d: util.Date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-23 00:00:00")var mid="?"for (i <- list) {if (d.getTime - i._1.getTime < 600000 && i._2!=mid )bool = trued = i._1mid=i._2}(e._1, bool)}).filter(f => f._2).foreach(println)}4.2.3. 車輛抽樣–蓄水池抽樣算法
def main(args: Array[String]): Unit = {val sparkSession = ContextUtils.getSparkSession("Hello04MonitorTopNSpeed")MockDataUtil.mock2view(sparkSession)//獲取數據val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-21' ").rdd//yyyy-MM-dd_HH , rowval hourRDD: RDD[(String, Row)] = flowRdd.map(row => (DateUtils.getDateHour(row.getString(4)), row))//車流量的總數,并進行廣播val flowAllCount: Long = hourRDD.count()val broadcastFlowAllCount: Broadcast[Long] = sparkSession.sparkContext.broadcast(flowAllCount)//計算每個小時的比例 并進行廣播val hourRatio: collection.Map[String, Double] = hourRDD.countByKey().map(e => {(e._1, e._2 * 1.0 / broadcastFlowAllCount.value)})val broadcastHourRatio: Broadcast[collection.Map[String, Double]] = sparkSession.sparkContext.broadcast(hourRatio)//開始進行抽樣val sampleRDD: RDD[Row] = hourRDD.groupByKey().flatMap(ele => {val hour: String = ele._1val list: List[Row] = ele._2.iterator.toList//計算本時段要抽樣的數據量val sampleRatio: Double = broadcastHourRatio.value.get(hour).getOrElse(0)val sampleNum: Long = Math.round(sampleRatio * 100)//開始進行取樣(蓄水池抽樣)val sampleList: ListBuffer[Row] = new ListBuffer[Row]()sampleList.appendAll(list.take(sampleNum.toInt))for (i <- sampleNum until list.size) {//隨機生成一個數字val num = (Math.random() * list.size).toIntif (num < sampleNum) {sampleList.update(num, list(i.toInt))}}sampleList})sampleRDD.foreach(println)}4.2.4. 道路轉換率
def main(args: Array[String]): Unit = {//創建會話val sparkSession = ContextUtils.getSparkSession("Hello07MonitorConvertRatio")MockDataUtil.mock2view(sparkSession)//開始計算val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd//計算每個卡口的總通車量val monitorCountMap: collection.Map[String, Long] = flowRdd.map(row => (row.getString(1), row)).countByKey()//計算卡口到卡口的通行率val sortRDD: RDD[(String, List[Row])] = flowRdd.map(row => (row.getString(3), row)).groupByKey().map(ele => (ele._1, ele._2.iterator.toList.sortBy(_.getString(4))))val m2mMap: collection.Map[String, Long] = sortRDD.flatMap(ele => {//存放映射關系val map: mutable.HashMap[String, Int] = mutable.HashMap[String, Int]()val list: List[Row] = ele._2.toListfor (i <- 0 until list.size; j <- i + 1 until list.size) {//拼接Keyval key = list(i).getString(1) + "->" + list(j).getString(1)map.put(key, map.get(key).getOrElse(0) + 1);}//返回結果map.toList}).countByKey()//開始進行計算m2mMap.foreach(ele => {println("卡口[" + ele._1 + "]的轉換率為:" + ele._2.toDouble / monitorCountMap.get(ele._1.split("->")(0)).get)}) }4.3、區域道路流量Top3
- 數據傾斜問題
- key添加后綴擴組,減小數據傾斜
4.3.1 RDD解決
def main(args: Array[String]): Unit = {//創建會話val sparkSession = ContextUtils.getSparkSession("Hello07MonitorConvertRatio")MockDataUtil.mock2view(sparkSession)//開始計算val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd//開始計算flowRdd.map(row => (row.getString(7) + "_" + row.getString(6) + "&" + (Math.random() * 30 + 10).toInt, 1)).reduceByKey(_ + _).map(ele => {val area_road_random = ele._1val count = ele._2(area_road_random.split("_")(0), area_road_random.split("_")(1).split("&")(0) + "_" + count)}).groupByKey().map(ele => {val map = new mutable.HashMap[String, Int]()ele._2.foreach(e => {val key = e.split("_")(0)val value = e.split("_")(1).toIntmap.put(key, map.get(key).getOrElse(0) + value)})"區劃【" + ele._1 + "】車輛最多的三條道路分別為:" + map.toList.sortBy(_._2).takeRight(3).reverse.mkString("-")}).foreach(println) }4.3.2. Java連接Hive—SQL解決
-
hive中導入模擬數據
打開hive手動創建database 為traffic 。
生產模擬數據,將模擬數據提交到linux上。
將data2hive代碼 打包,提交到客戶端運行。
查詢hive中數據庫為traffic中monitor_flow_action和monitor_camera_info兩張數據庫表是否導入數據。
若出現mysql數據庫亂碼的問題
在安裝mysql的linux節點路徑/etc/my.cnf中加入:
- 在[client]下添加
default-character-set=utf8 - 在[mysqld]下添加
default-character-set=utf8
如圖:
運行項目中AreaTop3RoadFlowAnalyze代碼4.4、Streaming 實時
4.4.1. 道路實時擁堵情況 --kafka
- 生產者
- 消費者
4.4.2. 動態改變廣播變量(布控)
總結
以上是生活随笔為你收集整理的spark 车流量项目实战的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 前端学习(1530):钩子函数--代码演
- 下一篇: unity3d 挂载脚本_Unity3D