日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

spark streamming + kafka + Redis 实践

發布時間:2024/3/13 数据库 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark streamming + kafka + Redis 实践 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

java操作Redis:http://blog.csdn.net/xyang81/article/details/51918129

數據order.txt

A 202.106.196.115 手機 iPhone8 8000 B 202.106.0.20 服裝 布萊奧尼西服 199 C 202.102.152.3 家具 嬰兒床 2000 D 202.96.96.68 家電 電飯鍋 1000 F 202.98.0.68 化妝品 迪奧香水 200 H 202.96.75.68 食品 奶粉 600 J 202.97.229.133 圖書 Hadoop編程指南 90 A 202.106.196.115 手機 手機殼 200 B 202.106.0.20 手機 iPhone8 8000 C 202.102.152.3 家具 嬰兒車 2000 D 202.96.96.68 家具 嬰兒車 1000 F 202.98.0.68 化妝品 迪奧香水 200 H 202.96.75.68 食品 嬰兒床 600 J 202.97.229.133 圖書 spark實戰 80

JedisConnectionPool類

import redis.clients.jedis.Jedis import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}object JedisConnectionPool{val config = new JedisPoolConfig()//最大連接數,config.setMaxTotal(20)//最大空閑連接數config.setMaxIdle(10)//當調用borrow Object方法時,是否進行有效性檢查 -->config.setTestOnBorrow(true)//10000代表超時時間(10秒)val pool = new JedisPool(config, "192.168.1.207", 6379, 10000, "123")def getConnection(): Jedis = {pool.getResource}def main(args: Array[String]) {val conn = JedisConnectionPool.getConnection() // conn.set("income", "1000") // // val r1 = conn.get("xiaoniu") // // println(r1) // // conn.incrBy("xiaoniu", -50) // // val r2 = conn.get("xiaoniu") // // println(r2) // // conn.close()val r = conn.keys("*")import scala.collection.JavaConversions._for (p <- r) {println(p + " : " + conn.get(p))}}}

OrderCount類

import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} import org.I0Itec.zkclient.ZkClient import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Duration, StreamingContext}object OrderCount {def main(args: Array[String]): Unit = {//指定組名val group = "g1"//創建SparkConfval conf = new SparkConf().setAppName("OrderCount").setMaster("local[4]")//創建SparkStreaming,并設置間隔時間val ssc = new StreamingContext(conf, Duration(5000))val broadcastRef = IPUtils.broadcastIpRules(ssc, "/Users/zx/Desktop/temp/spark-24/spark-4/ip/ip.txt")//指定消費的 topic 名字val topic = "orders"//指定kafka的broker地址(sparkStream的Task直連到kafka的分區上,用更加底層的API消費,效率更高)val brokerList = "node-4:9092,node-5:9092,node-6:9092"//指定zk的地址,后期更新消費的偏移量時使用(以后可以使用Redis、MySQL來記錄偏移量)val zkQuorum = "node-1:2181,node-2:2181,node-3:2181"//創建 stream 時使用的 topic 名字集合,SparkStreaming可同時消費多個topicval topics: Set[String] = Set(topic)//創建一個 ZKGroupTopicDirs 對象,其實是指定往zk中寫入數據的目錄,用于保存偏移量val topicDirs = new ZKGroupTopicDirs(group, topic)//獲取 zookeeper 中的路徑 "/g001/offsets/wordcount/"val zkTopicPath = s"${topicDirs.consumerOffsetDir}"//準備kafka的參數val kafkaParams = Map(//"key.deserializer" -> classOf[StringDeserializer],//"value.deserializer" -> classOf[StringDeserializer],//"deserializer.encoding" -> "GB2312", //配置讀取Kafka中數據的編碼"metadata.broker.list" -> brokerList,"group.id" -> group,//從頭開始讀取數據"auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString)//zookeeper 的host 和 ip,創建一個 client,用于跟新偏移量量的//是zookeeper的客戶端,可以從zk中讀取偏移量數據,并更新偏移量val zkClient = new ZkClient(zkQuorum)//查詢該路徑下是否字節點(默認有字節點為我們自己保存不同 partition 時生成的)// /g001/offsets/wordcount/0/10001"// /g001/offsets/wordcount/1/30001"// /g001/offsets/wordcount/2/10001"//zkTopicPath -> /g001/offsets/wordcount/val children = zkClient.countChildren(zkTopicPath)var kafkaStream: InputDStream[(String, String)] = null//如果 zookeeper 中有保存 offset,我們會利用這個 offset 作為 kafkaStream 的起始位置var fromOffsets: Map[TopicAndPartition, Long] = Map()//如果保存過 offset//注意:偏移量的查詢是在Driver完成的if (children > 0) {for (i <- 0 until children) {// /g001/offsets/wordcount/0/10001// /g001/offsets/wordcount/0val partitionOffset = zkClient.readData[String](s"$zkTopicPath/${i}")// wordcount/0val tp = TopicAndPartition(topic, i)//將不同 partition 對應的 offset 增加到 fromOffsets 中// wordcount/0 -> 10001fromOffsets += (tp -> partitionOffset.toLong)}//Key: kafka的key values: "hello tom hello jerry"//這個會將 kafka 的消息進行 transform,最終 kafak 的數據都會變成 (kafka的key, message) 這樣的 tupleval messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())//通過KafkaUtils創建直連的DStream(fromOffsets參數的作用是:按照前面計算好了的偏移量繼續消費數據)//[String, String, StringDecoder, StringDecoder, (String, String)]// key value key的解碼方式 value的解碼方式kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)} else {//如果未保存,根據 kafkaParam 的配置使用最新(largest)或者最舊的(smallest) offsetkafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)}//偏移量的范圍var offsetRanges = Array[OffsetRange]()//直連方式只有在KafkaDStream的RDD(KafkaRDD)中才能獲取偏移量,那么就不能到調用DStream的Transformation//所以只能子在kafkaStream調用foreachRDD,獲取RDD的偏移量,然后就是對RDD進行操作了//依次迭代KafkaDStream中的KafkaRDD//如果使用直連方式累加數據,那么就要在外部的數據庫中進行累加(用KeyVlaue的內存數據庫(NoSQL),Redis)//kafkaStream.foreachRDD里面的業務邏輯是在Driver端執行kafkaStream.foreachRDD { kafkaRDD =>//判斷當前的kafkaStream中的RDD是否有數據if(!kafkaRDD.isEmpty()) {//只有KafkaRDD可以強轉成HasOffsetRanges,并獲取到偏移量offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRangesval lines: RDD[String] = kafkaRDD.map(_._2)//整理數據val fields: RDD[Array[String]] = lines.map(_.split(" "))//計算成交總金額CalculateUtil.calculateIncome(fields)//計算商品分類金額CalculateUtil.calculateItem(fields)//計算區域成交金額CalculateUtil.calculateZone(fields, broadcastRef)//偏移量跟新在哪一端()for (o <- offsetRanges) {// /g001/offsets/wordcount/0val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"//將該 partition 的 offset 保存到 zookeeper// /g001/offsets/wordcount/0/20000ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString)}}}ssc.start()ssc.awaitTermination()}}

CalculateUtil類

import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDDobject CalculateUtil {def calculateIncome(fields: RDD[Array[String]]) = {//將數據計算后寫入到Reidsval priceRDD: RDD[Double] = fields.map(arr => {val price = arr(4).toDoubleprice})//reduce是一個Action,會把結果返回到Driver端//將當前批次的總金額返回了val sum: Double = priceRDD.reduce(_+_)//獲取一個jedis連接val conn = JedisConnectionPool.getConnection()//將歷史值和當前的值進行累加//conn.set(Constant.TOTAL_INCOME, sum.toString)conn.incrByFloat(Constant.TOTAL_INCOME, sum)//釋放連接conn.close()}/*** 計算分類的成交金額* @param fields*/def calculateItem(fields: RDD[Array[String]]) = {//對field的map方法是在哪一端調用的呢?Driverval itemAndPrice: RDD[(String, Double)] = fields.map(arr => {//分類val item = arr(2)//金額val parice = arr(4).toDouble(item, parice)})//安裝商品分類進行聚合val reduced: RDD[(String, Double)] = itemAndPrice.reduceByKey(_+_)//將當前批次的數據累加到Redis中//foreachPartition是一個Action//現在這種方式,jeids的連接是在哪一端創建的(Driver)//在Driver端拿Jedis連接不好//val conn = JedisConnectionPool.getConnection()reduced.foreachPartition(part => {//獲取一個Jedis連接//這個連接其實是在Executor中的獲取的//JedisConnectionPool在一個Executor進程中有幾個實例(單例)val conn = JedisConnectionPool.getConnection()part.foreach(t => {//一個連接更新多條數據conn.incrByFloat(t._1, t._2)})//將當前分區中的數據跟新完在關閉連接conn.close()})}//根據Ip計算歸屬地def calculateZone(fields: RDD[Array[String]], broadcastRef: Broadcast[Array[(Long, Long, String)]]) = {val provinceAndPrice: RDD[(String, Double)] = fields.map(arr => {val ip = arr(1)val price = arr(4).toDoubleval ipNum = MyUtils.ip2Long(ip)//在Executor中獲取到廣播的全部規則val allRules: Array[(Long, Long, String)] = broadcastRef.value//二分法查找val index = MyUtils.binarySearch(allRules, ipNum)var province = "未知"if (index != -1) {province = allRules(index)._3}//省份,訂單金額(province, price)})//按省份進行聚合val reduced: RDD[(String, Double)] = provinceAndPrice.reduceByKey(_+_)//將數據跟新到Redisreduced.foreachPartition(part => {val conn = JedisConnectionPool.getConnection()part.foreach(t => {conn.incrByFloat(t._1, t._2)})conn.close()})} }

Constant(1)類

object Constant {val TOTAL_INCOME = "TOTAL_INCOME" }

MyUtils類

import java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.spark.streaming.StreamingContextimport scala.io.{BufferedSource, Source}object MyUtils {def ip2Long(ip: String): Long = {val fragments = ip.split("[.]")var ipNum = 0Lfor (i <- 0 until fragments.length){ipNum = fragments(i).toLong | ipNum << 8L}ipNum}def readRules(path: String): Array[(Long, Long, String)] = {//讀取ip規則val bf: BufferedSource = Source.fromFile(path)val lines: Iterator[String] = bf.getLines()//對ip規則進行整理,并放入到內存val rules: Array[(Long, Long, String)] = lines.map(line => {val fileds = line.split("[|]")val startNum = fileds(2).toLongval endNum = fileds(3).toLongval province = fileds(6)(startNum, endNum, province)}).toArrayrules}def binarySearch(lines: Array[(Long, Long, String)], ip: Long) : Int = {var low = 0var high = lines.length - 1while (low <= high) {val middle = (low + high) / 2if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))return middleif (ip < lines(middle)._1)high = middle - 1else {low = middle + 1}}-1}def data2MySQL(it: Iterator[(String, Int)]): Unit = {//一個迭代器代表一個分區,分區中有多條數據//先獲得一個JDBC連接val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "123568")//將數據通過Connection寫入到數據庫val pstm: PreparedStatement = conn.prepareStatement("INSERT INTO access_log VALUES (?, ?)")//將分區中的數據一條一條寫入到MySQL中it.foreach(tp => {pstm.setString(1, tp._1)pstm.setInt(2, tp._2)pstm.executeUpdate()})//將分區中的數據全部寫完之后,在關閉連接if(pstm != null) {pstm.close()}if (conn != null) {conn.close()}}def main(args: Array[String]): Unit = {//數據是在內存中val rules: Array[(Long, Long, String)] = readRules("/Users/zx/Desktop/ip/ip.txt")//將ip地址轉換成十進制val ipNum = ip2Long("114.215.43.42")//查找val index = binarySearch(rules, ipNum)//根據腳本到rules中查找對應的數據val tp = rules(index)val province = tp._3println(province)} }

IPUtils類

import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.streaming.StreamingContextobject IPUtils {def broadcastIpRules(ssc: StreamingContext, ipRulesPath: String): Broadcast[Array[(Long, Long, String)]] = {//現獲取sparkContextval sc = ssc.sparkContextval rulesLines:RDD[String] = sc.textFile(ipRulesPath)//整理ip規則數據val ipRulesRDD: RDD[(Long, Long, String)] = rulesLines.map(line => {val fields = line.split("[|]")val startNum = fields(2).toLongval endNum = fields(3).toLongval province = fields(6)(startNum, endNum, province)})//將分散在多個Executor中的部分IP規則收集到Driver端val rulesInDriver: Array[(Long, Long, String)] = ipRulesRDD.collect()//將Driver端的數據廣播到Executor//廣播變量的引用(還在Driver端)sc.broadcast(rulesInDriver)} }

?

總結

以上是生活随笔為你收集整理的spark streamming + kafka + Redis 实践的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。