海量日誌中統計次數最多的100個IP

因爲標題長度限制,原題是這樣:某系統QPS100萬,每十分鐘統計一下請求次數最多的100個IP。ip請求寫到日誌的話,其實就是超大文件中統計top k問題。10分鐘6億條記錄,大約是10G級別,因此對於通常單機處理來說不能一次性加載到內存計算。因此分治算法是處理這類問題的基本思想。java

思路

前面說了分治思想。那麼具體如何分解問題呢。算法

思路就是把大文件分割成多個能夠內存處理的小文件,對每一個小文件統計top k問題,最後再對全部統計結果合併獲得最終的top k。apache

注意,這裏的分割並非隨意分割的,那樣最終結果顯然是不對的,必須保證相同的ip記錄都分割到同一個文件。那麼hash算法最合適不過了,能夠把相同的ip哈希到同一文件。api

關於top k問題,效率高的解法是使用構造最小堆或者藉助快速排序的思想,複雜度爲O(nlogk)。這裏更適合用最小堆,具體來講,就是先利用前k個數據構建一個固定大小k的最小堆,對以後的數據,小於堆頂不作處理,大於則替換堆頂並調整。這樣,對每一個文件順序處理完以後就獲得最終結果,而不須要保留每一個文件的top k再歸併。app

實現

博主偷懶,藉助TreeSet代替最小堆來維護top k數據,TreeSet的話底層是藉助紅黑樹排序,比最小堆複雜些,實際上對每一個小文件用紅黑樹全排序再截取前k個。複雜度O(nlogm),這裏m是每一個小文件中的數量, m>>k。再有時間的話再用最小堆優化一下,複雜度應爲O(nlogk)。dom

ps:已實現最小堆版本,見實現2,並作了對比實驗maven

定時任務使用quartz實現。ide

下面是代碼。工具

IP類,封裝ip計數,使用TreeSet存放須實現comparable接口。注意這裏重寫compare方法不要return 0,不然會被TreeSet視爲相同對象而放不進去。這個能夠看一下TreeSet的實現,它實際上內部仍是一個TreeMap,只是把對象做爲key,而value沒有使用。add添加元素時,會調用TreeMap的put方法,put內部又會調用compare方法,若是compare返回結果爲0,只是從新setValue,對TreeSet至關於什麼也沒作。優化

package com.hellolvs;

import org.apache.commons.lang3.builder.ToStringBuilder;

/**
 * IP計數POJO
 *
 * @author lvs
 * @date 2017/12/08.
 */
public class IP implements Comparable<IP> {

    private String ip;
    private int count;

    public IP() {
    }

    public IP(String ip, int count) {
        this.ip = ip;
        this.count = count;
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    @Override
    public int compareTo(IP o) {
        return o.count < this.count ? -1 : 1;
    }

    @Override
    public String toString() {
        return ToStringBuilder.reflectionToString(this);
    }
}

IPCountJob類,定時統計日誌文件中top k個ip。

注意其中的分割文件,這裏的分割須要對文件邊讀邊寫,不能一次性讀入內存再分割。guava io的readLines是直接裝入內存的,因此不能用。能夠使用java原生的io類,或使用commons io的LineIterator更優雅一些。

package com.hellolvs;

import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.base.StandardSystemProperty;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.common.io.LineProcessor;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 定時Job,每十分鐘統計請求次數前k的ip
 *
 * @author lvs
 * @date 2017/12/08.
 */
public class IPCountJob implements Job {

    private static final Logger LOG = LoggerFactory.getLogger(IPCountJob.class);

    private static final String LINE_SEPARATOR = StandardSystemProperty.LINE_SEPARATOR.value();
    private static final Charset UTF_8 = Charsets.UTF_8;

    private static final String INPUT_PATH = "/home/lvs/logs/ip.log";
    private static final String OUTPUT_PATH = "/home/lvs/logs/split/";

    private static final int SPLIT_NUM = 1024;
    private static final int TOP_K = 100;

