006-優化web請求二-應用緩存、異步調用【Future、ListenableFuture、CompletableFuture】、ETag、WebSocket【SockJS、Stomp】

4、應用緩存

  使用spring應用緩存。使用方式:使用@EnableCache註解激活Spring的緩存功能,須要建立一個CacheManager來處理緩存。如使用一個內存緩存示例html

package com.github.bjlhx15.gradle.demotest;

import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.concurrent.ConcurrentMapCache;
import org.springframework.cache.support.SimpleCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;

@Configuration
@EnableCaching
public class CacheConfiguration {
    
    @Bean
    public CacheManager cacheManager(){
        SimpleCacheManager simpleCacheManager=new SimpleCacheManager();
        simpleCacheManager.setCaches(Arrays.asList(new ConcurrentMapCache("searches")));
        return simpleCacheManager;
    }
}

  其餘實現如:EhCacheManager、GuavaCacheManager等前端

  主要標記:java

    @CacheEvict:將會從緩存中移除一個條目jquery

    @CachePut:會將方法結果放到緩存中,而不會影響到方法調用自己。git

    @Caching:將緩存註解從新分組github

    @CacheConfig:指向不一樣的緩存配置web

  更多spring 應用緩存:https://www.cnblogs.com/bjlhx/category/1233985.htmlredis

5、分佈式緩存

  推薦使用redis,系列文章:https://www.cnblogs.com/bjlhx/category/1066467.html算法

  spring使用也比較方便:https://www.cnblogs.com/bjlhx/category/1233985.htmlspring

6、異步方法-EnableAsync

  在程序執行時候還有一個瓶頸,串行執行,能夠經過使用不一樣線程類快速提高應用的速度。

  要啓用Spring的異步功能,必需要使用@EnableAsync註解。這樣將會透明地使用java.util.concurrent.Executor來執行全部帶有@Async註解的方法。

  @Async所修飾的函數不要定義爲static類型,這樣異步調用不會生效

  針對調用的Async,若是不作Future特殊處理,執行完調用方法會當即返回結果,如異步郵件發送,不會真的等郵件發送完畢才響應客戶,如需等待可使用Future阻塞處理。

6.一、原始使用

一、main方法增長@EnableAsync註解 

@SpringBootApplication
@EnableAsync
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
        System.out.println("ThreadId:"+Thread.currentThread().getId());
    }
}

二、在所需方法增長@Async註解

@Component
public class Task {
    @Async
    public void doTaskOne() throws Exception {
        for (int i = 0; i < 3; i++) {
            Thread.sleep(200);
            System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskOne");
        }
    }

    @Async
    public void doTaskTwo() throws Exception {
        for (int i = 0; i < 3; i++) {
            Thread.sleep(200);
            System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskTwo");
        }
    }

    @Async
    public void doTaskThree() throws Exception {
        for (int i = 0; i < 3; i++) {
            Thread.sleep(200);
            System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskThree");
        }
    }
}

三、查看調用

@RestController
public class TestAsyns {
    @Autowired
    private Task task;
    @RequestMapping("/testAsync")
    public ResponseEntity testAsync() throws Exception {
        task.doTaskOne();
        task.doTaskTwo();
        task.doTaskThree();
        return ResponseEntity.ok("ok");
    }
}

上述方法依次調用三個方法。

若是去除@EnableAsync註解,輸出以下:【可見是串行執行】

ThreadId:33:doTaskOne
ThreadId:33:doTaskOne
ThreadId:33:doTaskOne
ThreadId:33:doTaskTwo
ThreadId:33:doTaskTwo
ThreadId:33:doTaskTwo
ThreadId:33:doTaskThree
ThreadId:33:doTaskThree
ThreadId:33:doTaskThree

若是增長@EnableAsync註解,輸出以下:【可見是並行執行】

ThreadId:56:doTaskThree
ThreadId:55:doTaskTwo
ThreadId:54:doTaskOne
ThreadId:54:doTaskOne
ThreadId:55:doTaskTwo
ThreadId:56:doTaskThree
ThreadId:54:doTaskOne
ThreadId:56:doTaskThree
ThreadId:55:doTaskTwo

6.二、自定義執行器使用異步

一、配置類

  方式1、注入Bean方式

import java.util.concurrent.Executor;  
  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.ComponentScan;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.scheduling.annotation.EnableAsync;  
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;  
  
@Configuration  
public class ThreadConfig  {  
  
     // 執行須要依賴線程池,這裏就來配置一個線程池  
     @Bean  
     public Executor getExecutor() {  
          ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
          executor.setCorePoolSize(5);  
          executor.setMaxPoolSize(10);  
          executor.setQueueCapacity(25);  
          executor.initialize();  
          return executor;  
     }  
}  

  方式2、經過實現AsyncConfigurer接口,能夠自定義默認的執行(executor)。新增以下配置類:

@Configuration
public class AsyncConfiguration implements AsyncConfigurer {
    protected final Logger logger = LoggerFactory.getLogger(AsyncConfiguration.class);

    @Override
    public Executor getAsyncExecutor() {
        //作好不超過10個,這裏寫兩個方便測試
        return Executors.newFixedThreadPool(2);
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex,method,params)->logger.error("Uncaught async error",ex);
    }
}

  Executor的初始化配置,還有不少種,能夠參看https://www.cnblogs.com/bjlhx/category/1086008.html

  Spring 已經實現的異步線程池:
    1. SimpleAsyncTaskExecutor:不是真的線程池,這個類不重用線程,每次調用都會建立一個新的線程。
    2. SyncTaskExecutor:這個類沒有實現異步調用,只是一個同步操做。只適用於不須要多線程的地方
    3. ConcurrentTaskExecutor:Executor的適配類,不推薦使用。若是ThreadPoolTaskExecutor不知足要求時,才用考慮使用這個類
    4. SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的類。線程池同時被quartz和非quartz使用,才須要使用此類
    5. ThreadPoolTaskExecutor :最常使用,推薦。 其實質是對java.util.concurrent.ThreadPoolExecutor的包裝

  使用上述配置,可以確保在應用中,用來處理異步任務的線程不會超過10個。這對於web應用很重要,由於每一個客戶端都會有一個專用的線程。你所使用的線程越多,阻塞時間越長那麼可以處理的客戶端就會越少。

  若是設置成兩個,程序中有3個異步線程,也會只有兩個運行,以下

ThreadId:55:doTaskTwo
ThreadId:54:doTaskOne
ThreadId:55:doTaskTwo
ThreadId:54:doTaskOne
ThreadId:55:doTaskTwo
ThreadId:54:doTaskOne
ThreadId:55:doTaskThree
ThreadId:55:doTaskThree
ThreadId:55:doTaskThree

二、使用

  同上述一致。

6.三、異步返回處理

方式1、使用Future【FutureTask是默認實現】處理+輪詢處理【jdk1.5產物,沒有提供Callback機制,只能主動輪詢,經過get去獲取結果】【不推薦】

修改異步執行的方法

@Component
public class TaskFutureDemo {
    @Async
    public Future<String> doTaskOne() throws Exception {
        for (int i = 0; i < 3; i++) {
            Thread.sleep(1000);
            System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskOne");
        }
        return new AsyncResult<>("doTaskOne");
    }

