日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

spark 持久化 mysql_Spark 从零到开发(八)nginx日志清洗并持久化实战

發(fā)布時(shí)間:2025/4/5 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark 持久化 mysql_Spark 从零到开发(八)nginx日志清洗并持久化实战 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

本文將介紹如何清洗nginx日志并存儲到mysql中,附帶azkaban定時(shí)任務(wù)協(xié)作完成對access.log的清洗任務(wù)。

1. 查看nginx日志格式

cd /var/log/nginx

[root@FantJ nginx]# cat access.log

140.205.205.25 - - [19/Aug/2018:03:41:59 +0800] "GET / HTTP/1.1" 404 312 "-" "Scrapy/1.5.0 (+https://scrapy.org)" "-"

185.55.46.110 - - [19/Aug/2018:03:56:16 +0800] "GET / HTTP/1.0" 404 180 "-" "-" "-"

80.107.89.207 - - [19/Aug/2018:03:56:25 +0800] "GET / HTTP/1.1" 404 191 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/601.7.7 (KHTML, like Gecko) Version/9.1.2 Safari/601.7.7" "-"

140.205.205.25 - - [19/Aug/2018:04:13:52 +0800] "HEAD / HTTP/1.1" 404 0 "-" "Go-http-client/1.1" "-"

139.162.88.63 - - [19/Aug/2018:04:31:56 +0800] "GET http://clientapi.ipip.net/echo.php?info=1234567890 HTTP/1.1" 404 207 "-" "Go-http-client/1.1" "-"

......

我們需要根據(jù)這個(gè)格式來寫正則表達(dá)式,對數(shù)據(jù)進(jìn)行過濾。上面是我的日志格式。

log_format main '$remote_addr - $remote_user [$time_local] "$request" '

'$status $body_bytes_sent "$http_referer" '

'"$http_user_agent" "$http_x_forwarded_for"';

這是我nginx的日志配置。(centos版本默認(rèn)配置)。

2. 正則表達(dá)式測試

public static void main(String[] args) {

Pattern p = Pattern.compile("([^ ]*) ([^ ]*) ([^ ]*) (\\[.*\\]) (\\\".*?\\\") (-|[0-9]*) (-|[0-9]*) (\\\".*?\\\") (\\\".*?\\\")([^ ]*)");

Matcher m = p.matcher("202.173.10.31 - - [18/Aug/2018:21:16:28 +0800] \"GET / HTTP/1.1\" 404 312 \"http://www.sdf.sdf\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36\" \"-\"\n");

while (m.find()) {

System.out.println(m.group(1));

System.out.println(m.group(2));

System.out.println(m.group(3));

System.out.println(m.group(4));

System.out.println(m.group(5));

System.out.println(m.group(6));

System.out.println(m.group(7));

System.out.println(m.group(8));

System.out.println(m.group(9));

System.out.println(m.group(10));

System.out.println(m.toString());

}

}

控制臺輸出:

202.173.10.31

-

-

[18/Aug/2018:21:16:28 +0800]

"GET / HTTP/1.1"

404

312

"http://www.xxx.top"

"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36"

證明我們的正則可以使用。

3. Spark程序?qū)崿F(xiàn)

上一章我介紹了RDD和DF之間的轉(zhuǎn)換和臨時(shí)表Sql的執(zhí)行,這章節(jié)增加了對RDD數(shù)據(jù)的持久化操作,我將把RDD數(shù)據(jù)集存儲到mysql中。

3.1 創(chuàng)建mysql表

