日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

spark 持久化 mysql_Spark 从零到开发(八)nginx日志清洗并持久化实战

發布時間:2025/4/5 数据库 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark 持久化 mysql_Spark 从零到开发(八)nginx日志清洗并持久化实战 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文將介紹如何清洗nginx日志并存儲到mysql中,附帶azkaban定時任務協作完成對access.log的清洗任務。

1. 查看nginx日志格式

cd /var/log/nginx

[root@FantJ nginx]# cat access.log

140.205.205.25 - - [19/Aug/2018:03:41:59 +0800] "GET / HTTP/1.1" 404 312 "-" "Scrapy/1.5.0 (+https://scrapy.org)" "-"

185.55.46.110 - - [19/Aug/2018:03:56:16 +0800] "GET / HTTP/1.0" 404 180 "-" "-" "-"

80.107.89.207 - - [19/Aug/2018:03:56:25 +0800] "GET / HTTP/1.1" 404 191 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/601.7.7 (KHTML, like Gecko) Version/9.1.2 Safari/601.7.7" "-"

140.205.205.25 - - [19/Aug/2018:04:13:52 +0800] "HEAD / HTTP/1.1" 404 0 "-" "Go-http-client/1.1" "-"

139.162.88.63 - - [19/Aug/2018:04:31:56 +0800] "GET http://clientapi.ipip.net/echo.php?info=1234567890 HTTP/1.1" 404 207 "-" "Go-http-client/1.1" "-"

......

我們需要根據這個格式來寫正則表達式,對數據進行過濾。上面是我的日志格式。

log_format main '$remote_addr - $remote_user [$time_local] "$request" '

'$status $body_bytes_sent "$http_referer" '

'"$http_user_agent" "$http_x_forwarded_for"';

這是我nginx的日志配置。(centos版本默認配置)。

2. 正則表達式測試

public static void main(String[] args) {

Pattern p = Pattern.compile("([^ ]*) ([^ ]*) ([^ ]*) (\\[.*\\]) (\\\".*?\\\") (-|[0-9]*) (-|[0-9]*) (\\\".*?\\\") (\\\".*?\\\")([^ ]*)");

Matcher m = p.matcher("202.173.10.31 - - [18/Aug/2018:21:16:28 +0800] \"GET / HTTP/1.1\" 404 312 \"http://www.sdf.sdf\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36\" \"-\"\n");

while (m.find()) {

System.out.println(m.group(1));

System.out.println(m.group(2));

System.out.println(m.group(3));

System.out.println(m.group(4));

System.out.println(m.group(5));

System.out.println(m.group(6));

System.out.println(m.group(7));

System.out.println(m.group(8));

System.out.println(m.group(9));

System.out.println(m.group(10));

System.out.println(m.toString());

}

}

控制臺輸出:

202.173.10.31

-

-

[18/Aug/2018:21:16:28 +0800]

"GET / HTTP/1.1"

404

312

"http://www.xxx.top"

"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36"

證明我們的正則可以使用。

3. Spark程序實現

上一章我介紹了RDD和DF之間的轉換和臨時表Sql的執行,這章節增加了對RDD數據的持久化操作,我將把RDD數據集存儲到mysql中。

3.1 創建mysql表