    @Async
    public Future<String> doTaskTwo() throws Exception {
        for (int i = 0; i < 3; i++) {
            Thread.sleep(1000);
            System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskTwo");
        }
        return new AsyncResult<>("doTaskTwo");
    }

    @Async
    public Future<String> doTaskThree() throws Exception {
        for (int i = 0; i < 3; i++) {
            Thread.sleep(1000);
            System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskThree");
        }
        return new AsyncResult<>("doTaskThree");
    }
}
View Code

修改調用的方法

@RestController
public class TestAsynsFutureController {
    @Autowired
    private TaskFutureDemo task;

    @RequestMapping("/testAsyncFuture")
    public ResponseEntity testAsyncFuture() throws Exception {
        Future<String> taskOne = task.doTaskOne();
        Future<String> taskTwo = task.doTaskTwo();
        Future<String> taskThree = task.doTaskThree();
        while (true) {
            if (taskOne.isDone() && taskTwo.isDone() && taskThree.isDone()) {
                break;
            }
        }
        return ResponseEntity.ok("ok");
    }
}
View Code

方式2、Spring的ListenableFuture和CountDownLatch處理

Service實現類

@Service
public class TaskListenableFutureService {
    private AsyncSearch asyncSearch;

    @Autowired
    public TaskListenableFutureService(AsyncSearch asyncSearch) {
        this.asyncSearch=asyncSearch;
    }

    public List<String> search(List<String> keywords){
        CountDownLatch latch=new CountDownLatch(keywords.size());
        List<String> allResult=Collections.synchronizedList(new ArrayList<>());
        keywords.stream()
                .forEach(keyword->asyncFetch(latch,allResult,keyword));
        await(latch);
        return allResult;
    }

    private void asyncFetch(CountDownLatch latch, List<String> result, String keyword){
        asyncSearch.asyncFetch(keyword)
                .addCallback(
                        key->onSuccess(result,latch,key),
                        ex -> onError(latch,ex)
                );
    }

    private void await(CountDownLatch latch){
        try {
            latch.await();
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }
    private static void onSuccess(List<String> result,CountDownLatch latch,String keyword){
        result.add(keyword);
        latch.countDown();
    }
    private static void onError(CountDownLatch latch,Throwable ex){
        ex.printStackTrace();
        latch.countDown();
    }
    @Component
    private static class AsyncSearch{
        @Autowired
        public AsyncSearch() {
        }

        protected final Logger logger = LoggerFactory.getLogger(AsyncSearch.class);
        @Async
        public ListenableFuture<String> asyncFetch(String keyword){
            logger.info(Thread.currentThread().getName()+"-"+keyword);
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new AsyncResult<>("keyword="+keyword);
        }
    }
}
View Code

調用方法

@RestController
public class TestAsynsListenableFutureController {
    protected final Logger logger = LoggerFactory.getLogger(TestAsynsListenableFutureController.class);
    @Autowired
    private TaskListenableFutureService task;

    @RequestMapping("/testAsyncListenableFuture")
    public ResponseEntity testAsyncListenableFuture() throws Exception {
        List<String> list = task.search(Arrays.asList("java", "html", "spring"));
        list.stream().forEach(p-> logger.info(p));

        return ResponseEntity.ok("ok");
    }
}
View Code

方式3、使用CompletableFuture【推薦】

User實體類

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

@JsonIgnoreProperties(ignoreUnknown=true)
public class User {

    private String name;
    private String blog;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getBlog() {
        return blog;
    }

    public void setBlog(String blog) {
        this.blog = blog;
    }

    @Override
    public String toString() {
        return "User [name=" + name + ", blog=" + blog + "]";
    }

}
View Code

CompletableFuture的服務

@Service
public class GitHubLookupService {

    private static final Logger logger = LoggerFactory.getLogger(GitHubLookupService.class);

    private final RestTemplate restTemplate;

    public GitHubLookupService(RestTemplateBuilder restTemplateBuilder) {
        this.restTemplate = restTemplateBuilder.build();
    }

    @Async
    public CompletableFuture<User> findUser(String user) throws InterruptedException {
        logger.info("Looking up " + user);
        String url = String.format("https://api.github.com/users/%s", user);
        User results = restTemplate.getForObject(url, User.class);
        // Artificial delay of 1s for demonstration purposes
        Thread.sleep(1000L);
        return CompletableFuture.completedFuture(results);
    }

}
View Code

調用方法

@RestController
public class CompletableFutureController {


    private static final Logger logger = LoggerFactory.getLogger(CompletableFutureController.class);
    @Autowired
    private  GitHubLookupService gitHubLookupService;

    @RequestMapping("/testCompletableFuture")
    public ResponseEntity testCompletableFuture() throws Exception {
        // Start the clock
        long start = System.currentTimeMillis();

        // Kick of multiple, asynchronous lookups
        CompletableFuture<User> page1 = gitHubLookupService.findUser("PivotalSoftware");
        CompletableFuture<User> page2 = gitHubLookupService.findUser("CloudFoundry");
        CompletableFuture<User> page3 = gitHubLookupService.findUser("Spring-Projects");

        // Wait until they are all done
        CompletableFuture.allOf(page1,page2,page3).join();

        // Print results, including elapsed time
        logger.info("Elapsed time: " + (System.currentTimeMillis() - start));
        logger.info("--> " + page1.get());
        logger.info("--> " + page2.get());
        logger.info("--> " + page3.get());

        return ResponseEntity.ok("ok");
    }
}
View Code

7、ETag

7.一、什麼是ETag? 

  ETag:是實體標籤(Entity Tag)的縮寫。ETag通常不以明文形式相應給客戶端。在資源的各個生命週期中,它都具備不一樣的值,用於標識出資源的狀態。當資源發生變動時,若是其頭信息中一個或者多個發生變化,或者消息實體發生變化,那麼ETag也隨之發生變化。

  ETag值的變動說明資源狀態已經被修改。每每能夠經過時間戳就能夠便宜的獲得ETag頭信息。在服務端中若是發回給消費者的相應從一開始起就由ETag控制,那麼能夠確保更細粒度的ETag升級徹底由服務來進行控制。服務計算ETag值,並在相應客戶端請求時將它返回給客戶端。

7.二、計算ETag值

  在HTTP1.1協議中並無規範如何計算ETag。ETag值能夠是惟一標識資源的任何東西,如持久化存儲中的某個資源關聯的版本、一個或者多個文件屬性,實體頭信息和校驗值、(CheckSum),也能夠計算實體信息的散列值。有時候,爲了計算一個ETag值可能有比較大的代價,此時能夠採用生成惟一值等方式(如常見的GUID)。不管怎樣,服務都應該儘量的將ETag值返回給客戶端。客戶端不用關心ETag值如何產生,只要服務在資源狀態發生變動的狀況下將ETag值發送給它就行。

  ETag值能夠經過uuid、整數、長整形、字符串等四種類型。