    /**
     * 利用TreeSet存儲請求次數前k的IP
     */
    private TreeSet<IP> resultSet = Sets.newTreeSet();

    /**
     * 分割文件用,保存每一個文件的寫入流對象
     */
    private final Map<Integer, BufferedWriter> bufferMap = Maps.newHashMapWithExpectedSize(SPLIT_NUM);

    /**
     * 定時任務,每十分鐘統計請求次數前k的IP
     */
    @Override
    public void execute(JobExecutionContext jobExecutionContext) {
        // 捕獲異常,防止定時任務中斷
        try {
            execute();
        } catch (Exception e) {
            LOG.error("定時任務出錯:{}", e.getMessage(), e);
        }
    }

    /**
     * 統計大文件中請求次數前k的IP
     * 
     * @throws IOException I/O error
     */
    public void execute() throws IOException {
        // 這裏應該每10分鐘獲取當前輪替日誌文件路徑,此處用常量路徑模擬
        File ipLogFile = new File(INPUT_PATH);

        splitLog(ipLogFile, SPLIT_NUM);

        File logSplits = new File(OUTPUT_PATH);
        for (File logSplit : logSplits.listFiles()) {
            countTopK(logSplit, TOP_K);
        }

        LOG.info("結果集:{}", resultSet.size());
        for (IP ip : resultSet) {
            LOG.info("{}", ip);
        }
    }

    /**
     * 生成模擬日誌文件
     * 
     * @param logNum 生成日誌條數
     * @throws IOException I/O error
     */
    public static void generateLog(long logNum) throws IOException {

        /* 建立文件 */
        File log = new File(INPUT_PATH);
        File parentDir = log.getParentFile();
        if (!parentDir.exists()) {
            parentDir.mkdirs();
        }
        log.createNewFile();

        /* 生成隨機ip寫入文件 */
        SecureRandom random = new SecureRandom();
        try (BufferedWriter bw = new BufferedWriter(new FileWriter(log))) {
            for (int i = 0; i < logNum; i++) {
                StringBuilder sb = new StringBuilder();
                sb.append("192.").append(random.nextInt(255)).append(".").append(random.nextInt(255)).append(".")
                        .append(random.nextInt(255)).append(LINE_SEPARATOR);
                bw.write(sb.toString());
            }
            bw.flush();
        }
    }

    /**
     * 分割日誌文件
     *
     * @param logFile 待分割文件
     * @param fileNum 分割文件數量
     * @throws IOException I/O error
     */
    private void splitLog(File logFile, int fileNum) throws IOException {

        /* 爲每一個分割文件建立寫入流對象 */
        for (int i = 0; i < fileNum; i++) {
            File file = new File(OUTPUT_PATH + i);
            File parentDir = file.getParentFile();
            if (!parentDir.exists()) {
                parentDir.mkdirs();
            }
            bufferMap.put(i, new BufferedWriter(new FileWriter(file)));
        }

        /* 根據ip的hashcode將數據分割到不一樣文件中 */
        LineIterator it = null;
        try {
            it = FileUtils.lineIterator(logFile, "UTF-8");
            while (it.hasNext()) {
                String ip = it.nextLine();
                int hashCode = Objects.hashCode(ip);
                hashCode = hashCode < 0 ? -hashCode : hashCode;
                BufferedWriter writer = bufferMap.get(hashCode % fileNum);
                writer.write(ip + LINE_SEPARATOR);
            }
        } finally {
            /* 釋放資源 */
            LineIterator.closeQuietly(it);
            for (Map.Entry<Integer, BufferedWriter> buffer : bufferMap.entrySet()) {
                BufferedWriter writer = buffer.getValue();
                writer.flush();
                writer.close();
            }
            bufferMap.clear();
        }
    }

