因爲標題長度限制,原題是這樣:某系統QPS100萬,每十分鐘統計一下請求次數最多的100個IP。ip請求寫到日誌的話,其實就是超大文件中統計top k問題。10分鐘6億條記錄,大約是10G級別,因此對於通常單機處理來說不能一次性加載到內存計算。因此分治算法是處理這類問題的基本思想。java
前面說了分治思想。那麼具體如何分解問題呢。算法
思路就是把大文件分割成多個能夠內存處理的小文件,對每一個小文件統計top k問題,最後再對全部統計結果合併獲得最終的top k。apache
注意,這裏的分割並非隨意分割的,那樣最終結果顯然是不對的,必須保證相同的ip記錄都分割到同一個文件。那麼hash算法最合適不過了,能夠把相同的ip哈希到同一文件。api
關於top k問題,效率高的解法是使用構造最小堆或者藉助快速排序的思想,複雜度爲O(nlogk)。這裏更適合用最小堆,具體來講,就是先利用前k個數據構建一個固定大小k的最小堆,對以後的數據,小於堆頂不作處理,大於則替換堆頂並調整。這樣,對每一個文件順序處理完以後就獲得最終結果,而不須要保留每一個文件的top k再歸併。app
博主偷懶,藉助TreeSet代替最小堆來維護top k數據,TreeSet的話底層是藉助紅黑樹排序,比最小堆複雜些,實際上對每一個小文件用紅黑樹全排序再截取前k個。複雜度O(nlogm),這裏m是每一個小文件中的數量, m>>k。再有時間的話再用最小堆優化一下,複雜度應爲O(nlogk)。dom
ps:已實現最小堆版本,見實現2,並作了對比實驗maven
定時任務使用quartz實現。ide
下面是代碼。工具
IP類,封裝ip計數,使用TreeSet存放須實現comparable接口。注意這裏重寫compare方法不要return 0,不然會被TreeSet視爲相同對象而放不進去。這個能夠看一下TreeSet的實現,它實際上內部仍是一個TreeMap,只是把對象做爲key,而value沒有使用。add添加元素時,會調用TreeMap的put方法,put內部又會調用compare方法,若是compare返回結果爲0,只是從新setValue,對TreeSet至關於什麼也沒作。優化
package com.hellolvs; import org.apache.commons.lang3.builder.ToStringBuilder; /** * IP計數POJO * * @author lvs * @date 2017/12/08. */ public class IP implements Comparable<IP> { private String ip; private int count; public IP() { } public IP(String ip, int count) { this.ip = ip; this.count = count; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } @Override public int compareTo(IP o) { return o.count < this.count ? -1 : 1; } @Override public String toString() { return ToStringBuilder.reflectionToString(this); } }
IPCountJob類,定時統計日誌文件中top k個ip。
注意其中的分割文件,這裏的分割須要對文件邊讀邊寫,不能一次性讀入內存再分割。guava io的readLines是直接裝入內存的,因此不能用。能夠使用java原生的io類,或使用commons io的LineIterator更優雅一些。
package com.hellolvs; import com.google.common.base.Charsets; import com.google.common.base.Objects; import com.google.common.base.StandardSystemProperty; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.io.Files; import com.google.common.io.LineProcessor; import org.apache.commons.io.FileUtils; import org.apache.commons.io.LineIterator; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.nio.charset.Charset; import java.security.SecureRandom; import java.util.HashMap; import java.util.Map; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; /** * 定時Job,每十分鐘統計請求次數前k的ip * * @author lvs * @date 2017/12/08. */ public class IPCountJob implements Job { private static final Logger LOG = LoggerFactory.getLogger(IPCountJob.class); private static final String LINE_SEPARATOR = StandardSystemProperty.LINE_SEPARATOR.value(); private static final Charset UTF_8 = Charsets.UTF_8; private static final String INPUT_PATH = "/home/lvs/logs/ip.log"; private static final String OUTPUT_PATH = "/home/lvs/logs/split/"; private static final int SPLIT_NUM = 1024; private static final int TOP_K = 100; /** * 利用TreeSet存儲請求次數前k的IP */ private TreeSet<IP> resultSet = Sets.newTreeSet(); /** * 分割文件用,保存每一個文件的寫入流對象 */ private final Map<Integer, BufferedWriter> bufferMap = Maps.newHashMapWithExpectedSize(SPLIT_NUM); /** * 定時任務,每十分鐘統計請求次數前k的IP */ @Override public void execute(JobExecutionContext jobExecutionContext) { // 捕獲異常,防止定時任務中斷 try { execute(); } catch (Exception e) { LOG.error("定時任務出錯:{}", e.getMessage(), e); } } /** * 統計大文件中請求次數前k的IP * * @throws IOException I/O error */ public void execute() throws IOException { // 這裏應該每10分鐘獲取當前輪替日誌文件路徑,此處用常量路徑模擬 File ipLogFile = new File(INPUT_PATH); splitLog(ipLogFile, SPLIT_NUM); File logSplits = new File(OUTPUT_PATH); for (File logSplit : logSplits.listFiles()) { countTopK(logSplit, TOP_K); } LOG.info("結果集:{}", resultSet.size()); for (IP ip : resultSet) { LOG.info("{}", ip); } } /** * 生成模擬日誌文件 * * @param logNum 生成日誌條數 * @throws IOException I/O error */ public static void generateLog(long logNum) throws IOException { /* 建立文件 */ File log = new File(INPUT_PATH); File parentDir = log.getParentFile(); if (!parentDir.exists()) { parentDir.mkdirs(); } log.createNewFile(); /* 生成隨機ip寫入文件 */ SecureRandom random = new SecureRandom(); try (BufferedWriter bw = new BufferedWriter(new FileWriter(log))) { for (int i = 0; i < logNum; i++) { StringBuilder sb = new StringBuilder(); sb.append("192.").append(random.nextInt(255)).append(".").append(random.nextInt(255)).append(".") .append(random.nextInt(255)).append(LINE_SEPARATOR); bw.write(sb.toString()); } bw.flush(); } } /** * 分割日誌文件 * * @param logFile 待分割文件 * @param fileNum 分割文件數量 * @throws IOException I/O error */ private void splitLog(File logFile, int fileNum) throws IOException { /* 爲每一個分割文件建立寫入流對象 */ for (int i = 0; i < fileNum; i++) { File file = new File(OUTPUT_PATH + i); File parentDir = file.getParentFile(); if (!parentDir.exists()) { parentDir.mkdirs(); } bufferMap.put(i, new BufferedWriter(new FileWriter(file))); } /* 根據ip的hashcode將數據分割到不一樣文件中 */ LineIterator it = null; try { it = FileUtils.lineIterator(logFile, "UTF-8"); while (it.hasNext()) { String ip = it.nextLine(); int hashCode = Objects.hashCode(ip); hashCode = hashCode < 0 ? -hashCode : hashCode; BufferedWriter writer = bufferMap.get(hashCode % fileNum); writer.write(ip + LINE_SEPARATOR); } } finally { /* 釋放資源 */ LineIterator.closeQuietly(it); for (Map.Entry<Integer, BufferedWriter> buffer : bufferMap.entrySet()) { BufferedWriter writer = buffer.getValue(); writer.flush(); writer.close(); } bufferMap.clear(); } } /** * 統計請求次數前k的IP * * @param logSplit 當前分割文件 * @param k top k * @throws IOException I/O error */ private void countTopK(File logSplit, int k) throws IOException { /* 讀取文件對ip計數 */ HashMap<String, AtomicInteger> ipCountMap = Files.readLines(logSplit, UTF_8, new LineProcessor<HashMap<String, AtomicInteger>>() { private HashMap<String, AtomicInteger> ipCountMap = Maps.newHashMap(); @Override public boolean processLine(String line) throws IOException { AtomicInteger ipCount = ipCountMap.get(line.trim()); if (ipCount != null) { ipCount.getAndIncrement(); } else { ipCountMap.put(line.trim(), new AtomicInteger(1)); } return true; } @Override public HashMap<String, AtomicInteger> getResult() { return ipCountMap; } }); /* 統計結果添加到TreeSet */ for (Map.Entry<String, AtomicInteger> entry : ipCountMap.entrySet()) { resultSet.add(new IP(entry.getKey(), entry.getValue().get())); } /* TreeSet只保留前k個ip */ TreeSet<IP> temp = Sets.newTreeSet(); int i = 0; for (IP o : resultSet) { temp.add(o); i++; if (i >= k) { break; } } resultSet = temp; } /** * 返回統計結果 * * @return 結果集合 */ public TreeSet<IP> getResult() { return resultSet; } }
Main,定時任務啓動
package com.hellolvs; import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.Scheduler; import org.quartz.SimpleScheduleBuilder; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.impl.StdSchedulerFactory; /** * 定時任務啓動器 * * @author lvs * @date 2017/12/11. */ public class Main { public static void main(String[] args) throws Exception { // 生成模擬日誌文件 IPCountJob.generateLog(600000000); JobDetail job = JobBuilder.newJob(IPCountJob.class) .withIdentity("ipCountJob", "group1").build(); Trigger trigger = TriggerBuilder .newTrigger() .withIdentity("ipCountTrigger", "group1") .withSchedule( SimpleScheduleBuilder.simpleSchedule() .withIntervalInMinutes(10).repeatForever()) .build(); Scheduler scheduler = new StdSchedulerFactory().getScheduler(); scheduler.start(); scheduler.scheduleJob(job, trigger); } }
IP類
package com.hellolvs; import org.apache.commons.lang3.builder.ToStringBuilder; /** * IP計數POJO * * @author lvs * @date 2017/12/08. */ public class IP implements Comparable<IP> { private String ip; private int count; public IP() { } public IP(String ip, int count) { this.ip = ip; this.count = count; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } @Override public int compareTo(IP o) { return Integer.compare(this.count, o.count); } @Override public String toString() { return ToStringBuilder.reflectionToString(this); } }
IPCountJob類,最小堆版本統計top k
package com.hellolvs; import com.google.common.base.Charsets; import com.google.common.base.Objects; import com.google.common.base.StandardSystemProperty; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; import com.google.common.io.LineProcessor; import org.apache.commons.io.FileUtils; import org.apache.commons.io.LineIterator; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.nio.charset.Charset; import java.security.SecureRandom; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; /** * 定時Job,每十分鐘統計請求次數前k的ip * * @author lvs * @date 2017/12/08. */ public class IPCountJob implements Job { private static final Logger LOG = LoggerFactory.getLogger(IPCountJob.class); private static final String LINE_SEPARATOR = StandardSystemProperty.LINE_SEPARATOR.value(); private static final Charset UTF_8 = Charsets.UTF_8; private static final String INPUT_PATH = "/home/lvs/logs/ip.log"; private static final String OUTPUT_PATH = "/home/lvs/logs/split/"; private static final int SPLIT_NUM = 1024; private static final int TOP_K = 100; /** * 利用最小堆結構存儲請求次數前k的IP */ private List<IP> result = Lists.newArrayListWithExpectedSize(TOP_K); /** * 分割文件用,保存每一個文件的寫入流對象 */ private final Map<Integer, BufferedWriter> bufferMap = Maps.newHashMapWithExpectedSize(SPLIT_NUM); /** * 定時任務,每十分鐘統計請求次數前k的IP */ @Override public void execute(JobExecutionContext jobExecutionContext) { // 捕獲異常,防止定時任務中斷 try { execute(); } catch (Exception e) { LOG.error("定時任務出錯:{}", e.getMessage(), e); } } /** * 統計大文件中請求次數前k的IP * * @throws IOException I/O error */ public void execute() throws IOException { // 這裏應該每10分鐘獲取當前輪替日誌文件路徑,此處用常量路徑模擬 File ipLogFile = new File(INPUT_PATH); splitLog(ipLogFile, SPLIT_NUM); File logSplits = new File(OUTPUT_PATH); for (File logSplit : logSplits.listFiles()) { countTopK(logSplit, TOP_K); } MinHeap.sort(result); LOG.info("結果集:{}", result.size()); for (int i = result.size() - 1; i >= 0; i--) { LOG.info("{}", result.get(i)); } } /** * 生成模擬日誌文件 * * @param logNum 生成日誌條數 * @throws IOException I/O error */ public static void generateLog(long logNum) throws IOException { /* 建立文件 */ File log = new File(INPUT_PATH); File parentDir = log.getParentFile(); if (!parentDir.exists()) { parentDir.mkdirs(); } log.createNewFile(); /* 生成隨機ip寫入文件 */ SecureRandom random = new SecureRandom(); try (BufferedWriter bw = new BufferedWriter(new FileWriter(log))) { for (int i = 0; i < logNum; i++) { StringBuilder sb = new StringBuilder(); sb.append("192.").append(random.nextInt(255)).append(".").append(random.nextInt(255)).append(".") .append(random.nextInt(255)).append(LINE_SEPARATOR); bw.write(sb.toString()); } bw.flush(); } } /** * 分割日誌文件 * * @param logFile 待分割文件 * @param fileNum 分割文件數量 * @throws IOException I/O error */ private void splitLog(File logFile, int fileNum) throws IOException { /* 爲每一個分割文件建立寫入流對象 */ for (int i = 0; i < fileNum; i++) { File file = new File(OUTPUT_PATH + i); File parentDir = file.getParentFile(); if (!parentDir.exists()) { parentDir.mkdirs(); } bufferMap.put(i, new BufferedWriter(new FileWriter(file))); } /* 根據ip的hashcode將數據分割到不一樣文件中 */ LineIterator it = null; try { it = FileUtils.lineIterator(logFile, "UTF-8"); while (it.hasNext()) { String ip = it.nextLine(); int hashCode = Objects.hashCode(ip); hashCode = hashCode < 0 ? -hashCode : hashCode; BufferedWriter writer = bufferMap.get(hashCode % fileNum); writer.write(ip + LINE_SEPARATOR); } } finally { /* 釋放資源 */ LineIterator.closeQuietly(it); for (Map.Entry<Integer, BufferedWriter> buffer : bufferMap.entrySet()) { BufferedWriter writer = buffer.getValue(); writer.flush(); writer.close(); } bufferMap.clear(); } } /** * 統計請求次數前k的IP * * @param logSplit 當前分割文件 * @param k top k * @throws IOException I/O error */ private void countTopK(File logSplit, int k) throws IOException { /* 讀取文件對ip計數 */ HashMap<String, AtomicInteger> ipCountMap = Files.readLines(logSplit, UTF_8, new LineProcessor<HashMap<String, AtomicInteger>>() { private HashMap<String, AtomicInteger> ipCountMap = Maps.newHashMap(); @Override public boolean processLine(String line) throws IOException { AtomicInteger ipCount = ipCountMap.get(line.trim()); if (ipCount != null) { ipCount.getAndIncrement(); } else { ipCountMap.put(line.trim(), new AtomicInteger(1)); } return true; } @Override public HashMap<String, AtomicInteger> getResult() { return ipCountMap; } }); /* 前k條數據用來構建初始最小堆,以後的數據比堆頂大則替換堆頂並調堆 */ for (Map.Entry<String, AtomicInteger> entry : ipCountMap.entrySet()) { IP ip = new IP(entry.getKey(), entry.getValue().get()); if (result.size() != k) { result.add(ip); if (result.size() == k) { MinHeap.initMinHeap(result); } } else { if (ip.compareTo(result.get(0)) > 0) { result.set(0, ip); MinHeap.adjust(result, 0, k); } } } } /** * 返回統計結果 * * @return 結果集合 */ public List<IP> getResult() { return result; } }
MinHeap類,最小堆工具
package com.hellolvs; import java.util.List; /** * 最小堆 * * @author lvs * @date 2017-12-12 */ public class MinHeap { /** * 對最小堆排序 * * @param list 已經爲最小堆結構的列表 * @param <T> 元素須實現Comparable接口 */ public static <T extends Comparable<? super T>> void sort(List<T> list) { for (int i = list.size() - 1; i > 0; i--) { swap(list, 0, i); adjust(list, 0, i); } } /** * 初始化最小堆 * * @param list 待初始化爲最小堆的列表 * @param <T> 元素須實現Comparable接口 */ public static <T extends Comparable<? super T>> void initMinHeap(List<T> list) { /* 從最後一個非葉節點開始至根節點依次調整 */ for (int i = list.size() / 2 - 1; i >= 0; i--) { adjust(list, i, list.size()); } } /** * 調堆 * * @param list 當前堆 * @param <T> 元素須實現Comparable接口 * @param cur 待調整位置 * @param length 當前堆大小 */ public static <T extends Comparable<? super T>> void adjust(List<T> list, int cur, int length) { T tmp = list.get(cur); for (int i = 2 * cur + 1; i < length; i = 2 * i + 1) { if (i + 1 < length && list.get(i).compareTo(list.get(i + 1)) > 0) { i++; // i指向孩子節點中最小的節點 } if (tmp.compareTo(list.get(i)) > 0) { list.set(cur, list.get(i)); // 最小孩子節點調整到其父節點 cur = i; // 當前節點置爲最小孩子節點,繼續調整 } else { break; // 沒有調整時退出循環 } } list.set(cur, tmp); // 被調整節點最終存放位置 } /** * 交換List中的元素 * * @param list 待交換列表 * @param i 第一個元素位置 * @param j 第二個元素位置 */ private static <T extends Comparable<? super T>> void swap(List<T> list, int i, int j) { T tmp = list.get(i); list.set(i, list.get(j)); list.set(j, tmp); } }
Main類,無改動
package com.hellolvs; import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.Scheduler; import org.quartz.SimpleScheduleBuilder; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.impl.StdSchedulerFactory; /** * 定時任務啓動器 * * @author lvs * @date 2017/12/11. */ public class Main { public static void main(String[] args) throws Exception { // 生成模擬日誌文件 IPCountJob.generateLog(600000000); JobDetail job = JobBuilder.newJob(IPCountJob.class) .withIdentity("ipCountJob", "group1").build(); Trigger trigger = TriggerBuilder .newTrigger() .withIdentity("ipCountTrigger", "group1") .withSchedule( SimpleScheduleBuilder.simpleSchedule() .withIntervalInMinutes(10).repeatForever()) .build(); Scheduler scheduler = new StdSchedulerFactory().getScheduler(); scheduler.start(); scheduler.scheduleJob(job, trigger); } }
附一下pom文件:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hellolvs</groupId> <artifactId>ipCount</artifactId> <version>1.0.0</version> <packaging>jar</packaging> <properties> <guava.version>20.0</guava.version> <commons-lang3.version>3.1</commons-lang3.version> <commons-io.version>2.4</commons-io.version> <joda-time.version>2.6</joda-time.version> <org.quartz-scheduler.version>2.1.7</org.quartz-scheduler.version> <org.slf4j.version>1.7.5</org.slf4j.version> <logback.version>1.0.13</logback.version> <junit.version>4.10</junit.version> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencyManagement> <dependencies> <!-- guava --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency> <!-- commons lang3--> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>${commons-lang3.version}</version> </dependency> <!-- commons io --> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>${commons-io.version}</version> </dependency> <!-- joda-time --> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>${joda-time.version}</version> </dependency> <!-- quartz --> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>${org.quartz-scheduler.version}</version> </dependency> <!-- slf4j --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${org.slf4j.version}</version> </dependency> <!-- logback --> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>${logback.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>${logback.version}</version> <scope>runtime</scope> </dependency> <!-- junit --> <dependency> <groupId>junit</groupId> <artifactId>junit-dep</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <!-- guava --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> <!-- commons lang3--> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!-- commons io --> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency> <!-- joda-time --> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> </dependency> <!-- quartz --> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> </dependency> <!-- slf4j --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <!-- logback --> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> </dependency> <!-- junit --> <dependency> <groupId>junit</groupId> <artifactId>junit-dep</artifactId> </dependency> </dependencies> <build> <finalName>ROOT</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>${java.version}</source> <target>${java.version}</target> <encoding>${project.build.sourceEncoding}</encoding> </configuration> </plugin> </plugins> </build> </project>
生成了6億條數據的日誌。
生成6億條日誌時間:521582 分割文件時間:173219 分割後統計top k時間:195037 定時任務執行時間:368294
注:定時任務執行時間指的是對大文件的總統計時間,主要是分割文件+分割後統計top k。
cpu和堆使用狀況:
能夠看到堆變化明顯分爲三階段:對應了生成日誌、分割日誌、分割後統計top k。
生成6億條日誌時間:513840 分割文件時間:148861 分割後統計top k時間:190966 定時任務執行時間:339870
cpu和堆使用狀況:
總結:
生成日誌和分割文件是沒有改動的,運行時間不同,可能有必定偏差。
卻是兩個版本統計top k時間沒有明顯的變化,按上面分析O(nlogm)和O(nlogk)應該有比較明顯的差距纔對,這裏n=600000000,m約600000,k=100,各位能夠幫忙分析一下效率差距不大的緣由。
不過能夠看到堆內存使用明顯下降了約100MB,由於TreeSet須要添加m個元素再截取k個,而MinHeap只須要添加k個元素。
我的博客:www.hellolvs.com