Flow Control:控流的概念
本文開發了一個分佈式流量控制Filter,來限定application的併發量:html
1)對於過量的請求,首先將請求buffer在隊列中。java
2)當buffer隊列滿時,多餘的請求將會被直接拒絕。(過載請求量)node
3)那些buffer中被阻塞的請求,等待必定時間後任然沒法被執行,則直接返回錯誤URL。(溢出請求量)nginx
4)咱們設定一個容許的併發量,經過java中Semaphore控制。只有獲取「鎖」的請求,才能繼續執行。web
web.xml配置
<filter> <filter-name>flowControlFilter</filter-name> <filter-class>com.demo.security.FlowControlFilter</filter-class> <init-param> <param-name>permits</param-name> <param-value>128</param-value> </init-param> <init-param> <param-name>timeout</param-name> <param-value>15000</param-value> </init-param> <init-param> <param-name>bufferSize</param-name> <param-value>500</param-value> </init-param> <init-param> <param-name>errorUrl</param-name> <param-value>/error.html</param-value> </init-param> </filter>
<filter-mapping> <filter-name>flowControlFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping>
Java代碼:網絡
package com.src.java.filter; import java.io.IOException; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; import javax.servlet.Filter; import javax.servlet.FilterChain; import javax.servlet.FilterConfig; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletResponse; /** * * @ClassName: FlowControlFilter * @Description: 分佈式系統流量控制 * @author chinasoft_liuhanlin * @date 2017年6月1日 下午3:57:08 */ public class FlowControlFilter implements Filter { /** * 最大併發量 默認爲500 */ private int permits = Runtime.getRuntime().availableProcessors() + 1; /** * 當併發量達到permits後,新的請求將會被buffer,buffer最大尺寸 若是buffer已滿,則直接拒絕 */ private int bufferSize = 500; /** * buffer中的請求被阻塞,此值用於控制最大阻塞時間 默認阻塞時間 */ private long timeout = 30000; /** * 跳轉的錯誤頁面 */ private String errorUrl; private BlockingQueue<Node> waitingQueue; private Thread selectorThread; private Semaphore semaphore; private Object lock = new Object(); @Override public void destroy() { } /** * <p> * Title: doFilter * </p> * <p> * Description: * </p> * * @param request * @param response * @param chain * @throws IOException * @throws ServletException * @see javax.servlet.Filter#doFilter(javax.servlet.ServletRequest, * javax.servlet.ServletResponse, javax.servlet.FilterChain) */ @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { checkSelector(); Thread t = Thread.currentThread(); HttpServletResponse httpServletResponse = (HttpServletResponse) response; Node node = new Node(t, false); boolean buffered = waitingQueue.offer(node); // 若是buffer已滿 if (!buffered) { if (errorUrl != null) { httpServletResponse.sendRedirect(errorUrl); } return; } long deadline = System.currentTimeMillis() + timeout; // 進入等待隊列後,當前線程阻塞 LockSupport.parkNanos(this, TimeUnit.MICROSECONDS.toNanos(timeout)); if (t.isInterrupted()) { // 若是線程是中斷返回 t.interrupted();// clear status } // 若是等待過時,則直接返回 if (deadline >= System.currentTimeMillis()) { if (errorUrl != null) { httpServletResponse.sendRedirect(errorUrl); } // 對信號量進行補充 synchronized (lock) { if (node.dequeued) { semaphore.release(); } else { node.dequeued = true; } } return; } // 繼續執行 try { chain.doFilter(request, response); } finally { semaphore.release(); checkSelector(); } } /** * <p> * Title: init * </p> * <p> * Description: * </p> * * @param filterConfig * @throws ServletException * @see javax.servlet.Filter#init(javax.servlet.FilterConfig) */ @Override public void init(FilterConfig filterConfig) throws ServletException { String p = filterConfig.getInitParameter("permits"); if (p != null) { permits = Integer.parseInt(p); if (permits < 0) { throw new IllegalArgumentException("FlowControlFilter,permits parameter should be greater than 0 !"); } } String t = filterConfig.getInitParameter("timeout"); if (t != null) { timeout = Long.parseLong(t); if (timeout < 1) { throw new IllegalArgumentException("FlowControlFilter,timeout parameter should be greater than 0 !"); } } String b = filterConfig.getInitParameter("bufferSize"); if (b != null) { bufferSize = Integer.parseInt(b); if (bufferSize < 0) { throw new IllegalArgumentException("FlowControlFilter,bufferSize parameter should be greater than 0 !"); } } errorUrl = filterConfig.getInitParameter("errorUrl"); waitingQueue = new LinkedBlockingQueue<>(bufferSize); semaphore = new Semaphore(permits); selectorThread = new Thread(new SelectorRunner()); selectorThread.setDaemon(true); selectorThread.start(); } /** * @Title: checkSelector * @Description: TODO * @param: * @return: void * @throws */ private void checkSelector() { if (selectorThread != null && selectorThread.isAlive()) { return; } synchronized (lock) { if (selectorThread != null && selectorThread.isAlive()) { return; } selectorThread = new Thread(new SelectorRunner()); selectorThread.setDaemon(true); selectorThread.start(); } } /** * * @ClassName: SelectorRunner * @Description: TODO * @author chinasoft_liuhanlin * @date 2017年6月1日 下午3:59:11 */ private class SelectorRunner implements Runnable { @Override public void run() { try { while (true) { Node node = waitingQueue.take(); // 若是t,阻塞逃逸,只能在pack超時後退出 synchronized (lock) { if (node.dequeued) { // 若是此線程已經park過時而退出了,則直接忽略 continue; } else { node.dequeued = true; } } semaphore.acquire(); LockSupport.unpark(node.currentThread); } } catch (Exception e) { // } finally { // 所有釋放阻塞 Queue<Node> queue = new LinkedList<>(); waitingQueue.drainTo(queue); for (Node n : queue) { if (!n.dequeued) { LockSupport.unpark(n.currentThread); } } } } } private class Node { Thread currentThread; boolean dequeued;// 是否已經出隊 public Node(Thread t, boolean dequeued) { this.currentThread = t; this.dequeued = dequeued; } } }