CREATE TABLE `access` (

`remote_addr` varchar(255) DEFAULT NULL,

`remote_user` varchar(255) DEFAULT NULL,

`time_local` varchar(255) DEFAULT NULL,

`request` varchar(255) DEFAULT NULL,

`status` varchar(255) DEFAULT NULL,

`byte_sent` varchar(255) DEFAULT NULL,

`refere` varchar(255) DEFAULT NULL,

`http_agent` varchar(255) DEFAULT NULL,

`http_forward_for` varchar(255) DEFAULT NULL

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `acc_addr_count` (

`remote_addr` varchar(255) DEFAULT NULL,

`count` int(11) DEFAULT NULL

) ENGINE=InnoDB DEFAULT CHARSET=latin1;

第一個表是log的全部數據內容,第二個表是對ip數目做一統計。這兩個表都在我的數據庫nginx中。

3.2 編寫DBHelper.java

public class DBHelper {

private String url = "jdbc:mysql://192.168.27.166:3306/nginx";

private String name = "com.mysql.jdbc.Driver";

private String user = "root";

private String password = "xxx";

//獲取數據庫連接

public Connection connection = null;

public DBHelper(){

try {

Class.forName(name);

connection = DriverManager.getConnection(url,user,password);

} catch (Exception e) {

e.printStackTrace();

}

}

public void close() throws SQLException {

this.connection.close();

}

}

3.3 編寫實體類(javaBean)

我將用反射的方法完成對整條log的清洗,用動態元素創建來完成對acc_addr_count表的收集。(不清楚這兩種方法的可先看下上一章)

NginxParams.java

public class NginxParams implements Serializable {

private String remoteAddr;

private String remoteUser;

private String timeLocal;

private String request;

private String status;

private String byteSent;

private String referer;

private String httpUserAgent;

private String httpForwardedFor;

setter and getter ...methods...

@Override

public String toString() {

return "NginxParams{" +

"remoteAddr='" + remoteAddr + '\'' +

", remoteUser='" + remoteUser + '\'' +

", timeLocal='" + timeLocal + '\'' +

", request='" + request + '\'' +

", status='" + status + '\'' +

", byteSent='" + byteSent + '\'' +

", referer='" + referer + '\'' +

", httpUserAgent='" + httpUserAgent + '\'' +

", httpForwardedFor='" + httpForwardedFor + '\'' +

'}';

}

}

3.4 編寫清洗代碼

NginxLogCollect.java

public class NginxLogCollect implements Serializable {

static DBHelper dbHelper = null;

public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName("NginxLogCollect").setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

sc.setLogLevel("ERROR");

SQLContext sqlContext = new SQLContext(sc);

JavaRDD lines = sc.textFile("C:\\Users\\84407\\Desktop\\nginx.log");

JavaRDD nginxs = lines.map((Function) line -> {

Pattern p = Pattern.compile("([^ ]*) ([^ ]*) ([^ ]*) (\\[.*\\]) (\\\".*?\\\") (-|[0-9]*) (-|[0-9]*) (\\\".*?\\\") (\\\".*?\\\")([^ ]*)");

Matcher m = p.matcher(line);

NginxParams nginxParams = new NginxParams();

while (m.find()){

nginxParams.setRemoteAddr(m.group(1));

nginxParams.setRemoteUser(m.group(2));

nginxParams.setTimeLocal(m.group(4));

nginxParams.setRequest(m.group(5));

nginxParams.setStatus(m.group(6));

nginxParams.setByteSent(m.group(7));

nginxParams.setReferer(m.group(8));

nginxParams.setHttpUserAgent(m.group(9));

nginxParams.setHttpForwardedFor(m.group(10));

}

return nginxParams;

});

/**

* 使用反射方式,將RDD轉換為DataFrame

*/

DataFrame nginxDF = sqlContext.createDataFrame(nginxs,NginxParams.class);

/**

* 拿到一個DataFrame之后,就可以將其注冊為一個臨時表,然后針對其中的數據執行sql語句

*/

nginxDF.registerTempTable("nginxs");

DataFrame allDF = sqlContext.sql("select * from nginxs");

//統計ip訪問數

DataFrame addrCount = sqlContext.sql("select remoteAddr,COUNT(remoteAddr)as count from nginxs GROUP BY remoteAddr ORDER BY count DESC");

/**

* 將查詢出來的DataFrame ,再次轉換為RDD

*/

JavaRDD allRDD = allDF.javaRDD();

JavaRDD addrCountRDD = addrCount.javaRDD();

/**

* 將RDD中的數據進行映射,映射為NginxParams

*/

JavaRDD map = allRDD.map((Function) row -> {

NginxParams nginxParams = new NginxParams();

nginxParams.setRemoteAddr(row.getString(4));

nginxParams.setRemoteUser(row.getString(5));

nginxParams.setTimeLocal(row.getString(8));

nginxParams.setRequest(row.getString(6));

nginxParams.setStatus(row.getString(7));

nginxParams.setByteSent(row.getString(0));

nginxParams.setReferer(row.getString(2));

nginxParams.setHttpUserAgent(row.getString(3));

nginxParams.setHttpForwardedFor(row.getString(1));

return nginxParams;

});

/**

* 將數據collect回來,然后打印

*/

// List nginxParamsList = map.collect();

// for (NginxParams np:nginxParamsList){

// System.out.println(np);

// }

dbHelper = new DBHelper();

String sql = "INSERT INTO `access` VALUES (?,?,?,?,?,?,?,?,?)";

map.foreach((VoidFunction) nginxParams -> {

PreparedStatement pt = dbHelper.connection.prepareStatement(sql);

pt.setString(1,nginxParams.getRemoteAddr());

pt.setString(2,nginxParams.getRemoteUser());

pt.setString(3,nginxParams.getTimeLocal());

pt.setString(4,nginxParams.getRequest());

pt.setString(5,nginxParams.getStatus());

pt.setString(6,nginxParams.getByteSent());

pt.setString(7,nginxParams.getReferer());

pt.setString(8,nginxParams.getHttpUserAgent());

pt.setString(9,nginxParams.getHttpForwardedFor());

pt.executeUpdate();

});

String addrCountSql = "insert into `acc_addr_count` values(?,?)";

addrCountRDD.foreach((VoidFunction) row -> {

System.out.println("row.getString(0)"+row.getString(0));

System.out.println("row.getString(1)"+row.getLong(1));

PreparedStatement pt = dbHelper.connection.prepareStatement(addrCountSql);

pt.setString(1,row.getString(0));

pt.setString(2, String.valueOf(row.getLong(1)));

pt.executeUpdate();

});

}

}