  計算ETag值時,須要考慮兩個問題:計算與存儲。若是一個ETag值只須要很小的代價以及佔用很低的存儲空間,那麼咱們能夠在每次須要發送給客戶端ETag值值的時候計算一遍就行行了。相反的,咱們須要將以前就已經計算並存儲好的ETag值發送給客戶端。以前說:將時間戳做爲字符串做爲一種廉價的方式來獲取ETag值。對於不是常常變化的消息,它是一種足夠好的方案。注意:若是將時間戳作爲ETag值,一般不該該用Last-Modified的值。因爲HTTP機制中,因此當咱們在經過服務校驗資源狀態時,客戶端不須要進行相應的改動。計算ETag值開銷最大的通常是計算採用哈希算法獲取資源的表述值。能夠只計算資源的哈希值,也能夠將頭信息和頭信息的值也包含進去。若是包含頭信息,那麼注意不要包含計算機標識的頭信息。一樣也應該避免包含Expires、Cache-Control和Vary頭信息。注意:在經過哈希算法。

7.三、ETag的類型以及他們之間的區別

  ETag有兩種類型:強ETag(strong ETag)與弱ETag(weak ETag)。

    強ETag表示形式:"22FAA065-2664-4197-9C5E-C92EA03D0A16"。

    弱ETag表現形式:w/"22FAA065-2664-4197-9C5E-C92EA03D0A16"。

  強、弱ETag類型的出現與Apache服務器計算ETag的方式有關。Apache默認經過FileEtag中FileEtag INode Mtime Size的配置自動生成ETag(固然也能夠經過用戶自定義的方式)。假設服務端的資源頻繁被修改(如1秒內修改了N次),此時若是有用戶將Apache的配置改成MTime,因爲MTime只能精確到秒,那麼就能夠避免強ETag在1秒內的ETag老是不一樣而頻繁刷新Cache(若是資源在秒級常常被修改,也能夠經過Last-Modified來解決)。

7.四、Etag - 做用

Etag 主要爲了解決 Last-Modified 沒法解決的一些問題。

一、 一些文件也許會週期性的更改,可是他的內容並不改變(僅僅改變的修改時間),這個時候咱們並不但願客戶端認爲這個文件被修改了,而從新GET;

二、某些文件修改很是頻繁,好比在秒如下的時間內進行修改,(比方說1s內修改了N次),If-Modified-Since能檢查到的粒度是s級的,這種修改沒法判斷(或者說UNIX記錄MTIME只能精確到秒 

三、某些服務器不能精確的獲得文件的最後修改時間;

  爲此,HTTP/1.1 引入了 Etag(Entity Tags).Etag僅僅是一個和文件相關的標記,能夠是一個版本標記,好比說v1.0.0或者說"2e681a-6-5d044840"這麼一串看起來很神祕的編碼。可是HTTP/1.1標準並無規定Etag的內容是什麼或者說要怎麼實現,惟一規定的是Etag須要放在""內。

7.五、Etag - 工做原理

Etag由服務器端生成,客戶端經過If-Match或者說If-None-Match這個條件判斷請求來驗證資源是否修改。常見的是使用If-None-Match.請求一個文件的流程可能以下:

====第一次請求===
1.客戶端發起 HTTP GET 請求一個文件;

2.服務器處理請求,返回文件內容和一堆Header,固然包括Etag(例如"2e681a-6-5d044840")(假設服務器支持Etag生成和已經開啓了Etag).狀態碼200

====第二次請求===
1.客戶端發起 HTTP GET 請求一個文件,注意這個時候客戶端同時發送一個If-None-Match頭,這個頭的內容就是第一次請求時服務器返回的Etag:2e681a-6-5d044840

2.服務器判斷髮送過來的Etag和計算出來的Etag匹配,所以If-None-Match爲False,不返回200,返回304,客戶端繼續使用本地緩存;

  流程很簡單,問題是,若是服務器又設置了Cache-Control:max-age和Expires呢,怎麼辦?
  答案是同時使用,也就是說在徹底匹配If-Modified-Since和If-None-Match即檢查完修改時間和Etag以後,服務器才能返回304.

 7.六、在spring中實踐

  雖然對請求已經作了應用緩存等處理,可是持續請求一個restful接口請求仍是會發送到服務端去讀取緩存,即便結果沒有發生改變,但結果自己仍是會屢次發送給用戶,形成浪費帶寬。

  ETag是Web響應數據的一個散列(Hash),而且會在頭信息中進行發送。客戶端能夠記住資源的ETag,而且經過If-None-Match頭信息將最新的已知版本發送給服務器。若是在這段時間內請求沒有發生變化的話,服務器就會返回304 Not Modified。

  在Spring中有一個特殊的Servlet過濾器來處理ETag,名爲ShallowEtagHeaderFilter。只需將此類注入便可:

    @Bean
    public Filter etagFilter(){
        return new ShallowEtagHeaderFilter();
    }

  只要響應頭沒有緩存控制頭信息的話,系統就會爲你的響應生成ETag。

示例

    @GetMapping("/testNoChangeContent")
    public ResponseEntity testNoChangeContent(){
        return ResponseEntity.ok("OK");
    }
    @GetMapping("/testChangeContent")
    public ResponseEntity testChangeContent(){
        return ResponseEntity.ok("OK:"+LocalDateTime.now());
    }

接口1、testNoChangeContent,是測試內容沒有改變的,第一次請求是200,之後請求是304

接口2、testChangeContent,是測試內容有改變的,第一次請求是200,之後請求均是200

8、WebSocket

在優化web請求時,這是一種優化方案,在服務器端有可用數據時,就當即將其發送到客戶端。經過多線程方式獲取搜索結果,因此數據會分爲多個塊。這時能夠一點點地進行發送,而沒必要等待全部結果。

8.一、概述

一、WebSocket

  Http鏈接爲一次請求(request)一次響應(response),必須爲同步調用方式。WebSocket 協議提供了經過一個套接字實現全雙工通訊的功能。一次鏈接之後,會創建tcp鏈接,後續客戶端與服務器交互爲全雙工方式的交互方式,客戶端能夠發送消息到服務端,服務端也可將消息發送給客戶端。

  WebSocket 是發送和接收消息的底層API,WebSocket 協議提供了經過一個套接字實現全雙工通訊的功能。也可以實現 web 瀏覽器和 server 間的異步通訊,全雙工意味着 server 與瀏覽器間能夠發送和接收消息。須要注意的是必須考慮瀏覽器是否支持。

二、SockJS

  SockJS 是 WebSocket 技術的一種模擬。爲了應對許多瀏覽器不支持WebSocket協議的問題,設計了備選SockJs。開啓並使用SockJS後,它會優先選用Websocket協議做爲傳輸協議,若是瀏覽器不支持Websocket協議,則會在其餘方案中,選擇一個較好的協議進行通信。原來在不支持WebSocket的狀況下,也能夠很簡單地實現WebSocket的功能的,方法就是使用 SockJS。它會優先選擇WebSocket進行鏈接,可是當服務器或客戶端不支持WebSocket時,會自動在 XHR流、XDR流、iFrame事件源、iFrame HTML文件、XHR輪詢、XDR輪詢、iFrame XHR輪詢、JSONP輪詢 這幾個方案中擇優進行鏈接。

三、Stomp

        STOMP 中文爲: 面向消息的簡單文本協議。websocket定義了兩種傳輸信息類型: 文本信息和二進制信息。類型雖然被肯定,可是他們的傳輸體是沒有規定的。因此,須要用一種簡單的文本傳輸類型來規定傳輸內容,它能夠做爲通信中的文本傳輸協議,即交互中的高級協議來定義交互信息。

  STOMP自己能夠支持流類型的網絡傳輸協議: websocket協議和tcp協議。

  Stomp還提供了一個stomp.js,用於瀏覽器客戶端使用STOMP消息協議傳輸的js庫。

  STOMP的優勢以下:

  (1)不須要自建一套自定義的消息格式

  (2)現有stomp.js客戶端(瀏覽器中使用)能夠直接使用

  (3)能路由信息到指定消息地點

  (4)能夠直接使用成熟的STOMP代理進行廣播 如:RabbitMQ, ActiveMQ

四、WebSocket、SockJs、STOMP三者關係

  簡而言之,WebSocket 是底層協議,SockJS 是WebSocket 的備選方案,也是 底層協議,而 STOMP 是基於 WebSocket(SockJS) 的上層協議

  1. 假設HTTP協議並不存在,只能使用TCP套接字來編寫web應用,你可能認爲這是一件瘋狂的事情。
  2. 不過幸虧,咱們有HTTP協議,它解決了 web 瀏覽器發起請求以及 web 服務器響應請求的細節。
  3. 直接使用 WebSocket(SockJS) 就很相似於 使用 TCP 套接字來編寫 web 應用;由於沒有高層協議,所以就須要咱們定義應用間所發送消息的語義,還須要確保 鏈接的兩端都能遵循這些語義。
  4. 同HTTP在TCP套接字上添加請求-響應模型層同樣,STOMP在 WebSocket之上提供了一個基於幀的線路格式層,用來定義消息語義。

8.二、不支持WebSocket的場景有:

  瀏覽器不支持
  Web容器不支持,如tomcat7之前的版本不支持WebSocket
  防火牆不容許
  Nginx沒有開啓WebSocket支持

  當遇到不支持WebSocket的狀況時,SockJS會嘗試使用其餘的方案來鏈接,剛開始打開的時候由於須要嘗試各類方案,因此會阻塞一下子,以後能夠看到鏈接有異常,那就是嘗試失敗的狀況。

  爲了測試,使用Nginx作反向代理,把www.test.com指到項目啓動的端口上,而後本地配HOST來達到模擬真實場景的效果。由於Nginx默認是不支持WebSocket的,因此這裏模擬出了服務器不支持WebSocket的場景。、

8.三、spring下的WebSocket使用【WebSocket→sockJs→stomp】

項目中使用的pom

    compile 'org.springframework.boot:spring-boot-starter-websocket'
    compile 'org.springframework.boot:spring-messaging'
    compile group: 'org.webjars', name: 'sockjs-client', version: '1.1.2'
    compile group: 'org.webjars', name: 'stomp-websocket', version: '2.3.3'
    compile group: 'org.webjars', name: 'jquery', version: '3.3.1-1'

客戶端JS

    <script src="/webjars/jquery/3.3.1-1/jquery.js"></script>
    <script src="/webjars/sockjs-client/1.1.2/sockjs.min.js"></script>
    <script src="/webjars/stomp-websocket/2.3.3/stomp.min.js"></script>
    <script src="test.js"></script>

8.3.一、使用Spring的底層級WebSocket API

  Spring爲WebSocket提供了良好支持,WebSocket協議容許客戶端維持與服務器的長鏈接。數據能夠經過WebSocket在這兩個端點之間進行雙向傳輸,所以消費數據的一方可以實時獲取數據。

  按照其最簡單的形式,WebSocket只是兩個應用之間通訊的通道。位於WebSocket一端的應用發送消息,另一端處理消息。由於它是全雙工的,因此每一端均可以發送和處理消息。如圖18.1所示。

    
  WebSocket通訊能夠應用於任何類型的應用中,可是WebSocket最多見的應用場景是實現服務器和基於瀏覽器的應用之間的通訊。

實現步驟:

一、編寫Handler消息處理器類

方法一:實現 WebSocketHandler 接口,WebSocketHandler 接口以下 

public interface WebSocketHandler {
    void afterConnectionEstablished(WebSocketSession session) throws Exception;
    void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception;
    void handleTransportError(WebSocketSession session, Throwable exception) throws Exception; 
    void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception; 
    boolean supportsPartialMessages();
}

方法二:擴展 AbstractWebSocketHandler

@Service
public class ChatHandler extends AbstractWebSocketHandler {
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        session.sendMessage(new TextMessage("hello world."));
    }
}

