OKHTTP3 APNS推送時經過攔截器修改一些優化鏈接

碰到問題

APNS推送的,用okhttp3(3.5.0)作APNS推送,發現APNS服務器的鏈接都連在蘋果推送服務器的一個IP上,並且超時不少,另外連接發送的請求分配也很不均勻,國內到國外的網速也不穩定,幾毫秒到幾十秒都有。git

主要方案:github

1.租用雲的VPC對等連接服務,加快每次調用時間緩存

2.調用軟件優化服務器

蘋果服務器給的性能指導裏提到了,建多鏈接到不一樣的服務器節點。cookie

Best Practices for Managing Connections

You can establish multiple connections to APNs servers to improve performance. When you send a large number of remote notifications, distribute them across connections to several server endpoints. This improves performance

如今就是要如何把請求更均勻的分配到每一個連接,每一個連接的請求併發請求數減小,實現連接儘可能連向跟多多服務器。網絡

OKHttp3框架分析

okhttp3的攔截器

ConnectionPool connectionPool=new ConnectionPool(50, 10, TimeUnit.MINUTES);

OkHttpClient.Builder builder = new OkHttpClient
        .Builder()
        .addNetworkInterceptor(new ApnsNetworkInterceptor())
        .connectionPool(connectionPool)
        .connectTimeout(10, TimeUnit.SECONDS)
        .writeTimeout(10, TimeUnit.SECONDS)
        .readTimeout(20,TimeUnit.SECONDS);

OkHttpClient.Builder builder給了2個接口併發

addInterceptor:應用的攔截,不能再處理連接相關的信息,已經去不到鏈接
addNetworkInterceptor:網絡處理相關的鏈接器,能夠處理connection

OKhttp內部攔截器鏈初始化:

RetryAndFollowUpInterceptor —>建立StreamAllocation對象,處理http的redirect,出錯重試。對後續Interceptor的執行的影響:修改request及StreamAllocation。
BridgeInterceptor————–>補全缺失的一些http header。對後續Interceptor的執行的影響:修改request。
CacheInterceptor————–>處理http緩存。對後續Interceptor的執行的影響:若緩存中有所需請求的響應,則後續Interceptor再也不執行。
ConnectInterceptor————>藉助於前面分配的StreamAllocation對象創建與服務器之間的鏈接,並選定交互所用的協議是HTTP 1.1仍是HTTP 2。對後續Interceptor的執行的影響:建立了httpStream和connection。
CallServerInterceptor———–>處理IO,與服務器進行數據交換。對後續Interceptor的執行的影響:爲Interceptor鏈中的最後一個Interceptor,沒有後續Interceptor。app

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
  }

連接池獲取連接

/** Returns a recycled connection to {@code address}, or null if no such connection exists. */
RealConnection get(Address address, StreamAllocation streamAllocation) {
  assert (Thread.holdsLock(this));
  for (RealConnection connection : connections) {
    if (connection.allocations.size() < connection.allocationLimit
        && address.equals(connection.route().address)
        && !connection.noNewStreams) {
      streamAllocation.acquire(connection);
      return connection;
    }
  }
  return null;
}

攔截器實現

主要就是設置allocationLimit、讓差的連接下次不會被使用。網絡請求差距太大也沒有好方法,並且OkHttp的擴展感受不是很好,人笨沒找到更好的方法。框架

/**
 * APNS推送的攔截器,
 * 1.設置連接最大併發請求參數
 * 2.超時統計並關閉連接,根據平均值統計(1.超過花消閥值,2.接近最大平均時間的)
 *
 * @author shenwr
 * @version V1.0
 * @since 2016-12-13 16:55
 */
public class ApnsNetworkInterceptor implements Interceptor {

    public static final double took_time_precent = 0.9;

    private static Logger logger = LoggerFactory.getLogger(ApnsNetworkInterceptor.class);

    //花費時間閥值
    private final long time_took_throttle = 20000;
    //花費時間閥值(最低閥值)
    private final long time_took_throttle_min = 5000;

    //一個連接到併發請求上限
    private final int allocation_limit = 60;
    //統計次數
    private final int stat_count = 20;
    //統計服務器數上限
    private final int stat_server_limit = 100;
    //動態統計次數
    private final int dynamic_stat_count = 1000;

    private ConcurrentHashMap<String, ServerPointStat> pointStat = new ConcurrentHashMap<String, ServerPointStat>(
        stat_server_limit);

    //最大平均花費時間
    private long maxAvgTookTime = 0;
    //總調用次數
    private AtomicLong invokeTotalCount = new AtomicLong(0);