CREATE TABLE `access` (

`remote_addr` varchar(255) DEFAULT NULL,

`remote_user` varchar(255) DEFAULT NULL,

`time_local` varchar(255) DEFAULT NULL,

`request` varchar(255) DEFAULT NULL,

`status` varchar(255) DEFAULT NULL,

`byte_sent` varchar(255) DEFAULT NULL,

`refere` varchar(255) DEFAULT NULL,

`http_agent` varchar(255) DEFAULT NULL,

`http_forward_for` varchar(255) DEFAULT NULL

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `acc_addr_count` (

`remote_addr` varchar(255) DEFAULT NULL,

`count` int(11) DEFAULT NULL

) ENGINE=InnoDB DEFAULT CHARSET=latin1;

第一個(gè)表是log的全部數(shù)據(jù)內(nèi)容,第二個(gè)表是對ip數(shù)目做一統(tǒng)計(jì)。這兩個(gè)表都在我的數(shù)據(jù)庫nginx中。

3.2 編寫DBHelper.java

public class DBHelper {

private String url = "jdbc:mysql://192.168.27.166:3306/nginx";

private String name = "com.mysql.jdbc.Driver";

private String user = "root";

private String password = "xxx";

//獲取數(shù)據(jù)庫連接

public Connection connection = null;

public DBHelper(){

try {

Class.forName(name);

connection = DriverManager.getConnection(url,user,password);

} catch (Exception e) {

e.printStackTrace();

}

}

public void close() throws SQLException {

this.connection.close();

}

}

3.3 編寫實(shí)體類(javaBean)

我將用反射的方法完成對整條log的清洗,用動態(tài)元素創(chuàng)建來完成對acc_addr_count表的收集。(不清楚這兩種方法的可先看下上一章)

NginxParams.java

public class NginxParams implements Serializable {

private String remoteAddr;

private String remoteUser;

private String timeLocal;

private String request;

private String status;

private String byteSent;

private String referer;

private String httpUserAgent;

private String httpForwardedFor;

setter and getter ...methods...

@Override

public String toString() {

return "NginxParams{" +

"remoteAddr='" + remoteAddr + '\'' +

", remoteUser='" + remoteUser + '\'' +

", timeLocal='" + timeLocal + '\'' +

", request='" + request + '\'' +

", status='" + status + '\'' +

", byteSent='" + byteSent + '\'' +

", referer='" + referer + '\'' +

", httpUserAgent='" + httpUserAgent + '\'' +

", httpForwardedFor='" + httpForwardedFor + '\'' +

'}';

}

}

3.4 編寫清洗代碼

NginxLogCollect.java

public class NginxLogCollect implements Serializable {

static DBHelper dbHelper = null;

public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName("NginxLogCollect").setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

sc.setLogLevel("ERROR");

SQLContext sqlContext = new SQLContext(sc);

JavaRDD lines = sc.textFile("C:\\Users\\84407\\Desktop\\nginx.log");

JavaRDD nginxs = lines.map((Function) line -> {

Pattern p = Pattern.compile("([^ ]*) ([^ ]*) ([^ ]*) (\\[.*\\]) (\\\".*?\\\") (-|[0-9]*) (-|[0-9]*) (\\\".*?\\\") (\\\".*?\\\")([^ ]*)");

Matcher m = p.matcher(line);

NginxParams nginxParams = new NginxParams();

while (m.find()){

nginxParams.setRemoteAddr(m.group(1));

nginxParams.setRemoteUser(m.group(2));

nginxParams.setTimeLocal(m.group(4));

nginxParams.setRequest(m.group(5));

nginxParams.setStatus(m.group(6));

nginxParams.setByteSent(m.group(7));

nginxParams.setReferer(m.group(8));

nginxParams.setHttpUserAgent(m.group(9));

nginxParams.setHttpForwardedFor(m.group(10));

}

return nginxParams;

});

/**

* 使用反射方式,將RDD轉(zhuǎn)換為DataFrame

*/

DataFrame nginxDF = sqlContext.createDataFrame(nginxs,NginxParams.class);

/**

* 拿到一個(gè)DataFrame之后,就可以將其注冊為一個(gè)臨時(shí)表,然后針對其中的數(shù)據(jù)執(zhí)行sql語句

*/

nginxDF.registerTempTable("nginxs");

DataFrame allDF = sqlContext.sql("select * from nginxs");

//統(tǒng)計(jì)ip訪問數(shù)

DataFrame addrCount = sqlContext.sql("select remoteAddr,COUNT(remoteAddr)as count from nginxs GROUP BY remoteAddr ORDER BY count DESC");

/**

* 將查詢出來的DataFrame ,再次轉(zhuǎn)換為RDD

*/

JavaRDD allRDD = allDF.javaRDD();

JavaRDD addrCountRDD = addrCount.javaRDD();

/**

* 將RDD中的數(shù)據(jù)進(jìn)行映射,映射為NginxParams

*/

JavaRDD map = allRDD.map((Function) row -> {

NginxParams nginxParams = new NginxParams();

nginxParams.setRemoteAddr(row.getString(4));

nginxParams.setRemoteUser(row.getString(5));

nginxParams.setTimeLocal(row.getString(8));

nginxParams.setRequest(row.getString(6));

nginxParams.setStatus(row.getString(7));

nginxParams.setByteSent(row.getString(0));

nginxParams.setReferer(row.getString(2));

nginxParams.setHttpUserAgent(row.getString(3));

nginxParams.setHttpForwardedFor(row.getString(1));

return nginxParams;

});

/**

* 將數(shù)據(jù)collect回來,然后打印

*/

// List nginxParamsList = map.collect();

// for (NginxParams np:nginxParamsList){

// System.out.println(np);

// }

dbHelper = new DBHelper();

String sql = "INSERT INTO `access` VALUES (?,?,?,?,?,?,?,?,?)";

map.foreach((VoidFunction) nginxParams -> {

PreparedStatement pt = dbHelper.connection.prepareStatement(sql);

pt.setString(1,nginxParams.getRemoteAddr());

pt.setString(2,nginxParams.getRemoteUser());

pt.setString(3,nginxParams.getTimeLocal());

pt.setString(4,nginxParams.getRequest());

pt.setString(5,nginxParams.getStatus());

pt.setString(6,nginxParams.getByteSent());

pt.setString(7,nginxParams.getReferer());

pt.setString(8,nginxParams.getHttpUserAgent());

pt.setString(9,nginxParams.getHttpForwardedFor());

pt.executeUpdate();

});

String addrCountSql = "insert into `acc_addr_count` values(?,?)";

addrCountRDD.foreach((VoidFunction) row -> {

System.out.println("row.getString(0)"+row.getString(0));

System.out.println("row.getString(1)"+row.getLong(1));

PreparedStatement pt = dbHelper.connection.prepareStatement(addrCountSql);

pt.setString(1,row.getString(0));

pt.setString(2, String.valueOf(row.getLong(1)));

pt.executeUpdate();

});

}

}