  爲了在Spring使用較低層級的API來處理消息,必須編寫一個實現WebSocketHandler的類.WebSocketHandler須要咱們實現五個方法。相比直接實現WebSocketHandler,更爲簡單的方法是擴展AbstractWebSocketHandler,這是WebSocketHandler的一個抽象實現。

  除了重載WebSocketHandler中所定義的五個方法之外,咱們還能夠重載AbstractWebSocketHandler中所定義的三個方法:

    • handleBinaryMessage()
    • handlePongMessage()
    • handleTextMessage() 
      這三個方法只是handleMessage()方法的具體化,每一個方法對應於某一種特定類型的消息。

方案3、擴展TextWebSocketHandler或BinaryWebSocketHandler。

  TextWebSocketHandler是AbstractWebSocketHandler的子類,它會拒絕處理二進制消息。它重載了handleBinaryMessage()方法,若是收到二進制消息的時候,將會關閉WebSocket鏈接。與之相似,BinaryWebSocketHandler也是AbstractWeb-SocketHandler的子類,它重載了handleTextMessage()方法,若是接收到文本消息的話,將會關閉鏈接。

二、增長websocket攔截器,管理用戶

@Component
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        if (request instanceof ServletServerHttpRequest) {
            attributes.put("username","lhx");
        }
        return true;
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
    }
}
  • beforeHandshake()方法,在調用 handler 前調用。經常使用來註冊用戶信息,綁定 WebSocketSession,在 handler 里根據用戶信息獲取WebSocketSession發送消息

三、WebSocketConfig配置

方式1、註解配置

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    @Autowired
    private ChatHandler chatHandler;
    @Autowired
    private WebSocketHandshakeInterceptor webSocketHandshakeInterceptor;
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(chatHandler,"/chat")
                .addInterceptors(webSocketHandshakeInterceptor);
    }
    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxTextMessageBufferSize(8192*4);
        container.setMaxBinaryMessageBufferSize(8192*4);
        return container;
    }
}
  • 實現 WebSocketConfigurer 接口,重寫 registerWebSocketHandlers 方法,這是一個核心實現方法,配置 websocket 入口,容許訪問的域、註冊 Handler、SockJs 支持和攔截器。
  • registry.addHandler()註冊和路由的功能,當客戶端發起 websocket 鏈接,把 /path 交給對應的 handler 處理,而不實現具體的業務邏輯,能夠理解爲收集和任務分發中心。
  • addInterceptors,顧名思義就是爲 handler 添加攔截器,能夠在調用 handler 先後加入咱們本身的邏輯代碼。
  • ServletServerContainerFactoryBean能夠添加對WebSocket的一些配置

方式2、xml配置

四、客戶端配置

