springboot socketio

pom.xmljava

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sys</groupId>
    <artifactId>springboot-socketio</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>springboot-socketio</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.corundumstudio.socketio/netty-socketio -->
        <dependency>
            <groupId>com.corundumstudio.socketio</groupId>
            <artifactId>netty-socketio</artifactId>
            <version>1.7.12</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

 

2 啓動加載 socketweb

import com.corundumstudio.socketio.SocketIOServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
 
 
@Component
@Order(value=1)
public class MyCommandLineRunner implements CommandLineRunner {
    private final SocketIOServer server;
 
 
    @Autowired
    public MyCommandLineRunner(SocketIOServer server) {
        this.server = server;
    }
 
 
    @Override
    public void run(String... args) throws Exception {
        server.start();
        System.out.println("socket.io啓動成功!");
    }
}

3 創建鏈接spring

import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import java.util.ArrayList;
import java.util.Date;
import java.util.UUID;
 
 
@Component
public class MessageEventHandler {
    public static SocketIOServer socketIoServer;
    public static ArrayList<UUID> listClient = new ArrayList<>();
    //static final int limitSeconds = 60;
 
    @Autowired
    public MessageEventHandler(SocketIOServer server) {
        MessageEventHandler.socketIoServer = server;
    }
 
    @OnConnect
    public void onConnect(SocketIOClient client) {
        listClient.add(client.getSessionId());
        System.err.println(listClient.size());
        System.out.println("客戶端:" + client.getSessionId() + "已鏈接");
    }
 
    @OnDisconnect
    public void onDisconnect(SocketIOClient client) {
        System.out.println("客戶端:" + client.getSessionId() + "斷開鏈接");
        listClient.remove(client.getSessionId());
    }
 
 
    @OnEvent(value = "messageevent") //value是監聽事件的名稱
    public void onEvent(SocketIOClient client, AckRequest request, Object data) {
        //System.out.println("發來消息:" + data.toString());
        //socketIoServer.getClient(client.getSessionId()).sendEvent("messageevent", "back data"+new Date().getTime());
    }
 
 
    public static void sendBuyLogEvent() {   //這裏就是向客戶端推消息了
        long dateTime = new Date().getTime();
 
        for (UUID clientId : listClient) {
            if (socketIoServer.getClient(clientId) == null) continue;
            socketIoServer.getClient(clientId).sendEvent("messageevent", dateTime, 1);
        }
    }
    
}

4 springboot 啓動類SpringbootSocketioApplication  配置socket beanapache

 

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;

import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.SpringAnnotationScanner;
import com.sys.demo.kafka.KafkaSender;


@SpringBootApplication
public class SpringbootSocketioApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =SpringApplication.run(SpringbootSocketioApplication.class, args);
        //SpringApplication.run(SpringBootKakfaApplication.class, args);
        /*KafkaSender sender = context.getBean(KafkaSender.class);

        for (int i = 0; i < 3; i++) {
            //調用消息發送類中的消息發送方法
            sender.send();

            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        */
        
    }
    @Bean
    public SocketIOServer socketIOServer() {
        com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
        
        String os = System.getProperty("os.name");
        if(os.toLowerCase().startsWith("win")){   //在本地window環境測試時用localhost
            System.out.println("this is  windows");
            config.setHostname("localhost");
        } else {
            config.setHostname("123.123.111.222");   //部署到你的遠程服務器正式發佈環境時用服務器公網ip
        }
        config.setPort(9092);
 
        /*config.setAuthorizationListener(new AuthorizationListener() {//相似過濾器
            @Override
            public boolean isAuthorized(HandshakeData data) {
                //http://localhost:8081?username=test&password=test
                //例若是使用上面的連接進行connect,能夠使用以下代碼獲取用戶密碼信息,本文不作身份驗證
                // String username = data.getSingleUrlParam("username");
                // String password = data.getSingleUrlParam("password");
                return true;
            }
        });*/
 
 
        final SocketIOServer server = new SocketIOServer(config);
        return server;
    }
 
    @Bean
    public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
        return new SpringAnnotationScanner(socketServer);
    }
    
}

5 監聽kafka 發送消息給客戶端windows

package com.sys.demo.kafka;

import java.util.Optional;
import java.util.UUID;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import com.sys.demo.MessageEventHandler;

@Component
public class KafkaReceiver {
    @Autowired
    private MessageEventHandler messageEventHandler;
    @KafkaListener(topics = {"mycall_out"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.err.println("to client message:"+message);
            for (UUID clientId : messageEventHandler.listClient) {
                if (messageEventHandler.socketIoServer.getClient(clientId) == null) 
                    continue;
                //發送消息
                messageEventHandler.socketIoServer.getClient(clientId).sendEvent("messageevent", message);
            }
        }

    }
}
相關文章
相關標籤/搜索