一、Redis 發佈訂閱
參考:https://www.runoob.com/redis/redis-pub-sub.htmlhtml
Redis 發佈訂閱(pub/sub)是一種消息通訊模式:發送者(pub)發送消息,訂閱者(sub)接收消息。java
Redis 客戶端能夠訂閱任意數量的頻道。git
二、案例:netty-socketio和redis實現發佈/訂閱功能
本Demo實現:netty-socketio實現訂閱(參考:https://www.cnblogs.com/xy-ouyang/p/10675904.html),redis實現推送消息。本demo保存地址:https://github.com/wenbinouyang/oy_javagithub
demo使用 springboot 2.1.3.RELEASE,項目整體結構:redis
pom.xmlspring
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.oy</groupId> <artifactId>nettysocketio007</artifactId> <version>0.0.1</version> <name>nettysocketio008</name> <description>nettysocketio008 for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <!-- spring boot start --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- netty-socketio server --> <dependency> <groupId>com.corundumstudio.socketio</groupId> <artifactId>netty-socketio</artifactId> <version>1.7.11</version> </dependency> <!-- springboot data redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <!-- exclusion lettuce,jedis --> <exclusions> <exclusion> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </exclusion> <exclusion> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> </exclusion> </exclusions> </dependency> <!-- jedis --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <!-- jedis pool dependency --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
application.propertiesapache
logging.file=/home/wwwlogs/nettysocketio008/log.log #redis #spring.redis.host=127.0.0.1 spring.redis.host=47.244.48.230 spring.redis.port=6379 spring.redis.password=Redis0929 spring.redis.jedis.pool.maxActive=8 spring.redis.jedis.pool.max-idle=8 spring.redis.jedis.pool.min-idle=0 spring.redis.timeout=0
Nettysocketio008Application類json
package com.oy; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.core.annotation.Order; import com.corundumstudio.socketio.Configuration; import com.corundumstudio.socketio.SocketIOServer; import com.corundumstudio.socketio.annotation.SpringAnnotationScanner; @SpringBootApplication @Order(1) public class Nettysocketio008Application implements CommandLineRunner { private SocketIOServer server; public static void main(String[] args) { SpringApplication.run(Nettysocketio008Application.class, args); } @Bean public SocketIOServer socketIOServer() { Configuration config = new Configuration(); config.setHostname("localhost"); config.setPort(4001); this.server = new SocketIOServer(config); return server; } @Bean public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) { return new SpringAnnotationScanner(socketServer); } @Override public void run(String... args) throws Exception { server.start(); UtilFunctions.log.info("socket.io run success!"); // 向"channel_1" push數據 // Service.send(args); } }
RedisConfig類tomcat
package com.oy; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.Topic; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @Configuration public class RedisConfig extends CachingConfigurerSupport { public static final Logger log = LoggerFactory.getLogger(RedisConfig.class); @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisSerializer<String> redisSerializer = new StringRedisSerializer(); // RedisTemplate RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(factory); template.setKeySerializer(redisSerializer); template.setValueSerializer(redisSerializer); template.setHashKeySerializer(redisSerializer); template.setHashValueSerializer(redisSerializer); template.afterPropertiesSet(); return template; } @Bean(destroyMethod = "destroy") public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory, MessageListener redisMessageListener) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(factory); ThreadPoolTaskScheduler taskExecutor = new ThreadPoolTaskScheduler(); taskExecutor.setPoolSize(10); taskExecutor.initialize(); container.setTaskExecutor(taskExecutor); Map<MessageListener, Collection<? extends Topic>> listeners = new HashMap<>(); List<Topic> list = new ArrayList<>(); list.add(new ChannelTopic("cfd_md")); listeners.put(redisMessageListener, list); container.setMessageListeners(listeners); return container; } }
MessageEventHandler類springboot
package com.oy; import java.util.Set; import java.util.UUID; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.corundumstudio.socketio.AckRequest; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.SocketIOServer; import com.corundumstudio.socketio.annotation.OnConnect; import com.corundumstudio.socketio.annotation.OnDisconnect; import com.corundumstudio.socketio.annotation.OnEvent; @Component public class MessageEventHandler { public static SocketIOServer socketIoServer; @Autowired public MessageEventHandler(SocketIOServer server) { MessageEventHandler.socketIoServer = server; } @OnConnect public void onConnect(SocketIOClient client) { UUID socketSessionId = client.getSessionId(); String ip = client.getRemoteAddress().toString(); UtilFunctions.log.info("client connect, socketSessionId:{}, ip:{}", socketSessionId, ip); } @OnEvent("sub") public void sub(SocketIOClient client, AckRequest request, String channel) { UUID socketSessionId = client.getSessionId(); String ip = client.getRemoteAddress().toString(); client.joinRoom(channel); UtilFunctions.log.info("client sub, channel:{}, socketSessionId:{}, ip:{}", channel, socketSessionId, ip); Set<String> rooms = client.getAllRooms(); for (String room : rooms) { UtilFunctions.log.info("after client connect, room:{}", room); } // 客戶端一訂閱,就立刻push一次 if ("channel_1".equals(channel)) { sendAllEvent(Service.getMsg()); } else if ("redis_channel".equals(channel)) { sendAllEvent(RedisSub.getMsg()); } } // @OnEvent("unsub") // public void unsub(SocketIOClient client, AckRequest request, String channel) { // UUID socketSessionId = client.getSessionId(); // String ip = client.getRemoteAddress().toString(); // client.leaveRoom(channel); // UtilFunctions.log.info("client unsub, channel:{}, socketSessionId:{}, ip:{}", channel, socketSessionId, ip); // } @OnDisconnect public void onDisconnect(SocketIOClient client) { UUID socketSessionId = client.getSessionId(); String ip = client.getRemoteAddress().toString(); UtilFunctions.log.info("client disconnect, socketSessionId:{}, ip:{}", socketSessionId, ip); Set<String> rooms = client.getAllRooms(); for (String room : rooms) { UtilFunctions.log.info("after client disconnect, room:{}", room); } } // broadcast to channel "channel_1" public static void sendAllEvent(String data) { socketIoServer.getRoomOperations("channel_1").sendEvent("channel_1", data); } // broadcast to channel "redis_channel" public static void sendAllEvent2(String data) { socketIoServer.getRoomOperations("redis_channel").sendEvent("redis_channel", data); } }
RedisSub類
package com.oy; import java.util.Date; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; @Service public class RedisSub implements MessageListener { private static String msg; private String redisStr; @Autowired private RedisTemplate<String, Object> redisTemplate; public RedisTemplate<String, Object> getRedisTemplate() { return redisTemplate; } @Override public void onMessage(Message message, byte[] pattern) { String msg = (String) redisTemplate.getValueSerializer().deserialize(message.getBody()); String channel = (String) redisTemplate.getValueSerializer().deserialize(message.getChannel()); if (null != channel && !"".equals(channel) && null != msg && !"".equals(msg)) { // 相同數據不push if(redisStr == null) { UtilFunctions.log.info("===== redisStr == null ====="); redisStr = msg; } else if (redisStr.equals(msg)) { UtilFunctions.log.info("===== redisStr is same ====="); return ; } else { UtilFunctions.log.info("===== redisStr:{} =====", redisStr); redisStr = msg; } JSONObject data = new JSONObject(); JSONObject json = JSON.parseObject(msg); data.put(json.getString("contract"), json); JSONObject jsonObj = new JSONObject(); jsonObj.put("channel", channel); jsonObj.put("timestamp", new Date().getTime()); jsonObj.put("data", data); MessageEventHandler.sendAllEvent2(jsonObj.toJSONString()); RedisSub.msg = jsonObj.toJSONString(); UtilFunctions.log.info("message from channel:{}, msg:{} ", channel, jsonObj.toJSONString()); } } public static String getMsg() { return msg; } public static void setMsg(String msg) { RedisSub.msg = msg; } }
客戶端html頁面及測試參考:https://www.cnblogs.com/xy-ouyang/p/10675904.html