以前作過一個IM的項目,裏面涉及了基本的聊天功能,因此注意這系列的文章不是練習,不含基礎和逐步學習的部分,直接開始實戰和思想引導,基礎部分須要額外的去補充,我有精力的話能夠後續出一系列的文章。javascript
爲何第一篇是聊天室,聊天室是最容易實現的部分。也是IM結構最簡單的一部分,其次做單聊和羣聊,業務邏輯層層遞增,完全的拿下聊天室的代碼,進階單聊和羣聊就很簡單了,後續我還會推出直播間的實現。css
若是單純想實現聊天室很簡單,可是我儘可能會把流程都走全,爲了方便理解。html
主要由兩個功能類實現:初始化類+響應處理類前端
添加pom.xmljava
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.2.Final</version>
</dependency>
複製代碼
輔助接口讓服務架構更加清晰,這裏有兩個接口,一個用來處理Http請求,一個處理Webocket請求。web
MyHttpService.javaspring
/**
* 處理 http請求
*/
public interface MyHttpService {
void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request);
}
複製代碼
MyWebSocketService.javanpm
/**
* 處理 WebSocket 請求中的frame
*/
public interface MyWebSocketService {
void handleFrame(ChannelHandlerContext ctx, WebSocketFrame frame);
}
複製代碼
那麼問題來了,誰來實現這兩個類呢,誰來處理這兩種請求的分發呢。bootstrap
下面來看服務響應處理類:WebSocketServerHandler.java瀏覽器
繼承SimpleChannelInboundHandler類,實現
channelRead0()
handlerAdded()
handlerRemoved()
exceptionCaught()
等方法,第一個是必選方法,其餘方法供咱們作一些標記和後續處理。
WebSocketServerHandler.java
@Slf4j
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
private MyHttpService httpService;
private MyWebSocketService webSocketService;
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public WebSocketServerHandler(MyHttpService httpService, MyWebSocketService webSocketService) {
super();
this.httpService = httpService;
this.webSocketService = webSocketService;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
httpService.handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
webSocketService.handleFrame(ctx, (WebSocketFrame) msg);
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
channels.add(ctx.channel());
channels.writeAndFlush(new TextWebSocketFrame(ctx.channel() +"上線了"));
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
channels.remove(ctx.channel());
channels.writeAndFlush(new TextWebSocketFrame(ctx.channel() +"下線了"));
}
/**
* 發生異常時處理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
channels.remove(ctx.channel());
ctx.close();
log.info("異常信息:{}",cause.getMessage());
}
}
複製代碼
handlerAdded()
方法中channels.add(ctx.channel());
,相應的在handlerRemoved
方法中remove
。channelRead0()
方法中,實現了對請求的識別和分別處理。exceptionCaught()
方法爲發生異常時處理。@Slf4j
public class WebSocketServer implements MyHttpService, MyWebSocketService {
/**
* 握手用的 變量
*/
private static final AttributeKey<WebSocketServerHandshaker> ATTR_HAND_SHAKER = AttributeKey.newInstance("ATTR_KEY_CHANNEL_ID");
private static final int MAX_CONTENT_LENGTH = 65536;
/**
* 請求類型常量
*/
private static final String WEBSOCKET_UPGRADE = "websocket";
private static final String WEBSOCKET_CONNECTION = "Upgrade";
private static final String WEBSOCKET_URI_ROOT_PATTERN = "ws://%s:%d";
/**
* 用戶字段
*/
private String host;
private int port;
/**
* 保存 全部的鏈接
*/
private Map<ChannelId, Channel> channelMap = new HashMap<>();
private final String WEBSOCKET_URI_ROOT;
public WebSocketServer(String host, int port) {
this.host = host;
this.port = port;
// 將 ip 和端口 按照格式 賦值給 uri
WEBSOCKET_URI_ROOT = String.format(WEBSOCKET_URI_ROOT_PATTERN, host, port);
}
public void start(){
// 實例化 nio監聽事件池
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 實例化 nio工做線程池
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 啓動器
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pl = channel.pipeline();
// 保存 該channel 到map中
channelMap.put(channel.id(),channel);
log.info("new channel {}",channel);
channel.closeFuture().addListener((ChannelFutureListener) channelFuture -> {
log.info("channel close future {}",channelFuture);
//關閉後 從map中移除
channelMap.remove(channelFuture.channel().id());
});
//添加 http 編解碼
pl.addLast(new HttpServerCodec());
// 聚合器
pl.addLast(new HttpObjectAggregator(MAX_CONTENT_LENGTH));
// 支持大數據流
pl.addLast(new ChunkedWriteHandler());
// 設置 websocket 服務處理方式
pl.addLast(new WebSocketServerHandler(WebSocketServer.this, WebSocketServer.this));
}
});
/**
* 實例化完畢後,須要完成端口綁定
*/
try {
ChannelFuture channelFuture = bootstrap.bind(host,port).addListener((ChannelFutureListener) channelFuture1 -> {
if (channelFuture1.isSuccess()){
log.info("webSocket started");
}
}).sync();
channelFuture.channel().closeFuture().addListener((ChannelFutureListener) channelFuture12 ->
log.info("server channel {} closed.", channelFuture12.channel())).sync();
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("綁定端口失敗");
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
log.info("webSocket shutdown");
}
@Override
public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
//判斷是否是 socket 請求
if (isWebSocketUpgrade(request)){
//若是是webSocket請求
log.info("請求是webSocket協議");
// 獲取子協議
String subProtocols = request.headers().get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL);
//握手工廠 設置 uri+協議+不容許擴展
WebSocketServerHandshakerFactory handshakerFactory = new WebSocketServerHandshakerFactory(WEBSOCKET_URI_ROOT,subProtocols,false);
// 從工廠中實例化一個 握手請求
WebSocketServerHandshaker handshaker = handshakerFactory.newHandshaker(request);
if (handshaker == null){
//握手失敗:不支持的協議
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
}else {
//響應請求:將 握手轉交給 channel處理
handshaker.handshake(ctx.channel(),request);
//將 channel 與 handshaker 綁定
ctx.channel().attr(ATTR_HAND_SHAKER).set(handshaker);
}
return;
}else {
// 不處理 HTTP 請求
log.info("不處理 HTTP 請求");
}
}
@Override
public void handleFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
/**
* text frame handler
*/
if (frame instanceof TextWebSocketFrame){
String text = ((TextWebSocketFrame) frame).text();
TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(text);
log.info("receive textWebSocketFrame from channel: {} , 目前一共有{}個在線",ctx.channel(),channelMap.size());
//發給其它的 channel (羣聊功能)
for (Channel ch: channelMap.values()){
if (ch.equals(ctx.channel())){
continue;
}
//將 text frame 寫出
ch.writeAndFlush(textWebSocketFrame);
log.info("消息已發送給{}",ch);
log.info("write text: {} to channel: {}",textWebSocketFrame,ctx.channel());
}
return;
}
/**
* ping frame , 回覆 pong frame
*/
if (frame instanceof PingWebSocketFrame){
log.info("receive pingWebSocket from channel: {}",ctx.channel());
ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
return;
}
/**
* pong frame, do nothing
*/
if (frame instanceof PongWebSocketFrame){
log.info("receive pongWebSocket from channel: {}",ctx.channel());
return;
}
/**
* close frame, close
*/
if (frame instanceof CloseWebSocketFrame){
log.info("receive closeWebSocketFrame from channel: {}", ctx.channel());
//獲取到握手信息
WebSocketServerHandshaker handshaker = ctx.channel().attr(ATTR_HAND_SHAKER).get();
if (handshaker == null){
log.error("channel: {} has no handShaker", ctx.channel());
return;
}
handshaker.close(ctx.channel(),((CloseWebSocketFrame) frame).retain());
return;
}
/**
* 剩下的都是 二進制 frame ,忽略
*/
log.warn("receive binary frame , ignore to handle");
}
/**
* 判斷是不是 webSocket 請求
*/
private boolean isWebSocketUpgrade(FullHttpRequest req) {
HttpHeaders headers = req.headers();
return req.method().equals(HttpMethod.GET)
&& headers.get(HttpHeaderNames.UPGRADE).contains(WEBSOCKET_UPGRADE)
&& headers.get(HttpHeaderNames.CONNECTION).contains(WEBSOCKET_CONNECTION);
}
}
複製代碼
l.addLast(new WebSocketServerHandler(WebSocketServer.this, WebSocketServer.this));
添加本身的響應處理。WebSocketServerHandler
是第二點實現的請求處理類.private Map<ChannelId, Channel> channelMap = new HashMap<>();
來將ChannelId和CHannel對應保存。方便後來對應獲取。bootstrap.bind(host,port)
也能夠替換成僅bind端口。public ChannelFuture bind(String inetHost, int inetPort) {
return bind(new InetSocketAddress(inetHost, inetPort));
}
複製代碼
public synchronized InetAddress anyLocalAddress() {
if (anyLocalAddress == null) {
anyLocalAddress = new Inet4Address(); // {0x00,0x00,0x00,0x00}
anyLocalAddress.holder().hostName = "0.0.0.0";
}
return anyLocalAddress;
}
複製代碼
它默認會給0.0.0.0
端口開放服務。 4. handleHttpRequest
和handleFrame
是MyWebSocketService
類的一個實現。 5. 各個細節都有註釋,仔細看註釋。
public class Main {
public static void main(String[] args) {
new WebSocketServer("192.168.1.33",9999).start();
}
}
複製代碼
我用的是npm 的一個serve 服務來搞局域網。 官網介紹:www.npmjs.com/package/ser… 個人文章:React打包注意事項及靜態文件服務搭建
這下保證你的手機和電腦都在局域網內,就能夠訪問你本身的羣聊了。
要送就送一套。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>Document</title>
<style type="text/css">
.talk_con{
width:600px;
height:500px;
border:1px solid #666;
margin:50px auto 0;
background:#f9f9f9;
}
.talk_show{
width:580px;
height:420px;
border:1px solid #666;
background:#fff;
margin:10px auto 0;
overflow:auto;
}
.talk_input{
width:580px;
margin:10px auto 0;
}
.whotalk{
width:80px;
height:30px;
float:left;
outline:none;
}
.talk_word{
width:420px;
height:26px;
padding:0px;
float:left;
margin-left:10px;
outline:none;
text-indent:10px;
}
.talk_sub{
width:56px;
height:30px;
float:left;
margin-left:10px;
}
.atalk{
margin:10px;
}
.atalk span{
display:inline-block;
background:#0181cc;
border-radius:10px;
color:#fff;
padding:5px 10px;
}
.btalk{
margin:10px;
text-align:right;
}
.btalk span{
display:inline-block;
background:#ef8201;
border-radius:10px;
color:#fff;
padding:5px 10px;
}
</style>
<script type="text/javascript">
//
document.onkeydown = function (ev) {
if (ev && ev.keyCode == 13){
send();
clear();
}
}
var socket;
if (window.WebSocket) {
socket = new WebSocket("ws://192.168.1.33:9999");
// socket = new WebSocket("ws://127.0.0.1:9999");
// socket = new WebSocket("ws://192.168.43.186:9999");
socket.onmessage = function (ev) {
atalkAppendIn("接收:"+socket.channel + ":" + ev.data)
};
socket.onopen = function () {
btalkAppendIn("鏈接已創建");
}
socket.onclose = function () {
btalkAppendIn("鏈接關閉");
};
}else {
alert("瀏覽器不支持");
}
function send(){
var message = document.getElementById("talkwords");
if (!window.WebSocket){
return
}
if (socket.readyState === WebSocket.OPEN){
socket.send(message.value);
btalkAppendIn("發送:"+ message.value);
clear();
} else {
alert("WebSocket 創建失敗");
}
}
function atalkAppendIn(text) {
var append = document.getElementById("words");
append.innerHTML+= '<div class="atalk"><span>'+ text +'</span></div>';
}
function btalkAppendIn(text) {
var append = document.getElementById("words");
append.innerHTML+= '<div class="btalk"><span>'+ text +'</span></div>';
}
function clear () {
var elementById = document.getElementById("talkwords");
elementById.value = "";
}
</script>
</head>
<body>
<div class="talk_con">
<div class="talk_show" id="words">
</div>
<div class="talk_input">
<!--<select class="whotalk" id="who">-->
<!--<option value="0">A說:</option>-->
<!--<option value="1">B說:</option>-->
<!--</select>-->
<input type="text" class="talk_word" id="talkwords">
<input type="button" onclick="send()" value="發送" class="talk_sub" id="talksub">
</div>
</div>
</body>
</html>
複製代碼
socket = new WebSocket("ws://192.168.1.33:9999");
注意這裏ip和port與服務一一對應。socket.onmessage()
是獲取socket信息。socket.onopen
是建立鏈接。socket.onclose
是關閉鏈接。socket.send(message.value);
是發送socket信息。控制檯輸出:
15:12:42.443 [nioEventLoopGroup-3-6] INFO com.fantj.springbootjpa.netty.WebSocketServer - receive textWebSocketFrame from channel: [id: 0x0d08c657, L:/192.168.1.33:9999 - R:/192.168.1.33:50440] , 目前一共有2個在線
15:12:42.443 [nioEventLoopGroup-3-6] INFO com.fantj.springbootjpa.netty.WebSocketServer - 消息已發送給[id: 0xacd5c1ad, L:/192.168.1.33:9999 - R:/192.168.1.33:50438]
15:12:42.444 [nioEventLoopGroup-3-5] DEBUG io.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder - Encoding WebSocket Frame opCode=1 length=5
15:12:42.443 [nioEventLoopGroup-3-6] INFO com.fantj.springbootjpa.netty.WebSocketServer - write text: TextWebSocketFrame(data: UnpooledUnsafeHeapByteBuf(ridx: 0, widx: 5, cap: 15)) to channel: [id: 0x0d08c657, L:/192.168.1.33:9999 - R:/192.168.1.33:50440]
複製代碼
若是你喜歡個人文章,那麻煩請關注個人公衆號,公衆號重點分析架構師技術,該公衆號還處於初始階段,謝謝你們的支持。
java架構
獲取架構視頻資源(後期還會分享不一樣的優質資源噢)。