最近的一個項目:咱們的系統接收到上游系統的派單任務後,會推送到指定的門店的相關設備,並進行相應的業務處理。java
在接收到派單任務以後,經過 Netty 推送到指定門店相關的設備。在咱們的系統中 Netty 實現了消息推送、長鏈接以及心跳機制。node
每一個 Netty 服務端經過 ConcurrentHashMap 保存了客戶端的 clientId 以及它鏈接的 SocketChannel。bootstrap
服務器端向客戶端發送消息時,只要獲取 clientId 對應的 SocketChannel,往 SocketChannel 裏寫入相應的 message 便可。api
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new MessageEncoder());
p.addLast(new MessageDecoder());
p.addLast(new PushServerHandler());
}
});
ChannelFuture future = bootstrap.bind(host,port).sync();
if (future.isSuccess()) {
logger.info("server start...");
}
複製代碼
客戶端用於接收服務端的消息,隨即進行業務處理。客戶端還有心跳機制,它經過 IdleEvent 事件定時向服務端放送 Ping 消息以此來檢測 SocketChannel 是否中斷。服務器
public PushClientBootstrap(String host, int port) throws InterruptedException {
this.host = host;
this.port = port;
start(host,port);
}
private void start(String host, int port) throws InterruptedException {
bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.group(workGroup)
.remoteAddress(host, port)
.handler(new ChannelInitializer(){
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new IdleStateHandler(20, 10, 0)); // IdleStateHandler 用於檢測心跳
p.addLast(new MessageDecoder());
p.addLast(new MessageEncoder());
p.addLast(new PushClientHandler());
}
});
doConnect(port, host);
}
/** * 創建鏈接,而且能夠實現自動重連. * @param port port. * @param host host. * @throws InterruptedException InterruptedException. */
private void doConnect(int port, String host) throws InterruptedException {
if (socketChannel != null && socketChannel.isActive()) {
return;
}
final int portConnect = port;
final String hostConnect = host;
ChannelFuture future = bootstrap.connect(host, port);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture futureListener) throws Exception {
if (futureListener.isSuccess()) {
socketChannel = (SocketChannel) futureListener.channel();
logger.info("Connect to server successfully!");
} else {
logger.info("Failed to connect to server, try connect after 10s");
futureListener.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
doConnect(portConnect, hostConnect);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 10, TimeUnit.SECONDS);
}
}
}).sync();
}
複製代碼
服務註冊本質上是爲了解耦服務提供者和服務消費者。服務註冊是一個高可用強一致性的服務發現存儲倉庫,主要用來存儲服務的api和地址對應關係。爲了高可用,服務註冊中心通常爲一個集羣,而且可以保證分佈式一致性。目前經常使用的有 ZooKeeper、Etcd 等等。負載均衡
在咱們項目中採用了 ZooKeeper 實現服務註冊。dom
public class ServiceRegistry {
private static final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class);
private CountDownLatch latch = new CountDownLatch(1);
private String registryAddress;
public ServiceRegistry(String registryAddress) {
this.registryAddress = registryAddress;
}
public void register(String data) {
if (data != null) {
ZooKeeper zk = connectServer();
if (zk != null) {
createNode(zk, data);
}
}
}
/** * 鏈接 zookeeper 服務器 * @return */
private ZooKeeper connectServer() {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(registryAddress, Constants.ZK_SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
latch.await();
} catch (IOException | InterruptedException e) {
logger.error("", e);
}
return zk;
}
/** * 建立節點 * @param zk * @param data */
private void createNode(ZooKeeper zk, String data) {
try {
byte[] bytes = data.getBytes();
String path = zk.create(Constants.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
logger.debug("create zookeeper node ({} => {})", path, data);
} catch (KeeperException | InterruptedException e) {
logger.error("", e);
}
}
}
複製代碼
有了服務註冊,在 Netty 服務端啓動以後,將 Netty 服務端的 ip 和 port 註冊到 ZooKeeper。socket
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new MessageEncoder());
p.addLast(new MessageDecoder());
p.addLast(new PushServerHandler());
}
});
ChannelFuture future = bootstrap.bind(host,port).sync();
if (future.isSuccess()) {
logger.info("server start...");
}
if (serviceRegistry != null) {
serviceRegistry.register(host + ":" + port);
}
複製代碼
這裏咱們採用的是客戶端的服務發現,即服務發現機制由客戶端實現。分佈式
客戶端在和服務端創建鏈接以前,經過查詢註冊中心的方式來獲取服務端的地址。若是存在有多個 Netty 服務端的話,能夠作服務的負載均衡。在咱們的項目中只採用了簡單的隨機法進行負載。ide
public class ServiceDiscovery {
private static final Logger logger = LoggerFactory.getLogger(ServiceDiscovery.class);
private CountDownLatch latch = new CountDownLatch(1);
private volatile List<String> serviceAddressList = new ArrayList<>();
private String registryAddress; // 註冊中心的地址
public ServiceDiscovery(String registryAddress) {
this.registryAddress = registryAddress;
ZooKeeper zk = connectServer();
if (zk != null) {
watchNode(zk);
}
}
/** * 經過服務發現,獲取服務提供方的地址 * @return */
public String discover() {
String data = null;
int size = serviceAddressList.size();
if (size > 0) {
if (size == 1) { //只有一個服務提供方
data = serviceAddressList.get(0);
logger.info("unique service address : {}", data);
} else { //使用隨機分配法。簡單的負載均衡法
data = serviceAddressList.get(ThreadLocalRandom.current().nextInt(size));
logger.info("choose an address : {}", data);
}
}
return data;
}
/** * 鏈接 zookeeper * @return */
private ZooKeeper connectServer() {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(registryAddress, Constants.ZK_SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
latch.await();
} catch (IOException | InterruptedException e) {
logger.error("", e);
}
return zk;
}
/** * 獲取服務地址列表 * @param zk */
private void watchNode(final ZooKeeper zk) {
try {
//獲取子節點列表
List<String> nodeList = zk.getChildren(Constants.ZK_REGISTRY_PATH, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
//發生子節點變化時再次調用此方法更新服務地址
watchNode(zk);
}
}
});
List<String> dataList = new ArrayList<>();
for (String node : nodeList) {
byte[] bytes = zk.getData(Constants.ZK_REGISTRY_PATH + "/" + node, false, null);
dataList.add(new String(bytes));
}
logger.debug("node data: {}", dataList);
this.serviceAddressList = dataList;
} catch (KeeperException | InterruptedException e) {
logger.error("", e);
}
}
}
複製代碼
Netty 客戶端啓動以後,經過服務發現獲取 Netty 服務端的 ip 和 port。
/** * 支持經過服務發現來獲取 Socket 服務端的 host、port * @param discoveryAddress * @throws InterruptedException */
public PushClientBootstrap(String discoveryAddress) throws InterruptedException {
serviceDiscovery = new ServiceDiscovery(discoveryAddress);
serverAddress = serviceDiscovery.discover();
if (serverAddress!=null) {
String[] array = serverAddress.split(":");
if (array!=null && array.length==2) {
String host = array[0];
int port = Integer.parseInt(array[1]);
start(host,port);
}
}
}
複製代碼
服務註冊和發現一直是分佈式的核心組件。本文介紹了藉助 ZooKeeper 作註冊中心,如何實現一個簡單的服務註冊和發現。其實,註冊中心的選擇有不少,例如 Etcd、Eureka 等等。選擇符合咱們業務需求的纔是最重要的。
Java與Android技術棧:每週更新推送原創技術文章,歡迎掃描下方的公衆號二維碼並關注,期待與您的共同成長和進步。