日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

streaming接mysql数据库_[Spark streaming举例]-- 实时统计并且存储到mysql数据库中

發(fā)布時間:2025/3/8 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 streaming接mysql数据库_[Spark streaming举例]-- 实时统计并且存储到mysql数据库中 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

舉例

package com.scala.my

import org.apache.spark.SparkConf

import org.apache.spark.streaming.Durations

import org.apache.spark.streaming.StreamingContext

/**

*

* @author root

* 測試步驟:

* ? ?1\打開h15\h16\h17\h18,啟動zookeeper,再啟動hadoop集群:start-all.sh,再啟動mysql

* ? ?2\在h15上創(chuàng)建文件夾wordcount_checkpoint,用于docheckpoint

* ? ? ? 在h5上mysql的dg數據庫中創(chuàng)建表t_word

* ? ?3\啟動eclipse的本程序,讓他等待著

* ? ?4\在h15的dos窗口下輸入單詞,以空格分隔的單詞(需要在h15上開啟端口9999:#nc -lk 9999)

* ? ?5\查詢h15上的mysql的dg數據庫的t_word表是否有數據即可

*

* 注:建表語句

* ? ? mysql> show create table wordcount; ?//查看表語句

CREATE TABLE ? t_word (

id ?int(11) NOT NULL AUTO_INCREMENT,

updated_time ?timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

word varchar(255) DEFAULT NULL,

count ?int(11) DEFAULT NULL,

PRIMARY KEY (id)

);

*/

*

* 測試結果:通過,注意-----》第74行沒有取得數據,原因在最后沒有觸發(fā)事件(封裝事件),目前已經解決

*

* sh spark-submit --master spark://de2:7077 --class 全類名 --driver-class-path /mysql-connector-java-5.1.26.jar ?sparkstreaming.jar

sh spark-submit --class com.day6.scala.my.PresistMysqlWordCount --master yarn-cluster --driver-class-path /home/spark-1.5.1-bin-hadoop2.4/lib/mysql-connector-

java-5.1.31-bin.jar /home/spark-1.5.1-bin-hadoop2.4/sparkstreaming.jar

$bin/hadoop dfsadmin -safemode leave

也就是關閉Hadoop的安全模式,這樣問題就解決了。

*/

object PresistMysqlWordCount {

def main(args: Array[String]): Unit = {

//獲取streamingContext,并且設置每5秒切割一次rdd

// ? ?val sc = new StreamingContext(new SparkConf().setAppName("mysqlPresist").setMaster("local[2]"), Durations.seconds(8))

val sc = new StreamingContext(new SparkConf().setAppName("mysqlPresist").setMaster("local[2]"), Durations.seconds(8))

//設置checkpoit緩存策略

/**

* 利用 checkpoint 來保留上一個窗口的狀態(tài),

* 這樣可以做到移動窗口的更新統計

*/

sc.checkpoint("hdfs://hh15:8020/wordcount_checkpoint")

// ? ?sc.checkpoint("hdfs://h15:8020/wordcount_checkpoint")

//獲取doc窗口或者hdfs上的words

// ? ?val lines=sc.textFileStream("hdfs://h15:8020/文件夾名稱") ?//實時監(jiān)控hdfs文件夾下新增的數據

val lines = sc.socketTextStream("hh15", 9999)

// ? ?val lines = sc.socketTextStream("h15", 9999)

//壓扁

val words = lines.flatMap { x => x.split(" ") }

//map

val paris = words.map { (_, 1) }

//定義一個函數,用于保持狀態(tài)

val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {

var newValue = prevValueState.getOrElse(0)

for (value wd.foreachPartition(

data => {

val conn = ConnectPool.getConn("root", "1714004716", "hh15", "dg")

// ? ? ? ?val conn = ConnectPool.getConn("root", "1714004716", "h15", "dg")

//插入數據

// ? ? ? ?conn.prepareStatement("insert into t_word2(word,num) values('tom',23)").executeUpdate()

try {

for (row

總結

以上是生活随笔為你收集整理的streaming接mysql数据库_[Spark streaming举例]-- 实时统计并且存储到mysql数据库中的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。