function contect() {
    var  wsServer = 'ws://'+window.location.host+'/chat';
    var  websocket = new WebSocket(wsServer);
    websocket.onopen = function (evt) { onOpen(evt) };
    websocket.onclose = function (evt) { onClose(evt) };
    websocket.onmessage = function (evt) { onMessage(evt) };
    websocket.onerror = function (evt) { onError(evt) };
    function onOpen(evt) {
        console.log("Connected to WebSocket server.");
        websocket.send("test");//客戶端向服務器發送消息
    }
    function onClose(evt) {
        console.log("Disconnected");
    }
    function onMessage(evt) {
        console.log('Retrieved data from server: ' + evt.data);
    }
    function onError(evt) {
        console.log('Error occured: ' + evt.data);
    }
}

contect();

8.3.二、SockJs針對WebSocket支持稍差的場景

  爲了應對許多瀏覽器不支持WebSocket協議的問題,設計了備選SockJs

  SockJS 是 WebSocket 技術的一種模擬。SockJS 會 儘量對應 WebSocket API,但若是 WebSocket 技術不可用的話,就會選擇另外的通訊方式協議。

一、服務端只需增長:.withSockJS()便可

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    @Autowired
    private ChatHandler chatHandler;
    @Autowired
    private WebSocketHandshakeInterceptor webSocketHandshakeInterceptor;
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        // withSockJS() 方法聲明咱們想要使用 SockJS 功能,若是WebSocket不可用的話,會使用 SockJS;
        registry.addHandler(chatHandler,"/chat")
                .addInterceptors(webSocketHandshakeInterceptor).withSockJS();
    }
}

或者xml配置

<websocket:sockjs />

二、客戶端

只需對請求

    // var  server = 'ws://'+window.location.host+'/chat';
    var  server = 'http://'+window.location.host+'/chatsockjs';
    var  websocket = new SockJS(server);
  • SockJS 所處理的 URL 是 「http://「 或 「https://「 模式,而不是 「ws://「 or 「wss://「;
  • 其餘的函數如 onopen, onmessage, and onclose ,SockJS 客戶端與 WebSocket 同樣,

8.3.三、Stomp方式

  STOMP幀由命令,一個或多個頭信息以及負載所組成。

  直接使用WebSocket(或SockJS)就很相似於使用TCP套接字來編寫Web應用。由於沒有高層級的線路協議(wire protocol),所以就須要咱們定義應用之間所發送消息的語義,還須要確保鏈接的兩端都能遵循這些語義。 
  不過,好消息是咱們並不是必需要使用原生的WebSocket鏈接。就像HTTP在TCP套接字之上添加了請求-響應模型層同樣,STOMP在WebSocket之上提供了一個基於幀的線路格式(frame-based wire format)層,用來定義消息的語義。

  乍看上去,STOMP的消息格式很是相似於HTTP請求的結構。與HTTP請求和響應相似,STOMP幀由命令、一個或多個頭信息以及負載所組成。例如,以下就是發送數據的一個STOMP幀:

SEND
destination:/app/room-message
content-length:20

{\"message\":\"Hello!\"}

對以上代碼分析:

  1. SEND:STOMP命令,代表會發送一些內容;
  2. destination:頭信息,用來表示消息發送到哪裏;
  3. content-length:頭信息,用來表示 負載內容的 大小;
  4. 空行;
  5. 幀內容(負載)內容

8.3.3.一、基本用法

一、服務端Configuration配置

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration  implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //添加一個/socket-server-point 鏈接端點,客戶端就能夠經過這個端點來進行鏈接;withSockJS做用是添加SockJS支持
        registry.addEndpoint("/socket-server-point").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //定義了一個客戶端訂閱地址的前綴信息,也就是客戶端接收服務端發送消息的前綴信息
        registry.enableSimpleBroker("/topic");
        //定義了服務端接收地址的前綴,也即客戶端給服務端發消息的地址前綴
        registry.setApplicationDestinationPrefixes("/ws");
    }
}

對以上代碼分析:

  1. EnableWebSocketMessageBroker 註解代表: 這個配置類不只配置了 WebSocket,還配置了基於代理的 STOMP 消息;
  2. 它複寫了 registerStompEndpoints() 方法:添加一個服務端點,來接收客戶端的鏈接。將 「/socket-server-point」 路徑註冊爲 STOMP 端點。這個路徑與以前發送和接收消息的目的路徑有所不一樣, 這是一個端點,客戶端在訂閱或發佈消息到目的地址前,要鏈接該端點,即用戶發送請求 :url=’/127.0.0.1:8080/socket-server-point’ 與 STOMP server 進行鏈接,以後再轉發到訂閱url;
  3. 它複寫了 configureMessageBroker() 方法:配置了一個 簡單的消息代理,通俗一點講就是設置消息鏈接請求的各類規範信息。
  4. 發送應用程序的消息將會帶有 「/ws」 前綴。
二、控制器以及邏輯開發
Service服務處理
@Service
public class HandlerService {
    private static final Logger logger = LoggerFactory.getLogger(HandlerService.class);
    @Async
    public CompletableFuture<String> handle(String key) throws Exception {
        Thread.sleep(new Random().nextInt(3000));
        logger.info("Looking up " + key);
        key=key+":"+LocalDateTime.now();
        return CompletableFuture.completedFuture(key);
    }
}

Controller控制開發

    @MessageMapping("/searchBase")
    public ResponseEntity searchBase() throws Exception {
        Consumer<List<String>> callback = p -> websocket.convertAndSend("/topic/searchResults", p);
        List<String> list = Arrays.asList("bba", "aaa", "ccc");
        localSearch(list, callback);
        Map map = new HashMap();
        map.put("list", list);
        map.put("date", LocalDateTime.now());
        return ResponseEntity.ok(map);
    }

    public void localSearch(List<String> keys, Consumer<List<String>> callback) throws Exception {
        Thread.sleep(2000);
        List<String> list = new ArrayList<>();
        for (String key : keys) {
            CompletableFuture<String> completableFuture = handlerService.handle(key);
            completableFuture.thenAcceptAsync(p -> {
                list.clear();
                list.add(p);
                callback.accept(list);
            });
        }
    }

8.3.3.二、消息流

  

8.3.3.三、啓用STOMP代理中繼

  對於生產環境下的應用來講,你可能會但願使用真正支持STOMP的代理來支撐WebSocket消息,如RabbitMQ或ActiveMQ。這樣的代理提供了可擴展性和健壯性更好的消息功能,固然它們也會完整支持STOMP命令。咱們須要根據相關的文檔來爲STOMP搭建代理。搭建就緒以後,就可使用STOMP代理來替換內存代理了,只需按照以下方式重載configureMessageBroker()方法便可:

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/queue", "/topic");
        //定義了一個客戶端訂閱地址的前綴信息,也就是客戶端接收服務端發送消息的前綴信息
//        registry.enableSimpleBroker("/topic");
        //定義了服務端接收地址的前綴,也即客戶端給服務端發消息的地址前綴
        registry.setApplicationDestinationPrefixes("/ws");
    }
  • 上述configureMessageBroker()方法的第一行代碼啓用了STOMP代理中繼(broker relay)功能,並將其目的地前綴設置爲「/topic」和「/queue」。這樣的話,Spring就能知道全部目的地前綴爲「/topic」或「/queue」的消息都會發送到STOMP代理中。

  • 在第二行的configureMessageBroker()方法中將應用的前綴設置爲「/ws」。全部目的地以「/ws」打頭的消息都將會路由到帶有@MessageMapping註解的方法中,而不會發布到代理隊列或主題中。

