衆所周知,推送和 IM 在 Android 應用中很常見,但真正本身去實現的比較少,咱們大多會去選擇第三方提供的成熟方案,如極光推送、雲信等,由於移動網絡具備不肯定性,所以本身實現一套穩定的方案會耗費不少精力,這對於小公司來講是得不償失的。git
推送和 IM 咱們平時用的不少,但真正瞭解原理的很少,真正動手實現過的很少。推送和 IM 本質上都是長鏈接,無非是業務方向不一樣,所以咱們如下統稱爲長鏈接。今天咱們一塊兒來揭開長鏈接的神祕面紗。github
雖然不少人都對 netty 比較熟悉了,可是可能仍是有不瞭解的同窗,所以咱們先簡單介紹下 netty。編程
Netty是由 JBOSS 開發的一個 Java 開源框架api
Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.promise
Netty是一個異步事件驅動的網絡應用程序框架,用於快速開發可維護的高性能協議服務器和客戶端。緩存
這段簡介摘自 netty 官網,是對 netty 的高度歸納。已經幫大家翻譯好了 ^ _ ^bash
Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.
'Quick and easy' doesn't mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences earned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.服務器Netty是一個NIO客戶端服務器框架,能夠快速簡單地開發協議服務器和客戶端等網絡應用程序。 它極大地簡化和簡化了TCP和UDP套接字服務器等網絡編程。
「快速而簡單」並不意味着由此產生的應用程序將受到可維護性或性能問題的困擾。 Netty的設計經驗很是豐富,包括FTP,SMTP,HTTP以及各類基於二進制和文本的傳統協議。 所以,Netty已經成功地找到了一個方法來實現輕鬆的開發,性能,穩定性和靈活性,而不用妥協。網絡
一複製就停不下來了 =。= 主要是以爲官網介紹的很準確。併發
這裏提到了 事件驅動
,可能你們以爲有點陌生,事件驅動其實很簡單,好比你點了下鼠標,軟件執行相應的操做,這就是一個事件驅動模型,再舉一個例子,Android 中的 Message Looper Handler 也是事件驅動,經過 Handler 發送一個消息,這個消息就至關於一個事件,Looper 取出事件,再由 Handler 處理。
這些特性就使得 netty 很適合用於高併發的長鏈接。
今天,咱們就一塊兒使用 netty 實現一個 Android IM,包括客戶端和服務端。
做爲一個 IM 應用,咱們須要識別用戶,客戶端創建長鏈接後須要彙報本身的信息,服務器驗證經過後將其緩存起來,代表該用戶在線。
客戶端是一個一個的個體,服務器做爲中轉,好比,A 給 B 發送消息,A 先把消息發送到服務器,並告訴服務器這條消息要發給誰,而後服務器把消息發送給 B。
服務器在收到消息後能夠對消息進行存儲,若是 B 不在線,就等 B 上線後再將消息發送過去。
新建一個項目
添加 netty 依賴
implementation 'io.netty:netty-all:4.1.9.Final'
複製代碼
netty 已經出了 5.x 的測試版,爲了穩定,咱們使用最新穩定版。
// 修改成本身的主機和端口
private static final String HOST = "10.240.78.82";
private static final int PORT = 8300;
private SocketChannel socketChannel;
NioEventLoopGroup group = new NioEventLoopGroup();
new Bootstrap()
.channel(NioSocketChannel.class)
.group(group)
.option(ChannelOption.TCP_NODELAY, true) // 不延遲,直接發送
.option(ChannelOption.SO_KEEPALIVE, true) // 保持長鏈接狀態
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(0, 30, 0));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new ChannelHandle());
}
})
.connect(new InetSocketAddress(HOST, PORT))
.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
// 鏈接成功
socketChannel = (SocketChannel) future.channel();
} else {
Log.e(TAG, "connect failed");
// 這裏必定要關閉,否則一直重試會引起OOM
future.channel().close();
group.shutdownGracefully();
}
});
複製代碼
LoginInfo loginInfo = new LoginInfo();
loginInfo.setAccount(account);
loginInfo.setToken(token);
CMessage loginMsg = new CMessage();
loginMsg.setFrom(account);
loginMsg.setType(MsgType.LOGIN);
loginMsg.setContent(loginInfo.toJson());
socketChannel.writeAndFlush(loginMsg.toJson())
.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
// 發送成功,等待服務器響應
} else {
// 發送成功
close(); // 關閉鏈接,節約資源
}
});
複製代碼
private class ChannelHandle extends SimpleChannelInboundHandler<String> {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
// 鏈接失效
PushService.this.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.WRITER_IDLE) {
// 空閒了,發個心跳吧
CMessage message = new CMessage();
message.setFrom(myInfo.getAccount());
message.setType(MsgType.PING);
ctx.writeAndFlush(message.toJson());
}
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Gson gson = new Gson();
CMessage message = gson.fromJson(msg, CMessage.class);
if (message.getType() == MsgType.LOGIN) {
// 服務器返回登陸結果
} else if (message.getType() == MsgType.PING) {
Log.d(TAG, "receive ping from server");
// 收到服務器迴應的心跳
} else if (message.getType() == MsgType.TEXT) {
Log.d(TAG, "receive text message " + message.getContent());
// 收到消息
}
ReferenceCountUtil.release(msg);
}
}
複製代碼
這些代碼要長期在後臺執行,所以咱們放在 Service 中。
新建一個 Android Library 模塊做爲服務端,添加一樣的依賴
new ServerBootstrap()
.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.TCP_NODELAY, true) // 不延遲,直接發送
.childOption(ChannelOption.SO_KEEPALIVE, true) // 保持長鏈接狀態
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new NettyServerHandler());
}
})
.bind(port)
.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
System.out.println("netty server start");
} else {
System.out.println("netty server start failed");
}
});
複製代碼
public class NettyServerHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelInactive(ChannelHandlerContext ctx) {
// Channel失效,從Map中移除
NettyChannelMap.remove(ctx.channel());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
Gson gson = new Gson();
CMessage message = gson.fromJson(msg, CMessage.class);
if (message.getType() == MsgType.PING) {
System.out.println("received ping from " + message.getFrom());
// 收到 Ping,迴應一下
Channel channel = NettyChannelMap.get(message.getFrom());
if (channel != null) {
channel.writeAndFlush(message.toJson());
}
} else if (message.getType() == MsgType.LOGIN) {
// 用戶登陸
LoginInfo loginInfo = gson.fromJson(message.getContent(), LoginInfo.class);
if (UserManager.get().verify(loginInfo)) {
loginInfo.setCode(200);
loginInfo.setMsg("success");
message.setContent(loginInfo.toJson());
ctx.channel().writeAndFlush(message.toJson());
NettyChannelMap.add(loginInfo.getAccount(), ctx.channel());
System.out.println(loginInfo.getAccount() + " login");
} else {
loginInfo.setCode(400);
loginInfo.setMsg("用戶名或密碼錯誤");
message.setContent(loginInfo.toJson());
ctx.channel().writeAndFlush(message.toJson());
}
} else if (message.getType() == MsgType.TEXT) {
// 發送消息
Channel channel = NettyChannelMap.get(message.getTo());
if (channel != null) {
channel.isWritable();
channel.writeAndFlush(message.toJson()).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
System.out.println("send msg to " + message.getTo() + " failed");
}
});
}
}
ReferenceCountUtil.release(msg);
}
}
複製代碼
已登陸的用戶緩存在 NettyChannelMap 中。
這裏能夠加入離線消息緩存邏輯,若是消息發送失敗,須要緩存起來,等待用戶上線後再發送。
若是服務端在本機運行,須要和客戶端在同一個局域網,若是是在公網運行則不須要。
只看上面的代碼可能仍是有點懵逼,建議你們跑一下源碼,會對 netty 有一個更清晰的認識。 github.com/wangchenyan…
今天咱們一塊兒認識了 netty,並使用 netty 實現了一個簡單的 IM 應用。這裏咱們僅僅實現了 IM 核心功能,其餘好比保活機制、斷線重連不在本文討論範圍以內。
咱們今天實現的長鏈接和第三方長鏈接服務商提供的長鏈接服務其實並沒有太大差別,無非是後者具備成熟的保活、短線重連機制。
讀完本文,是否以爲長鏈接其實也沒那麼神祕?
可是不要驕傲,咱們今天學習的只是最簡單的用法,這只是皮毛,要想徹底瞭解其中的原理仍是要花費不少功夫的。
遷移自個人簡書 2017.12.27