    /**
     * 統計請求次數前k的IP
     *
     * @param logSplit 當前分割文件
     * @param k top k
     * @throws IOException I/O error
     */
    private void countTopK(File logSplit, int k) throws IOException {

        /* 讀取文件對ip計數 */
        HashMap<String, AtomicInteger> ipCountMap = Files.readLines(logSplit, UTF_8,
                new LineProcessor<HashMap<String, AtomicInteger>>() {
                    private HashMap<String, AtomicInteger> ipCountMap = Maps.newHashMap();

                    @Override
                    public boolean processLine(String line) throws IOException {
                        AtomicInteger ipCount = ipCountMap.get(line.trim());
                        if (ipCount != null) {
                            ipCount.getAndIncrement();
                        } else {
                            ipCountMap.put(line.trim(), new AtomicInteger(1));
                        }
                        return true;
                    }

                    @Override
                    public HashMap<String, AtomicInteger> getResult() {
                        return ipCountMap;
                    }
                });

        /* 統計結果添加到TreeSet */
        for (Map.Entry<String, AtomicInteger> entry : ipCountMap.entrySet()) {
            resultSet.add(new IP(entry.getKey(), entry.getValue().get()));
        }

        /* TreeSet只保留前k個ip */
        TreeSet<IP> temp = Sets.newTreeSet();
        int i = 0;
        for (IP o : resultSet) {
            temp.add(o);
            i++;
            if (i >= k) {
                break;
            }
        }
        resultSet = temp;
    }

    /**
     * 返回統計結果
     *
     * @return 結果集合
     */
    public TreeSet<IP> getResult() {
        return resultSet;
    }
}

Main,定時任務啓動

package com.hellolvs;

import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;

/**
 * 定時任務啓動器
 * 
 * @author lvs
 * @date 2017/12/11.
 */
public class Main {
    public static void main(String[] args) throws Exception {
        // 生成模擬日誌文件
        IPCountJob.generateLog(600000000);

        JobDetail job = JobBuilder.newJob(IPCountJob.class)
                .withIdentity("ipCountJob", "group1").build();

        Trigger trigger = TriggerBuilder
                .newTrigger()
                .withIdentity("ipCountTrigger", "group1")
                .withSchedule(
                        SimpleScheduleBuilder.simpleSchedule()
                                .withIntervalInMinutes(10).repeatForever())
                .build();

        Scheduler scheduler = new StdSchedulerFactory().getScheduler();
        scheduler.start();
        scheduler.scheduleJob(job, trigger);
    }
}

實現2

IP類

package com.hellolvs;

import org.apache.commons.lang3.builder.ToStringBuilder;

/**
 * IP計數POJO
 *
 * @author lvs
 * @date 2017/12/08.
 */
public class IP implements Comparable<IP> {

    private String ip;
    private int count;

    public IP() {
    }

    public IP(String ip, int count) {
        this.ip = ip;
        this.count = count;
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    @Override
    public int compareTo(IP o) {
        return Integer.compare(this.count, o.count);
    }

    @Override
    public String toString() {
        return ToStringBuilder.reflectionToString(this);
    }
}

IPCountJob類,最小堆版本統計top k

package com.hellolvs;

import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.base.StandardSystemProperty;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.common.io.LineProcessor;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 定時Job,每十分鐘統計請求次數前k的ip
 *
 * @author lvs
 * @date 2017/12/08.
 */
public class IPCountJob implements Job {

    private static final Logger LOG = LoggerFactory.getLogger(IPCountJob.class);

    private static final String LINE_SEPARATOR = StandardSystemProperty.LINE_SEPARATOR.value();
    private static final Charset UTF_8 = Charsets.UTF_8;

    private static final String INPUT_PATH = "/home/lvs/logs/ip.log";
    private static final String OUTPUT_PATH = "/home/lvs/logs/split/";

    private static final int SPLIT_NUM = 1024;
    private static final int TOP_K = 100;

    /**
     * 利用最小堆結構存儲請求次數前k的IP
     */
    private List<IP> result = Lists.newArrayListWithExpectedSize(TOP_K);

