Spark 從零到開發(八)nginx日誌清洗並持久化實戰

本文將介紹如何清洗nginx日誌並存儲到mysql中,附帶azkaban定時任務協做完成對access.log的清洗任務。php

1. 查看nginx日誌格式

cd /var/log/nginxjava

[root@FantJ nginx]# cat access.log
140.205.205.25 - - [19/Aug/2018:03:41:59 +0800] "GET / HTTP/1.1" 404 312 "-" "Scrapy/1.5.0 (+https://scrapy.org)" "-"
185.55.46.110 - - [19/Aug/2018:03:56:16 +0800] "GET / HTTP/1.0" 404 180 "-" "-" "-"
80.107.89.207 - - [19/Aug/2018:03:56:25 +0800] "GET / HTTP/1.1" 404 191 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/601.7.7 (KHTML, like Gecko) Version/9.1.2 Safari/601.7.7" "-"
140.205.205.25 - - [19/Aug/2018:04:13:52 +0800] "HEAD / HTTP/1.1" 404 0 "-" "Go-http-client/1.1" "-"
139.162.88.63 - - [19/Aug/2018:04:31:56 +0800] "GET http://clientapi.ipip.net/echo.php?info=1234567890 HTTP/1.1" 404 207 "-" "Go-http-client/1.1" "-"
......
複製代碼

咱們須要根據這個格式來寫正則表達式,對數據進行過濾。上面是個人日誌格式。mysql

log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';
複製代碼

這是我nginx的日誌配置。(centos版本默認配置)。nginx

2. 正則表達式測試

public static void main(String[] args) {
        Pattern p = Pattern.compile("([^ ]*) ([^ ]*) ([^ ]*) (\\[.*\\]) (\\\".*?\\\") (-|[0-9]*) (-|[0-9]*) (\\\".*?\\\") (\\\".*?\\\")([^ ]*)");
        Matcher m = p.matcher("202.173.10.31 - - [18/Aug/2018:21:16:28 +0800] \"GET / HTTP/1.1\" 404 312 \"http://www.sdf.sdf\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36\" \"-\"\n");
        while (m.find()) {
            System.out.println(m.group(1));
            System.out.println(m.group(2));
            System.out.println(m.group(3));
            System.out.println(m.group(4));
            System.out.println(m.group(5));
            System.out.println(m.group(6));
            System.out.println(m.group(7));
            System.out.println(m.group(8));
            System.out.println(m.group(9));
            System.out.println(m.group(10));
            System.out.println(m.toString());
        }
    }
複製代碼

控制檯輸出:正則表達式

202.173.10.31
-
-
[18/Aug/2018:21:16:28 +0800]
"GET / HTTP/1.1"
404
312
"http://www.xxx.top"
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36"

複製代碼

證實咱們的正則可使用。sql

3. Spark程序實現

上一章我介紹了RDD和DF之間的轉換和臨時表Sql的執行,這章節增長了對RDD數據的持久化操做,我將把RDD數據集存儲到mysql中。數據庫

