java-websocket客戶端 斷線重連 注入Service問題

java版客戶端:java

使用開源項目java-websocket, github地址: https://github.com/TooTallNate/Java-WebSocketgit

github上有不少示例,具體能夠去查看github

此處主要是記錄java-websocket實現客戶端,並解決沒法使用Service層方法(service爲null)的問題,以及斷線重連web

引用包spring

<dependency>
    <groupId>org.java-websocket</groupId>
    <artifactId>Java-WebSocket</artifactId>
    <version>1.3.9</version>
</dependency>

初版,使用getBean獲取Service層方法,而且實現斷線重連

使用的是GitHub上的demo示例apache

import com.alibaba.fastjson.JSONArray;import com.sensor.vibration.utils.ApplicationContextRegister; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.java_websocket.client.WebSocketClient; import org.java_websocket.drafts.Draft; import org.java_websocket.handshake.ServerHandshake; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import java.net.URI; import java.util.Map; /** This example demonstrates how to create a websocket connection to a server. Only the most important callbacks are overloaded. */ @Slf4j public class SensorWebSocketClient extends WebSocketClient { @Autowired private UserService userService; public SensorWebSocketClient(URI serverUri , Draft draft ) { super( serverUri, draft ); } public SensorWebSocketClient(URI serverURI ) { super( serverURI ); } public SensorWebSocketClient(URI serverUri, Map<String, String> httpHeaders ) { super(serverUri, httpHeaders); } @Override public void onOpen( ServerHandshake handshakedata ) { System.out.println( "opened connection" ); // if you plan to refuse connection based on ip or httpfields overload: onWebsocketHandshakeReceivedAsClient
 } @Override public void onMessage( String msg ) { log.info("[websocket] 收到消息={}",msg); } @Override public void onClose( int code, String reason, boolean remote ) { // The codecodes are documented in class org.java_websocket.framing.CloseFrame
        System.out.println( "Connection closed by " + ( remote ? "remote peer" : "us" ) + " Code: " + code + " Reason: " + reason ); } @Override public void onError( Exception ex ) { ex.printStackTrace(); // if the error is fatal then onClose will be called additionally
 } }

新建一個類,建立一個方法,啓動websocketjson

import java.net.URI; import java.net.URISyntaxException; /** * Simple example to reconnect blocking and non-blocking. */

public class ReconnectClient { public static void reconnect() throws URISyntaxException, InterruptedException{ SensorWebSocketClient c = new SensorWebSocketClient( new URI( "ws://localhost:5005/websocket" ) ); c.connectBlocking();

        new Thread(new Runnable() { public void run() { System.out.println("Runnable running.."); } }) { public void run() { while (true){ try{ Thread.sleep(3000); c.send(""); }catch (Exception e){ c.reconnect(); } } }; }.start(); } }

在新建一個類,程序啓動的時候,調用上面的方法安全

import com.sensor.vibration.utils.Common; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.net.URISyntaxException; import java.util.Map; @Slf4j @Component public class InitStart implements CommandLineRunner { @Override public void run(String... args) throws URISyntaxException, InterruptedException{ ReconnectClient.reconnect(); } }

中間的啓動類的方法能夠省去,直接寫在InitStart的run方法裏面websocket

如今還不能使用Service層的方法,會報service爲null異常,百度後,參考別人使用getBean方法,寫一個工具類java-web

import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component;  @Component @Lazy(false) public class ApplicationContextRegister  implements ApplicationContextAware { private static ApplicationContext APPLICATION_CONTEXT; /** * 設置spring上下文 * * @param applicationContext spring上下文 * @throws BeansException */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { APPLICATION_CONTEXT = applicationContext; } public static ApplicationContext getApplicationContext() { return APPLICATION_CONTEXT; } }

在 SensorWebSocketClient.java 中使用Service

@Autowired private UserService userService; ApplicationContext act = ApplicationContextRegister.getApplicationContext(); userService=act.getBean(UserService.class);

可是 領導不讓用getBean這種方法,放棄

第二版,使用Service層方法版本 + 斷線重連

實現:

import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.net.URI; /** * Created by Chow on 2019/8/22 */ @Slf4j @Component public class WebSocketClientStart { @Autowired private UserService userService; @Bean public WebSocketClient webSocketClient() { try { WebSocketClient webSocketClient = new WebSocketClient(new URI("ws://127.0.0.1:5005/websocket")) { @Override public void onOpen(ServerHandshake handshakedata) { log.info("[websocket] 鏈接成功"); } @Override public void onMessage(String msg) { try{ log.info("[websocket] 收到消息={}",msg); if (msg == null || StringUtils.isBlank(msg)){ log.error("the msg message of websocket received is null"); this.send(""); return; } JSONArray jsonArray = JSONArray.parseArray(msg); if (jsonArray == null || jsonArray.size() == 0){ log.info("log: the message of websocket received is empty"); } vibrationAlarmService.alarmAnalysis(jsonArray); this.send(""); }catch (Exception e){ log.error(e.getMessage(), e); this.send(""); } } @Override public void onClose(int code, String reason, boolean remote) { log.info("[websocket] 退出鏈接"); } @Override public void onError(Exception ex) { log.info("[websocket] 鏈接錯誤={}",ex.getMessage()); } }; webSocketClient.connect(); new Thread(new Runnable() { public void run() { System.out.println("Runnable running.."); } }) { public void run() { while (true){ try{ Thread.sleep(3000); webSocketClient.send(""); }catch (Exception e){ webSocketClient.reconnect(); } } } }.start(); return webSocketClient; } catch (Exception e) { e.printStackTrace(); } return null; } }

 

 

 測試服務端代碼

須要引入包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
    <version>2.1.3.RELEASE</version>
</dependency>

server:

import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import org.springframework.stereotype.Component; @ServerEndpoint(value = "/websocket") @Component public class MyWebSocket { //靜態變量,用來記錄當前在線鏈接數。應該把它設計成線程安全的。
    private static int onlineCount = 0; //concurrent包的線程安全Set,用來存放每一個客戶端對應的MyWebSocket對象。
    private static CopyOnWriteArraySet<MyWebSocket> webSocketSet = new CopyOnWriteArraySet<MyWebSocket>(); //與某個客戶端的鏈接會話,須要經過它來給客戶端發送數據
    private Session session; /** * 鏈接創建成功調用的方法*/ @OnOpen public void onOpen(Session session) { this.session = session; webSocketSet.add(this);     //加入set中
        addOnlineCount();           //在線數加1
        System.out.println("有新鏈接加入!當前在線人數爲" + getOnlineCount()); try { sendMessage("當前在線人數爲" + getOnlineCount()); } catch (IOException e) { System.out.println("IO異常"); } } /** * 鏈接關閉調用的方法 */ @OnClose public void onClose() { webSocketSet.remove(this);  //從set中刪除
        subOnlineCount();           //在線數減1
        System.out.println("有一鏈接關閉!當前在線人數爲" + getOnlineCount()); } /** * 收到客戶端消息後調用的方法 * * @param message 客戶端發送過來的消息*/ @OnMessage public void onMessage(String message, Session session) { System.out.println("來自客戶端的消息:" + message); //羣發消息
        for (MyWebSocket item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); } } } @OnError public void onError(Session session, Throwable error) { System.out.println("發生錯誤"); error.printStackTrace(); } public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 羣發自定義消息 * */
    public static void sendInfo(String message) throws IOException { for (MyWebSocket item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { continue; } } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { MyWebSocket.onlineCount++; } public static synchronized void subOnlineCount() { MyWebSocket.onlineCount--; } }
相關文章
相關標籤/搜索