pom.xmljava
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sys</groupId> <artifactId>springboot-socketio</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>springboot-socketio</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/com.corundumstudio.socketio/netty-socketio --> <dependency> <groupId>com.corundumstudio.socketio</groupId> <artifactId>netty-socketio</artifactId> <version>1.7.12</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2 啓動加載 socketweb
import com.corundumstudio.socketio.SocketIOServer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; @Component @Order(value=1) public class MyCommandLineRunner implements CommandLineRunner { private final SocketIOServer server; @Autowired public MyCommandLineRunner(SocketIOServer server) { this.server = server; } @Override public void run(String... args) throws Exception { server.start(); System.out.println("socket.io啓動成功!"); } }
3 創建鏈接spring
import com.corundumstudio.socketio.AckRequest; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.SocketIOServer; import com.corundumstudio.socketio.annotation.OnConnect; import com.corundumstudio.socketio.annotation.OnDisconnect; import com.corundumstudio.socketio.annotation.OnEvent; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.Date; import java.util.UUID; @Component public class MessageEventHandler { public static SocketIOServer socketIoServer; public static ArrayList<UUID> listClient = new ArrayList<>(); //static final int limitSeconds = 60; @Autowired public MessageEventHandler(SocketIOServer server) { MessageEventHandler.socketIoServer = server; } @OnConnect public void onConnect(SocketIOClient client) { listClient.add(client.getSessionId()); System.err.println(listClient.size()); System.out.println("客戶端:" + client.getSessionId() + "已鏈接"); } @OnDisconnect public void onDisconnect(SocketIOClient client) { System.out.println("客戶端:" + client.getSessionId() + "斷開鏈接"); listClient.remove(client.getSessionId()); } @OnEvent(value = "messageevent") //value是監聽事件的名稱 public void onEvent(SocketIOClient client, AckRequest request, Object data) { //System.out.println("發來消息:" + data.toString()); //socketIoServer.getClient(client.getSessionId()).sendEvent("messageevent", "back data"+new Date().getTime()); } public static void sendBuyLogEvent() { //這裏就是向客戶端推消息了 long dateTime = new Date().getTime(); for (UUID clientId : listClient) { if (socketIoServer.getClient(clientId) == null) continue; socketIoServer.getClient(clientId).sendEvent("messageevent", dateTime, 1); } } }
4 springboot 啓動類SpringbootSocketioApplication 配置socket beanapache
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import com.corundumstudio.socketio.SocketIOServer; import com.corundumstudio.socketio.annotation.SpringAnnotationScanner; import com.sys.demo.kafka.KafkaSender; @SpringBootApplication public class SpringbootSocketioApplication { public static void main(String[] args) { ConfigurableApplicationContext context =SpringApplication.run(SpringbootSocketioApplication.class, args); //SpringApplication.run(SpringBootKakfaApplication.class, args); /*KafkaSender sender = context.getBean(KafkaSender.class); for (int i = 0; i < 3; i++) { //調用消息發送類中的消息發送方法 sender.send(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } */ } @Bean public SocketIOServer socketIOServer() { com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration(); String os = System.getProperty("os.name"); if(os.toLowerCase().startsWith("win")){ //在本地window環境測試時用localhost System.out.println("this is windows"); config.setHostname("localhost"); } else { config.setHostname("123.123.111.222"); //部署到你的遠程服務器正式發佈環境時用服務器公網ip } config.setPort(9092); /*config.setAuthorizationListener(new AuthorizationListener() {//相似過濾器 @Override public boolean isAuthorized(HandshakeData data) { //http://localhost:8081?username=test&password=test //例若是使用上面的連接進行connect,能夠使用以下代碼獲取用戶密碼信息,本文不作身份驗證 // String username = data.getSingleUrlParam("username"); // String password = data.getSingleUrlParam("password"); return true; } });*/ final SocketIOServer server = new SocketIOServer(config); return server; } @Bean public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) { return new SpringAnnotationScanner(socketServer); } }
5 監聽kafka 發送消息給客戶端windows
package com.sys.demo.kafka; import java.util.Optional; import java.util.UUID; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import com.sys.demo.MessageEventHandler; @Component public class KafkaReceiver { @Autowired private MessageEventHandler messageEventHandler; @KafkaListener(topics = {"mycall_out"}) public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.err.println("to client message:"+message); for (UUID clientId : messageEventHandler.listClient) { if (messageEventHandler.socketIoServer.getClient(clientId) == null) continue; //發送消息 messageEventHandler.socketIoServer.getClient(clientId).sendEvent("messageevent", message); } } } }