因爲項目中有用到HttpClient異步發送大量http請求,因此作已記錄json
思路:使用HttpClient鏈接池,多線程session
public class HttpAsyncClient { private static int socketTimeout = 500;// 設置等待數據超時時間0.5秒鐘 根據業務調整 private static int connectTimeout = 2000;// 鏈接超時 private static int poolSize = 100;// 鏈接池最大鏈接數 private static int maxPerRoute = 100;// 每一個主機的併發最多隻有1500 private static int connectionRequestTimeout = 3000; //從鏈接池中後去鏈接的timeout時間 // http代理相關參數 private String host = ""; private int port = 0; private String username = ""; private String password = ""; // 異步httpclient private CloseableHttpAsyncClient asyncHttpClient; // 異步加代理的httpclient private CloseableHttpAsyncClient proxyAsyncHttpClient; public HttpAsyncClient() { try { this.asyncHttpClient = createAsyncClient(false); this.proxyAsyncHttpClient = createAsyncClient(true); } catch (Exception e) { e.printStackTrace(); } } public CloseableHttpAsyncClient createAsyncClient(boolean proxy) throws KeyManagementException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, MalformedChallengeException, IOReactorException { RequestConfig requestConfig = RequestConfig.custom() .setConnectionRequestTimeout(connectionRequestTimeout) .setConnectTimeout(connectTimeout) .setSocketTimeout(socketTimeout).build(); SSLContext sslcontext = SSLContexts.createDefault(); UsernamePasswordCredentials credentials = new UsernamePasswordCredentials( username, password); CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, credentials); // 設置協議http和https對應的處理socket連接工廠的對象 Registry<SchemeIOSessionStrategy> sessionStrategyRegistry = RegistryBuilder .<SchemeIOSessionStrategy> create() .register("http", NoopIOSessionStrategy.INSTANCE) .register("https", new SSLIOSessionStrategy(sslcontext)) .build(); // 配置io線程 IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setSoKeepAlive(false).setTcpNoDelay(true) .setIoThreadCount(Runtime.getRuntime().availableProcessors()) .build(); // 設置鏈接池大小 ConnectingIOReactor ioReactor; ioReactor = new DefaultConnectingIOReactor(ioReactorConfig); PoolingNHttpClientConnectionManager conMgr = new PoolingNHttpClientConnectionManager( ioReactor, null, sessionStrategyRegistry, null); if (poolSize > 0) { conMgr.setMaxTotal(poolSize); } if (maxPerRoute > 0) { conMgr.setDefaultMaxPerRoute(maxPerRoute); } else { conMgr.setDefaultMaxPerRoute(10); } ConnectionConfig connectionConfig = ConnectionConfig.custom() .setMalformedInputAction(CodingErrorAction.IGNORE) .setUnmappableInputAction(CodingErrorAction.IGNORE) .setCharset(Consts.UTF_8).build(); Lookup<AuthSchemeProvider> authSchemeRegistry; authSchemeRegistry = RegistryBuilder .<AuthSchemeProvider> create() .register(AuthSchemes.BASIC, new BasicSchemeFactory()) .register(AuthSchemes.DIGEST, new DigestSchemeFactory()) .register(AuthSchemes.NTLM, new NTLMSchemeFactory()) .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory()) .register(AuthSchemes.KERBEROS, new KerberosSchemeFactory()) .build(); conMgr.setDefaultConnectionConfig(connectionConfig); if (proxy) { return HttpAsyncClients.custom().setConnectionManager(conMgr) .setDefaultCredentialsProvider(credentialsProvider) .setDefaultAuthSchemeRegistry(authSchemeRegistry) .setProxy(new HttpHost(host, port)) .setDefaultCookieStore(new BasicCookieStore()) .setDefaultRequestConfig(requestConfig).build(); } else { return HttpAsyncClients.custom().setConnectionManager(conMgr) .setDefaultCredentialsProvider(credentialsProvider) .setDefaultAuthSchemeRegistry(authSchemeRegistry) .setDefaultCookieStore(new BasicCookieStore()).build(); } } public CloseableHttpAsyncClient getAsyncHttpClient() { return asyncHttpClient; } public CloseableHttpAsyncClient getProxyAsyncHttpClient() { return proxyAsyncHttpClient; } }
public class HttpClientFactory { private static HttpAsyncClient httpAsyncClient = new HttpAsyncClient(); private HttpClientFactory() { } private static HttpClientFactory httpClientFactory = new HttpClientFactory(); public static HttpClientFactory getInstance() { return httpClientFactory; } public HttpAsyncClient getHttpAsyncClientPool() { return httpAsyncClient; } }
public void sendThredPost(List<FaceBookUserQuitEntity> list,String title,String subTitle,String imgUrl){ if(list == null || list.size() == 0){ new BusinessException("亞洲查詢用戶數據爲空"); } int number = list.size(); int num = number / 10; PostThread[] threads = new PostThread[1]; if(num > 0){ threads = new PostThread[10]; for(int i = 0; i <= 9; i++) { List<FaceBookUserQuitEntity> threadList = list.subList(i * num, (i + 1) * num > number ? number : (i + 1) * num); if (threadList == null || threadList.size() == 0) { new BusinessException("亞洲切分用戶數據爲空"); } threads[i] = new PostThread(HttpClientFactory.getInstance().getHttpAsyncClientPool().getAsyncHttpClient(), threadList, title, subTitle, imgUrl); } for (int k = 0; k< threads.length; k++) { threads[k].start(); logger.info("亞洲線程: {} 啓動",k); } for (int j = 0; j < threads.length; j++) { try { threads[j].join(); } catch (InterruptedException e) { e.printStackTrace(); } } }else{ threads[0] = new PostThread(HttpClientFactory.getInstance().getHttpAsyncClientPool().getAsyncHttpClient(), list,title,subTitle, imgUrl); threads[0].start(); try { threads[0].join(); } catch (InterruptedException e) { e.printStackTrace(); } }
public PostThread(CloseableHttpAsyncClient httpClient, List<FaceBookUserQuitEntity> list, String title, String subTitle,String imgUrl){ this.httpClient = httpClient; this.list = list; this. title= title; this. subTitle= subTitle; this. imgUrl= imgUrl; } @Override public void run() { try { int size = list.size(); for (int k = 0; k < size; k += 100) { List<FaceBookUserQuitEntity> subList = new ArrayList<FaceBookUserQuitEntity>(); if (k + 100 < size) { subList = list.subList(k, k + 100); } else { subList = list.subList(k, size); } if(subList.size() > 0){ httpClient.start(); final long startTime = System.currentTimeMillis(); final CountDownLatch latch = new CountDownLatch(subList.size()); for (FaceBookUserQuitEntity faceBookEntity : subList) { String senderId = faceBookEntity.getSenderId(); String player_id = faceBookEntity.getPlayer_id(); logger.info("開始發送消息:playerid=" + player_id); String bodyStr = getPostbody(senderId, player_id, title, subTitle, imgUrl, "Play Game", ""); if (!bodyStr.isEmpty()) { final HttpPost httpPost = new HttpPost(URL); StringEntity stringEntity = new StringEntity(bodyStr, "utf-8"); stringEntity.setContentEncoding("UTF-8"); stringEntity.setContentType("application/json"); httpPost.setEntity(stringEntity); httpClient.execute(httpPost, new FutureCallback<HttpResponse>() { @Override public void completed(HttpResponse result) { latch.countDown(); int statusCode = result.getStatusLine().getStatusCode(); if(200 == statusCode){ logger.info("請求發消息成功="+bodyStr); try { logger.info(EntityUtils.toString(result.getEntity(), "UTF-8")); } catch (IOException e) { e.printStackTrace(); } }else{ logger.info("請求返回狀態="+statusCode); logger.info("請求發消息失敗="+bodyStr); try { logger.info(EntityUtils.toString(result.getEntity(), "UTF-8")); } catch (IOException e) { e.printStackTrace(); } } } @Override public void failed(Exception ex) { latch.countDown(); logger.info("請求發消息失敗e="+ex); } @Override public void cancelled() { latch.countDown(); } }); } } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long leftTime = 10000 - (System.currentTimeMillis() - startTime); if (leftTime > 0) { try { Thread.sleep(leftTime); } catch (InterruptedException e) { e.printStackTrace(); } } } } } catch (UnsupportedCharsetException e) { e.printStackTrace(); } }
以上工具代碼可直接使用,發送邏輯代碼需適當修改。多線程