netty-socketio(二)整合redis實現發佈訂閱

一、Redis 發佈訂閱

參考:https://www.runoob.com/redis/redis-pub-sub.htmlhtml

Redis 發佈訂閱(pub/sub)是一種消息通訊模式:發送者(pub)發送消息,訂閱者(sub)接收消息。java

Redis 客戶端能夠訂閱任意數量的頻道。git

二、案例:netty-socketio和redis實現發佈/訂閱功能

  本Demo實現:netty-socketio實現訂閱(參考:https://www.cnblogs.com/xy-ouyang/p/10675904.html),redis實現推送消息。本demo保存地址:https://github.com/wenbinouyang/oy_javagithub

  demo使用 springboot 2.1.3.RELEASE,項目整體結構:redis

  

  pom.xmlspring

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.oy</groupId>
    <artifactId>nettysocketio007</artifactId>
    <version>0.0.1</version>
    <name>nettysocketio008</name>
    <description>nettysocketio008 for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <!-- spring boot start -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        
        <!-- netty-socketio server -->
        <dependency>
            <groupId>com.corundumstudio.socketio</groupId>
            <artifactId>netty-socketio</artifactId>
            <version>1.7.11</version>
        </dependency>
        
        <!-- springboot data redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <!-- exclusion lettuce,jedis -->
            <exclusions>
                <exclusion>
                    <groupId>redis.clients</groupId>
                    <artifactId>jedis</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>io.lettuce</groupId>
                    <artifactId>lettuce-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- jedis -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>
        
        <!-- jedis pool dependency -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

 

  application.propertiesapache

logging.file=/home/wwwlogs/nettysocketio008/log.log

#redis
#spring.redis.host=127.0.0.1
spring.redis.host=47.244.48.230
spring.redis.port=6379
spring.redis.password=Redis0929

spring.redis.jedis.pool.maxActive=8
spring.redis.jedis.pool.max-idle=8
spring.redis.jedis.pool.min-idle=0
spring.redis.timeout=0

 

   Nettysocketio008Application類json

package com.oy;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.annotation.Order;
import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.SpringAnnotationScanner;

@SpringBootApplication
@Order(1)
public class Nettysocketio008Application implements CommandLineRunner {
    private SocketIOServer server;

    public static void main(String[] args) {
        SpringApplication.run(Nettysocketio008Application.class, args);
    }

    @Bean
    public SocketIOServer socketIOServer() {
        Configuration config = new Configuration();
        config.setHostname("localhost");
        config.setPort(4001);
        this.server = new SocketIOServer(config);
        return server;
    }

    @Bean
    public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
        return new SpringAnnotationScanner(socketServer);
    }

    @Override
    public void run(String... args) throws Exception {
        server.start();
        UtilFunctions.log.info("socket.io run success!");
        
        // 向"channel_1" push數據
        // Service.send(args);
    }
}

 

   RedisConfig類tomcat

package com.oy;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

@Configuration
public class RedisConfig extends CachingConfigurerSupport {
    public static final Logger log = LoggerFactory.getLogger(RedisConfig.class);

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();

        // RedisTemplate
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        template.setKeySerializer(redisSerializer);
        template.setValueSerializer(redisSerializer);
        template.setHashKeySerializer(redisSerializer);
        template.setHashValueSerializer(redisSerializer);

        template.afterPropertiesSet();
        return template;
    }

    @Bean(destroyMethod = "destroy")
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory,
            MessageListener redisMessageListener) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);

        ThreadPoolTaskScheduler taskExecutor = new ThreadPoolTaskScheduler();
        taskExecutor.setPoolSize(10);
        taskExecutor.initialize();
        container.setTaskExecutor(taskExecutor);

        Map<MessageListener, Collection<? extends Topic>> listeners = new HashMap<>();
        List<Topic> list = new ArrayList<>();
        list.add(new ChannelTopic("cfd_md"));
        listeners.put(redisMessageListener, list);
        container.setMessageListeners(listeners);

        return container;
    }

}

 

  MessageEventHandler類springboot

package com.oy;
import java.util.Set;
import java.util.UUID;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent;

@Component
public class MessageEventHandler {
    public static SocketIOServer socketIoServer;

