簡單RPC框架-基於Consul的服務註冊與發現

通常咱們常見的RPC框架都包含以下三個部分:html

  • 註冊中心,用於服務端註冊遠程服務以及客戶端發現服務
  • 服務端,對外提供後臺服務,將本身的服務信息註冊到註冊中心
  • 客戶端,從註冊中心獲取遠程服務的註冊信息,而後進行遠程過程調用
    上面提到的註冊中心其實屬於服務治理,即便沒有註冊中心,RPC的功能也是完整的。以前我大多接觸的是基於zookeeper的註冊中心,這裏基於consul來實現註冊中心的基本功能。

Consul的一些特色:

  • Raft相比Paxos直接

此外很少描述,還沒研究raftjava

  • 支持數據中心,能夠用來解決單點故障之類的問題
  • 集成相比zookeeper更加簡單(代碼量少,邏輯清晰簡單)
  • 支持健康檢查,支持http以及tcp
  • 自帶UI管理功能,不須要額外第三方支持。(zookeeper須要單獨部署zkui之類的第三方工具)
  • 支持key/value存儲

啓動consul以後訪問管理頁面node

RPC集成

提取出服務註冊與服務發現兩個接口,而後使用Consul實現,這裏主要經過consul-client來實現(也能夠是consul-api),須要在pom中引入:git

<dependency>
	<groupId>com.orbitz.consul</groupId>
	<artifactId>consul-client</artifactId>
	<version>0.14.1</version>
</dependency>

服務註冊

  • RegistryService
    提供服務的註冊與刪除功能
public interface RegistryService {
    void register(RpcURL url);
    void unregister(RpcURL url);
}
  • AbstractConsulService
    consul的基類,用於構建Consl對象,服務於服務端以及客戶端。
public class AbstractConsulService {
    private static final Logger logger = LoggerFactory.getLogger(AbstractConsulService.class);

    protected final static String CONSUL_NAME="consul_node_jim";
    protected final static String CONSUL_ID="consul_node_id";
    protected final static String CONSUL_TAGS="v3";
    protected final static String CONSUL_HEALTH_INTERVAL="1s";

    protected Consul buildConsul(String registryHost, int registryPort){
        return Consul.builder().withHostAndPort(HostAndPort.fromString(registryHost+":"+registryPort)).build();
    }
}
  • ConsulRegistryService
    服務註冊實現類,在註冊服務的同時,指定了健康檢查。

服務的刪除暫時未實現github

public class ConsulRegistryService extends AbstractConsulService implements RegistryService {

    private final static int CONSUL_CONNECT_PERIOD=1*1000;

    @Override
    public void register(RpcURL url) {
        Consul consul = this.buildConsul(url.getRegistryHost(),url.getRegistryPort());
        AgentClient agent = consul.agentClient();

        ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();
        ImmutableRegistration.Builder builder = ImmutableRegistration.builder();
        builder.id(CONSUL_ID).name(CONSUL_NAME).addTags(CONSUL_TAGS).address(url.getHost()).port(url.getPort()).addChecks(check);

        agent.register(builder.build());

    }

    @Override
    public void unregister(RpcURL url) {

    }

}

因爲我實現的RPC是基於TCP的,因此服務註冊的健康檢查也指定爲TCP,consul會按指定的IP以及端口創建鏈接以此判斷服務的健康狀態。若是是http,則須要調用http方法,同時指定健康檢查地址。web

ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();

後臺的監控信息以下:canvas

雖然只是指定了TCP,可能出於某種機制後臺依然會發起HTTP的健康檢查請求,上圖第一條請求日誌。api

服務發現

  • DiscoveryService
    獲取全部註冊的有效的服務信息。
public interface DiscoveryService {

    List<RpcURL> getUrls(String registryHost, int registryPort);
}
  • ConsulDiscoveryService
    首先是獲取有效的服務列表:
