java版客戶端:java
使用開源項目java-websocket, github地址: https://github.com/TooTallNate/Java-WebSocketgit
github上有不少示例,具體能夠去查看github
此處主要是記錄java-websocket實現客戶端,並解決沒法使用Service層方法(service爲null)的問題,以及斷線重連web
引用包spring
<dependency> <groupId>org.java-websocket</groupId> <artifactId>Java-WebSocket</artifactId> <version>1.3.9</version> </dependency>
初版,使用getBean獲取Service層方法,而且實現斷線重連
使用的是GitHub上的demo示例apache
import com.alibaba.fastjson.JSONArray;import com.sensor.vibration.utils.ApplicationContextRegister; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.java_websocket.client.WebSocketClient; import org.java_websocket.drafts.Draft; import org.java_websocket.handshake.ServerHandshake; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import java.net.URI; import java.util.Map; /** This example demonstrates how to create a websocket connection to a server. Only the most important callbacks are overloaded. */ @Slf4j public class SensorWebSocketClient extends WebSocketClient { @Autowired private UserService userService; public SensorWebSocketClient(URI serverUri , Draft draft ) { super( serverUri, draft ); } public SensorWebSocketClient(URI serverURI ) { super( serverURI ); } public SensorWebSocketClient(URI serverUri, Map<String, String> httpHeaders ) { super(serverUri, httpHeaders); } @Override public void onOpen( ServerHandshake handshakedata ) { System.out.println( "opened connection" ); // if you plan to refuse connection based on ip or httpfields overload: onWebsocketHandshakeReceivedAsClient } @Override public void onMessage( String msg ) { log.info("[websocket] 收到消息={}",msg); } @Override public void onClose( int code, String reason, boolean remote ) { // The codecodes are documented in class org.java_websocket.framing.CloseFrame System.out.println( "Connection closed by " + ( remote ? "remote peer" : "us" ) + " Code: " + code + " Reason: " + reason ); } @Override public void onError( Exception ex ) { ex.printStackTrace(); // if the error is fatal then onClose will be called additionally } }
新建一個類,建立一個方法,啓動websocketjson
import java.net.URI; import java.net.URISyntaxException; /** * Simple example to reconnect blocking and non-blocking. */ public class ReconnectClient { public static void reconnect() throws URISyntaxException, InterruptedException{ SensorWebSocketClient c = new SensorWebSocketClient( new URI( "ws://localhost:5005/websocket" ) ); c.connectBlocking(); new Thread(new Runnable() { public void run() { System.out.println("Runnable running.."); } }) { public void run() { while (true){ try{ Thread.sleep(3000); c.send(""); }catch (Exception e){ c.reconnect(); } } }; }.start(); } }
在新建一個類,程序啓動的時候,調用上面的方法安全
import com.sensor.vibration.utils.Common; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.net.URISyntaxException; import java.util.Map; @Slf4j @Component public class InitStart implements CommandLineRunner { @Override public void run(String... args) throws URISyntaxException, InterruptedException{ ReconnectClient.reconnect(); } }
中間的啓動類的方法能夠省去,直接寫在InitStart的run方法裏面websocket
如今還不能使用Service層的方法,會報service爲null異常,百度後,參考別人使用getBean方法,寫一個工具類java-web
import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; @Component @Lazy(false) public class ApplicationContextRegister implements ApplicationContextAware { private static ApplicationContext APPLICATION_CONTEXT; /** * 設置spring上下文 * * @param applicationContext spring上下文 * @throws BeansException */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { APPLICATION_CONTEXT = applicationContext; } public static ApplicationContext getApplicationContext() { return APPLICATION_CONTEXT; } }
在 SensorWebSocketClient.java 中使用Service
@Autowired private UserService userService; ApplicationContext act = ApplicationContextRegister.getApplicationContext(); userService=act.getBean(UserService.class);
可是 領導不讓用getBean這種方法,放棄
第二版,使用Service層方法版本 + 斷線重連
實現:
import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.net.URI; /** * Created by Chow on 2019/8/22 */ @Slf4j @Component public class WebSocketClientStart { @Autowired private UserService userService; @Bean public WebSocketClient webSocketClient() { try { WebSocketClient webSocketClient = new WebSocketClient(new URI("ws://127.0.0.1:5005/websocket")) { @Override public void onOpen(ServerHandshake handshakedata) { log.info("[websocket] 鏈接成功"); } @Override public void onMessage(String msg) { try{ log.info("[websocket] 收到消息={}",msg); if (msg == null || StringUtils.isBlank(msg)){ log.error("the msg message of websocket received is null"); this.send(""); return; } JSONArray jsonArray = JSONArray.parseArray(msg); if (jsonArray == null || jsonArray.size() == 0){ log.info("log: the message of websocket received is empty"); } vibrationAlarmService.alarmAnalysis(jsonArray); this.send(""); }catch (Exception e){ log.error(e.getMessage(), e); this.send(""); } } @Override public void onClose(int code, String reason, boolean remote) { log.info("[websocket] 退出鏈接"); } @Override public void onError(Exception ex) { log.info("[websocket] 鏈接錯誤={}",ex.getMessage()); } }; webSocketClient.connect(); new Thread(new Runnable() { public void run() { System.out.println("Runnable running.."); } }) { public void run() { while (true){ try{ Thread.sleep(3000); webSocketClient.send(""); }catch (Exception e){ webSocketClient.reconnect(); } } } }.start(); return webSocketClient; } catch (Exception e) { e.printStackTrace(); } return null; } }
測試服務端代碼
須要引入包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> <version>2.1.3.RELEASE</version> </dependency>
server:
import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import org.springframework.stereotype.Component; @ServerEndpoint(value = "/websocket") @Component public class MyWebSocket { //靜態變量,用來記錄當前在線鏈接數。應該把它設計成線程安全的。 private static int onlineCount = 0; //concurrent包的線程安全Set,用來存放每一個客戶端對應的MyWebSocket對象。 private static CopyOnWriteArraySet<MyWebSocket> webSocketSet = new CopyOnWriteArraySet<MyWebSocket>(); //與某個客戶端的鏈接會話,須要經過它來給客戶端發送數據 private Session session; /** * 鏈接創建成功調用的方法*/ @OnOpen public void onOpen(Session session) { this.session = session; webSocketSet.add(this); //加入set中 addOnlineCount(); //在線數加1 System.out.println("有新鏈接加入!當前在線人數爲" + getOnlineCount()); try { sendMessage("當前在線人數爲" + getOnlineCount()); } catch (IOException e) { System.out.println("IO異常"); } } /** * 鏈接關閉調用的方法 */ @OnClose public void onClose() { webSocketSet.remove(this); //從set中刪除 subOnlineCount(); //在線數減1 System.out.println("有一鏈接關閉!當前在線人數爲" + getOnlineCount()); } /** * 收到客戶端消息後調用的方法 * * @param message 客戶端發送過來的消息*/ @OnMessage public void onMessage(String message, Session session) { System.out.println("來自客戶端的消息:" + message); //羣發消息 for (MyWebSocket item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); } } } @OnError public void onError(Session session, Throwable error) { System.out.println("發生錯誤"); error.printStackTrace(); } public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 羣發自定義消息 * */ public static void sendInfo(String message) throws IOException { for (MyWebSocket item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { continue; } } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { MyWebSocket.onlineCount++; } public static synchronized void subOnlineCount() { MyWebSocket.onlineCount--; } }