package cloud.app.prod.home.utils; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; /** * Author : YongBo Xie </br> * File Name: DSHMessageAndSmsSendThreadUtil.java </br> * Created Date: 2018年3月26日 下午2:59:46 </br> * Modified Date: 2018年3月26日 下午2:59:46 </br> * Version: 1.0 </br> */ public class DSHWechatMsgAndSmsSendThreadUtil { private static Logger logger = Logger.getLogger(DSHWechatMsgAndSmsSendThreadUtil.class); public static void main(String[] args) { List<String> mobileList = new ArrayList<>(); mobileList.add("17721111111"); mobileList.add("15223333333"); String content = "SCRM 多線程發送短信測試"; try { sendSMS(mobileList, content); } catch (Exception e) { e.printStackTrace(); } } /** * 發送短信 * * @param mobileList * 手機號碼 * @param content * 短信內容 * @throws Exception */ public static void sendSMS(List<String> mobileList, String content) throws Exception { try { logger.info("send message start..."); long startTime = System.currentTimeMillis(); BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(20000); ThreadPoolExecutor executors = new ThreadPoolExecutor(5, 10, 60000, TimeUnit.SECONDS, queue); // 要推送的用戶總數 int count = mobileList.size(); logger.info("message all count=" + count); // 初始每一個線程處理的用戶數量 final int eveLength = 5; // 計算處理全部用戶須要的線程數量 int eveBlocks = count / eveLength + (count % eveLength != 0 ? 1 : 0); logger.info("need thread's count=" + eveBlocks); // 線程計數器 CountDownLatch doneSignal = new CountDownLatch(eveBlocks); // 開啓線程處理 int doneCount = 0; for (int page = 0; page < eveBlocks; page++) { /* blocks太大能夠再細分從新調度 */ content = content + ",線程" + (page + 1); SmsSendThread ms = new SmsSendThread(mobileList, content, page, eveLength, doneSignal); executors.execute(ms); // logger.info("start thread =>{}",page+1); doneCount++; } doneSignal.await();// 等待全部計數器線程執行完 long endTime = System.currentTimeMillis(); logger.info("send message all thread ends!time(s)=" + (startTime - endTime) / 1000); logger.info("all thread count=" + doneCount); } catch (Exception e) { logger.error("send message error=>{}", e); } } /** * 微信公衆號推送消息 * @param accessToken 公衆號token * @param openIdList 微信OpenID列表 * @param content 消息內容 * @throws Exception */ public static void sendWechatMsg(String accessToken, List<String> openIdList, String content) throws Exception { try { logger.info("send message start..."); long startTime = System.currentTimeMillis(); BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(20000); ThreadPoolExecutor executors = new ThreadPoolExecutor(5, 10, 60000, TimeUnit.SECONDS, queue); // 要推送的用戶總數 int count = openIdList.size(); logger.info("message all count=" + count); // 初始每一個線程處理的用戶數量 final int eveLength = 2000; // 計算處理全部用戶須要的線程數量 int eveBlocks = count / eveLength + (count % eveLength != 0 ? 1 : 0); logger.info("need thread's count=" + eveBlocks); // 線程計數器 CountDownLatch doneSignal = new CountDownLatch(eveBlocks); // 開啓線程處理 int doneCount = 0; for (int page = 0; page < eveBlocks; page++) { /* blocks太大能夠再細分從新調度 */ WachatMagSendThread ms = new WachatMagSendThread(accessToken, openIdList, content, page, eveLength, doneSignal); executors.execute(ms); // logger.info("start thread =>{}",page+1); doneCount++; } doneSignal.await();// 等待全部計數器線程執行完 long endTime = System.currentTimeMillis(); logger.info("send message all thread ends!time(s)=" + (startTime - endTime) / 1000); logger.info("all thread count=" + doneCount); } catch (Exception e) { logger.error("send message error=>{}", e); } } }
package cloud.app.prod.home.utils; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.log4j.Logger; /** * Author : YongBo Xie </br> * File Name: SmsSendThreadUtils.java </br> * Created Date: 2018年3月26日 下午2:17:42 </br> * Modified Date: 2018年3月26日 下午2:17:42 </br> * Version: 1.0 </br> */ public class SmsSendThread implements Runnable { private static Logger logger = Logger.getLogger(SmsSendThread.class); private List<String> mobileList;//手機號碼 private String content;//短信內容 private int currentIndex;//當前索引 private int rows;//處理數據條數 private CountDownLatch doneSignal;//處理線程條數 public SmsSendThread(List<String> mobileList, String content, int currentIndex, int rows, CountDownLatch doneSignal) { this.mobileList = mobileList; this.content = content; this.currentIndex = currentIndex; this.rows = rows; this.doneSignal = doneSignal; } @Override public void run() { // 查詢當前的block範圍內的發送的手機號=>篩選目標客戶羣手機號--------- String mobiles = filterPhones(mobileList, currentIndex, rows); try { DSHSendMessageUtil.postSendMsg(content, mobiles); } catch (Exception e) { logger.error("send message thread exception=>{" + mobiles + "}{" + e.getMessage() + "}"); e.printStackTrace(); } finally { doneSignal.countDown();//工人完成工做,計數器減一 } } /** * 篩選目標客戶羣手機號 * @param mobileList 手機號碼 * @param currentIndex 當前索引 * @param rows 處理數據條數 * @return */ private String filterPhones(List<String> mobileList, int currentIndex, int rows) { int startIndex = currentIndex * rows; int endIndex = (currentIndex + 1) * rows; if (endIndex > mobileList.size()) { endIndex = mobileList.size(); } String mobiles = ""; for (int i = startIndex; i < endIndex; i++) { mobiles = mobileList.get(i) + "," + mobiles; } return mobiles; } }
短信發送html
package cloud.app.prod.home.utils; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; import java.net.URL; import java.net.URLEncoder; /** * File Name: DSHSendMessageUtil.java<br> * Created Date: 2018年3月15日 上午9:54:12<br> * Modified Date: 2018年3月15日 上午9:54:12<br> * Version: 1.0<br> */ public class DSHSendMessageUtil { private static final String OPER_ID = "ceshfj2"; private static final String OPER_PASS = "jcUiROHM"; private static final String SEND_URL = "http://qxtsms.guodulink.net:8000/QxtSms/QxtFirewall"; private static final String SEND_TIME = ""; private static final String VALID_TIME = ""; private static final String APPEND_ID = "888"; /** * Test * @param args * @throws Exception */ public static void main(String args[]) throws Exception { try { String content = "http://news.sina.com.cn/o/2018-03-15/doc-ifyshvuy1243084.shtml"; String mobiles = "13918159104,15216865591"; /* post方式發送消息 */ boolean postResponse = DSHSendMessageUtil.postSendMsg(content, mobiles); System.out.println("post方式返回的響應爲:" + postResponse); } catch (Exception e) { // TODO: handle exception } } /** * 發送短信 * @param content 短信內容 * @param mobiles 手機號,多個手機號可用「,」隔開 * @return boolean */ public static boolean postSendMsg(String content, String mobiles) throws Exception { boolean flag = false; try { /* 將內容用URLEncoder編一次GBK */ String encoderContent = ""; encoderContent = URLEncoder.encode(content, "GBK"); /* 消息參數 */ StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("OperID=").append(OPER_ID) .append("&OperPass=").append(OPER_PASS) .append("&SendTime=").append(SEND_TIME) .append("&ValidTime=").append(VALID_TIME) .append("&AppendID=").append(APPEND_ID) .append("&DesMobile=").append(mobiles.trim()) .append("&Content=").append(encoderContent) .append("&ContentType=").append("8"); /* 使用post方式發送消息 */ String response = postURL(stringBuilder.toString(), SEND_URL); if (response.indexOf("<code>01</code>") >= 0) { flag = true; } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return flag; } /** * @param commString 須要發送的url參數串 * @param address 須要發送的url地址 * @return 國都返回的XML格式的串 * @catch Exception */ public static String postURL(String commString, String address) { String rec_string = ""; URL url = null; HttpURLConnection urlConn = null; try { /* 獲得url地址的URL類 */ url = new URL(address); /* 得到打開須要發送的url鏈接 */ urlConn = (HttpURLConnection) url.openConnection(); /* 設置鏈接超時時間 */ urlConn.setConnectTimeout(30000); /* 設置讀取響應超時時間 */ urlConn.setReadTimeout(30000); /* 設置post發送方式 */ urlConn.setRequestMethod("POST"); /* 發送commString */ urlConn.setDoOutput(true); urlConn.setDoInput(true); OutputStream out = urlConn.getOutputStream(); out.write(commString.getBytes()); out.flush(); out.close(); /* 發送完畢 獲取返回流,解析流數據 */ BufferedReader rd = new BufferedReader(new InputStreamReader(urlConn.getInputStream(), "GBK")); StringBuffer sb = new StringBuffer(); int ch; while ((ch = rd.read()) > -1) { sb.append((char) ch); } rec_string = sb.toString().trim(); rd.close(); } catch (Exception e) { rec_string = "-107"; } finally { if (urlConn != null) { urlConn.disconnect(); } } return rec_string; } }
微信公衆號推送消息線程java
package cloud.app.prod.home.utils; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.log4j.Logger; import cloud.app.prod.home.wechat.DSHWechatAPIHander; import net.sf.json.JSONArray; import net.sf.json.JSONObject; /** * Author : YongBo Xie </br> * File Name: WachatMagSendThread.java </br> * Created Date: 2018年3月26日 下午4:11:23 </br> * Modified Date: 2018年3月26日 下午4:11:23 </br> * Version: 1.0 </br> */ public class WachatMagSendThread implements Runnable { private static Logger logger = Logger.getLogger(WachatMagSendThread.class); private String accessToken;//公衆號token private List<String> openIdList;//微信OpenID列表 private String content;//消息內容 private int currentIndex;//當前索引 private int rows;//處理數據條數 private CountDownLatch doneSignal;//處理線程條數 public WachatMagSendThread(String accessToken, List<String> openIdList, String content, int currentIndex, int rows, CountDownLatch doneSignal) { this.accessToken = accessToken; this.openIdList = openIdList; this.content = content; this.currentIndex = currentIndex; this.rows = rows; this.doneSignal = doneSignal; } @Override public void run() { // 查詢當前的block範圍內的發送的OpenID--------- JSONArray openIdArray = filterOpenIds(openIdList, currentIndex, rows); try { // 設置發送消息的參數 JSONObject msgJson = new JSONObject(); msgJson.put("touser", openIdArray); msgJson.put("msgtype", "text"); JSONObject contentJson = new JSONObject(); contentJson.put("content", content); msgJson.put("text", contentJson); DSHWechatAPIHander.sendMessage(accessToken, msgJson.toString()); } catch (Exception e) { logger.error("send message thread exception=>{" + openIdArray.toString() + "}{" + e.getMessage() + "}"); e.printStackTrace(); } finally { doneSignal.countDown();//工人完成工做,計數器減一 } } /** * 篩選目標客戶羣OpenID * @param openIdList OpenID * @param currentIndex 當前索引 * @param rows 處理數據條數 * @return */ private JSONArray filterOpenIds(List<String> openIdList, int currentIndex, int rows) { int startIndex = currentIndex * rows; int endIndex = (currentIndex + 1) * rows; if (endIndex > openIdList.size()) { endIndex = openIdList.size(); } JSONArray openIdArray = new JSONArray(); for (int i = startIndex; i < endIndex; i++) { openIdArray.add(openIdList.get(i)); } return openIdArray; } }
微信公衆號推送消息apache
package cloud.app.prod.home.wechat; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.HttpResponse; import org.apache.http.NameValuePair; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.HttpClient; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.client.methods.RequestBuilder; import org.apache.http.conn.routing.HttpRoute; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.apache.log4j.Logger; /** * Author : YongBo Xie </br> * File Name: WechatAPIHander.java </br> * Created Date: 2018年3月13日 下午6:29:00 </br> * Modified Date: 2018年3月13日 下午6:29:00 </br> * Version: 1.0 </br> */ public class DSHWechatAPIHander { private static Logger logger = Logger.getLogger(DSHWechatAPIHander.class); /** * 主動推送信息接口(羣發) */ private static String SEND_MSG_URL = "https://api.weixin.qq.com/cgi-bin/message/mass/sendall?access_token={0}"; private static PoolingHttpClientConnectionManager connectionManager = null; private static HttpClientBuilder httpBuilder = null; private static RequestConfig requestConfig = null; private static int MAXCONNECTION = 10; private static int DEFAULTMAXCONNECTION = 5; private static String IP = "127.0.0.1"; private static int PORT = 8888; static { // 設置http的狀態參數 requestConfig = RequestConfig.custom().setSocketTimeout(5000).setConnectTimeout(5000).setConnectionRequestTimeout(5000).build(); HttpHost target = new HttpHost(IP, PORT); connectionManager = new PoolingHttpClientConnectionManager(); connectionManager.setMaxTotal(MAXCONNECTION);// 客戶端總並行連接最大數 connectionManager.setDefaultMaxPerRoute(DEFAULTMAXCONNECTION);// 每一個主機的最大並行連接數 connectionManager.setMaxPerRoute(new HttpRoute(target), 20); httpBuilder = HttpClients.custom(); httpBuilder.setConnectionManager(connectionManager); } public static CloseableHttpClient getConnection() { CloseableHttpClient httpClient = httpBuilder.build(); return httpClient; } public static HttpUriRequest getRequestMethod(Map<String, String> map, String url, String method) { List<NameValuePair> params = new ArrayList<NameValuePair>(); Set<Map.Entry<String, String>> entrySet = map.entrySet(); for (Map.Entry<String, String> e : entrySet) { String name = e.getKey(); String value = e.getValue(); NameValuePair pair = new BasicNameValuePair(name, value); params.add(pair); } HttpUriRequest reqMethod = null; if ("post".equals(method)) { reqMethod = RequestBuilder.post().setUri(url).addParameters(params.toArray(new BasicNameValuePair[params.size()])).setConfig(requestConfig).build(); } else if ("get".equals(method)) { reqMethod = RequestBuilder.get().setUri(url).addParameters(params.toArray(new BasicNameValuePair[params.size()])).setConfig(requestConfig).build(); } return reqMethod; } /** * @desc 推送信息 * @param token * @param msg * @return */ public static void sendMessage(String token, String msg) { try { logger.info("\n\nsendMessage start.token:" + token + ",msg:" + msg); String url = MessageFormat.format(SEND_MSG_URL, token); // 建立默認的httpClient實例. CloseableHttpClient httpclient = HttpClients.createDefault(); // 建立httppost HttpPost httppost = new HttpPost(url); // 設置發送消息的參數 // 這裏必須是一個合法的json格式數據,每一個字段的意義能夠查看上面鏈接的說明,content後面的test是要發送給用戶的數據,這裏是羣發給全部人 // msg = "{\"filter\":{\"is_to_all\":true},\"text\":{\"content\":\"test\"},\"msgtype\":\"text\"}\""; StringEntity sEntity; try { sEntity = new StringEntity(msg); // 解決中文亂碼的問題 sEntity.setContentEncoding("UTF-8"); sEntity.setContentType("application/json"); httppost.setEntity(sEntity); System.out.println("executing request " + httppost.getURI()); // 發送請求 CloseableHttpResponse response = httpclient.execute(httppost); try { HttpEntity hEntity = response.getEntity(); if (hEntity != null) { System.out.println("--------------------------------------"); System.out.println("Response content: " + EntityUtils.toString(hEntity, "UTF-8")); System.out.println("--------------------------------------"); } } finally { response.close(); } } catch (ClientProtocolException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e1) { e1.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { // 關閉鏈接,釋放資源 try { httpclient.close(); } catch (IOException e) { e.printStackTrace(); } } } catch (Exception e) { logger.error("get user info exception", e); } } public static void main(String args[]) throws IOException { Map<String, String> map = new HashMap<String, String>(); map.put("account", ""); map.put("password", ""); HttpClient client = getConnection(); HttpUriRequest post = getRequestMethod(map, "http://baidu.com", "post"); HttpResponse response = client.execute(post); if (response.getStatusLine().getStatusCode() == 200) { HttpEntity entity = response.getEntity(); String message = EntityUtils.toString(entity, "utf-8"); System.out.println(message); } else { System.out.println("請求失敗"); } } }