默認狀況下,STOMP代理中繼會假設代理監聽localhost的61613端口,而且客戶端的username和password均爲「guest」。若是你的STOMP代理位於其餘的服務器上,或者配置成了不一樣的客戶端憑證,那麼咱們能夠在啓用STOMP代理中繼的時候,須要配置這些細節信息:

  @Override
  public void configureMessageBroker(MessageBrokerRegistry registry) {
    registry.enableStompBrokerRelay("/queue", "/topic")
            .setRelayHost("rabbit.someotherserver")
            .setRelayPort(62623)
            .setClientLogin("marcopolo")
            .setClientPasscode("letmein01")
    registry.setApplicationDestinationPrefixes("/app");
  }

8.3.3.四、處理來自客戶端的STOMP消息

一、應用消息MessageMapping

Spring 4.0引入了@MessageMapping註解,它用於STOMP消息的處理,相似於Spring MVC的@RequestMapping註解。當消息抵達某個特定的目的地時,帶有@MessageMapping註解的方法可以處理這些消息。

    /**
     * 處理來自客戶端的STOMP消息
     * @param incoming
     * @return
     */
    @MessageMapping("/incoming")
    public Shout handleShout(Shout incoming) {
        logger.info("Received message: " + incoming.getMessage());
        try { Thread.sleep(2000); } catch (InterruptedException e) {}
        Shout outgoing = new Shout();
        outgoing.setMessage("incoming!");
        return outgoing;
    }

消息接受類

public class Shout {
    private String message;

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}

客戶端

function contect() {
    var socket=new SockJS('/socket-server-point');
    stompCliet=Stomp.over(socket);
    stompCliet.connect({},function (frame) {
        console.log('Connected :'+frame);
        stompCliet.send("/ws/incoming",{},"{\"message\":\"Hello!\"}");
    });
}
contect();

二、訂閱模式SubscribeMapping

@SubscribeMapping的主要應用場景是實現請求-迴應模式。在請求-迴應模式中,客戶端訂閱某一個目的地,而後預期在這個目的地上得到一個一次性的響應。 
例如,考慮以下@SubscribeMapping註解標註的方法:

    @SubscribeMapping("/sub")
    public Shout handleSubscription(){
        logger.info("Received message: " +"subscription");
        Shout outgoing = new Shout();
        outgoing.setMessage("subscription!");
        return outgoing;
    }

當處理這個訂閱時,handleSubscription()方法會產生一個輸出的Shout對象並將其返回。而後,Shout對象會轉換成一條消息,而且會按照客戶端訂閱時相同的目的地發送回客戶端。

客戶端

function contect() {
    var socket=new SockJS('/socket-server-point');
    stompCliet=Stomp.over(socket);
    stompCliet.connect({},function (frame) {
        console.log('Connected :'+frame);
        stompCliet.subscribe('/ws/sub',function (result) {
            console.log("aaaa",JSON.parse(result.body));
        });
        stompCliet.send("/ws/sub",{},"{\"message\":\"Hello!\"}");
    });
}
contect();

這種請求-迴應模式與HTTP GET的請求-響應模式並無太大差異。可是,這裏的關鍵區別在於HTTP GET請求是同步的,而訂閱的請求-迴應模式則是異步的,這樣客戶端可以在迴應可用時再去處理,而沒必要等待。

8.3.3.五、發送消息到客戶端

 Spring提供了兩種發送數據給客戶端的方法:

  • 做爲處理消息或處理訂閱的附帶結果;
  • 使用消息模板。

方式1、做爲處理消息或處理訂閱的附帶結果、在處理消息以後,發送消息

    @MessageMapping("/incoming")
    public Shout handleShout(Shout incoming) {
        logger.info("Received message: " + incoming.getMessage());
        try { Thread.sleep(2000); } catch (InterruptedException e) {}
        Shout outgoing = new Shout();
        outgoing.setMessage("incoming!");
        return outgoing;
    }

當@MessageMapping註解標示的方法有返回值的時候,返回的對象將會進行轉換(經過消息轉換器)並放到STOMP幀的負載中,而後發送給消息代理。

默認狀況下,幀所發往的目的地會與觸發處理器方法的目的地相同,只不過會添加上「/topic」前綴。就本例而言,這意味着handleShout()方法所返回的Shout對象會寫入到STOMP幀的負載中,併發布到「/topic/incoming」目的地。不過,咱們能夠經過爲方法添加@SendTo註解,重載目的地:

    @MessageMapping("/incoming")
    @SendTo("/topic/shout")
    public Shout handleShout(Shout incoming) {
        logger.info("Received message: " + incoming.getMessage());
        try { Thread.sleep(2000); } catch (InterruptedException e) {}
        Shout outgoing = new Shout();
        outgoing.setMessage("incoming!");
        return outgoing;
    }

按照這個@SendTo註解,消息將會發布到「/topic/shout」。全部訂閱這個主題的應用(如客戶端)都會收到這條消息。 

按照相似的方式,@SubscribeMapping註解標註的方式也能發送一條消息,做爲訂閱的迴應。

    @SubscribeMapping("/sub")
    public Shout handleSubscription(){
        logger.info("Received message: " +"subscription");
        Shout outgoing = new Shout();
        outgoing.setMessage("subscription!");
        return outgoing;
    }

@SubscribeMapping的區別在於這裏的Shout消息將會直接發送給客戶端,而沒必要通過消息代理。若是你爲方法添加@SendTo註解的話,那麼消息將會發送到指定的目的地,這樣會通過代理。

對應客戶端須要增長訂閱便可

        stompCliet.subscribe('/ws/sub',function (result) {
            console.log("aaaa",JSON.parse(result.body));
        });

正如前面看到的那樣,使用 @MessageMapping 或者 @SubscribeMapping 註解能夠處理客戶端發送過來的消息,並選擇方法是否有返回值。

    若是 @MessageMapping 註解的控制器方法有返回值的話,返回值會被髮送到消息代理,只不過會添加上"/topic"前綴。可使用@SendTo 重寫消息目的地;

    若是 @SubscribeMapping 註解的控制器方法有返回值的話,返回值會直接發送到客戶端,不通過代理。若是加上@SendTo 註解的話,則要通過消息代理。

方式2、使用消息模板【在任意地方發送消息】

  @MessageMapping和@SubscribeMapping提供了一種很簡單的方式來發送消息,這是接收消息或處理訂閱的附帶結果。不過,Spring的SimpMessagingTemplate可以在應用的任何地方發送消息,甚至沒必要以首先接收一條消息做爲前提。

  咱們沒必要要求用戶刷新頁面,而是讓首頁訂閱一個STOMP主題.

function contect() {
    var socket=new SockJS('/socket-server-point');
    stompCliet=Stomp.over(socket);
    stompCliet.connect({},function (frame) {
        console.log('Connected :'+frame);
        stompCliet.subscribe('/topic/sendDataToClient',function (result) {
            console.log("aaaa",JSON.parse(result.body));
        });
        // stompCliet.send("/ws/sub",{},"{\"message\":\"Hello!\"}");
    });
}
contect();

使用SimpMessagingTemplate可以在應用的任何地方發佈消息

    @Autowired(required = false)
    private SimpMessagingTemplate websocket;

    @GetMapping("/sendDataToClient")
    public ResponseEntity sendDataToClient() throws Exception {
        Map map=new HashMap();
        map.put("aa","aaa");
        map.put("bb","bbb");
        websocket.convertAndSend("/topic/sendDataToClient",map);
        return ResponseEntity.ok("ok");
    }