4. 執行完后查看數據庫:

5. 總結

5.1 集群中執行

上面例子執行在本地,如果打包運行在服務器,需要執行腳本。

/home/fantj/spark/bin/spark-submit \

--class com.fantj.nginxlog.NginxLogCollect\

--num-executors 1 \

--driver-memory 100m \

--executor-memory 100m \

--executor-cores 3 \

--files /home/fantj/hive/conf/hive-site.xml \

--driver-class-path /home/fantj/hive/lib/mysql-connector-java-5.1.17.jar \

/home/fantj/nginxlog.jar \

并修改setMaster()和sc.textFile()的參數。

5.2 定時任務實現

我們可以將執行腳本打包寫一個azkaban的定時job,然后做每天的數據統計。當然,這里面還有很多細節,比如nginx日志按天分割等。但是都是一些小問題。(不熟悉azkaban的:Azkaban 簡單入門)

總結

以上是生活随笔為你收集整理的spark 持久化 mysql_Spark 从零到开发(八)nginx日志清洗并持久化实战的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 紧身裙女教师三上悠亚红杏 | 九九热精彩视频 | 国产伦精品一区二区三区88av | 亚洲精品www久久久久久广东 | 国产丝袜在线播放 | 夜夜草导航 | 日韩成人免费av | 黄色a级片在线观看 | 好吊妞视频这里只有精品 | 久久久情| 三上悠亚一区二区三区 | 久久3p| av无毛| 91免费观看入口 | 91一区二区三区四区 | 日韩精品久久 | 国产精品久久久久久亚洲调教 | 视频免费在线观看 | 亚洲日本在线播放 | 大桥未久av一区二区三区中文 | 日韩一区二区三区四区五区 | 在线视频黄| 狂野欧美性猛交xxⅹ李丽珍 | 欧美精品中文 | 国产乱码一区二区三区 | 在线观看中文字幕视频 | 女性向片在线观看 | 成人精品一区二区三区电影黑人 | 亚洲婷婷久久综合 | 苍井空亚洲精品aa片在线播放 | 亚洲综合图片区 | 日韩人妻精品一区二区三区 | 日韩视频一二三区 | 久久亚洲国产精品 | 色玖玖| av中文在线播放 | 翔田千里88av中文字幕 | 在线观看不卡的av | 他揉捏她两乳不停呻吟动态图 | 日韩电影在线一区 | 美女扒开腿让男人捅 | 哺乳期给上司喂奶hd | 中国免费看的片 | a级黄片毛片 | 久久影片| 亚洲不卡中文字幕无码 | 91日韩精品 | 人妻少妇精品中文字幕av蜜桃 | 一本久久道| 少妇综合 | 荷兰av| 日本顶级大片 | 天天射综合网站 | 沈樵精品国产成av片 | 欧美无人区码suv | 欧美成年人视频在线观看 | 风韵少妇性饥渴推油按摩视频 | 天天色天天操天天射 | 女人被狂躁c到高潮喷水电影 | 亚洲在线激情 | 九九爱精品 | 亚洲涩综合 | 2019自拍偷拍 | a级黄色小说 | 色黄大色黄女片免费中国 | 亚洲AV成人无码久久精品同性 | 国产精品伦一区二区三区 | 在线免费观看亚洲 | 欧美日韩高清一区二区 国产亚洲免费看 | 亚洲欧美色图视频 | 一道本视频在线 | 欧美日韩资源 | 成人在线观看小视频 | 欧美久久久久久 | 超碰97国产精品人人cao | 97国产成人无码精品久久久 | 91网站视频在线观看 | 在线免费精品视频 | 波多野结衣中文字幕在线播放 | 日本一区二区三区视频在线播放 | 国产成人a∨ | 国产人妖一区二区三区 | 四虎精品一区二区 | 精品国产污污免费网站入口 | 亚洲天天 | 国语对白少妇spa私密按摩 | 国产精品视频免费观看 | 中文字幕不卡一区 | 黄色片网站视频 | 国产3页| 色窝窝无码一区二区三区 | 亚洲伦理影院 | 亚洲久久在线 | 伊人久久久| 国产亚洲色婷婷久久 | 日韩高清一二三区 | 亚洲一区二区 | se在线观看| 毛片网在线观看 |