    /**
     * 分割文件用,保存每一個文件的寫入流對象
     */
    private final Map<Integer, BufferedWriter> bufferMap = Maps.newHashMapWithExpectedSize(SPLIT_NUM);

    /**
     * 定時任務,每十分鐘統計請求次數前k的IP
     */
    @Override
    public void execute(JobExecutionContext jobExecutionContext) {
        // 捕獲異常,防止定時任務中斷
        try {
            execute();
        } catch (Exception e) {
            LOG.error("定時任務出錯:{}", e.getMessage(), e);
        }
    }

    /**
     * 統計大文件中請求次數前k的IP
     * 
     * @throws IOException I/O error
     */
    public void execute() throws IOException {
        // 這裏應該每10分鐘獲取當前輪替日誌文件路徑,此處用常量路徑模擬
        File ipLogFile = new File(INPUT_PATH);

        splitLog(ipLogFile, SPLIT_NUM);
        File logSplits = new File(OUTPUT_PATH);
        for (File logSplit : logSplits.listFiles()) {
            countTopK(logSplit, TOP_K);
        }

        MinHeap.sort(result);
        LOG.info("結果集:{}", result.size());
        for (int i = result.size() - 1; i >= 0; i--) {
            LOG.info("{}", result.get(i));
        }
    }

    /**
     * 生成模擬日誌文件
     * 
     * @param logNum 生成日誌條數
     * @throws IOException I/O error
     */
    public static void generateLog(long logNum) throws IOException {

        /* 建立文件 */
        File log = new File(INPUT_PATH);
        File parentDir = log.getParentFile();
        if (!parentDir.exists()) {
            parentDir.mkdirs();
        }
        log.createNewFile();

        /* 生成隨機ip寫入文件 */
        SecureRandom random = new SecureRandom();
        try (BufferedWriter bw = new BufferedWriter(new FileWriter(log))) {
            for (int i = 0; i < logNum; i++) {
                StringBuilder sb = new StringBuilder();
                sb.append("192.").append(random.nextInt(255)).append(".").append(random.nextInt(255)).append(".")
                        .append(random.nextInt(255)).append(LINE_SEPARATOR);
                bw.write(sb.toString());
            }
            bw.flush();
        }
    }

    /**
     * 分割日誌文件
     *
     * @param logFile 待分割文件
     * @param fileNum 分割文件數量
     * @throws IOException I/O error
     */
    private void splitLog(File logFile, int fileNum) throws IOException {

        /* 爲每一個分割文件建立寫入流對象 */
        for (int i = 0; i < fileNum; i++) {
            File file = new File(OUTPUT_PATH + i);
            File parentDir = file.getParentFile();
            if (!parentDir.exists()) {
                parentDir.mkdirs();
            }
            bufferMap.put(i, new BufferedWriter(new FileWriter(file)));
        }

        /* 根據ip的hashcode將數據分割到不一樣文件中 */
        LineIterator it = null;
        try {
            it = FileUtils.lineIterator(logFile, "UTF-8");
            while (it.hasNext()) {
                String ip = it.nextLine();
                int hashCode = Objects.hashCode(ip);
                hashCode = hashCode < 0 ? -hashCode : hashCode;
                BufferedWriter writer = bufferMap.get(hashCode % fileNum);
                writer.write(ip + LINE_SEPARATOR);
            }
        } finally {
            /* 釋放資源 */
            LineIterator.closeQuietly(it);
            for (Map.Entry<Integer, BufferedWriter> buffer : bufferMap.entrySet()) {
                BufferedWriter writer = buffer.getValue();
                writer.flush();
                writer.close();
            }
            bufferMap.clear();
        }
    }