固然此處的SimpMessagingTemplate也可使用父接口SimpMessageSendingOperations注入

在這個場景下,咱們但願全部的客戶端都能及時看到實時的/topic/sendDataToClient,這種作法是很好的。但有的時候,咱們但願發送消息給指定的用戶,而不是全部的客戶端。 

8.3.3.六、爲目標用戶發送消息

  以上說明了如何廣播消息,訂閱目的地的全部用戶都能收到消息。若是消息只想發送給特定的用戶呢?spring-websocket 介紹瞭如下

在使用Spring和STOMP消息功能的時候,咱們有兩種方式利用認證用戶:

  一、@MessageMapping和@SubscribeMapping標註的方法基於@SendToUser註解和Principal參數來獲取認證用戶;@MessageMapping、@SubscribeMapping和@MessageException方法返回的值可以以消息的形式發送給認證用戶;

  二、SimpMessageSendingOperations接口或SimpMessagingTemplate的convertAndSendToUser方法可以發送消息給特定用戶。

一、在控制器中處理用戶的消息

  在控制器的@MessageMapping或@SubscribeMapping方法中,處理消息時有兩種方式瞭解用戶信息。在處理器方法中,經過簡單地添加一個Principal參數,這個方法就能知道用戶是誰並利用該信息關注此用戶相關的數據。除此以外,處理器方法還可使用@SendToUser註解,代表它的返回值要以消息的形式發送給某個認證用戶的客戶端(只發送給該客戶端)。

  @SendToUser 表示要將消息發送給指定的用戶,會自動在消息目的地前補上"/user"前綴。以下,最後消息會被髮布在  /user/queue/notifications-username。可是問題來了,這個username是怎麼來的呢?就是經過 principal 參數來得到的。那麼,principal 參數又是怎麼來的呢?須要在spring-websocket 的配置類中重寫 configureClientInboundChannel 方法,添加上用戶的認證。

服務端增長configuration

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
    /**
     * 一、設置攔截器
     * 二、首次鏈接的時候,獲取其Header信息,利用Header裏面的信息進行權限認證
     * 三、經過認證的用戶,使用 accessor.setUser(user); 方法,將登錄信息綁定在該 StompHeaderAccessor 上,在Controller方法上能夠獲取 StompHeaderAccessor 的相關信息
     * @param registration
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new ChannelInterceptorAdapter() {
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
                //一、判斷是否首次鏈接
                if (StompCommand.CONNECT.equals(accessor.getCommand())){
                    //二、判斷用戶名和密碼
                    String username = accessor.getNativeHeader("username").get(0);
                    String password = accessor.getNativeHeader("password").get(0);

                    if ("admin".equals(username) && "admin".equals(password)){
                        Principal principal = new Principal() {
                            @Override
                            public String getName() {
                                return username;
                            }
                        };
                        accessor.setUser(principal);
                        return message;
                    }else {
                        return null;
                    }
                }
                //不是首次鏈接,已經登錄成功
                return message;
            }

        });
    }
}
View Code

服務端處理邏輯

    @MessageMapping("/shout")
    @SendToUser("/queue/notifications")
    public Shout userStomp(Principal principal, Shout shout) {
        String name = principal.getName();
        String message = shout.getMessage();
        logger.info("認證的名字是:{},收到的消息是:{}", name, message);
        return shout;
    }

前端js代碼

var headers={
    username:'admin',
    password:'admin'
};
function contect() {
    var socket=new SockJS('/socket-server-point');
    stompCliet=Stomp.over(socket);
    stompCliet.connect(headers,function (frame) {
        console.log('Connected :'+frame);
        stompCliet.subscribe('/user/queue/notifications',function (result) {
            console.log("aaaa",JSON.parse(result.body));
        });
        stompCliet.send("/ws/shout",{},"{\"message\":\"Hello!\"}");
    });
}
contect();

二、convertAndSendToUser方法

   除了convertAndSend()之外,SimpMessageSendingOperations 還提供了convertAndSendToUser()方法。按照名字就能夠判斷出來,convertAndSendToUser()方法可以讓咱們給特定用戶發送消息。

    @MessageMapping("/singleShout")
    public void singleUser(Shout shout, StompHeaderAccessor stompHeaderAccessor) {
        String message = shout.getMessage();
        LOGGER.info("接收到消息:" + message);
        Principal user = stompHeaderAccessor.getUser();
        simpMessageSendingOperations.convertAndSendToUser(user.getName(), "/queue/shouts", shout);
    }

  如上,這裏雖然我仍是用了認證的信息獲得用戶名。可是,其實大可沒必要這樣,由於 convertAndSendToUser 方法能夠指定要發送給哪一個用戶。也就是說,徹底能夠把用戶名的看成一個參數傳遞給控制器方法,從而繞過身份認證!convertAndSendToUser 方法最終會把消息發送到 /user/sername/queue/shouts 目的地上。

8.3.3.七、處理消息異常

  在處理消息的時候,有可能會出錯並拋出異常。由於STOMP消息異步的特色,發送者可能永遠也不會知道出現了錯誤。@MessageExceptionHandler標註的方法可以處理消息方法中所拋出的異常。咱們能夠把錯誤發送給用戶特定的目的地上,而後用戶從該目的地上訂閱消息,從而用戶就能知道本身出現了什麼錯誤

     @MessageExceptionHandler(Exception.class)
     @SendToUser("/queue/errors")
     public Exception handleExceptions(Exception t){
         t.printStackTrace();
         return t;
     }

8.3.3.八、更多stomp配置

一、發起鏈接

其中headers表示客戶端的認證信息:

若無需認證,直接使用空對象 「{}」 便可;

 (1)connectCallback 表示鏈接成功時(服務器響應 CONNECTED 幀)的回調方法; 

 (2)errorCallback 表示鏈接失敗時(服務器響應 ERROR 幀)的回調方法,非必須;

默認連接端點

//默認的和STOMP端點鏈接
/*stomp.connect("guest", "guest", function (franme) {
});*/

有用戶認證的

var headers={
    username:'admin',
    password:'admin'
};

