Best Practices for Managing Connections You can establish multiple connections to APNs servers to improve performance. When you send a large number of remote notifications, distribute them across connections to several server endpoints. This improves performance
ConnectionPool connectionPool=new ConnectionPool(50, 10, TimeUnit.MINUTES); OkHttpClient.Builder builder = new OkHttpClient .Builder() .addNetworkInterceptor(new ApnsNetworkInterceptor()) .connectionPool(connectionPool) .connectTimeout(10, TimeUnit.SECONDS) .writeTimeout(10, TimeUnit.SECONDS) .readTimeout(20,TimeUnit.SECONDS);
OkHttpClient.Builder builder給了2個接口併發
RetryAndFollowUpInterceptor —>建立StreamAllocation對象,處理http的redirect,出錯重試。對後續Interceptor的執行的影響:修改request及StreamAllocation。
BridgeInterceptor————–>補全缺失的一些http header。對後續Interceptor的執行的影響:修改request。
ConnectInterceptor————>藉助於前面分配的StreamAllocation對象創建與服務器之間的鏈接,並選定交互所用的協議是HTTP 1.1仍是HTTP 2。對後續Interceptor的執行的影響:建立了httpStream和connection。
Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List<Interceptor> interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors()); interceptors.add(retryAndFollowUpInterceptor); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain( interceptors, null, null, null, 0, originalRequest); return chain.proceed(originalRequest); }
/** Returns a recycled connection to {@code address}, or null if no such connection exists. */ RealConnection get(Address address, StreamAllocation streamAllocation) { assert (Thread.holdsLock(this)); for (RealConnection connection : connections) { if (connection.allocations.size() < connection.allocationLimit && address.equals(connection.route().address) && !connection.noNewStreams) { streamAllocation.acquire(connection); return connection; } } return null; }
/** * APNS推送的攔截器, * 1.設置連接最大併發請求參數 * 2.超時統計並關閉連接,根據平均值統計(1.超過花消閥值,2.接近最大平均時間的) * * @author shenwr * @version V1.0 * @since 2016-12-13 16:55 */ public class ApnsNetworkInterceptor implements Interceptor { public static final double took_time_precent = 0.9; private static Logger logger = LoggerFactory.getLogger(ApnsNetworkInterceptor.class); //花費時間閥值 private final long time_took_throttle = 20000; //花費時間閥值(最低閥值) private final long time_took_throttle_min = 5000; //一個連接到併發請求上限 private final int allocation_limit = 60; //統計次數 private final int stat_count = 20; //統計服務器數上限 private final int stat_server_limit = 100; //動態統計次數 private final int dynamic_stat_count = 1000; private ConcurrentHashMap<String, ServerPointStat> pointStat = new ConcurrentHashMap<String, ServerPointStat>( stat_server_limit); //最大平均花費時間 private long maxAvgTookTime = 0; //總調用次數 private AtomicLong invokeTotalCount = new AtomicLong(0); public ApnsNetworkInterceptor() { //啓動清除任務 Timer timer = new Timer(); timer.schedule(new TimerTask() { public void run() { //10分鐘內沒有統計信息的,清楚統計 long expireTime = System.currentTimeMillis() - 600000; for (Map.Entry<String, ServerPointStat> stat : pointStat.entrySet()) { if (stat.getValue().getLastUsedTime() < expireTime) { pointStat.remove(stat.getKey()); } } } }, 60000, 60000); } @Override public Response intercept(Chain chain) throws IOException { Request request = chain.request(); //設置一個連接的併發請求數上限 Connection connection = chain.connection(); RealConnection rc = null; if (connection != null && connection instanceof RealConnection) { rc = (RealConnection) connection; //只會對後面連接池裏獲取連接有效 rc.allocationLimit = allocation_limit; } long startNs = System.nanoTime(); Response response=null; try { //處理請求 response = chain.proceed(request); }catch (Exception e ){ logger.error("exception happened",e); throw e; }finally { long tookMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); try { if (rc == null) { return response; } //統計計數 String ip = rc.socket.getInetAddress().getHostAddress(); ServerPointStat stat = pointStat.get(ip); if (stat == null) { if (pointStat.size() > stat_server_limit) { //再也不統計,直接返回 return response; } stat = new ServerPointStat(); pointStat.put(ip,stat); } //調用統計 long totalCount = invokeTotalCount.incrementAndGet(); if (totalCount < 0) { //溢出重置 invokeTotalCount.set(0); } //統計 stat.invokeStat(tookMs); //質量差的服務節點處理 long avgTime = stat.getAvgTime(); if (stat.getUseCount() > stat_count && avgTime > time_took_throttle && tookMs > avgTime) { //統計次數到了,平均花費時間超過閥值,認定爲很差的服務器節點 //noNewStreams(chain); rc.allocationLimit = 0; } else if (maxAvgTookTime < time_took_throttle) { //若是請求處理時間都沒有超過閥值 if (totalCount > dynamic_stat_count && avgTime > time_took_throttle_min && tookMs > avgTime && avgTime > Math.round(maxAvgTookTime * took_time_precent)) { //若是都沒有超過超時閥值,而且接近平均最大時間,任務是很差的連接 //noNewStreams(chain); rc.allocationLimit = 0; } if (avgTime > maxAvgTookTime) { maxAvgTookTime = avgTime; } } } catch (Exception e) { logger.error("統計發送異常", e); } //返回請求結果 return response; } } private void noNewStreams(Chain chain) { /**這樣處理像是有問題 if (chain instanceof RealInterceptorChain) { StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation(); if (streamAllocation.hasMoreRoutes()) { //該請求還有其餘服務節點的時候,禁用用這個連接發送請求,等釋放 streamAllocation.noNewStreams(); } } */ } private static class ServerPointStat { private long lastUsedTime; private AtomicLong useCount = new AtomicLong(0); private AtomicLong tookTotalTime = new AtomicLong(0); public void invokeStat(long tookTime) { tookTotalTime.addAndGet(tookTime); useCount.incrementAndGet(); lastUsedTime = System.currentTimeMillis(); if (tookTotalTime.get() < 0) { //溢出重置 tookTotalTime.set(0); useCount.set(0); } } public long getAvgTime() { return tookTotalTime.get() / useCount.get(); } /** * 獲取 {@link #lastUsedTime} * * @return 返回 {@link #lastUsedTime} */ public long getLastUsedTime() { return lastUsedTime; } /** * 獲取 {@link #useCount} * * @return 返回 {@link #useCount} */ public long getUseCount() { return useCount.get(); } /* * 獲取 {@link #tookTotalTime} * * @return 返回 {@link #tookTotalTime} */ public long getTookTotalTime() { return tookTotalTime.get(); } } }