微服務架構方案 ZeroC IceGrid

前言

在聊ICE以前,咱們說說目前主流的幾個微服務架構方案。php

Spring Boot/Cloud

因爲 Spring 社區的影響力和 Netflix 的背書,目前能夠認爲是構建 Java 微服務的一個社區標準,Spring Boot 目前在 GitHub 上有超過 20k 星。前端

基於 Spring 的框架本質上能夠認爲是一種 RESTful 框架(不是 RPC 框架),序列化協議主要採用基於文本的 JSON,通信協議通常基於 HTTP。RESTful 框架自然支持跨語言,任何語言只要有 HTTP 客戶端均可以接入調用,可是客戶端通常須要本身解析 payload。目前 Spring 框架也支持 Swagger 契約編程模型,可以基於契約生成各類語言的強類型客戶端,極大方便不一樣語言棧的應用接入,可是由於 RESTful 框架和 Swagger 規範的弱契約特性,生成的各類語言客戶端的互操做性仍是有很多坑的。vue

Dubbo

Dubbo 是阿里多年構建生產級分佈式微服務的技術結晶,服務治理能力很是豐富,在國內技術社區具備很大影響力,目前 github 上有超過 16k 星。Dubbo 本質上是一套基於 Java 的 RPC 框架,噹噹 Dubbox 擴展了 Dubbo 支持 RESTful 接口暴露能力。java

Dubbo 主要面向 Java 技術棧,跨語言支持不足是它的一個弱項,另外由於治理能力太豐富,以致於這個框架比較重,徹底用好這個框架的門檻比較高,可是若是你的企業基本上投資在 Java 技術棧上,選 Dubbo 可讓你在服務框架一塊站在較高的起點上,不論是性能仍是企業級的服務治理能力,Dubbo 都作的很出色。新浪微博開源的 Motan(GitHub 4k stars)也不錯,功能和 Dubbo 相似,能夠認爲是一個輕量裁剪版的 Dubbo。python

gRPC

gRPC 是谷歌近年新推的一套 RPC 框架,基於 protobuf 的強契約編程模型,能自動生成各類語言客戶端,且保證互操做。支持 HTTP2 是 gRPC 的一大亮點,通信層性能比 HTTP 有很大改進。Protobuf 是在社區具備悠久歷史和良好口碑的高性能序列化協議,加上 Google 公司的背書和社區影響力,目前 gRPC 也比較火,GitHub 上有超過 13.4k 星。ios

目前看 gRPC 更適合內部服務相互調用場景,對外暴露 RESTful 接口能夠實現,可是比較麻煩(須要 gRPC Gateway 配合),因此對於對外暴露 API 場景可能還須要引入第二套 RESTful 框架做爲補充。整體上 gRPC 這個東西還比較新,社區對於 HTTP2 帶來的好處還未造成一致認同,建議謹慎投入,能夠作一些試點。git

Ice

ZeroC IceGrid做爲一種微服務架構,它基於RPC框架發展而來,具備良好的性能與分佈式能力。不過尷尬的是,在國內,彷佛使用它的案例並很少,就我所知,目前Skpye內部一些地方在使用Ice。不過這並不影響它的優勢,那就是它的性能很不錯,如下是源自網上的性能測試:es6

以下所示是它的總體示意圖:github

IceGrid具有微服務架構的以下明顯特徵:web

  • 微服務架構須要一個集中的服務註冊中心,以及某種服務發現機制。IceGrid服務註冊採用XML文件來定義,其服務註冊中心就是Ice Registry,這是一個獨立的進程,而且提供了HA高可用機制;對應的服務發現機制就是命名查詢服務,即LocatorService提供的API,能夠根據服務名查詢對應的服務實例可用地址。

  • 微服務架構中的每一個微服務一般會被部署爲一個獨立的進程,當無狀態服務時,通常會由多個獨立進程提供服務。對應在IceGrid裏,一個IceBox就是一個單獨的進程,當一個IceBox只封裝一個Servant時,就是一個典型的微服務進程了。

  • 微服務架構中一般都須要內嵌某種負載均衡機制。在 IceGrid 裏是經過客戶端API內嵌的負載均衡算法實現的,相對於採用中間件Proxy轉發流量的方式(如SpringCloud),IceGrid的作法更加高效,但增長了平臺開發的工做量與難度,由於採用各類語言的客戶端都須要實現一遍負載均衡的算法邏輯。

  • 一個好的微服務架構平臺應該簡化和方便應用部署。咱們看到 IceGrid提供了grid.xml來描述與定義一個基於微服務架構的Application,一個命令行工具一鍵部署這個Application,還提供了發佈二進制程序的輔助工具——icepatch2。下圖顯示icepatch2的工做機制,icepatch2server相似於FTP Sever,用於存放要發佈到每一個Node上的二進制代碼與配置文件,而位於每一個Node上的icepatch2client則從icepatch2server上拉取文件,這個過程當中採用了壓縮傳輸及差量傳輸等高級特性,以減小沒必要要的文件傳輸過程。客觀地評價,在Docker技術以前,icepatch2這套作法仍是很先進與完備的,也大大減小了分佈式集羣下微服務系統的運維工做量。