stomp.connect(headers, function (frame) {

示例

// 創建鏈接對象(還未發起鏈接)
 var socket=new SockJS("/endpointChat"); 
 // 獲取 STOMP 子協議的客戶端對象 
 var stompClient = Stomp.over(socket); 
 // 向服務器發起websocket鏈接併發送CONNECT幀 
 stompClient.connect( {}, 
function connectCallback (frame) { 
     // 鏈接成功時(服務器響應 CONNECTED 幀)的回調方法 
console.log('已鏈接【' + frame + '】'); 
//訂閱一個消息
     stompClient.subscribe('/topic/getResponse',
function (response) { 
showResponse(response.body);
});
 },
     function errorCallBack (error) { 
     // 鏈接失敗時(服務器響應 ERROR 幀)的回調方法 
console.log('鏈接失敗【' + error + '】'); 
} );

二、斷開鏈接

若要從客戶端主動斷開鏈接,可調用 disconnect() 方法:

client.disconnect(
function () { 
    alert("斷開鏈接");
});

三、發送消息

鏈接成功後,客戶端可以使用 send() 方法向服務器發送信息:

client.send(destination url, headers, body);

其中: 

(1)destination url 爲服務器 controller中 @MessageMapping 中匹配的URL,字符串,必須參數; 

(2)headers 爲發送信息的header,JavaScript 對象,可選參數; 

(3)body 爲發送信息的 body,字符串,可選參數;
示例

client.send("/queue/test", {priority: 9}, "Hello, STOMP");
client.send("/queue/test", {}, "Hello, STOMP");

四、訂閱、接收消息

STOMP 客戶端要想接收來自服務器推送的消息,必須先訂閱相應的URL,即發送一個 SUBSCRIBE 幀,而後才能不斷接收來自服務器的推送消息。

訂閱和接收消息經過 subscribe() 方法實現:

subscribe(destination url, callback, headers)

其中 

(1)destination url 爲服務器 @SendTo 匹配的 URL,字符串; 

(2)callback 爲每次收到服務器推送的消息時的回調方法,該方法包含參數 message; 

(3)headers 爲附加的headers,JavaScript 對象;該方法返回一個包含了id屬性的 JavaScript 對象,可做爲 unsubscribe() 方法的參數;默認狀況下,若是沒有在headers額外添加,這個庫會默認構建一個獨一無二的ID。在傳遞headers這個參數時,可使用你本身id。
示例

var headers = {
ack: 'client',
//這個客戶端指定了它會確認接收的信息,只接收符合這個selector : location = 'Europe'的消息。
 'selector': "location = 'Europe'",
//id:’myid’
}; 
var callback = function(message) {
if (message.body) {
 alert("got message with body " +JSON.parse( message.body)) }
 else{
alert("got empty message"); 
} }); 
var subscription = client.subscribe("/queue/test", callback, headers);
 
若是想讓客戶端訂閱多個目的地,你能夠在接收全部信息的時候調用相同的回調函數:
onmessage = function(message) {
    // called every time the client receives a message
}
var sub1 = client.subscribe("queue/test", onmessage);
var sub2 = client.subscribe("queue/another", onmessage)

五、取消訂閱

var subscription = client.subscribe(...);
 
subscription.unsubscribe();

六、事務支持

能夠在將消息的發送和確認接收放在一個事務中。

客戶端調用自身的begin()方法就能夠開始啓動事務了,begin()有一個可選的參數transaction,一個惟一的可標識事務的字符串。若是沒有傳遞這個參數,那麼庫會自動構建一個。

這個方法會返回一個object。這個對象有一個id屬性對應這個事務的ID,還有兩個方法:

commit()提交事務
 
abort()停止事務

在一個事務中,客戶端能夠在發送/接受消息時指定transaction id來設置transaction。

// start the transaction
 
var tx = client.begin();
 
// send the message in a transaction
 
client.send("/queue/test", {transaction: tx.id}, "message in a transaction");
 
// commit the transaction to effectively send the message
 
tx.commit();

若是你在調用send()方法發送消息的時候忘記添加transction header,那麼這不會稱爲事務的一部分,這個消息會直接發送,不會等到事務完成後才發送。

var txid = "unique_transaction_identifier";
 
// start the transaction
 
var tx = client.begin();
 
// oops! send the message outside the transaction
 
client.send("/queue/test", {}, "I thought I was in a transaction!");
 
tx.abort(); // Too late! the message has been sent

七、消息確認

默認狀況,在消息發送給客戶端以前,服務端會自動確認(acknowledged)。

客戶端能夠選擇經過訂閱一個目的地時設置一個ack header爲client或client-individual來處理消息確認。

在下面這個例子,客戶端必須調用message.ack()來通知客戶端它已經接收了消息。

var subscription = client.subscribe("/queue/test",
    function(message) {
        // do something with the message
        ...
        // and acknowledge it
        message.ack();
    },
    {ack: 'client'}
);

ack()接受headers參數用來附加確認消息。例如,將消息做爲事務(transaction)的一部分,當要求接收消息時其實代理(broker)已經將ACK STOMP frame處理了。

var tx = client.begin();
message.ack({ transaction: tx.id, receipt: 'my-receipt' });
tx.commit();

ack()也能夠用來通知STOMP 1.1.brokers(代理):客戶端不能消費這個消息。與ack()方法的參數相同。

八、debug調試

有一些測試代碼能有助於你知道庫發送或接收的是什麼,從而來調試程序。

客戶端能夠將其debug屬性設置爲一個函數,傳遞一個字符串參數去觀察庫全部的debug語句。

client.debug = function(str) {
 
    // append the debug log to a #debug div somewhere in the page using JQuery:
 
    $("#debug").append(str + "\n");
};

默認狀況,debug消息會被記錄在在瀏覽器的控制檯。

九、心跳機制

若是STOMP broker(代理)接收STOMP 1.1版本的幀,heart-beating是默認啓用的。heart-beating也就是頻率,incoming是接收頻率,outgoing是發送頻率。

經過改變incoming和outgoing能夠更改客戶端的heart-beating(默認爲10000ms):

client.heartbeat.outgoing = 20000;
 
// client will send heartbeats every 20000ms
 
client.heartbeat.incoming = 0;
 
// client does not want to receive heartbeats
 
// from the server

heart-beating是利用window.setInterval()去規律地發送heart-beats或者檢查服務端的heart-beats。

 

更多

stomp.connect(headers, function (frame) {

    //發送消息
    //第二個參數是一個頭信息的Map,它會包含在STOMP的幀中
    //事務支持
    var tx = stomp.begin();
    stomp.send("/app/marco", {transaction: tx.id}, strJson);
    tx.commit();


    //訂閱服務端消息 subscribe(destination url, callback[, headers])
    stomp.subscribe("/topic/marco", function (message) {
        var content = message.body;
        var obj = JSON.parse(content);
        console.log("訂閱的服務端消息:" + obj.message);
    }, {});


    stomp.subscribe("/app/getShout", function (message) {
        var content = message.body;
        var obj = JSON.parse(content);
        console.log("訂閱的服務端直接返回的消息:" + obj.message);
    }, {});


    /*如下是針對特定用戶的訂閱*/
    var adminJSON = JSON.stringify({'message': 'ADMIN'});
    /*第一種*/
    stomp.send("/app/singleShout", {}, adminJSON);
    stomp.subscribe("/user/queue/shouts",function (message) {
        var content = message.body;
        var obj = JSON.parse(content);
        console.log("admin用戶特定的消息1:" + obj.message);
    });
    /*第二種*/
    stomp.send("/app/shout", {}, adminJSON);
    stomp.subscribe("/user/queue/notifications",function (message) {
        var content = message.body;
        var obj = JSON.parse(content);
        console.log("admin用戶特定的消息2:" + obj.message);
    });

    /*訂閱異常消息*/
    stomp.subscribe("/user/queue/errors", function (message) {
        console.log(message.body);
    });

    //若使用STOMP 1.1 版本,默認開啓了心跳檢測機制(默認值都是10000ms)
    stomp.heartbeat.outgoing = 20000;

    stomp.heartbeat.incoming = 0; //客戶端不從服務端接收心跳包
});
View Code

 

參看代碼:https://github.com/JMCuixy/SpringWebSocket 

相關文章
相關標籤/搜索