使用spring應用緩存。使用方式:使用@EnableCache註解激活Spring的緩存功能,須要建立一個CacheManager來處理緩存。如使用一個內存緩存示例html
package com.github.bjlhx15.gradle.demotest; import org.springframework.cache.CacheManager; import org.springframework.cache.annotation.EnableCaching; import org.springframework.cache.concurrent.ConcurrentMapCache; import org.springframework.cache.support.SimpleCacheManager; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Arrays; @Configuration @EnableCaching public class CacheConfiguration { @Bean public CacheManager cacheManager(){ SimpleCacheManager simpleCacheManager=new SimpleCacheManager(); simpleCacheManager.setCaches(Arrays.asList(new ConcurrentMapCache("searches"))); return simpleCacheManager; } }
其餘實現如:EhCacheManager、GuavaCacheManager等前端
主要標記:java
@CacheEvict:將會從緩存中移除一個條目jquery
@CachePut:會將方法結果放到緩存中,而不會影響到方法調用自己。git
@Caching:將緩存註解從新分組github
@CacheConfig:指向不一樣的緩存配置web
更多spring 應用緩存:https://www.cnblogs.com/bjlhx/category/1233985.htmlredis
推薦使用redis,系列文章:https://www.cnblogs.com/bjlhx/category/1066467.html算法
spring使用也比較方便:https://www.cnblogs.com/bjlhx/category/1233985.htmlspring
在程序執行時候還有一個瓶頸,串行執行,能夠經過使用不一樣線程類快速提高應用的速度。
要啓用Spring的異步功能,必需要使用@EnableAsync註解。這樣將會透明地使用java.util.concurrent.Executor來執行全部帶有@Async註解的方法。
@Async所修飾的函數不要定義爲static類型,這樣異步調用不會生效
針對調用的Async,若是不作Future特殊處理,執行完調用方法會當即返回結果,如異步郵件發送,不會真的等郵件發送完畢才響應客戶,如需等待可使用Future阻塞處理。
一、main方法增長@EnableAsync註解
@SpringBootApplication @EnableAsync public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); System.out.println("ThreadId:"+Thread.currentThread().getId()); } }
二、在所需方法增長@Async註解
@Component public class Task { @Async public void doTaskOne() throws Exception { for (int i = 0; i < 3; i++) { Thread.sleep(200); System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskOne"); } } @Async public void doTaskTwo() throws Exception { for (int i = 0; i < 3; i++) { Thread.sleep(200); System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskTwo"); } } @Async public void doTaskThree() throws Exception { for (int i = 0; i < 3; i++) { Thread.sleep(200); System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskThree"); } } }
三、查看調用
@RestController public class TestAsyns { @Autowired private Task task; @RequestMapping("/testAsync") public ResponseEntity testAsync() throws Exception { task.doTaskOne(); task.doTaskTwo(); task.doTaskThree(); return ResponseEntity.ok("ok"); } }
上述方法依次調用三個方法。
若是去除@EnableAsync註解,輸出以下:【可見是串行執行】
ThreadId:33:doTaskOne ThreadId:33:doTaskOne ThreadId:33:doTaskOne ThreadId:33:doTaskTwo ThreadId:33:doTaskTwo ThreadId:33:doTaskTwo ThreadId:33:doTaskThree ThreadId:33:doTaskThree ThreadId:33:doTaskThree
若是增長@EnableAsync註解,輸出以下:【可見是並行執行】
ThreadId:56:doTaskThree ThreadId:55:doTaskTwo ThreadId:54:doTaskOne ThreadId:54:doTaskOne ThreadId:55:doTaskTwo ThreadId:56:doTaskThree ThreadId:54:doTaskOne ThreadId:56:doTaskThree ThreadId:55:doTaskTwo
一、配置類
方式1、注入Bean方式
import java.util.concurrent.Executor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Configuration public class ThreadConfig { // 執行須要依賴線程池,這裏就來配置一個線程池 @Bean public Executor getExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); executor.setQueueCapacity(25); executor.initialize(); return executor; } }
方式2、經過實現AsyncConfigurer接口,能夠自定義默認的執行(executor)。新增以下配置類:
@Configuration public class AsyncConfiguration implements AsyncConfigurer { protected final Logger logger = LoggerFactory.getLogger(AsyncConfiguration.class); @Override public Executor getAsyncExecutor() { //作好不超過10個,這裏寫兩個方便測試 return Executors.newFixedThreadPool(2); } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex,method,params)->logger.error("Uncaught async error",ex); } }
Executor的初始化配置,還有不少種,能夠參看https://www.cnblogs.com/bjlhx/category/1086008.html
Spring 已經實現的異步線程池:
1. SimpleAsyncTaskExecutor:不是真的線程池,這個類不重用線程,每次調用都會建立一個新的線程。
2. SyncTaskExecutor:這個類沒有實現異步調用,只是一個同步操做。只適用於不須要多線程的地方
3. ConcurrentTaskExecutor:Executor的適配類,不推薦使用。若是ThreadPoolTaskExecutor不知足要求時,才用考慮使用這個類
4. SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的類。線程池同時被quartz和非quartz使用,才須要使用此類
5. ThreadPoolTaskExecutor :最常使用,推薦。 其實質是對java.util.concurrent.ThreadPoolExecutor的包裝
使用上述配置,可以確保在應用中,用來處理異步任務的線程不會超過10個。這對於web應用很重要,由於每一個客戶端都會有一個專用的線程。你所使用的線程越多,阻塞時間越長那麼可以處理的客戶端就會越少。
若是設置成兩個,程序中有3個異步線程,也會只有兩個運行,以下
ThreadId:55:doTaskTwo ThreadId:54:doTaskOne ThreadId:55:doTaskTwo ThreadId:54:doTaskOne ThreadId:55:doTaskTwo ThreadId:54:doTaskOne ThreadId:55:doTaskThree ThreadId:55:doTaskThree ThreadId:55:doTaskThree
二、使用
同上述一致。
修改異步執行的方法
@Component public class TaskFutureDemo { @Async public Future<String> doTaskOne() throws Exception { for (int i = 0; i < 3; i++) { Thread.sleep(1000); System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskOne"); } return new AsyncResult<>("doTaskOne"); } @Async public Future<String> doTaskTwo() throws Exception { for (int i = 0; i < 3; i++) { Thread.sleep(1000); System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskTwo"); } return new AsyncResult<>("doTaskTwo"); } @Async public Future<String> doTaskThree() throws Exception { for (int i = 0; i < 3; i++) { Thread.sleep(1000); System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskThree"); } return new AsyncResult<>("doTaskThree"); } }
修改調用的方法
@RestController public class TestAsynsFutureController { @Autowired private TaskFutureDemo task; @RequestMapping("/testAsyncFuture") public ResponseEntity testAsyncFuture() throws Exception { Future<String> taskOne = task.doTaskOne(); Future<String> taskTwo = task.doTaskTwo(); Future<String> taskThree = task.doTaskThree(); while (true) { if (taskOne.isDone() && taskTwo.isDone() && taskThree.isDone()) { break; } } return ResponseEntity.ok("ok"); } }
Service實現類
@Service public class TaskListenableFutureService { private AsyncSearch asyncSearch; @Autowired public TaskListenableFutureService(AsyncSearch asyncSearch) { this.asyncSearch=asyncSearch; } public List<String> search(List<String> keywords){ CountDownLatch latch=new CountDownLatch(keywords.size()); List<String> allResult=Collections.synchronizedList(new ArrayList<>()); keywords.stream() .forEach(keyword->asyncFetch(latch,allResult,keyword)); await(latch); return allResult; } private void asyncFetch(CountDownLatch latch, List<String> result, String keyword){ asyncSearch.asyncFetch(keyword) .addCallback( key->onSuccess(result,latch,key), ex -> onError(latch,ex) ); } private void await(CountDownLatch latch){ try { latch.await(); } catch (InterruptedException e) { throw new IllegalStateException(e); } } private static void onSuccess(List<String> result,CountDownLatch latch,String keyword){ result.add(keyword); latch.countDown(); } private static void onError(CountDownLatch latch,Throwable ex){ ex.printStackTrace(); latch.countDown(); } @Component private static class AsyncSearch{ @Autowired public AsyncSearch() { } protected final Logger logger = LoggerFactory.getLogger(AsyncSearch.class); @Async public ListenableFuture<String> asyncFetch(String keyword){ logger.info(Thread.currentThread().getName()+"-"+keyword); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return new AsyncResult<>("keyword="+keyword); } } }
調用方法
@RestController public class TestAsynsListenableFutureController { protected final Logger logger = LoggerFactory.getLogger(TestAsynsListenableFutureController.class); @Autowired private TaskListenableFutureService task; @RequestMapping("/testAsyncListenableFuture") public ResponseEntity testAsyncListenableFuture() throws Exception { List<String> list = task.search(Arrays.asList("java", "html", "spring")); list.stream().forEach(p-> logger.info(p)); return ResponseEntity.ok("ok"); } }
User實體類
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; @JsonIgnoreProperties(ignoreUnknown=true) public class User { private String name; private String blog; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getBlog() { return blog; } public void setBlog(String blog) { this.blog = blog; } @Override public String toString() { return "User [name=" + name + ", blog=" + blog + "]"; } }
CompletableFuture的服務
@Service public class GitHubLookupService { private static final Logger logger = LoggerFactory.getLogger(GitHubLookupService.class); private final RestTemplate restTemplate; public GitHubLookupService(RestTemplateBuilder restTemplateBuilder) { this.restTemplate = restTemplateBuilder.build(); } @Async public CompletableFuture<User> findUser(String user) throws InterruptedException { logger.info("Looking up " + user); String url = String.format("https://api.github.com/users/%s", user); User results = restTemplate.getForObject(url, User.class); // Artificial delay of 1s for demonstration purposes Thread.sleep(1000L); return CompletableFuture.completedFuture(results); } }
調用方法
@RestController public class CompletableFutureController { private static final Logger logger = LoggerFactory.getLogger(CompletableFutureController.class); @Autowired private GitHubLookupService gitHubLookupService; @RequestMapping("/testCompletableFuture") public ResponseEntity testCompletableFuture() throws Exception { // Start the clock long start = System.currentTimeMillis(); // Kick of multiple, asynchronous lookups CompletableFuture<User> page1 = gitHubLookupService.findUser("PivotalSoftware"); CompletableFuture<User> page2 = gitHubLookupService.findUser("CloudFoundry"); CompletableFuture<User> page3 = gitHubLookupService.findUser("Spring-Projects"); // Wait until they are all done CompletableFuture.allOf(page1,page2,page3).join(); // Print results, including elapsed time logger.info("Elapsed time: " + (System.currentTimeMillis() - start)); logger.info("--> " + page1.get()); logger.info("--> " + page2.get()); logger.info("--> " + page3.get()); return ResponseEntity.ok("ok"); } }
ETag:是實體標籤(Entity Tag)的縮寫。ETag通常不以明文形式相應給客戶端。在資源的各個生命週期中,它都具備不一樣的值,用於標識出資源的狀態。當資源發生變動時,若是其頭信息中一個或者多個發生變化,或者消息實體發生變化,那麼ETag也隨之發生變化。
ETag值的變動說明資源狀態已經被修改。每每能夠經過時間戳就能夠便宜的獲得ETag頭信息。在服務端中若是發回給消費者的相應從一開始起就由ETag控制,那麼能夠確保更細粒度的ETag升級徹底由服務來進行控制。服務計算ETag值,並在相應客戶端請求時將它返回給客戶端。
在HTTP1.1協議中並無規範如何計算ETag。ETag值能夠是惟一標識資源的任何東西,如持久化存儲中的某個資源關聯的版本、一個或者多個文件屬性,實體頭信息和校驗值、(CheckSum),也能夠計算實體信息的散列值。有時候,爲了計算一個ETag值可能有比較大的代價,此時能夠採用生成惟一值等方式(如常見的GUID)。不管怎樣,服務都應該儘量的將ETag值返回給客戶端。客戶端不用關心ETag值如何產生,只要服務在資源狀態發生變動的狀況下將ETag值發送給它就行。
ETag值能夠經過uuid、整數、長整形、字符串等四種類型。
計算ETag值時,須要考慮兩個問題:計算與存儲。若是一個ETag值只須要很小的代價以及佔用很低的存儲空間,那麼咱們能夠在每次須要發送給客戶端ETag值值的時候計算一遍就行行了。相反的,咱們須要將以前就已經計算並存儲好的ETag值發送給客戶端。以前說:將時間戳做爲字符串做爲一種廉價的方式來獲取ETag值。對於不是常常變化的消息,它是一種足夠好的方案。注意:若是將時間戳作爲ETag值,一般不該該用Last-Modified的值。因爲HTTP機制中,因此當咱們在經過服務校驗資源狀態時,客戶端不須要進行相應的改動。計算ETag值開銷最大的通常是計算採用哈希算法獲取資源的表述值。能夠只計算資源的哈希值,也能夠將頭信息和頭信息的值也包含進去。若是包含頭信息,那麼注意不要包含計算機標識的頭信息。一樣也應該避免包含Expires、Cache-Control和Vary頭信息。注意:在經過哈希算法。
ETag有兩種類型:強ETag(strong ETag)與弱ETag(weak ETag)。
強ETag表示形式:"22FAA065-2664-4197-9C5E-C92EA03D0A16"。
弱ETag表現形式:w/"22FAA065-2664-4197-9C5E-C92EA03D0A16"。
強、弱ETag類型的出現與Apache服務器計算ETag的方式有關。Apache默認經過FileEtag中FileEtag INode Mtime Size的配置自動生成ETag(固然也能夠經過用戶自定義的方式)。假設服務端的資源頻繁被修改(如1秒內修改了N次),此時若是有用戶將Apache的配置改成MTime,因爲MTime只能精確到秒,那麼就能夠避免強ETag在1秒內的ETag老是不一樣而頻繁刷新Cache(若是資源在秒級常常被修改,也能夠經過Last-Modified來解決)。
Etag 主要爲了解決 Last-Modified 沒法解決的一些問題。
一、 一些文件也許會週期性的更改,可是他的內容並不改變(僅僅改變的修改時間),這個時候咱們並不但願客戶端認爲這個文件被修改了,而從新GET;
二、某些文件修改很是頻繁,好比在秒如下的時間內進行修改,(比方說1s內修改了N次),If-Modified-Since能檢查到的粒度是s級的,這種修改沒法判斷(或者說UNIX記錄MTIME只能精確到秒
三、某些服務器不能精確的獲得文件的最後修改時間;
爲此,HTTP/1.1 引入了 Etag(Entity Tags).Etag僅僅是一個和文件相關的標記,能夠是一個版本標記,好比說v1.0.0或者說"2e681a-6-5d044840"這麼一串看起來很神祕的編碼。可是HTTP/1.1標準並無規定Etag的內容是什麼或者說要怎麼實現,惟一規定的是Etag須要放在""內。
Etag由服務器端生成,客戶端經過If-Match或者說If-None-Match這個條件判斷請求來驗證資源是否修改。常見的是使用If-None-Match.請求一個文件的流程可能以下:
====第一次請求===
1.客戶端發起 HTTP GET 請求一個文件;
2.服務器處理請求,返回文件內容和一堆Header,固然包括Etag(例如"2e681a-6-5d044840")(假設服務器支持Etag生成和已經開啓了Etag).狀態碼200
====第二次請求===
1.客戶端發起 HTTP GET 請求一個文件,注意這個時候客戶端同時發送一個If-None-Match頭,這個頭的內容就是第一次請求時服務器返回的Etag:2e681a-6-5d044840
2.服務器判斷髮送過來的Etag和計算出來的Etag匹配,所以If-None-Match爲False,不返回200,返回304,客戶端繼續使用本地緩存;
流程很簡單,問題是,若是服務器又設置了Cache-Control:max-age和Expires呢,怎麼辦?
答案是同時使用,也就是說在徹底匹配If-Modified-Since和If-None-Match即檢查完修改時間和Etag以後,服務器才能返回304.
雖然對請求已經作了應用緩存等處理,可是持續請求一個restful接口請求仍是會發送到服務端去讀取緩存,即便結果沒有發生改變,但結果自己仍是會屢次發送給用戶,形成浪費帶寬。
ETag是Web響應數據的一個散列(Hash),而且會在頭信息中進行發送。客戶端能夠記住資源的ETag,而且經過If-None-Match頭信息將最新的已知版本發送給服務器。若是在這段時間內請求沒有發生變化的話,服務器就會返回304 Not Modified。
在Spring中有一個特殊的Servlet過濾器來處理ETag,名爲ShallowEtagHeaderFilter。只需將此類注入便可:
@Bean public Filter etagFilter(){ return new ShallowEtagHeaderFilter(); }
只要響應頭沒有緩存控制頭信息的話,系統就會爲你的響應生成ETag。
示例
@GetMapping("/testNoChangeContent") public ResponseEntity testNoChangeContent(){ return ResponseEntity.ok("OK"); } @GetMapping("/testChangeContent") public ResponseEntity testChangeContent(){ return ResponseEntity.ok("OK:"+LocalDateTime.now()); }
接口1、testNoChangeContent,是測試內容沒有改變的,第一次請求是200,之後請求是304
接口2、testChangeContent,是測試內容有改變的,第一次請求是200,之後請求均是200
在優化web請求時,這是一種優化方案,在服務器端有可用數據時,就當即將其發送到客戶端。經過多線程方式獲取搜索結果,因此數據會分爲多個塊。這時能夠一點點地進行發送,而沒必要等待全部結果。
Http鏈接爲一次請求(request)一次響應(response),必須爲同步調用方式。WebSocket 協議提供了經過一個套接字實現全雙工通訊的功能。一次鏈接之後,會創建tcp鏈接,後續客戶端與服務器交互爲全雙工方式的交互方式,客戶端能夠發送消息到服務端,服務端也可將消息發送給客戶端。
WebSocket 是發送和接收消息的底層API,WebSocket 協議提供了經過一個套接字實現全雙工通訊的功能。也可以實現 web 瀏覽器和 server 間的異步通訊,全雙工意味着 server 與瀏覽器間能夠發送和接收消息。須要注意的是必須考慮瀏覽器是否支持。
SockJS 是 WebSocket 技術的一種模擬。爲了應對許多瀏覽器不支持WebSocket協議的問題,設計了備選SockJs。開啓並使用SockJS後,它會優先選用Websocket協議做爲傳輸協議,若是瀏覽器不支持Websocket協議,則會在其餘方案中,選擇一個較好的協議進行通信。原來在不支持WebSocket的狀況下,也能夠很簡單地實現WebSocket的功能的,方法就是使用 SockJS。它會優先選擇WebSocket進行鏈接,可是當服務器或客戶端不支持WebSocket時,會自動在 XHR流、XDR流、iFrame事件源、iFrame HTML文件、XHR輪詢、XDR輪詢、iFrame XHR輪詢、JSONP輪詢 這幾個方案中擇優進行鏈接。
STOMP 中文爲: 面向消息的簡單文本協議。websocket定義了兩種傳輸信息類型: 文本信息和二進制信息。類型雖然被肯定,可是他們的傳輸體是沒有規定的。因此,須要用一種簡單的文本傳輸類型來規定傳輸內容,它能夠做爲通信中的文本傳輸協議,即交互中的高級協議來定義交互信息。
STOMP自己能夠支持流類型的網絡傳輸協議: websocket協議和tcp協議。
Stomp還提供了一個stomp.js,用於瀏覽器客戶端使用STOMP消息協議傳輸的js庫。
STOMP的優勢以下:
(1)不須要自建一套自定義的消息格式
(2)現有stomp.js客戶端(瀏覽器中使用)能夠直接使用
(3)能路由信息到指定消息地點
(4)能夠直接使用成熟的STOMP代理進行廣播 如:RabbitMQ, ActiveMQ
簡而言之,WebSocket 是底層協議,SockJS 是WebSocket 的備選方案,也是 底層協議,而 STOMP 是基於 WebSocket(SockJS) 的上層協議
瀏覽器不支持
Web容器不支持,如tomcat7之前的版本不支持WebSocket
防火牆不容許
Nginx沒有開啓WebSocket支持
當遇到不支持WebSocket的狀況時,SockJS會嘗試使用其餘的方案來鏈接,剛開始打開的時候由於須要嘗試各類方案,因此會阻塞一下子,以後能夠看到鏈接有異常,那就是嘗試失敗的狀況。
爲了測試,使用Nginx作反向代理,把www.test.com指到項目啓動的端口上,而後本地配HOST來達到模擬真實場景的效果。由於Nginx默認是不支持WebSocket的,因此這裏模擬出了服務器不支持WebSocket的場景。、
項目中使用的pom
compile 'org.springframework.boot:spring-boot-starter-websocket' compile 'org.springframework.boot:spring-messaging' compile group: 'org.webjars', name: 'sockjs-client', version: '1.1.2' compile group: 'org.webjars', name: 'stomp-websocket', version: '2.3.3' compile group: 'org.webjars', name: 'jquery', version: '3.3.1-1'
客戶端JS
<script src="/webjars/jquery/3.3.1-1/jquery.js"></script> <script src="/webjars/sockjs-client/1.1.2/sockjs.min.js"></script> <script src="/webjars/stomp-websocket/2.3.3/stomp.min.js"></script> <script src="test.js"></script>
Spring爲WebSocket提供了良好支持,WebSocket協議容許客戶端維持與服務器的長鏈接。數據能夠經過WebSocket在這兩個端點之間進行雙向傳輸,所以消費數據的一方可以實時獲取數據。
按照其最簡單的形式,WebSocket只是兩個應用之間通訊的通道。位於WebSocket一端的應用發送消息,另一端處理消息。由於它是全雙工的,因此每一端均可以發送和處理消息。如圖18.1所示。
WebSocket通訊能夠應用於任何類型的應用中,可是WebSocket最多見的應用場景是實現服務器和基於瀏覽器的應用之間的通訊。
實現步驟:
一、編寫Handler消息處理器類
方法一:實現 WebSocketHandler 接口,WebSocketHandler 接口以下
public interface WebSocketHandler { void afterConnectionEstablished(WebSocketSession session) throws Exception; void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception; void handleTransportError(WebSocketSession session, Throwable exception) throws Exception; void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception; boolean supportsPartialMessages(); }
方法二:擴展 AbstractWebSocketHandler
@Service public class ChatHandler extends AbstractWebSocketHandler { @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { session.sendMessage(new TextMessage("hello world.")); } }
爲了在Spring使用較低層級的API來處理消息,必須編寫一個實現WebSocketHandler的類.WebSocketHandler須要咱們實現五個方法。相比直接實現WebSocketHandler,更爲簡單的方法是擴展AbstractWebSocketHandler,這是WebSocketHandler的一個抽象實現。
除了重載WebSocketHandler中所定義的五個方法之外,咱們還能夠重載AbstractWebSocketHandler中所定義的三個方法:
方案3、擴展TextWebSocketHandler或BinaryWebSocketHandler。
TextWebSocketHandler是AbstractWebSocketHandler的子類,它會拒絕處理二進制消息。它重載了handleBinaryMessage()方法,若是收到二進制消息的時候,將會關閉WebSocket鏈接。與之相似,BinaryWebSocketHandler也是AbstractWeb-SocketHandler的子類,它重載了handleTextMessage()方法,若是接收到文本消息的話,將會關閉鏈接。
二、增長websocket攔截器,管理用戶
@Component public class WebSocketHandshakeInterceptor implements HandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { if (request instanceof ServletServerHttpRequest) { attributes.put("username","lhx"); } return true; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { } }
三、WebSocketConfig配置
方式1、註解配置
@Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Autowired private ChatHandler chatHandler; @Autowired private WebSocketHandshakeInterceptor webSocketHandshakeInterceptor; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(chatHandler,"/chat") .addInterceptors(webSocketHandshakeInterceptor); } @Bean public ServletServerContainerFactoryBean createWebSocketContainer() { ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean(); container.setMaxTextMessageBufferSize(8192*4); container.setMaxBinaryMessageBufferSize(8192*4); return container; } }
方式2、xml配置
四、客戶端配置
function contect() { var wsServer = 'ws://'+window.location.host+'/chat'; var websocket = new WebSocket(wsServer); websocket.onopen = function (evt) { onOpen(evt) }; websocket.onclose = function (evt) { onClose(evt) }; websocket.onmessage = function (evt) { onMessage(evt) }; websocket.onerror = function (evt) { onError(evt) }; function onOpen(evt) { console.log("Connected to WebSocket server."); websocket.send("test");//客戶端向服務器發送消息 } function onClose(evt) { console.log("Disconnected"); } function onMessage(evt) { console.log('Retrieved data from server: ' + evt.data); } function onError(evt) { console.log('Error occured: ' + evt.data); } } contect();
爲了應對許多瀏覽器不支持WebSocket協議的問題,設計了備選SockJs
。
SockJS 是 WebSocket 技術的一種模擬。SockJS 會 儘量對應 WebSocket API,但若是 WebSocket 技術不可用的話,就會選擇另外的通訊方式協議。
一、服務端只需增長:.withSockJS()便可
@Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Autowired private ChatHandler chatHandler; @Autowired private WebSocketHandshakeInterceptor webSocketHandshakeInterceptor; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { // withSockJS() 方法聲明咱們想要使用 SockJS 功能,若是WebSocket不可用的話,會使用 SockJS; registry.addHandler(chatHandler,"/chat") .addInterceptors(webSocketHandshakeInterceptor).withSockJS(); } }
或者xml配置
<websocket:sockjs />
二、客戶端
只需對請求
// var server = 'ws://'+window.location.host+'/chat'; var server = 'http://'+window.location.host+'/chatsockjs'; var websocket = new SockJS(server);
STOMP幀由命令,一個或多個頭信息以及負載所組成。
直接使用WebSocket(或SockJS)就很相似於使用TCP套接字來編寫Web應用。由於沒有高層級的線路協議(wire protocol),所以就須要咱們定義應用之間所發送消息的語義,還須要確保鏈接的兩端都能遵循這些語義。
不過,好消息是咱們並不是必需要使用原生的WebSocket鏈接。就像HTTP在TCP套接字之上添加了請求-響應模型層同樣,STOMP在WebSocket之上提供了一個基於幀的線路格式(frame-based wire format)層,用來定義消息的語義。
乍看上去,STOMP的消息格式很是相似於HTTP請求的結構。與HTTP請求和響應相似,STOMP幀由命令、一個或多個頭信息以及負載所組成。例如,以下就是發送數據的一個STOMP幀:
SEND destination:/app/room-message content-length:20 {\"message\":\"Hello!\"}
對以上代碼分析:
8.3.3.一、基本用法
一、服務端Configuration配置
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { //添加一個/socket-server-point 鏈接端點,客戶端就能夠經過這個端點來進行鏈接;withSockJS做用是添加SockJS支持 registry.addEndpoint("/socket-server-point").withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { //定義了一個客戶端訂閱地址的前綴信息,也就是客戶端接收服務端發送消息的前綴信息 registry.enableSimpleBroker("/topic"); //定義了服務端接收地址的前綴,也即客戶端給服務端發消息的地址前綴 registry.setApplicationDestinationPrefixes("/ws"); } }
對以上代碼分析:
@Service public class HandlerService { private static final Logger logger = LoggerFactory.getLogger(HandlerService.class); @Async public CompletableFuture<String> handle(String key) throws Exception { Thread.sleep(new Random().nextInt(3000)); logger.info("Looking up " + key); key=key+":"+LocalDateTime.now(); return CompletableFuture.completedFuture(key); } }
Controller控制開發
@MessageMapping("/searchBase") public ResponseEntity searchBase() throws Exception { Consumer<List<String>> callback = p -> websocket.convertAndSend("/topic/searchResults", p); List<String> list = Arrays.asList("bba", "aaa", "ccc"); localSearch(list, callback); Map map = new HashMap(); map.put("list", list); map.put("date", LocalDateTime.now()); return ResponseEntity.ok(map); } public void localSearch(List<String> keys, Consumer<List<String>> callback) throws Exception { Thread.sleep(2000); List<String> list = new ArrayList<>(); for (String key : keys) { CompletableFuture<String> completableFuture = handlerService.handle(key); completableFuture.thenAcceptAsync(p -> { list.clear(); list.add(p); callback.accept(list); }); } }
8.3.3.二、消息流
8.3.3.三、啓用STOMP代理中繼
對於生產環境下的應用來講,你可能會但願使用真正支持STOMP的代理來支撐WebSocket消息,如RabbitMQ或ActiveMQ。這樣的代理提供了可擴展性和健壯性更好的消息功能,固然它們也會完整支持STOMP命令。咱們須要根據相關的文檔來爲STOMP搭建代理。搭建就緒以後,就可使用STOMP代理來替換內存代理了,只需按照以下方式重載configureMessageBroker()方法便可:
@Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableStompBrokerRelay("/queue", "/topic"); //定義了一個客戶端訂閱地址的前綴信息,也就是客戶端接收服務端發送消息的前綴信息 // registry.enableSimpleBroker("/topic"); //定義了服務端接收地址的前綴,也即客戶端給服務端發消息的地址前綴 registry.setApplicationDestinationPrefixes("/ws"); }
上述configureMessageBroker()方法的第一行代碼啓用了STOMP代理中繼(broker relay)功能,並將其目的地前綴設置爲「/topic」和「/queue」。這樣的話,Spring就能知道全部目的地前綴爲「/topic」或「/queue」的消息都會發送到STOMP代理中。
在第二行的configureMessageBroker()方法中將應用的前綴設置爲「/ws」。全部目的地以「/ws」打頭的消息都將會路由到帶有@MessageMapping註解的方法中,而不會發布到代理隊列或主題中。
默認狀況下,STOMP代理中繼會假設代理監聽localhost的61613端口,而且客戶端的username和password均爲「guest」。若是你的STOMP代理位於其餘的服務器上,或者配置成了不一樣的客戶端憑證,那麼咱們能夠在啓用STOMP代理中繼的時候,須要配置這些細節信息:
@Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableStompBrokerRelay("/queue", "/topic") .setRelayHost("rabbit.someotherserver") .setRelayPort(62623) .setClientLogin("marcopolo") .setClientPasscode("letmein01") registry.setApplicationDestinationPrefixes("/app"); }
Spring 4.0引入了@MessageMapping註解,它用於STOMP消息的處理,相似於Spring MVC的@RequestMapping註解。當消息抵達某個特定的目的地時,帶有@MessageMapping註解的方法可以處理這些消息。
/** * 處理來自客戶端的STOMP消息 * @param incoming * @return */ @MessageMapping("/incoming") public Shout handleShout(Shout incoming) { logger.info("Received message: " + incoming.getMessage()); try { Thread.sleep(2000); } catch (InterruptedException e) {} Shout outgoing = new Shout(); outgoing.setMessage("incoming!"); return outgoing; }
消息接受類
public class Shout { private String message; public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
客戶端
function contect() { var socket=new SockJS('/socket-server-point'); stompCliet=Stomp.over(socket); stompCliet.connect({},function (frame) { console.log('Connected :'+frame); stompCliet.send("/ws/incoming",{},"{\"message\":\"Hello!\"}"); }); } contect();
@SubscribeMapping的主要應用場景是實現請求-迴應模式。在請求-迴應模式中,客戶端訂閱某一個目的地,而後預期在這個目的地上得到一個一次性的響應。
例如,考慮以下@SubscribeMapping註解標註的方法:
@SubscribeMapping("/sub") public Shout handleSubscription(){ logger.info("Received message: " +"subscription"); Shout outgoing = new Shout(); outgoing.setMessage("subscription!"); return outgoing; }
當處理這個訂閱時,handleSubscription()方法會產生一個輸出的Shout對象並將其返回。而後,Shout對象會轉換成一條消息,而且會按照客戶端訂閱時相同的目的地發送回客戶端。
客戶端
function contect() { var socket=new SockJS('/socket-server-point'); stompCliet=Stomp.over(socket); stompCliet.connect({},function (frame) { console.log('Connected :'+frame); stompCliet.subscribe('/ws/sub',function (result) { console.log("aaaa",JSON.parse(result.body)); }); stompCliet.send("/ws/sub",{},"{\"message\":\"Hello!\"}"); }); } contect();
這種請求-迴應模式與HTTP GET的請求-響應模式並無太大差異。可是,這裏的關鍵區別在於HTTP GET請求是同步的,而訂閱的請求-迴應模式則是異步的,這樣客戶端可以在迴應可用時再去處理,而沒必要等待。
Spring提供了兩種發送數據給客戶端的方法:
方式1、做爲處理消息或處理訂閱的附帶結果、在處理消息以後,發送消息
@MessageMapping("/incoming") public Shout handleShout(Shout incoming) { logger.info("Received message: " + incoming.getMessage()); try { Thread.sleep(2000); } catch (InterruptedException e) {} Shout outgoing = new Shout(); outgoing.setMessage("incoming!"); return outgoing; }
當@MessageMapping註解標示的方法有返回值的時候,返回的對象將會進行轉換(經過消息轉換器)並放到STOMP幀的負載中,而後發送給消息代理。
默認狀況下,幀所發往的目的地會與觸發處理器方法的目的地相同,只不過會添加上「/topic」前綴。就本例而言,這意味着handleShout()方法所返回的Shout對象會寫入到STOMP幀的負載中,併發布到「/topic/incoming」目的地。不過,咱們能夠經過爲方法添加@SendTo註解,重載目的地:
@MessageMapping("/incoming") @SendTo("/topic/shout") public Shout handleShout(Shout incoming) { logger.info("Received message: " + incoming.getMessage()); try { Thread.sleep(2000); } catch (InterruptedException e) {} Shout outgoing = new Shout(); outgoing.setMessage("incoming!"); return outgoing; }
按照這個@SendTo註解,消息將會發布到「/topic/shout」。全部訂閱這個主題的應用(如客戶端)都會收到這條消息。
按照相似的方式,@SubscribeMapping註解標註的方式也能發送一條消息,做爲訂閱的迴應。
@SubscribeMapping("/sub") public Shout handleSubscription(){ logger.info("Received message: " +"subscription"); Shout outgoing = new Shout(); outgoing.setMessage("subscription!"); return outgoing; }
@SubscribeMapping的區別在於這裏的Shout消息將會直接發送給客戶端,而沒必要通過消息代理。若是你爲方法添加@SendTo註解的話,那麼消息將會發送到指定的目的地,這樣會通過代理。
對應客戶端須要增長訂閱便可
stompCliet.subscribe('/ws/sub',function (result) { console.log("aaaa",JSON.parse(result.body)); });
正如前面看到的那樣,使用 @MessageMapping 或者 @SubscribeMapping 註解能夠處理客戶端發送過來的消息,並選擇方法是否有返回值。
若是 @MessageMapping 註解的控制器方法有返回值的話,返回值會被髮送到消息代理,只不過會添加上"/topic"前綴。可使用@SendTo 重寫消息目的地;
若是 @SubscribeMapping 註解的控制器方法有返回值的話,返回值會直接發送到客戶端,不通過代理。若是加上@SendTo 註解的話,則要通過消息代理。
方式2、使用消息模板【在任意地方發送消息】
@MessageMapping和@SubscribeMapping提供了一種很簡單的方式來發送消息,這是接收消息或處理訂閱的附帶結果。不過,Spring的SimpMessagingTemplate可以在應用的任何地方發送消息,甚至沒必要以首先接收一條消息做爲前提。
咱們沒必要要求用戶刷新頁面,而是讓首頁訂閱一個STOMP主題.
function contect() { var socket=new SockJS('/socket-server-point'); stompCliet=Stomp.over(socket); stompCliet.connect({},function (frame) { console.log('Connected :'+frame); stompCliet.subscribe('/topic/sendDataToClient',function (result) { console.log("aaaa",JSON.parse(result.body)); }); // stompCliet.send("/ws/sub",{},"{\"message\":\"Hello!\"}"); }); } contect();
使用SimpMessagingTemplate可以在應用的任何地方發佈消息
@Autowired(required = false) private SimpMessagingTemplate websocket; @GetMapping("/sendDataToClient") public ResponseEntity sendDataToClient() throws Exception { Map map=new HashMap(); map.put("aa","aaa"); map.put("bb","bbb"); websocket.convertAndSend("/topic/sendDataToClient",map); return ResponseEntity.ok("ok"); }
固然此處的SimpMessagingTemplate也可使用父接口SimpMessageSendingOperations注入
在這個場景下,咱們但願全部的客戶端都能及時看到實時的/topic/sendDataToClient,這種作法是很好的。但有的時候,咱們但願發送消息給指定的用戶,而不是全部的客戶端。
以上說明了如何廣播消息,訂閱目的地的全部用戶都能收到消息。若是消息只想發送給特定的用戶呢?spring-websocket 介紹瞭如下
在使用Spring和STOMP消息功能的時候,咱們有兩種方式利用認證用戶:
一、@MessageMapping和@SubscribeMapping標註的方法基於@SendToUser註解和Principal參數來獲取認證用戶;@MessageMapping、@SubscribeMapping和@MessageException方法返回的值可以以消息的形式發送給認證用戶;
二、SimpMessageSendingOperations接口或SimpMessagingTemplate的convertAndSendToUser方法可以發送消息給特定用戶。
一、在控制器中處理用戶的消息
在控制器的@MessageMapping或@SubscribeMapping方法中,處理消息時有兩種方式瞭解用戶信息。在處理器方法中,經過簡單地添加一個Principal參數,這個方法就能知道用戶是誰並利用該信息關注此用戶相關的數據。除此以外,處理器方法還可使用@SendToUser註解,代表它的返回值要以消息的形式發送給某個認證用戶的客戶端(只發送給該客戶端)。
@SendToUser 表示要將消息發送給指定的用戶,會自動在消息目的地前補上"/user"前綴。以下,最後消息會被髮布在 /user/queue/notifications-username。可是問題來了,這個username是怎麼來的呢?就是經過 principal 參數來得到的。那麼,principal 參數又是怎麼來的呢?須要在spring-websocket 的配置類中重寫 configureClientInboundChannel 方法,添加上用戶的認證。
服務端增長configuration
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer { /** * 一、設置攔截器 * 二、首次鏈接的時候,獲取其Header信息,利用Header裏面的信息進行權限認證 * 三、經過認證的用戶,使用 accessor.setUser(user); 方法,將登錄信息綁定在該 StompHeaderAccessor 上,在Controller方法上能夠獲取 StompHeaderAccessor 的相關信息 * @param registration */ @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(new ChannelInterceptorAdapter() { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); //一、判斷是否首次鏈接 if (StompCommand.CONNECT.equals(accessor.getCommand())){ //二、判斷用戶名和密碼 String username = accessor.getNativeHeader("username").get(0); String password = accessor.getNativeHeader("password").get(0); if ("admin".equals(username) && "admin".equals(password)){ Principal principal = new Principal() { @Override public String getName() { return username; } }; accessor.setUser(principal); return message; }else { return null; } } //不是首次鏈接,已經登錄成功 return message; } }); } }
服務端處理邏輯
@MessageMapping("/shout") @SendToUser("/queue/notifications") public Shout userStomp(Principal principal, Shout shout) { String name = principal.getName(); String message = shout.getMessage(); logger.info("認證的名字是:{},收到的消息是:{}", name, message); return shout; }
前端js代碼
var headers={ username:'admin', password:'admin' }; function contect() { var socket=new SockJS('/socket-server-point'); stompCliet=Stomp.over(socket); stompCliet.connect(headers,function (frame) { console.log('Connected :'+frame); stompCliet.subscribe('/user/queue/notifications',function (result) { console.log("aaaa",JSON.parse(result.body)); }); stompCliet.send("/ws/shout",{},"{\"message\":\"Hello!\"}"); }); } contect();
二、convertAndSendToUser方法
除了convertAndSend()之外,SimpMessageSendingOperations 還提供了convertAndSendToUser()方法。按照名字就能夠判斷出來,convertAndSendToUser()方法可以讓咱們給特定用戶發送消息。
@MessageMapping("/singleShout") public void singleUser(Shout shout, StompHeaderAccessor stompHeaderAccessor) { String message = shout.getMessage(); LOGGER.info("接收到消息:" + message); Principal user = stompHeaderAccessor.getUser(); simpMessageSendingOperations.convertAndSendToUser(user.getName(), "/queue/shouts", shout); }
如上,這裏雖然我仍是用了認證的信息獲得用戶名。可是,其實大可沒必要這樣,由於 convertAndSendToUser 方法能夠指定要發送給哪一個用戶。也就是說,徹底能夠把用戶名的看成一個參數傳遞給控制器方法,從而繞過身份認證!convertAndSendToUser 方法最終會把消息發送到 /user/sername/queue/shouts 目的地上。
8.3.3.七、處理消息異常
在處理消息的時候,有可能會出錯並拋出異常。由於STOMP消息異步的特色,發送者可能永遠也不會知道出現了錯誤。@MessageExceptionHandler標註的方法可以處理消息方法中所拋出的異常。咱們能夠把錯誤發送給用戶特定的目的地上,而後用戶從該目的地上訂閱消息,從而用戶就能知道本身出現了什麼錯誤
@MessageExceptionHandler(Exception.class) @SendToUser("/queue/errors") public Exception handleExceptions(Exception t){ t.printStackTrace(); return t; }
8.3.3.八、更多stomp配置
一、發起鏈接
其中headers表示客戶端的認證信息:
若無需認證,直接使用空對象 「{}」 便可;
(1)connectCallback 表示鏈接成功時(服務器響應 CONNECTED 幀)的回調方法;
(2)errorCallback 表示鏈接失敗時(服務器響應 ERROR 幀)的回調方法,非必須;
默認連接端點
//默認的和STOMP端點鏈接 /*stomp.connect("guest", "guest", function (franme) { });*/
有用戶認證的
var headers={ username:'admin', password:'admin' }; stomp.connect(headers, function (frame) {
示例
// 創建鏈接對象(還未發起鏈接) var socket=new SockJS("/endpointChat"); // 獲取 STOMP 子協議的客戶端對象 var stompClient = Stomp.over(socket); // 向服務器發起websocket鏈接併發送CONNECT幀 stompClient.connect( {}, function connectCallback (frame) { // 鏈接成功時(服務器響應 CONNECTED 幀)的回調方法 console.log('已鏈接【' + frame + '】'); //訂閱一個消息 stompClient.subscribe('/topic/getResponse', function (response) { showResponse(response.body); }); }, function errorCallBack (error) { // 鏈接失敗時(服務器響應 ERROR 幀)的回調方法 console.log('鏈接失敗【' + error + '】'); } );
二、斷開鏈接
若要從客戶端主動斷開鏈接,可調用 disconnect() 方法:
client.disconnect( function () { alert("斷開鏈接"); });
三、發送消息
鏈接成功後,客戶端可以使用 send() 方法向服務器發送信息:
client.send(destination url, headers, body);
其中:
(1)destination url 爲服務器 controller中 @MessageMapping 中匹配的URL,字符串,必須參數;
(2)headers 爲發送信息的header,JavaScript 對象,可選參數;
(3)body 爲發送信息的 body,字符串,可選參數;
示例
client.send("/queue/test", {priority: 9}, "Hello, STOMP");
client.send("/queue/test", {}, "Hello, STOMP");
四、訂閱、接收消息
STOMP 客戶端要想接收來自服務器推送的消息,必須先訂閱相應的URL,即發送一個 SUBSCRIBE 幀,而後才能不斷接收來自服務器的推送消息。
訂閱和接收消息經過 subscribe() 方法實現:
subscribe(destination url, callback, headers)
其中
(1)destination url 爲服務器 @SendTo 匹配的 URL,字符串;
(2)callback 爲每次收到服務器推送的消息時的回調方法,該方法包含參數 message;
(3)headers 爲附加的headers,JavaScript 對象;該方法返回一個包含了id屬性的 JavaScript 對象,可做爲 unsubscribe() 方法的參數;默認狀況下,若是沒有在headers額外添加,這個庫會默認構建一個獨一無二的ID。在傳遞headers這個參數時,可使用你本身id。
示例
var headers = { ack: 'client', //這個客戶端指定了它會確認接收的信息,只接收符合這個selector : location = 'Europe'的消息。 'selector': "location = 'Europe'", //id:’myid’ }; var callback = function(message) { if (message.body) { alert("got message with body " +JSON.parse( message.body)) } else{ alert("got empty message"); } }); var subscription = client.subscribe("/queue/test", callback, headers); 若是想讓客戶端訂閱多個目的地,你能夠在接收全部信息的時候調用相同的回調函數: onmessage = function(message) { // called every time the client receives a message } var sub1 = client.subscribe("queue/test", onmessage); var sub2 = client.subscribe("queue/another", onmessage)
五、取消訂閱
var subscription = client.subscribe(...); subscription.unsubscribe();
六、事務支持
能夠在將消息的發送和確認接收放在一個事務中。
客戶端調用自身的begin()方法就能夠開始啓動事務了,begin()有一個可選的參數transaction,一個惟一的可標識事務的字符串。若是沒有傳遞這個參數,那麼庫會自動構建一個。
這個方法會返回一個object。這個對象有一個id屬性對應這個事務的ID,還有兩個方法:
commit()提交事務
abort()停止事務
在一個事務中,客戶端能夠在發送/接受消息時指定transaction id來設置transaction。
// start the transaction var tx = client.begin(); // send the message in a transaction client.send("/queue/test", {transaction: tx.id}, "message in a transaction"); // commit the transaction to effectively send the message tx.commit();
若是你在調用send()方法發送消息的時候忘記添加transction header,那麼這不會稱爲事務的一部分,這個消息會直接發送,不會等到事務完成後才發送。
var txid = "unique_transaction_identifier"; // start the transaction var tx = client.begin(); // oops! send the message outside the transaction client.send("/queue/test", {}, "I thought I was in a transaction!"); tx.abort(); // Too late! the message has been sent
七、消息確認
默認狀況,在消息發送給客戶端以前,服務端會自動確認(acknowledged)。
客戶端能夠選擇經過訂閱一個目的地時設置一個ack header爲client或client-individual來處理消息確認。
在下面這個例子,客戶端必須調用message.ack()來通知客戶端它已經接收了消息。
var subscription = client.subscribe("/queue/test", function(message) { // do something with the message ... // and acknowledge it message.ack(); }, {ack: 'client'} );
ack()接受headers參數用來附加確認消息。例如,將消息做爲事務(transaction)的一部分,當要求接收消息時其實代理(broker)已經將ACK STOMP frame處理了。
var tx = client.begin(); message.ack({ transaction: tx.id, receipt: 'my-receipt' }); tx.commit();
ack()也能夠用來通知STOMP 1.1.brokers(代理):客戶端不能消費這個消息。與ack()方法的參數相同。
八、debug調試
有一些測試代碼能有助於你知道庫發送或接收的是什麼,從而來調試程序。
客戶端能夠將其debug屬性設置爲一個函數,傳遞一個字符串參數去觀察庫全部的debug語句。
client.debug = function(str) { // append the debug log to a #debug div somewhere in the page using JQuery: $("#debug").append(str + "\n"); };
默認狀況,debug消息會被記錄在在瀏覽器的控制檯。
九、心跳機制
若是STOMP broker(代理)接收STOMP 1.1版本的幀,heart-beating是默認啓用的。heart-beating也就是頻率,incoming是接收頻率,outgoing是發送頻率。
經過改變incoming和outgoing能夠更改客戶端的heart-beating(默認爲10000ms):
client.heartbeat.outgoing = 20000; // client will send heartbeats every 20000ms client.heartbeat.incoming = 0; // client does not want to receive heartbeats // from the server
heart-beating是利用window.setInterval()去規律地發送heart-beats或者檢查服務端的heart-beats。
更多
stomp.connect(headers, function (frame) { //發送消息 //第二個參數是一個頭信息的Map,它會包含在STOMP的幀中 //事務支持 var tx = stomp.begin(); stomp.send("/app/marco", {transaction: tx.id}, strJson); tx.commit(); //訂閱服務端消息 subscribe(destination url, callback[, headers]) stomp.subscribe("/topic/marco", function (message) { var content = message.body; var obj = JSON.parse(content); console.log("訂閱的服務端消息:" + obj.message); }, {}); stomp.subscribe("/app/getShout", function (message) { var content = message.body; var obj = JSON.parse(content); console.log("訂閱的服務端直接返回的消息:" + obj.message); }, {}); /*如下是針對特定用戶的訂閱*/ var adminJSON = JSON.stringify({'message': 'ADMIN'}); /*第一種*/ stomp.send("/app/singleShout", {}, adminJSON); stomp.subscribe("/user/queue/shouts",function (message) { var content = message.body; var obj = JSON.parse(content); console.log("admin用戶特定的消息1:" + obj.message); }); /*第二種*/ stomp.send("/app/shout", {}, adminJSON); stomp.subscribe("/user/queue/notifications",function (message) { var content = message.body; var obj = JSON.parse(content); console.log("admin用戶特定的消息2:" + obj.message); }); /*訂閱異常消息*/ stomp.subscribe("/user/queue/errors", function (message) { console.log(message.body); }); //若使用STOMP 1.1 版本,默認開啓了心跳檢測機制(默認值都是10000ms) stomp.heartbeat.outgoing = 20000; stomp.heartbeat.incoming = 0; //客戶端不從服務端接收心跳包 });
參看代碼:https://github.com/JMCuixy/SpringWebSocket