List<RpcURL> urls= Lists.newArrayList();
Consul consul = this.buildConsul(registryHost,registryPort);
HealthClient client = consul.healthClient();
String name = CONSUL_NAME;
ConsulResponse object= client.getAllServiceInstances(name);
List<ImmutableServiceHealth> serviceHealths=(List<ImmutableServiceHealth>)object.getResponse();
for(ImmutableServiceHealth serviceHealth:serviceHealths){
    RpcURL url=new RpcURL();
    url.setHost(serviceHealth.getService().getAddress());
    url.setPort(serviceHealth.getService().getPort());
    urls.add(url);
}

服務更新監聽,當可用服務列表發現變化時須要通知調用端。緩存

try {
    ServiceHealthCache serviceHealthCache = ServiceHealthCache.newCache(client, name);
    serviceHealthCache.addListener(new ConsulCache.Listener<ServiceHealthKey, ServiceHealth>() {
        @Override
        public void notify(Map<ServiceHealthKey, ServiceHealth> map) {
            logger.info("serviceHealthCache.addListener notify");
            RpcClientInvokerCache.clear();

        }
    });
    serviceHealthCache.start();
} catch (Exception e) {
    logger.info("serviceHealthCache.start error:",e);
}

因爲以前對客戶端的Invoker有緩存,因此當服務列表有變化時須要對緩存信息進行更新。ruby

這裏簡單的直接對緩存作清除處理,其實好一點的方法應該只對有變化的作處理。

  • RpcClientInvokerCache
    對客戶端實例化後的Invoker的緩存類
public class RpcClientInvokerCache {

    private static CopyOnWriteArrayList<RpcClientInvoker> connectedHandlers = new CopyOnWriteArrayList<>();

    public static CopyOnWriteArrayList<RpcClientInvoker> getConnectedHandlersClone(){
        return (CopyOnWriteArrayList<RpcClientInvoker>) RpcClientInvokerCache.getConnectedHandlers().clone();
    }

    public static void addHandler(RpcClientInvoker handler) {
        CopyOnWriteArrayList<RpcClientInvoker> newHandlers = getConnectedHandlersClone();
        newHandlers.add(handler);
        connectedHandlers=newHandlers;
    }

    public static CopyOnWriteArrayList<RpcClientInvoker> getConnectedHandlers(){
        return connectedHandlers;
    }

    public static RpcClientInvoker get(int i){
        return connectedHandlers.get(i);
    }

    public static int size(){
        return connectedHandlers.size();
    }

    public static void clear(){
        CopyOnWriteArrayList<RpcClientInvoker> newHandlers = getConnectedHandlersClone();
        newHandlers.clear();
        connectedHandlers=newHandlers;
    }
}
  • 負載均衡
    當同一個接口有多個服務同時提供服務時,客戶端須要有必定的負載均衡機制去決策將客戶端的請求分配給哪一臺服務器,這裏實現一個簡易的輪詢實現方式。請求次數累加,累加的值與服務列表的大小作取模操做。

代碼中取服務列表的方法有小問題,未按接口信息取,後續再完成

public class RoundRobinLoadbalanceService implements LoadbalanceService {

    private AtomicInteger roundRobin = new AtomicInteger(0);
    private static final int MAX_VALUE=1000;
    private static final int MIN_VALUE=1;

    private AtomicInteger getRoundRobinValue(){
        if(this.roundRobin.getAndAdd(1)>MAX_VALUE){
            this.roundRobin.set(MIN_VALUE);
        }
        return this.roundRobin;
    }

    @Override
    public int index(int size) {
        return  (this.getRoundRobinValue().get() + size) % size;
    }
}

待完善的功能

  • 代碼中取服務列表的方法有小問題,未按接口信息取
  • 註冊中心的可用服務地址信息變化時,須要優化爲按需更新
  • 註冊中心的服務刪除未實現

源碼地址

https://github.com/jiangmin168168/jim-framework

相關文章
相關標籤/搜索