3.1 建立mysql表
CREATE TABLE `access` (
  `remote_addr` varchar(255) DEFAULT NULL,
  `remote_user` varchar(255) DEFAULT NULL,
  `time_local` varchar(255) DEFAULT NULL,
  `request` varchar(255) DEFAULT NULL,
  `status` varchar(255) DEFAULT NULL,
  `byte_sent` varchar(255) DEFAULT NULL,
  `refere` varchar(255) DEFAULT NULL,
  `http_agent` varchar(255) DEFAULT NULL,
  `http_forward_for` varchar(255) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `acc_addr_count` (
  `remote_addr` varchar(255) DEFAULT NULL,
  `count` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
複製代碼

第一個表是log的所有數據內容,第二個表是對ip數目作一統計。這兩個表都在個人數據庫nginx中。centos

3.2 編寫DBHelper.java
public class DBHelper {

    private String url = "jdbc:mysql://192.168.27.166:3306/nginx";
    private String name = "com.mysql.jdbc.Driver";
    private String user = "root";
    private String password = "xxx";

    //獲取數據庫鏈接
    public Connection connection = null;

    public DBHelper(){
        try {
            Class.forName(name);
            connection = DriverManager.getConnection(url,user,password);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public void close() throws SQLException {
        this.connection.close();
    }
}
複製代碼
3.3 編寫實體類(javaBean)

我將用反射的方法完成對整條log的清洗,用動態元素建立來完成對acc_addr_count表的收集。(不清楚這兩種方法的可先看下上一章)api

NginxParams.javabash

public class NginxParams implements Serializable {
    private String remoteAddr;

    private String remoteUser;

    private String timeLocal;

    private String request;

    private String status;

    private String byteSent;

    private String referer;

    private String httpUserAgent;

    private String httpForwardedFor;

setter and  getter ...methods...

    @Override
    public String toString() {
        return "NginxParams{" +
                "remoteAddr='" + remoteAddr + '\'' + ", remoteUser='" + remoteUser + '\'' + ", timeLocal='" + timeLocal + '\'' +
                ", request='" + request + '\'' + ", status='" + status + '\'' + ", byteSent='" + byteSent + '\'' +
                ", referer='" + referer + '\'' + ", httpUserAgent='" + httpUserAgent + '\'' + ", httpForwardedFor='" + httpForwardedFor + '\'' +
                '}';
    }
}
複製代碼
3.4 編寫清洗代碼

NginxLogCollect.java

public class NginxLogCollect implements Serializable {

    static DBHelper dbHelper = null;
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("NginxLogCollect").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("ERROR");

        SQLContext sqlContext = new SQLContext(sc);

        JavaRDD<String> lines = sc.textFile("C:\\Users\\84407\\Desktop\\nginx.log");
        JavaRDD<NginxParams> nginxs = lines.map((Function<String, NginxParams>) line -> {
            Pattern p = Pattern.compile("([^ ]*) ([^ ]*) ([^ ]*) (\\[.*\\]) (\\\".*?\\\") (-|[0-9]*) (-|[0-9]*) (\\\".*?\\\") (\\\".*?\\\")([^ ]*)");
            Matcher m = p.matcher(line);
            NginxParams nginxParams = new NginxParams();
            while (m.find()){
                nginxParams.setRemoteAddr(m.group(1));
                nginxParams.setRemoteUser(m.group(2));
                nginxParams.setTimeLocal(m.group(4));
                nginxParams.setRequest(m.group(5));
                nginxParams.setStatus(m.group(6));
                nginxParams.setByteSent(m.group(7));
                nginxParams.setReferer(m.group(8));
                nginxParams.setHttpUserAgent(m.group(9));
                nginxParams.setHttpForwardedFor(m.group(10));
            }
            return nginxParams;
        });
        /**
         * 使用反射方式,將RDD轉換爲DataFrame
         */
        DataFrame nginxDF = sqlContext.createDataFrame(nginxs,NginxParams.class);
        /**
         * 拿到一個DataFrame以後,就能夠將其註冊爲一個臨時表,而後針對其中的數據執行sql語句
         */
        nginxDF.registerTempTable("nginxs");

        DataFrame allDF = sqlContext.sql("select * from nginxs");
        //統計ip訪問數
        DataFrame addrCount = sqlContext.sql("select remoteAddr,COUNT(remoteAddr)as count from nginxs GROUP BY remoteAddr ORDER BY count DESC");
        /**
         * 將查詢出來的DataFrame ,再次轉換爲RDD
         */
        JavaRDD<Row> allRDD = allDF.javaRDD();
        JavaRDD<Row> addrCountRDD = addrCount.javaRDD();
        /**
         * 將RDD中的數據進行映射,映射爲NginxParams
         */
        JavaRDD<NginxParams> map = allRDD.map((Function<Row, NginxParams>) row -> {
            NginxParams nginxParams = new NginxParams();
            nginxParams.setRemoteAddr(row.getString(4));
            nginxParams.setRemoteUser(row.getString(5));
            nginxParams.setTimeLocal(row.getString(8));
            nginxParams.setRequest(row.getString(6));
            nginxParams.setStatus(row.getString(7));
            nginxParams.setByteSent(row.getString(0));
            nginxParams.setReferer(row.getString(2));
            nginxParams.setHttpUserAgent(row.getString(3));
            nginxParams.setHttpForwardedFor(row.getString(1));
            return nginxParams;
        });

        /**
         * 將數據collect回來,而後打印
         */

//        List<NginxParams> nginxParamsList = map.collect();
//        for (NginxParams np:nginxParamsList){
//            System.out.println(np);
//        }

        dbHelper = new DBHelper();
        String sql = "INSERT INTO `access` VALUES (?,?,?,?,?,?,?,?,?)";
        map.foreach((VoidFunction<NginxParams>) nginxParams -> {
            PreparedStatement pt = dbHelper.connection.prepareStatement(sql);
            pt.setString(1,nginxParams.getRemoteAddr());
            pt.setString(2,nginxParams.getRemoteUser());
            pt.setString(3,nginxParams.getTimeLocal());
            pt.setString(4,nginxParams.getRequest());
            pt.setString(5,nginxParams.getStatus());
            pt.setString(6,nginxParams.getByteSent());
            pt.setString(7,nginxParams.getReferer());
            pt.setString(8,nginxParams.getHttpUserAgent());
            pt.setString(9,nginxParams.getHttpForwardedFor());
            pt.executeUpdate();
        });

        String addrCountSql = "insert into `acc_addr_count` values(?,?)";
        addrCountRDD.foreach((VoidFunction<Row>) row -> {
            System.out.println("row.getString(0)"+row.getString(0));
            System.out.println("row.getString(1)"+row.getLong(1));
            PreparedStatement pt = dbHelper.connection.prepareStatement(addrCountSql);
            pt.setString(1,row.getString(0));
            pt.setString(2, String.valueOf(row.getLong(1)));
            pt.executeUpdate();
        });
    }
}
複製代碼

4. 執行完後查看數據庫:

5. 總結

5.1 集羣中執行

上面例子執行在本地,若是打包運行在服務器,須要執行腳本。

/home/fantj/spark/bin/spark-submit \
--class com.fantj.nginxlog.NginxLogCollect\
--num-executors 1 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 3 \
--files /home/fantj/hive/conf/hive-site.xml \
--driver-class-path /home/fantj/hive/lib/mysql-connector-java-5.1.17.jar \
/home/fantj/nginxlog.jar \
複製代碼

並修改setMaster()sc.textFile()的參數。

5.2 定時任務實現

咱們能夠將執行腳本打包寫一個azkaban的定時job,而後作天天的數據統計。固然,這裏面還有不少細節,好比nginx日誌按天分割等。可是都是一些小問題。(不熟悉azkaban的:Azkaban 簡單入門

相關文章
相關標籤/搜索