    /**
     * 統計請求次數前k的IP
     *
     * @param logSplit 當前分割文件
     * @param k top k
     * @throws IOException I/O error
     */
    private void countTopK(File logSplit, int k) throws IOException {

        /* 讀取文件對ip計數 */
        HashMap<String, AtomicInteger> ipCountMap = Files.readLines(logSplit, UTF_8,
                new LineProcessor<HashMap<String, AtomicInteger>>() {
                    private HashMap<String, AtomicInteger> ipCountMap = Maps.newHashMap();

                    @Override
                    public boolean processLine(String line) throws IOException {
                        AtomicInteger ipCount = ipCountMap.get(line.trim());
                        if (ipCount != null) {
                            ipCount.getAndIncrement();
                        } else {
                            ipCountMap.put(line.trim(), new AtomicInteger(1));
                        }
                        return true;
                    }

                    @Override
                    public HashMap<String, AtomicInteger> getResult() {
                        return ipCountMap;
                    }
                });

        /* 前k條數據用來構建初始最小堆,以後的數據比堆頂大則替換堆頂並調堆 */
        for (Map.Entry<String, AtomicInteger> entry : ipCountMap.entrySet()) {
            IP ip = new IP(entry.getKey(), entry.getValue().get());
            if (result.size() != k) {
                result.add(ip);
                if (result.size() == k) {
                    MinHeap.initMinHeap(result);
                }
            } else {
                if (ip.compareTo(result.get(0)) > 0) {
                    result.set(0, ip);
                    MinHeap.adjust(result, 0, k);
                }
            }
        }
    }

    /**
     * 返回統計結果
     *
     * @return 結果集合
     */
    public List<IP> getResult() {
        return result;
    }
}

MinHeap類,最小堆工具

package com.hellolvs;

import java.util.List;

/**
 * 最小堆
 *
 * @author lvs
 * @date 2017-12-12
 */
public class MinHeap {

    /**
     * 對最小堆排序
     * 
     * @param list 已經爲最小堆結構的列表
     * @param <T> 元素須實現Comparable接口
     */
    public static <T extends Comparable<? super T>> void sort(List<T> list) {
        for (int i = list.size() - 1; i > 0; i--) {
            swap(list, 0, i);
            adjust(list, 0, i);
        }
    }

    /**
     * 初始化最小堆
     *
     * @param list 待初始化爲最小堆的列表
     * @param <T> 元素須實現Comparable接口
     */
    public static <T extends Comparable<? super T>> void initMinHeap(List<T> list) {
        /* 從最後一個非葉節點開始至根節點依次調整 */
        for (int i = list.size() / 2 - 1; i >= 0; i--) {
            adjust(list, i, list.size());
        }
    }

    /**
     * 調堆
     *
     * @param list 當前堆
     * @param <T> 元素須實現Comparable接口
     * @param cur 待調整位置
     * @param length 當前堆大小
     */
    public static <T extends Comparable<? super T>> void adjust(List<T> list, int cur, int length) {
        T tmp = list.get(cur);
        for (int i = 2 * cur + 1; i < length; i = 2 * i + 1) {
            if (i + 1 < length && list.get(i).compareTo(list.get(i + 1)) > 0) {
                i++; // i指向孩子節點中最小的節點
            }
            if (tmp.compareTo(list.get(i)) > 0) {
                list.set(cur, list.get(i)); // 最小孩子節點調整到其父節點
                cur = i; // 當前節點置爲最小孩子節點,繼續調整
            } else {
                break; // 沒有調整時退出循環
            }
        }
        list.set(cur, tmp); // 被調整節點最終存放位置
    }

    /**
     * 交換List中的元素
     * 
     * @param list 待交換列表
     * @param i 第一個元素位置
     * @param j 第二個元素位置
     */
    private static <T extends Comparable<? super T>> void swap(List<T> list, int i, int j) {
        T tmp = list.get(i);
        list.set(i, list.get(j));
        list.set(j, tmp);
    }
}

Main類,無改動

package com.hellolvs;

import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;

/**
 * 定時任務啓動器
 * 
 * @author lvs
 * @date 2017/12/11.
 */
public class Main {
    public static void main(String[] args) throws Exception {
        // 生成模擬日誌文件
        IPCountJob.generateLog(600000000);

        JobDetail job = JobBuilder.newJob(IPCountJob.class)
                .withIdentity("ipCountJob", "group1").build();

        Trigger trigger = TriggerBuilder
                .newTrigger()
                .withIdentity("ipCountTrigger", "group1")
                .withSchedule(
                        SimpleScheduleBuilder.simpleSchedule()
                                .withIntervalInMinutes(10).repeatForever())
                .build();

        Scheduler scheduler = new StdSchedulerFactory().getScheduler();
        scheduler.start();
        scheduler.scheduleJob(job, trigger);
    }
}

附一下pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hellolvs</groupId>
    <artifactId>ipCount</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>

