實現分佈式服務註冊及簡易的netty聊天

  如今不少地方都會用到zookeeper, 用到它的地方就是爲了實現分佈式。用到的場景就是服務註冊,好比一個集羣服務器,須要知道哪些服務器在線,哪些服務器不在線。html

  ZK有一個功能,就是建立臨時節點,當機器啓動應用的時候就會鏈接到一個ZK節點,而後建立一個臨時節點,那麼經過獲取監聽該路徑,而且獲取該路徑下的節點數量就知道有哪些服務器在線了。當機器中止應用的時候,zk的臨時節點將會自動被刪除。咱們經過這個機制去實現。java

  此次主要實現是採用springboot, zkui, swagger實現。接下來來看一下主要的代碼實現:node

  在機器啓動的時候獲取本機的IP,而後將本機的IP和指定的端口號註冊到程序中:git

package com.hqs.zk.register;

import com.hqs.zk.register.config.AppConfig;
import com.hqs.zk.register.thread.ZKRegister;
import com.hqs.zk.register.util.ZKUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.net.InetAddress;

@SpringBootApplication
public class ZKApplication implements CommandLineRunner{
    @Autowired
    AppConfig appConfig;

    @Autowired
    ZKUtil zkUtil;

    public static void main(String[] args) {
        SpringApplication.run(ZKApplication.class, args);
        System.out.println("啓動應用成功");
    }

    @Override
    public void run(String... strings) throws Exception {
        //得到本機IP
        String addr = InetAddress.getLocalHost().getHostAddress();
        Thread thread = new Thread(new ZKRegister(appConfig, zkUtil, addr));
        thread.setName("register-thread");
        thread.start();

        Thread scanner = new Thread(new Scanner());
        scanner.start();
    }
}

  建立一個工具類,工具類主要實現建立父節點,建立臨時路徑,監聽事件,獲取全部註冊節點。github

    /**
     * 建立臨時目錄
     */
    public void createEphemeralNode(String path, String value) {
        zkClient.createEphemeral(path, value);
    }

    /**
     * 監聽事件
     */
    public void subscribeEvent(String path) {
        zkClient.subscribeChildChanges(path, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println("parentPath:" + parentPath + ":list:" + currentChilds.toString());
            }
        });
    }

  這塊就基本完成了,下面開始建立controller,目的是爲了獲取全部在線機器的節點。爲了方便測試和查看我使用了Swagger2, 這樣界面話的發請求工具特別好用。web

  接下來看controller的主要內容:spring

    /**
     * 獲取全部路由節點
     * @return
     */
    @ApiOperation("獲取全部路由節點")
    @RequestMapping(value = "getAllRoute",method = RequestMethod.POST)
    @ResponseBody()
    public List<String> getAllRoute(){
        List<String> allNode = zkUtil.getAllNode();
        List<String> result = new ArrayList<>();
        for (String node : allNode) {
            String key = node.split("-")[1];
            result.add(key);
        }
        return result ;
    }

  同時配置對應的Swagger2api

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

/**
 * Created by huangqingshi on 2019/1/8.
 */
@Configuration
@EnableSwagger2
public class SwaggerConfig {

    @Value("${swagger.switch}")
    private boolean swaggerSwitch;

    @Bean
    public Docket api() {
        Docket docket = new Docket(DocumentationType.SWAGGER_2);
        docket.enable(swaggerSwitch);
        docket
                .apiInfo(apiInfo())
                .select()
                .apis(RequestHandlerSelectors.basePackage("com.hqs.zk.register.controller")).paths(PathSelectors.any()).build();
        return docket;
    }

    private ApiInfo apiInfo() {
        return new ApiInfoBuilder()
                .title("Spring boot zk register")
                .description("測試")
                .contact(new Contact("黃青石","http://www.cnblogs.com/huangqingshi","68344150@qq.com"))
                .termsOfServiceUrl("http://www.cnblogs.com/huangqingshi")
                .version("1.0")
                .build();
    }
}

  好了,接下來該啓動工程了,啓動以後訪問: http://localhost:8080/swagger-ui.htmlspringboot

  

  點擊下面的zk-controller,對應controller的方法就會顯示出來,而後點try it out, execute 相應的結果就直接出來了, 經過下面的圖片,能夠發現我本機的IP已經註冊到裏邊了。  服務器

  接下來,我們使用ZKUI鏈接上zookeeper,看一下是否真的有註冊的機器(父節點用的monior),已經存在了,沒有問題:

  註冊這塊就算實現完了,我一直想實現一個簡易的聊天,參考了各類資料而後實現了一把,也算圓夢了。下面開始實現簡易netty版聊天(爲何選擇netty?由於這個工具很是棒),使用google的protobuf進行序列化和反序列化:

  首先從官網上下載protobuf工具,注意對應不一樣的操做系統,個人是WINDOWS的,直接下載一個EXE程序,你下載的哪一個版本,須要使用與該版本對應的版本號,不然會出錯誤。

  

  本身建立好對應的Request.proto和Response.proto,在裏邊指定好對應的字段和包名信息。分別執行命令便可生成對應的文件:protoc.exe ./Response.proto --java_out=./  這個是生成Response的,還須要指定一條生成Request。

  將文件夾放到工程裏邊,工程的大體接入以下:

  Server的主要實現,主要基於protoBuf固定長度的進行實現的(序列化和反序列化通常經過固定長度或者分隔符實現),這樣的話就不會形成粘包、拆包的問題。