4. 執(zhí)行完后查看數(shù)據(jù)庫:

5. 總結(jié)

5.1 集群中執(zhí)行

上面例子執(zhí)行在本地,如果打包運(yùn)行在服務(wù)器,需要執(zhí)行腳本。

/home/fantj/spark/bin/spark-submit \

--class com.fantj.nginxlog.NginxLogCollect\

--num-executors 1 \

--driver-memory 100m \

--executor-memory 100m \

--executor-cores 3 \

--files /home/fantj/hive/conf/hive-site.xml \

--driver-class-path /home/fantj/hive/lib/mysql-connector-java-5.1.17.jar \

/home/fantj/nginxlog.jar \

并修改setMaster()和sc.textFile()的參數(shù)。

5.2 定時(shí)任務(wù)實(shí)現(xiàn)

我們可以將執(zhí)行腳本打包寫一個(gè)azkaban的定時(shí)job,然后做每天的數(shù)據(jù)統(tǒng)計(jì)。當(dāng)然,這里面還有很多細(xì)節(jié),比如nginx日志按天分割等。但是都是一些小問題。(不熟悉azkaban的:Azkaban 簡單入門)

總結(jié)

以上是生活随笔為你收集整理的spark 持久化 mysql_Spark 从零到开发(八)nginx日志清洗并持久化实战的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。