基於IceGrid的微服務方案

若是基於IceGrid開發系統,則一般有三種典型的技術方案,下圖展現了這三種技術方案:

  • 其中方案一是比較符合傳統Java Web項目的一種漸進改造方案,Spring Boot裏只有Controller組件而沒有數據訪問層與Service對象,這些Controller組件經過Ice RPC方式調用部署在IceGrid裏的遠程的Ice微服務,面向前端包裝爲REST服務。此方案的總體思路清晰,分工明確。Leader在開源項目中給出了這種方式的一個基本框架以供參考:github.com/MyCATApach.…

  • 方案二與方案三則比較適合前端JavaScript能力強的團隊,好比很擅長Node.js的團隊能夠考慮方案二,即用JavaScript來替代Spring Boot實現REST服務。主要作互聯網App的系統則能夠考慮方案三,瀏覽器端的JavaScript以HTML5的WebSocket技術與Ice Glacier2直接通訊,總體高效敏捷。

  • IceGrid在3.6版本以後還增長了容器化的運行方式,即Ice Node與Ice Registry能夠經過Docker容器的方式啓動,這就簡化了IceGrid在Linux上的部署。對於用Java編寫的Ice微服務架構系統,咱們還能夠藉助Java遠程類加載機制,讓每臺Node自動從某個遠程HTTP Server下載指定的Jar包並加載相關的Servant類,從而實現相似Docker Hub的機制。下圖顯示了前面提到mycat-ice開源項目時給出的具體實現方案。

Ice運做原理

如下是我畫的一個簡單運做流程圖:

Ice多客戶端調用

Java調用

如下是我以前寫的Ice調用ElasticSearch中間件的代碼,僅作參考:

package com.bigdata.cloudshield.pentration.adapter;

import com.bigdata.cloudshield.pentration.adapter.entity.EsInsertRequest;
import com.bigdata.cloudshield.pentration.adapter.entity.EsRequest;
import com.bigdata.cloudshield.pentration.adapter.entity.EsResponseException;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.ice.common.CommonServicePrx;
import com.ice.common.CommonServicePrxHelper;

/** * Description: ice工具類 <BR> * * @author ran.chunlin * @date 2018/8/20 11:00 */
class IceUtil {

    /** * 初始化參數 */
    private static String[] iniParams = new String[]{"--Ice.MessageSizeMax=2097152", "--Ice.ThreadPool.Client.Size=4", "--Ice.ThreadPool.Client.SizeMax=4"};
    /** * Communicator */
    private static Ice.Communicator ic = Ice.Util.initialize(iniParams);
    /** * 代理對象 */
    private static Ice.ObjectPrx pro = ic.stringToProxy(EsConfig.EsDaoConf.getEsDaoPath());
    /** * 服務 */
    private static CommonServicePrx proxy = CommonServicePrxHelper.checkedCast(pro);
    /** * json轉換器 */
    private static JsonParser jsonParser = new JsonParser();

    /** * 返回碼 */
    private static final String RESULTCODE = "resultCode";
    /** * 結果 */
    private static final String RESULT = "result";
    /** * cat結果 */
    private static final String CATRESULT = "catResult";
    /** * 異常 */
    private static final String EXCEPTION = "exception";
    /** * 正常狀態碼 */
    private static final int OK_CODE = 200;

    /** * 通用 * * @param request 請求實體 * * @return json結果 * * @throws EsResponseException 返回異常 */
    public static String getResponseJson(EsRequest request) throws EsResponseException {
        String json = GsonUtil.toJson(request);
        String result = proxy.request(1500, json);
        JsonObject response = jsonParser.parse(result).getAsJsonObject();
        if (response.get(RESULTCODE).getAsInt() == OK_CODE) {
            String resultStr = response.get(RESULT).toString();
            response = null;
            return resultStr;
        } else {
            handleCode(response);
            return null;
        }
    }

    /** * insert * * @param request 請求實體 * * @throws EsResponseException 返回異常 */
    public static void getResponseForInsert(EsInsertRequest request) throws EsResponseException {
        String json = GsonUtil.toJson(request);
        String result = proxy.request(1501, json);
        JsonObject response = jsonParser.parse(result).getAsJsonObject();
        handleCode(response);
    }