    <properties>
        <guava.version>20.0</guava.version>
        <commons-lang3.version>3.1</commons-lang3.version>
        <commons-io.version>2.4</commons-io.version>
        <joda-time.version>2.6</joda-time.version>
        <org.quartz-scheduler.version>2.1.7</org.quartz-scheduler.version>
        <org.slf4j.version>1.7.5</org.slf4j.version>
        <logback.version>1.0.13</logback.version>
        <junit.version>4.10</junit.version>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencyManagement>
        <dependencies>
            <!-- guava -->
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>${guava.version}</version>
            </dependency>

            <!-- commons lang3-->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>${commons-lang3.version}</version>
            </dependency>

            <!-- commons io -->
            <dependency>
                <groupId>commons-io</groupId>
                <artifactId>commons-io</artifactId>
                <version>${commons-io.version}</version>
            </dependency>

            <!-- joda-time -->
            <dependency>
                <groupId>joda-time</groupId>
                <artifactId>joda-time</artifactId>
                <version>${joda-time.version}</version>
            </dependency>

            <!-- quartz -->
            <dependency>
                <groupId>org.quartz-scheduler</groupId>
                <artifactId>quartz</artifactId>
                <version>${org.quartz-scheduler.version}</version>
            </dependency>

            <!-- slf4j -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>${org.slf4j.version}</version>
            </dependency>

            <!-- logback -->
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
                <version>${logback.version}</version>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-core</artifactId>
                <version>${logback.version}</version>
                <scope>runtime</scope>
            </dependency>

            <!-- junit -->
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit-dep</artifactId>
                <version>${junit.version}</version>
                <scope>test</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <!-- guava -->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
        </dependency>

        <!-- commons lang3-->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

        <!-- commons io -->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
        </dependency>

        <!-- joda-time -->
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
        </dependency>

        <!-- quartz -->
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
        </dependency>

        <!-- slf4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </dependency>

        <!-- logback -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
        </dependency>

        <!-- junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit-dep</artifactId>
        </dependency>
    </dependencies>

    <build>
        <finalName>ROOT</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <encoding>${project.build.sourceEncoding}</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

對比實驗

生成了6億條數據的日誌。

TreeSet版本:

生成6億條日誌時間:521582
分割文件時間:173219
分割後統計top k時間:195037
定時任務執行時間:368294

注:定時任務執行時間指的是對大文件的總統計時間,主要是分割文件+分割後統計top k。

cpu和堆使用狀況:

能夠看到堆變化明顯分爲三階段:對應了生成日誌、分割日誌、分割後統計top k。

圖片描述

最小堆版本:

生成6億條日誌時間:513840
分割文件時間:148861
分割後統計top k時間:190966
定時任務執行時間:339870

cpu和堆使用狀況:

圖片描述

總結:

生成日誌和分割文件是沒有改動的,運行時間不同,可能有必定偏差。

卻是兩個版本統計top k時間沒有明顯的變化,按上面分析O(nlogm)和O(nlogk)應該有比較明顯的差距纔對,這裏n=600000000,m約600000,k=100,各位能夠幫忙分析一下效率差距不大的緣由。

不過能夠看到堆內存使用明顯下降了約100MB,由於TreeSet須要添加m個元素再截取k個,而MinHeap只須要添加k個元素。

我的博客:www.hellolvs.com

相關文章
相關標籤/搜索