基於SpringBoot,藉助Netty控制長連接,使用WebSocket協議作一個實時的聊天室。java
項目統一登陸路徑: http://localhost:8080/chat/netty
用戶名隨機生成,離線調用異步方法,數據寫操做,登陸顯示歷史聊天消息
項目名:InChat
項目地址:https://github.com/UncleCatMy...
項目介紹:基於Netty4與SpringBoot,聊天室WebSocket(文字圖片)加API調用Netty長連接執行發送消息(在線數、用戶列表)、Iot物聯網-MQTT協議、TCP/IP協議單片機通訊,異步存儲聊天數據mysql
public class RandomNameUtil { private static Random ran = new Random(); private final static int delta = 0x9fa5 - 0x4e00 + 1; public static char getName(){ return (char)(0x4e00 + ran.nextInt(delta)); } }
spring: datasource: driver-class-name: com.mysql.jdbc.Driver username: root password: root url: jdbc:mysql://localhost:3306/nettychat?characterEncoding=utf-8&useSSL=false jpa: show-sql: true netty: port: 8090 #監聽端口 bossThread: 2 #線程數 workerThread: 2 #線程數 keepalive: true #保持鏈接 backlog: 100
SET FOREIGN_KEY_CHECKS=0; -- ---------------------------- -- Table structure for user_msg -- ---------------------------- DROP TABLE IF EXISTS `user_msg`; CREATE TABLE `user_msg` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `msg` varchar(255) DEFAULT NULL, `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4; -- ---------------------------- -- Records of user_msg -- ---------------------------- INSERT INTO `user_msg` VALUES ('1', '亪', '今天不開心', '2018-08-14 14:26:02', '2018-08-14 14:26:02'); INSERT INTO `user_msg` VALUES ('2', '祐', '不錯呀', '2018-08-14 15:09:40', '2018-08-14 15:09:40'); INSERT INTO `user_msg` VALUES ('3', '搈', '開心 開心', '2018-08-14 15:09:40', '2018-08-14 15:09:40'); INSERT INTO `user_msg` VALUES ('4', '兇', '能夠的,後面再作個深刻一點的', '2018-08-14 15:18:35', '2018-08-14 15:18:35'); INSERT INTO `user_msg` VALUES ('5', '倎', '開源這個項目', '2018-08-14 15:18:35', '2018-08-14 15:18:35'); INSERT INTO `user_msg` VALUES ('6', '蝡', '1-someting', '2018-08-14 15:24:28', '2018-08-14 15:24:28'); INSERT INTO `user_msg` VALUES ('7', '弔', '不行呀', '2018-08-14 15:24:29', '2018-08-14 15:24:29'); INSERT INTO `user_msg` VALUES ('8', '習', '能夠的', '2018-08-14 15:26:03', '2018-08-14 15:26:03'); INSERT INTO `user_msg` VALUES ('9', '蔫', '開源這個項目', '2018-08-14 15:26:03', '2018-08-14 15:26:03');
@Data @Entity @DynamicUpdate public class UserMsg implements Serializable { private static final long serialVersionUID = 4133316147283239759L; @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Integer id; private String name; private String msg; private Date createTime; private Date updateTime; }
public interface UserMsgRepository extends JpaRepository<UserMsg,Integer> { //本次未使用到自定義方法,JPA原生便可 }
我沒有去配置虛擬機環境,就本地模擬了git
保存用戶名稱與連接隨機ID
@Component public class LikeRedisTemplate { private Map<Object,Object> RedisMap = new ConcurrentHashMap<>(); public void save(Object id,Object name){ RedisMap.put(id,name); } public void delete(Object id){ RedisMap.remove(id); } public Object get(Object id){ return RedisMap.get(id); } }
聊天內容臨時存儲
@Component public class LikeSomeCacheTemplate { private Set<UserMsg> SomeCache = new LinkedHashSet<>(); public void save(Object user,Object msg){ UserMsg userMsg = new UserMsg(); userMsg.setName(String.valueOf(user)); userMsg.setMsg(String.valueOf(msg)); SomeCache.add(userMsg); } public Set<UserMsg> cloneCacheMap(){ return SomeCache; } public void clearCacheMap(){ SomeCache.clear(); } }
@Component public class MsgAsyncTesk { @Autowired private LikeSomeCacheTemplate cacheTemplate; @Autowired private UserMsgRepository userMsgRepository; @Async public Future<Boolean> saveChatMsgTask() throws Exception{ // System.out.println("啓動異步任務"); Set<UserMsg> set = cacheTemplate.cloneCacheMap(); for (UserMsg item:set){ //保存用戶消息 userMsgRepository.save(item); } //清空臨時緩存 cacheTemplate.clearCacheMap(); return new AsyncResult<>(true); } }
@Data @Component @ConfigurationProperties(prefix = "netty") public class NettyAccountConfig { private int port; private int bossThread; private int workerThread; private boolean keepalive; private int backlog; }
@Component @Qualifier("textWebSocketFrameHandler") @ChannelHandler.Sharable public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{ public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Autowired private LikeRedisTemplate redisTemplate; @Autowired private LikeSomeCacheTemplate cacheTemplate; @Autowired private MsgAsyncTesk msgAsyncTesk; @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { Channel incoming = ctx.channel(); String uName = String.valueOf(redisTemplate.get(incoming.id())); for (Channel channel : channels) { //將當前每一個聊天內容進行存儲 System.out.println("存儲數據:"+uName+"-"+msg.text()); cacheTemplate.save(uName,msg.text()); if (channel != incoming){ channel.writeAndFlush(new TextWebSocketFrame("[" + uName + "]" + msg.text())); } else { channel.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text() )); } } } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress()); String uName = String.valueOf(RandomNameUtil.getName()); //用來獲取一個隨機的用戶名,能夠用其餘方式代替 //新用戶接入 Channel incoming = ctx.channel(); for (Channel channel : channels) { channel.writeAndFlush(new TextWebSocketFrame("[新用戶] - " + uName + " 加入")); } redisTemplate.save(incoming.id(),uName); //存儲用戶 channels.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); String uName = String.valueOf(redisTemplate.get(incoming.id())); //用戶離開 for (Channel channel : channels) { channel.writeAndFlush(new TextWebSocketFrame("[用戶] - " + uName + " 離開")); } redisTemplate.delete(incoming.id()); //刪除用戶 channels.remove(ctx.channel()); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("用戶:"+redisTemplate.get(incoming.id())+"在線"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("用戶:"+redisTemplate.get(incoming.id())+"掉線"); msgAsyncTesk.saveChatMsgTask(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel incoming = ctx.channel(); System.out.println("用戶:" + redisTemplate.get(incoming.id()) + "異常"); cause.printStackTrace(); ctx.close(); } }
@Component @Qualifier("somethingChannelInitializer") public class NettyWebSocketChannelInitializer extends ChannelInitializer<SocketChannel> { @Autowired private TextWebSocketFrameHandler textWebSocketFrameHandler; @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); pipeline.addLast(textWebSocketFrameHandler); //這裏不能使用new,否則在handler中不能注入依賴 } }
@Component public class NettyConfig { @Autowired private NettyAccountConfig nettyAccountConfig; @Bean(name = "bossGroup", destroyMethod = "shutdownGracefully") public NioEventLoopGroup bossGroup(){ return new NioEventLoopGroup(nettyAccountConfig.getBossThread()); } @Bean(name = "workerGroup", destroyMethod = "shutdownGracefully") public NioEventLoopGroup workerGroup(){ return new NioEventLoopGroup(nettyAccountConfig.getWorkerThread()); } @Bean(name = "tcpSocketAddress") public InetSocketAddress tcpPost(){ return new InetSocketAddress(nettyAccountConfig.getPort()); } @Bean(name = "tcpChannelOptions") public Map<ChannelOption<?>, Object> tcpChannelOptions(){ Map<ChannelOption<?>, Object> options = new HashMap<ChannelOption<?>, Object>(); options.put(ChannelOption.SO_KEEPALIVE, nettyAccountConfig.isKeepalive()); options.put(ChannelOption.SO_BACKLOG, nettyAccountConfig.getBacklog()); return options; } @Autowired @Qualifier("somethingChannelInitializer") private NettyWebSocketChannelInitializer nettyWebSocketChannelInitializer; @Bean(name = "serverBootstrap") public ServerBootstrap bootstrap(){ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup(), workerGroup()) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(nettyWebSocketChannelInitializer); Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions(); Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet(); for (@SuppressWarnings("rawtypes") ChannelOption option : keySet) { b.option(option, tcpChannelOptions.get(option)); } return b; } }
@Data @Component public class TCPServer { @Autowired @Qualifier("serverBootstrap") private ServerBootstrap serverBootstrap; @Autowired @Qualifier("tcpSocketAddress") private InetSocketAddress tcpPort; private Channel serverChannel; public void start() throws Exception { serverChannel = serverBootstrap.bind(tcpPort).sync().channel().closeFuture().sync().channel(); } @PreDestroy public void stop() throws Exception { serverChannel.close(); serverChannel.parent().close(); } }
@SpringBootApplication @EnableScheduling //啓動異步任務 public class NettychatApplication { public static void main(String[] args) throws Exception{ ConfigurableApplicationContext context = SpringApplication.run(NettychatApplication.class, args); //注入NettyConfig 獲取對應Bean NettyConfig nettyConfig = context.getBean(NettyConfig.class); //注入TCPServer 獲取對應Bean TCPServer tcpServer = context.getBean(TCPServer.class); //啓動websocket的服務 tcpServer.start(); } }
項目名:InChat
項目地址:https://github.com/UncleCatMy...
項目介紹:基於Netty4與SpringBoot,聊天室WebSocket(文字圖片)加API調用Netty長連接執行發送消息(在線數、用戶列表)、Iot物聯網-MQTT協議、TCP/IP協議單片機通訊,異步存儲聊天數據github
若是本文對你有所幫助,歡迎關注我的技術公衆號
web