public void bind(int port) throws Exception {
        //配置服務器端NIO線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b  = new ServerBootstrap();
            b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                    socketChannel.pipeline().addLast(new ProtobufDecoder(RequestProto.ReqProtocol.getDefaultInstance())).
                            addLast(new ProtobufVarint32LengthFieldPrepender()).addLast(new ProtobufEncoder());
                    socketChannel.pipeline().addLast(new ProBufServerHandler());
                }
            });
            //綁定端口,同步等待
            ChannelFuture f = b.bind(port).sync();
            if (f.isSuccess()) {
                System.out.println("啓動 server 成功");
            }

        } catch (Exception  e) {
            e.printStackTrace();
        }
    }

  客戶端主要兩個方式,一個方式是客戶端向服務端發請求,一個方式是羣組發消息,我爲了快速實現,就直接發一條請求,而且將結果輸出到日誌中了。客戶端使用一個線程執行兩個不一樣的方法,而後將一個是發送給Server, 一個是發送給Group。發送給Server比較簡單就直接給Server了。

    @PostConstruct
    public void start() throws Exception{
        connection(appConfig.getNettyServer(), appConfig.getNettyPort());
        for(int i = 1; i <= 1; i++) {
            int j = i;
            Runnable runnable = () -> {
                try {
                    sendMesgToServer(j);
                    sendMesgToGroup(j);
                } catch (Exception e) {
                    e.printStackTrace();
                }

            };

            new Thread(runnable).start();

        }
    }

  發送給Group的話須要記住每次過來的惟一requestId,而且保存對應的channel,而後發送消息的時候遍歷全部requestId,而且與之對應的發送消息:

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RequestProto.ReqProtocol reqProtocol) throws Exception {
        RequestProto.ReqProtocol req = reqProtocol;
        CHANNEL_MAP.putIfAbsent(req.getRequestId(), (NioSocketChannel)channelHandlerContext.channel());
//        System.out.println("get Msg from Client:" + req.getReqMsg());
        handleReq(req);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(cause.getMessage());
        ctx.close();
    }

    public void handleReq(RequestProto.ReqProtocol req) {
        Long originRequestId = req.getRequestId();

        if(req.getType() == Constants.CommandType.SERVER) {

            NioSocketChannel nioSocketChannel = CHANNEL_MAP.get(req.getRequestId());
            sendMsg(nioSocketChannel, originRequestId, originRequestId, Constants.CommandType.SERVER, "hello client");

        } else if(req.getType() == Constants.CommandType.GROUP) {
            for(Map.Entry<Long, NioSocketChannel> entry : CHANNEL_MAP.entrySet()) {
                //過濾本身收消息

                if(entry.getKey() == originRequestId) {
                    continue;
                }
                sendMsg(entry.getValue(), originRequestId, entry.getKey(), Constants.CommandType.GROUP, req.getReqMsg());
            }
        }


    }

   輸出的結果以下,自定義兩個客戶端,一個requestId是1L,另外一個requestId是2L,而後都在啓動的時候sleep 3秒,而後發送給server。sleep5秒發送到Group裏邊去,輸出的結果就是以下這個樣子的。

1L : send message to server successful!
2L : send message to server successful!
get Msg from Server: 2:hello client
received id:2- send to id:2
received id:1- send to id:1
get Msg from Server: 1:hello client

received id:1- send to id:2
get Msg from Group: 1:hello peoole in group
received id:2- send to id:1

get Msg from Group: 2:hello peoole in group

   具體的代碼可參考:https://github.com/stonehqs/ZKRegister

   若是問題,歡迎留言討論。

相關文章
相關標籤/搜索