    public ApnsNetworkInterceptor() {
        //啓動清除任務
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            public void run() {
                //10分鐘內沒有統計信息的,清楚統計
                long expireTime = System.currentTimeMillis() - 600000;
                for (Map.Entry<String, ServerPointStat> stat : pointStat.entrySet()) {
                    if (stat.getValue().getLastUsedTime() < expireTime) {
                        pointStat.remove(stat.getKey());
                    }
                }
            }
        }, 60000, 60000);
    }

    @Override
    public Response intercept(Chain chain) throws IOException {
        Request request = chain.request();

        //設置一個連接的併發請求數上限
        Connection connection = chain.connection();
        RealConnection rc = null;
        if (connection != null && connection instanceof RealConnection) {
            rc = (RealConnection) connection;
            //只會對後面連接池裏獲取連接有效
            rc.allocationLimit = allocation_limit;

        }

        long startNs = System.nanoTime();
        Response response=null;
        try {
            //處理請求
            response = chain.proceed(request);
        }catch (Exception e ){
            logger.error("exception happened",e);
            throw e;
        }finally {

            long tookMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);

            try {

                if (rc == null) {
                    return response;
                }

                //統計計數
                String ip = rc.socket.getInetAddress().getHostAddress();
                ServerPointStat stat = pointStat.get(ip);
                if (stat == null) {
                    if (pointStat.size() > stat_server_limit) {
                        //再也不統計,直接返回
                        return response;
                    }
                    stat = new ServerPointStat();
                    pointStat.put(ip,stat);
                }

                //調用統計
                long totalCount = invokeTotalCount.incrementAndGet();
                if (totalCount < 0) {
                    //溢出重置
                    invokeTotalCount.set(0);
                }

                //統計
                stat.invokeStat(tookMs);

                //質量差的服務節點處理
                long avgTime = stat.getAvgTime();
                if (stat.getUseCount() > stat_count && avgTime > time_took_throttle && tookMs > avgTime) {
                    //統計次數到了,平均花費時間超過閥值,認定爲很差的服務器節點
                    //noNewStreams(chain);   
                     rc.allocationLimit = 0;                    
                } else if (maxAvgTookTime < time_took_throttle) {

                    //若是請求處理時間都沒有超過閥值
                    if (totalCount > dynamic_stat_count && avgTime > time_took_throttle_min && tookMs > avgTime
                        && avgTime > Math.round(maxAvgTookTime * took_time_precent)) {
                        //若是都沒有超過超時閥值,而且接近平均最大時間,任務是很差的連接
                        //noNewStreams(chain);
                        rc.allocationLimit = 0;
                    }

                    if (avgTime > maxAvgTookTime) {
                        maxAvgTookTime = avgTime;
                    }
                }

            } catch (Exception e) {
                logger.error("統計發送異常", e);
            }

            //返回請求結果
            return response;
        }

    }

    private void noNewStreams(Chain chain) {
        /**這樣處理像是有問題
        if (chain instanceof RealInterceptorChain) {
            StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
            if (streamAllocation.hasMoreRoutes()) {
                //該請求還有其餘服務節點的時候,禁用用這個連接發送請求,等釋放
                streamAllocation.noNewStreams();
            }
        }
        */
    }

    private static class ServerPointStat {
        private long lastUsedTime;
        private AtomicLong useCount = new AtomicLong(0);
        private AtomicLong tookTotalTime = new AtomicLong(0);

        public void invokeStat(long tookTime) {

            tookTotalTime.addAndGet(tookTime);
            useCount.incrementAndGet();
            lastUsedTime = System.currentTimeMillis();

            if (tookTotalTime.get() < 0) {
                //溢出重置
                tookTotalTime.set(0);
                useCount.set(0);
            }
        }

        public long getAvgTime() {
            return tookTotalTime.get() / useCount.get();
        }

        /**
         * 獲取 {@link #lastUsedTime}
         *
         * @return 返回 {@link #lastUsedTime}
         */
        public long getLastUsedTime() {
            return lastUsedTime;
        }

        /**
         * 獲取 {@link #useCount}
         *
         * @return 返回 {@link #useCount}
         */
        public long getUseCount() {
            return useCount.get();
        }

        /*
         * 獲取 {@link #tookTotalTime}
         *
         * @return 返回 {@link #tookTotalTime}
         */
        public long getTookTotalTime() {
            return tookTotalTime.get();
        }

    }
}

攔截器的調用關係socket

參考:http://w4lle.github.io/2016/12/06/OkHttp/

相關文章
相關標籤/搜索