netty-socketio(二)整合redis實現發佈訂閱

一、Redis 發佈訂閱

參考:https://www.runoob.com/redis/redis-pub-sub.htmlhtml

Redis 發佈訂閱(pub/sub)是一種消息通訊模式:發送者(pub)發送消息,訂閱者(sub)接收消息。java

Redis 客戶端能夠訂閱任意數量的頻道。git

二、案例:netty-socketio和redis實現發佈/訂閱功能

  本Demo實現:netty-socketio實現訂閱(參考:https://www.cnblogs.com/xy-ouyang/p/10675904.html),redis實現推送消息。本demo保存地址:https://github.com/wenbinouyang/oy_javagithub

  demo使用 springboot 2.1.3.RELEASE,項目整體結構:redis

  

  pom.xmlspring

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.oy</groupId>
    <artifactId>nettysocketio007</artifactId>
    <version>0.0.1</version>
    <name>nettysocketio008</name>
    <description>nettysocketio008 for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <!-- spring boot start -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        
        <!-- netty-socketio server -->
        <dependency>
            <groupId>com.corundumstudio.socketio</groupId>
            <artifactId>netty-socketio</artifactId>
            <version>1.7.11</version>
        </dependency>
        
        <!-- springboot data redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <!-- exclusion lettuce,jedis -->
            <exclusions>
                <exclusion>
                    <groupId>redis.clients</groupId>
                    <artifactId>jedis</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>io.lettuce</groupId>
                    <artifactId>lettuce-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- jedis -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>
        
        <!-- jedis pool dependency -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

 

  application.propertiesapache

logging.file=/home/wwwlogs/nettysocketio008/log.log #redis #spring.redis.host=127.0.0.1 spring.redis.host=47.244.48.230 spring.redis.port=6379 spring.redis.password=Redis0929 spring.redis.jedis.pool.maxActive=8 spring.redis.jedis.pool.max-idle=8 spring.redis.jedis.pool.min-idle=0 spring.redis.timeout=0

 

   Nettysocketio008Application類json

package com.oy; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.core.annotation.Order; import com.corundumstudio.socketio.Configuration; import com.corundumstudio.socketio.SocketIOServer; import com.corundumstudio.socketio.annotation.SpringAnnotationScanner; @SpringBootApplication @Order(1) public class Nettysocketio008Application implements CommandLineRunner { private SocketIOServer server; public static void main(String[] args) { SpringApplication.run(Nettysocketio008Application.class, args); } @Bean public SocketIOServer socketIOServer() { Configuration config = new Configuration(); config.setHostname("localhost"); config.setPort(4001); this.server = new SocketIOServer(config); return server; } @Bean public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) { return new SpringAnnotationScanner(socketServer); } @Override public void run(String... args) throws Exception { server.start(); UtilFunctions.log.info("socket.io run success!"); // 向"channel_1" push數據 // Service.send(args);
 } }

 

   RedisConfig類tomcat

package com.oy; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.Topic; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @Configuration public class RedisConfig extends CachingConfigurerSupport { public static final Logger log = LoggerFactory.getLogger(RedisConfig.class); @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisSerializer<String> redisSerializer = new StringRedisSerializer(); // RedisTemplate
        RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(factory); template.setKeySerializer(redisSerializer); template.setValueSerializer(redisSerializer); template.setHashKeySerializer(redisSerializer); template.setHashValueSerializer(redisSerializer); template.afterPropertiesSet(); return template; } @Bean(destroyMethod = "destroy") public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory, MessageListener redisMessageListener) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(factory); ThreadPoolTaskScheduler taskExecutor = new ThreadPoolTaskScheduler(); taskExecutor.setPoolSize(10); taskExecutor.initialize(); container.setTaskExecutor(taskExecutor); Map<MessageListener, Collection<? extends Topic>> listeners = new HashMap<>(); List<Topic> list = new ArrayList<>(); list.add(new ChannelTopic("cfd_md")); listeners.put(redisMessageListener, list); container.setMessageListeners(listeners); return container; } }

 

  MessageEventHandler類springboot