    /** * 處理返回碼 * * @param response 結果 * * @throws EsResponseException 返回異常 */
    private static void handleCode(JsonObject response) throws EsResponseException {
        int code = response.get(RESULTCODE).getAsInt();
        String message = response.get(EXCEPTION).getAsString();
        switch (code) {
            case 200:
                break;
            case 101:
                throw new EsResponseException("參數中缺乏必要key!\n" + message);
            case 102:
                throw new EsResponseException("將參數以字符串格式的json轉換成map的時候異常!\n" + message);
            case 103:
                throw new EsResponseException("xpack token 認證失敗!\n" + message);
            case 104:
                throw new EsResponseException("新增刪除修改dataList無數據!\n" + message);
            case 105:
                throw new EsResponseException("修改數據時未能找到數據id!\n" + message);
            default:
                if (message.contains("no such index")) {
                    throw new EsResponseException("索引不存在!\n" + message);
                } else {
                    throw new EsResponseException("捕獲到的未知異常!\n" + message);
                }
        }
        response = null;
    }

    /** * update * * @param request 請求實體 * * @throws EsResponseException 返回異常 */
    public static void getResponseForUpdate(EsInsertRequest request) throws EsResponseException {
        String json = GsonUtil.toJson(request);
        String result = proxy.request(1502, json);
        JsonObject response = jsonParser.parse(result).getAsJsonObject();
        handleCode(response);
    }

    /** * cat查詢 * * @param request 請求實體 * * @return json結果 * * @throws EsResponseException 返回異常 */
    public static String getResponseForCat(EsRequest request) throws EsResponseException {
        String json = GsonUtil.toJson(request);
        String result = proxy.request(1504, json);
        JsonObject response = jsonParser.parse(result).getAsJsonObject();
        if (response.get(RESULTCODE).getAsInt() == SUCCESS_CODE) {
            return response.get(CATRESULT).getAsString();
        } else {
            handleCode(response);
            return null;
        }
    }
}
複製代碼

Python調用

如下是我以前寫的Ice調用elasticsearch中間件的代碼,僅作參考:

# -*- coding:utf-8 -*-
import sys
import traceback
import Ice
import os
import json
from cn.localhost.Config import es_dao_address, xpackToken

# 動態加載slice文件並編譯
Ice.loadSlice(os.path.dirname(os.path.realpath(__file__)) + './Es.ice')
from com.ice import *

reload(sys)
sys.setdefaultencoding('utf8')


class EsUtil:
    __ic = Ice.initialize(['--Ice.MessageSizeMax=0'])
    __ObjectPrx = __ic.stringToProxy(es_dao_address)
    __proxy = common.CommonServicePrx.checkedCast(__ObjectPrx)
    if not __proxy:
        raise RuntimeError('init ice_proxy error')

 @staticmethod
    def __search(endpoint, query={}, source_arr=[], sort={}, size=2000):
        try:
            params = {'xpackToken': xpackToken, 'method': 'POST', 'endpoint': endpoint,
                      'params': {'query': query, '_source': source_arr, 'sort': sort, 'size': size}}.__str__().replace(
                '\'', '"').replace(': u"', ': "')
            return json.loads(EsUtil.__proxy.request(1500, params))
        except:
            print traceback.format_exc()

 @staticmethod
    def __search_scroll(endpoint, time, scroll_id):
        try:
            params = {'xpackToken': xpackToken, 'method': 'POST', 'endpoint': endpoint,
                      'params': {'scroll': time, 'scroll_id': scroll_id}}.__str__().replace(
                '\'', '"').replace(': u"', ': "')
            return json.loads(EsUtil.__proxy.request(1500, params))
        except:
            print traceback.format_exc()

 @staticmethod
    def search(index, query={}, source_arr=[], sort={}, size=2000):
        return EsUtil.__search('/' + index + '/_search', query, source_arr, sort, size)

 @staticmethod
    def search_by_scroll(index, query={}, source_arr=[], sort={}, size=2000, time='5m'):
        result_array = []

        try:
            # 1.首次讀取
            es_result = EsUtil.__search('/' + index + '/_search?scroll=' + time, query, source_arr, sort, size)[
                'result']
            if len(es_result['hits']['hits']) == 0:
                return result_array

            # 2.寫入結果集
            for hit in es_result['hits']['hits']:
                record = []
                for field in source_arr:
                    if field in hit['_source']:
                        record.append(hit['_source'][field])
                result_array.append(record)

            # 3.判斷是否還存在未讀scroll數據
            if '_scroll_id' in es_result:
                scroll_id = es_result['_scroll_id']
            else:
                return result_array

            # 4.循環scroll讀取
            while True:
                es_result = EsUtil.__search_scroll('/_search/scroll', time, scroll_id)['result']
                if len(es_result['hits']['hits']) == 0:
                    break

                # 寫入結果集
                for hit in es_result['hits']['hits']:
                    record = []
                    for field in source_arr:
                        if field in hit['_source']:
                            record.append(hit['_source'][field])
                    result_array.append(record)

                # 判斷是否還存在未讀scroll數據
                if '_scroll_id' in es_result:
                    scroll_id = es_result['_scroll_id']
                else:
                    break
        except:
            print traceback.format_exc()
        return result_array

 @staticmethod
    def destroy_ice():
        if EsUtil.__ic:
            EsUtil.__ic.destroy()
複製代碼

摘錄:

相關文章
相關標籤/搜索