    @Autowired
    public MessageEventHandler(SocketIOServer server) {
        MessageEventHandler.socketIoServer = server;
    }

    @OnConnect
    public void onConnect(SocketIOClient client) {
        UUID socketSessionId = client.getSessionId();
        String ip = client.getRemoteAddress().toString();
        UtilFunctions.log.info("client connect, socketSessionId:{}, ip:{}", socketSessionId, ip);
    }

    @OnEvent("sub")
    public void sub(SocketIOClient client, AckRequest request, String channel) {
        UUID socketSessionId = client.getSessionId();
        String ip = client.getRemoteAddress().toString();
        client.joinRoom(channel);
        UtilFunctions.log.info("client sub, channel:{}, socketSessionId:{}, ip:{}", channel, socketSessionId, ip);
    
        Set<String> rooms = client.getAllRooms();
        for (String room : rooms) {
            UtilFunctions.log.info("after client connect, room:{}", room);
        }
        
        // 客戶端一訂閱,就立刻push一次
        if ("channel_1".equals(channel)) {
            sendAllEvent(Service.getMsg());
        } else if ("redis_channel".equals(channel)) {
            sendAllEvent(RedisSub.getMsg());
        }
    }

//    @OnEvent("unsub")
//    public void unsub(SocketIOClient client, AckRequest request, String channel) {
//        UUID socketSessionId = client.getSessionId();
//        String ip = client.getRemoteAddress().toString();
//        client.leaveRoom(channel);
//        UtilFunctions.log.info("client unsub, channel:{}, socketSessionId:{}, ip:{}", channel, socketSessionId, ip);
//    }

    @OnDisconnect
    public void onDisconnect(SocketIOClient client) {
        UUID socketSessionId = client.getSessionId();
        String ip = client.getRemoteAddress().toString();
        UtilFunctions.log.info("client disconnect, socketSessionId:{}, ip:{}", socketSessionId, ip);
        
        Set<String> rooms = client.getAllRooms();
        for (String room : rooms) {
            UtilFunctions.log.info("after client disconnect, room:{}", room);
        }
    }

    // broadcast to channel "channel_1"
    public static void sendAllEvent(String data) {
        socketIoServer.getRoomOperations("channel_1").sendEvent("channel_1", data);
    }
    
    // broadcast to channel "redis_channel"
    public static void sendAllEvent2(String data) {
        socketIoServer.getRoomOperations("redis_channel").sendEvent("redis_channel", data);
    }
}

 

   RedisSub類

package com.oy;

import java.util.Date;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

@Service
public class RedisSub implements MessageListener {
    private static String msg;
    private String redisStr;

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public RedisTemplate<String, Object> getRedisTemplate() {
        return redisTemplate;
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String msg = (String) redisTemplate.getValueSerializer().deserialize(message.getBody());
        String channel = (String) redisTemplate.getValueSerializer().deserialize(message.getChannel());

        if (null != channel && !"".equals(channel) && null != msg && !"".equals(msg)) {
            
            // 相同數據不push
            if(redisStr == null) {
                UtilFunctions.log.info("===== redisStr == null =====");
                redisStr = msg;
            } else if (redisStr.equals(msg)) {
                UtilFunctions.log.info("===== redisStr is same =====");
                return ;
            } else {
                UtilFunctions.log.info("===== redisStr:{} =====", redisStr);
                redisStr = msg;
            }
            
            JSONObject data = new JSONObject();
            JSONObject json = JSON.parseObject(msg);
            data.put(json.getString("contract"), json);
            
            JSONObject jsonObj = new JSONObject();
            jsonObj.put("channel", channel);
            jsonObj.put("timestamp", new Date().getTime());
            jsonObj.put("data", data);
            
            MessageEventHandler.sendAllEvent2(jsonObj.toJSONString());
            RedisSub.msg = jsonObj.toJSONString();
            UtilFunctions.log.info("message from channel:{}, msg:{} ", channel, jsonObj.toJSONString());
        }

    }

    public static String getMsg() {
        return msg;
    }

    public static void setMsg(String msg) {
        RedisSub.msg = msg;
    }
}

  

  客戶端html頁面及測試參考:http://www.javashuo.com/article/p-ohfadqey-a.html

相關文章
相關標籤/搜索