package com.oy; import java.util.Set; import java.util.UUID; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.corundumstudio.socketio.AckRequest; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.SocketIOServer; import com.corundumstudio.socketio.annotation.OnConnect; import com.corundumstudio.socketio.annotation.OnDisconnect; import com.corundumstudio.socketio.annotation.OnEvent; @Component public class MessageEventHandler { public static SocketIOServer socketIoServer; @Autowired public MessageEventHandler(SocketIOServer server) { MessageEventHandler.socketIoServer = server; } @OnConnect public void onConnect(SocketIOClient client) { UUID socketSessionId = client.getSessionId(); String ip = client.getRemoteAddress().toString(); UtilFunctions.log.info("client connect, socketSessionId:{}, ip:{}", socketSessionId, ip); } @OnEvent("sub") public void sub(SocketIOClient client, AckRequest request, String channel) { UUID socketSessionId = client.getSessionId(); String ip = client.getRemoteAddress().toString(); client.joinRoom(channel); UtilFunctions.log.info("client sub, channel:{}, socketSessionId:{}, ip:{}", channel, socketSessionId, ip); Set<String> rooms = client.getAllRooms(); for (String room : rooms) { UtilFunctions.log.info("after client connect, room:{}", room); } // 客戶端一訂閱,就立刻push一次
        if ("channel_1".equals(channel)) { sendAllEvent(Service.getMsg()); } else if ("redis_channel".equals(channel)) { sendAllEvent(RedisSub.getMsg()); } } // @OnEvent("unsub") // public void unsub(SocketIOClient client, AckRequest request, String channel) { // UUID socketSessionId = client.getSessionId(); // String ip = client.getRemoteAddress().toString(); // client.leaveRoom(channel); // UtilFunctions.log.info("client unsub, channel:{}, socketSessionId:{}, ip:{}", channel, socketSessionId, ip); // }
 @OnDisconnect public void onDisconnect(SocketIOClient client) { UUID socketSessionId = client.getSessionId(); String ip = client.getRemoteAddress().toString(); UtilFunctions.log.info("client disconnect, socketSessionId:{}, ip:{}", socketSessionId, ip); Set<String> rooms = client.getAllRooms(); for (String room : rooms) { UtilFunctions.log.info("after client disconnect, room:{}", room); } } // broadcast to channel "channel_1"
    public static void sendAllEvent(String data) { socketIoServer.getRoomOperations("channel_1").sendEvent("channel_1", data); } // broadcast to channel "redis_channel"
    public static void sendAllEvent2(String data) { socketIoServer.getRoomOperations("redis_channel").sendEvent("redis_channel", data); } }

 

   RedisSub類

package com.oy; import java.util.Date; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; @Service public class RedisSub implements MessageListener { private static String msg; private String redisStr; @Autowired private RedisTemplate<String, Object> redisTemplate; public RedisTemplate<String, Object> getRedisTemplate() { return redisTemplate; } @Override public void onMessage(Message message, byte[] pattern) { String msg = (String) redisTemplate.getValueSerializer().deserialize(message.getBody()); String channel = (String) redisTemplate.getValueSerializer().deserialize(message.getChannel()); if (null != channel && !"".equals(channel) && null != msg && !"".equals(msg)) { // 相同數據不push
            if(redisStr == null) { UtilFunctions.log.info("===== redisStr == null ====="); redisStr = msg; } else if (redisStr.equals(msg)) { UtilFunctions.log.info("===== redisStr is same ====="); return ; } else { UtilFunctions.log.info("===== redisStr:{} =====", redisStr); redisStr = msg; } JSONObject data = new JSONObject(); JSONObject json = JSON.parseObject(msg); data.put(json.getString("contract"), json); JSONObject jsonObj = new JSONObject(); jsonObj.put("channel", channel); jsonObj.put("timestamp", new Date().getTime()); jsonObj.put("data", data); MessageEventHandler.sendAllEvent2(jsonObj.toJSONString()); RedisSub.msg = jsonObj.toJSONString(); UtilFunctions.log.info("message from channel:{}, msg:{} ", channel, jsonObj.toJSONString()); } } public static String getMsg() { return msg; } public static void setMsg(String msg) { RedisSub.msg = msg; } }

  

  客戶端html頁面及測試參考:https://www.cnblogs.com/xy-ouyang/p/10675904.html

相關文章
相關標籤/搜索