螞蟻金服SOFA-Boot整合SOFA-RPC(中篇)

前言

上篇文章簡單地介紹了 SOFA-Boot 的功能特性,對 Readiness 健康檢查的配置舉例說明。重點介紹瞭如何在 SOFA-Boot 中引入 SOFA-RPC 中間件,給出了基於 bolt、rest 和 dubbo 等不一樣協議通道的服務發佈與消費的全流程。java

本文將進一步介紹 SOFA-RPC 中間件提供的豐富而強大的功能,包括單向調用、同步調用、Future調用、回調,泛化調用,過濾器配置等。spring

其餘文章

正文

1. 調用方式

SOFA-RPC 提供單向調用、同步調用、異步調用和回調四種調用機制。爲了區分四者的不一樣之處,這裏給出 SOFA 官方提供的原理圖。緩存

下面給出詳細闡述和配置說明:bash

1.1. 單向方式

當前線程發起調用後,不關心調用結果,不作超時控制,只要請求已經發出,就完成本次調用。目前支持 bolt 協議。網絡

配置說明

使用單向方式須要在服務引用的時候經過 sofa:global-attrs 元素的 type 屬性聲明調用方式爲 oneway ,這樣使用該服務引用發起調用時就是使用的單向方式了。多線程

<sofa:reference id="helloOneWayServiceReference" interface="com.ostenant.sofa.rpc.example.invoke.HelloOneWayService">
    <sofa:binding.bolt>
        <sofa:global-attrs type="oneway"/>
    </sofa:binding.bolt>
</sofa:reference>
複製代碼

適用場景

單向調用不保證成功,並且發起方沒法知道調用結果。所以一般用於能夠重試,或者定時通知類的場景,調用過程是有可能由於網絡問題,機器故障等緣由,致使請求失敗。業務場景須要能接受這樣的異常場景,纔可使用。

1.2. 同步方式

當前線程發起調用後,須要在指定的超時時間內,等到響應結果,才能完成本次調用。若是超時時間內沒有獲得結果,那麼會拋出超時異常。

配置說明

服務接口與實現類

SOFA-RPC 缺省採用的就是同步調用,能夠省略 sofa:global-attrs 配置項。

服務端發佈配置

<bean id="helloSyncServiceImpl" class="com.ostenant.sofa.rpc.example.invoke.HelloSyncServiceImpl"/>
<sofa:service ref="helloSyncServiceImpl" interface="com.ostenant.sofa.rpc.example.invoke.HelloSyncService">
    <sofa:binding.bolt/>
</sofa:service>
複製代碼

客戶端引用配置

<sofa:reference id="helloSyncServiceReference" interface="com.ostenant.sofa.rpc.example.invoke.HelloSyncService">
    <sofa:binding.bolt/>
</sofa:reference>
複製代碼

服務端啓動入口

SpringApplication springApplication = new SpringApplication(SyncServerApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
複製代碼

客戶端啓動入口

SpringApplication springApplication = new SpringApplication(SyncClientApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
複製代碼

客戶端調用

HelloSyncService helloSyncServiceReference = (HelloSyncService) applicationContext.getBean("helloSyncServiceReference");
System.out.println(helloSyncServiceReference.saySync("sync"));
複製代碼

適用場景

同步調用是最經常使用的方式。注意要根據對端的處理能力,合理設置超時時間。

1.3. Future方式

Future 方式下,客戶端發起調用後不會等待服務端的結果,繼續執行後面的業務邏輯。服務端返回的結果會被 SOFA-RPC 緩存,當客戶端須要結果的時候,須要主動獲取。目前支持 bolt 協議。

配置說明

服務接口和實現類

HelloFutureService.java

public interface HelloFutureService {
    String sayFuture(String future);
}
複製代碼

HelloFutureServiceImpl.java

public class HelloFutureServiceImpl implements HelloFutureService {
    @Override
    public String sayFuture(String future) {
        return future;
    }
}
複製代碼

服務端發佈配置

<bean id="helloFutureServiceImpl" class="com.ostenant.sofa.rpc.example.invoke.HelloFutureServiceImpl"/>
<sofa:service ref="helloFutureServiceImpl" interface="com.ostenant.sofa.rpc.example.invoke.HelloFutureService">
    <sofa:binding.bolt/>
</sofa:service>
複製代碼

客戶端引用配置

使用 Future 方式須要在服務引用的時候經過 sofa:global-attrs 元素的 type 屬性聲明調用方式爲 future

<sofa:reference id="helloFutureServiceReference" interface="com.ostenant.sofa.rpc.example.invoke.HelloFutureService">
    <sofa:binding.bolt>
        <sofa:global-attrs type="future"/>
    </sofa:binding.bolt>
</sofa:reference>
複製代碼

這樣使用該服務引用發起調用時就是使用的 Future 方式了。

服務端啓動入口

SpringApplication springApplication = new SpringApplication(FutureServerApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
複製代碼

客戶端啓動入口

SpringApplication springApplication = new SpringApplication(FutureClientApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
複製代碼

客戶端獲取返回結果有兩種方式:

  • 其一,經過 SofaResponseFuture 直接獲取結果。第一個參數是獲取結果的超時時間,第二個參數表示是否清除線程上下文中的結果。
HelloFutureService helloFutureServiceReference = (HelloFutureService) applicationContext
    .getBean("helloFutureServiceReference");
helloFutureServiceReference.sayFuture("future");

try {
    String result = (String)SofaResponseFuture.getResponse(1000, true);
    System.out.println("Future result: " + result)
} catch (InterruptedException e) {
    e.printStackTrace();
}
複製代碼
  • 其二,獲取原生 Future。該種方式會獲取 JDK 原生的 Future ,參數表示是否清除線程上下文中的結果。獲取結果的方式就是 JDK Future 的獲取方式。
HelloFutureService helloFutureServiceReference = (HelloFutureService) applicationContext
    .getBean("helloFutureServiceReference");
helloFutureServiceReference.sayFuture("future");

try {
    Future future = SofaResponseFuture.getFuture(true);
    String result = (String)future.get(1000, TimeUnit.MILLISECONDS);
    System.out.println("Future result: " + result)
} catch (InterruptedException e) {
    e.printStackTrace();
}
複製代碼

適用場景

Future 方式適用於非阻塞編程模式。對於客戶端程序處理後,不須要當即獲取返回結果,能夠先完成後續程序代碼執行,在後續業務中,主動從當前線程上下文獲取調用返回結果。減小了網絡 IO 等待形成的代碼運行阻塞和延遲。

1.4. 回調方式

當前線程發起調用,則本次調用立刻結束,能夠立刻執行下一次調用。發起調用時須要註冊一個回調,該回調須要分配一個異步線程池。待響應返回後,會在回調的異步線程池,來執行回調邏輯。

配置說明

服務接口和實現類

HelloCallbackService.java

public interface HelloCallbackService {
    String sayCallback(String callback);
}
複製代碼

HelloCallbackServiceImpl.java

public class HelloCallbackServiceImpl implements HelloCallbackService {
    @Override
    public String sayCallback(String string) {
        return string;
    }
}
複製代碼

業務回調類

客戶端回調類須要實現 com.alipay.sofa.rpc.core.invoke.SofaResponseCallback 接口。

CallbackImpl.java

public class CallbackImpl implements SofaResponseCallback {
    @Override
    public void onAppResponse(Object appResponse, String methodName, RequestBase request) {
        System.out.println("callback client process:" + appResponse);
    }

    @Override
    public void onAppException(Throwable throwable, String methodName, RequestBase request) {
    }

    @Override
    public void onSofaException(SofaRpcException sofaException, String methodName, RequestBase request) {
    }
}
複製代碼

SofaResponseCallback 接口提供了 3 個方法:

  • onAppResponse: 程序正常運行,則進入該回調方法。
  • onAppException: 服務端程序拋出異常,則進入該回調方法。
  • onSofaException: 框架內部出現錯誤,則進入該回調方法。

服務端發佈配置

<bean id="helloCallbackServiceImpl" class="helloFutureServiceReference" interface="com.ostenant.sofa.rpc.example.invoke.HelloCallbackServiceImpl"/>
<sofa:service ref="helloCallbackServiceImpl" interface="helloFutureServiceReference" interface="com.ostenant.sofa.rpc.example.invoke.HelloCallbackService">
    <sofa:binding.bolt/>
</sofa:service>
複製代碼

客戶端引用配置

在服務引用的時候經過 sofa:global-attrs 元素的 type 屬性聲明調用方式爲 callback ,再經過 callback-ref 聲明回調的實現類。

<bean id="callbackImpl" class="com.ostenant.sofa.rpc.example.invoke.CallbackImpl"/>
<sofa:reference id="helloCallbackServiceReference" interface="com.ostenant.sofa.rpc.example.invoke.HelloCallbackService">
    <sofa:binding.bolt>
        <sofa:global-attrs type="callback" callback-ref="callbackImpl"/>
    </sofa:binding.bolt>
</sofa:reference>
複製代碼

這樣使用該服務引用發起調用時,就是使用的回調方式了。在結果返回時,由 SOFA-RPC 自動調用該回調類的相應方法。

服務端啓動入口

SpringApplication springApplication = new SpringApplication(CallbackServerApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
複製代碼

客戶端啓動入口

SpringApplication springApplication = new SpringApplication(CallbackClientApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
複製代碼

客戶端發起調用

HelloCallbackService helloCallbackServiceReference = (HelloCallbackService) applicationContext
            .getBean("helloCallbackServiceReference");
helloCallbackServiceReference.sayCallback("callback");

try {
    Thread.sleep(3000);
} catch (InterruptedException e) {
    e.printStackTrace();
}
複製代碼

sayCallback() 的返回值不該該直接獲取。在客戶端註冊的回調類中,返回值會以參數的形式傳入正確的方法,以回調的形式完成後續邏輯處理。

適用場景

Callback 方式適用於異步非阻塞編程模式。客戶端程序所在線程發起調用後,繼續執行後續操做,不須要主動去獲取返回值。服務端程序處理完成,將返回值傳回一個異步線程池,由子線程經過回調函數進行返回值處理。很大狀況的減小了網絡 IO 阻塞,解決了單線程的瓶頸,實現了異步編程。

2. 泛化調用

泛化調用方式可以在客戶端不依賴服務端的接口狀況下發起調用,目前支持 bolt 協議。因爲不知道服務端的接口,所以須要經過字符串的方式將服務端的接口,調用的方法,參數及結果類進行描述。

配置說明

泛化參數類

SampleGenericParamModel.java

public class SampleGenericParamModel {
    private String name;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}
複製代碼

泛化返回類

SampleGenericResultModel.java

public class SampleGenericResultModel {
    private String name;
    private String value;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getValue() {
        return value;
    }
    public void setValue(String value) {
        this.value = value;
    }
}
複製代碼

服務接口和實現類

SampleGenericService.java

public interface SampleGenericService {
    SampleGenericResultModel sayGeneric(SampleGenericParamModel sampleGenericParamModel);
}
複製代碼
  • SampleGenericParamModel:做爲 sayGeneric() 的輸入參數類型,有一個 name 成員變量,做爲真正的方法入參。

  • SampleGenericResultModel:做爲 sayGeneric() 的返回結果類型,聲明瞭 namevalue 兩個成員變量,做爲真實的返回值。

SampleGenericServiceImpl.java

public class SampleGenericServiceImpl implements SampleGenericService {
    @Override
    public SampleGenericResultModel sayGeneric(SampleGenericParamModel sampleGenericParamModel) {
        String name = sampleGenericParamModel.getName();
        SampleGenericResultModel resultModel = new SampleGenericResultModel();
        resultModel.setName(name);
        resultModel.setValue("sample generic value");
        return resultModel;
    }
}
複製代碼

服務端發佈配置

<bean id="sampleGenericServiceImpl" class="com.ostenant.sofa.rpc.example.generic.SampleGenericServiceImpl"/>
<sofa:service ref="sampleGenericServiceImpl" interface="com.ostenant.sofa.rpc.example.generic.SampleGenericService">
    <sofa:binding.bolt/>
</sofa:service>
複製代碼

客戶端引用配置

<sofa:reference id="sampleGenericServiceReference" interface="com.alipay.sofa.rpc.api.GenericService">
    <sofa:binding.bolt>
        <sofa:global-attrs generic-interface="com.ostenant.sofa.rpc.example.generic.SampleGenericService"/>
    </sofa:binding.bolt>
</sofa:reference>
複製代碼

在泛化調用過程當中,客戶端配置有兩點須要注意:

  • sofa:reference 指向的服務接口須要聲明爲 SOFA-RPC 提供的泛化接口 com.alipay.sofa.rpc.api.GenericService
  • sofa:global-attrs 須要聲明屬性 generic-interface,value 爲真實的服務接口名稱。

服務端啓動入口

SpringApplication springApplication = new SpringApplication(SampleGenericServerApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
複製代碼

客戶端啓動入口

SpringApplication springApplication = new SpringApplication(SampleGenericClientApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
複製代碼

客戶端發起調用

  • 獲取服務的泛化引用
GenericService sampleGenericServiceReference = (GenericService) applicationContext
    .getBean("sampleGenericServiceReference");
複製代碼
  • 準備方法參數

因爲客戶端沒有調用服務的參數類,所以經過 com.alipay.hessian.generic.model.GenericObjectGenericObject 進行描述。

// 準備方法參數
GenericObject genericParam = new GenericObject(
    "com.ostenant.sofa.rpc.example.generic.SampleGenericParamModel");
genericParam.putField("name", "Harrison");
複製代碼

GenericObject 持有一個 Map<String, Object> 類型的變量,你可以經過 GenericObject 提供的 putField() 方法,將參數類的屬性和值放到這個 Map 中,以此來描述參數類。

  • 發起泛化調用

經過 GenericService$genericInvoke(arg1, agr2, arg3) 方法能夠發起服務的泛化調用,各個參數含義以下:

參數 含義 參數可選
arg1 目標方法名稱 必填
arg2 參數類型的數組,要求嚴格遵循前後次序 必填
arg3 參數值的數組,要求與參數類型數組保持一致 必填
arg4 返回值的Class類型 可選

方式一:

GenericObject genericResult = (GenericObject) sampleGenericServiceReference.$genericInvoke(
    // 目標方法名稱
    "sayGeneric",
    // 參數類型名稱
    new String[] { "com.ostenant.sofa.rpc.example.generic.SampleGenericParamModel" },
    // 參數的值
    new Object[] { genericParam });

// 驗證返回結果
System.out.println("Type: " + genericResult.getType());
System.out.println("Name: " + genericResult.getField("name"));
System.out.println("Value: " + genericResult.getField("value"));
複製代碼

方式二:

SampleGenericResultModel sampleGenericResult = sampleGenericServiceReference.$genericInvoke(
    // 目標方法名稱
    "sayGeneric",
    // 參數類型名稱
    new String[] { "com.ostenant.sofa.rpc.example.generic.SampleGenericParamModel" },
    // 參數的值
    new Object[] { genericParam },
    // 返回值的Class類型
    SampleGenericResultModel.class);

// 驗證返回結果
System.out.println("Type: " + sampleGenericResult.getClass().getName());
System.out.println("Name: " + sampleGenericResult.getName());
System.out.println("Value: " + sampleGenericResult.getValue());
複製代碼

查看控制檯輸出

兩種方式輸出以下:

Type: com.ostenant.sofa.rpc.example.generic.SampleGenericResultModel
Name: Harrison
Value: sample generic value
複製代碼

3. 過濾器配置

SOFA-RPC 經過過濾器 Filter 來實現對請求和響應的攔截處理。用戶能夠自定義 Filter 實現攔截擴展,目前支持 bolt 協議。開發人員經過繼承 com.alipay.sofa.rpc.filter.Filter 實現過濾器的自定義。

配置說明

服務接口與實現類

FilterService.java

public interface FilterService {
    String sayFilter(String filter);
}
複製代碼

FilterServiceImpl.java

public class FilterServiceImpl implements FilterService {
    @Override
    public String sayFilter(String filter) {
        return filters;
    }
}
複製代碼

服務端過濾器

在 Filter 實現類中,invoke() 方法實現具體的攔截邏輯,經過 FilterInvoker.invoke(SofaRequest) 觸發服務的調用,在該方法先後能夠實現具體的攔截處理。

public class SampleServerFilter extends Filter {
    @Override
    public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
        System.out.println("SampleFilter before server process");
        try {
            return invoker.invoke(request);
        } finally {
            System.out.println("SampleFilter after server process");
        }
    }
}
複製代碼

服務端發佈配置

服務端須要配置服務實現類、過濾器,而後在 sofa:servicesofa:global-attrs 標籤配置 filter 屬性,實現二者的綁定。

<bean id="sampleFilter" class="com.ostenant.sofa.rpc.example.filter.SampleServerFilter"/>
<bean id="filterService" class="com.ostenant.sofa.rpc.example.filter.FilterServiceImpl"/>
<sofa:service ref="filterService" interface="com.ostenant.sofa.rpc.example.filter.FilterService">
    <sofa:binding.bolt>
        <sofa:global-attrs filter="sampleFilter"/>
    </sofa:binding.bolt>
</sofa:service>
複製代碼

客戶端過濾器

public class SampleClientFilter extends Filter {
    @Override
    public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
        System.out.println("SampleFilter before client invoke");
        try {
            return invoker.invoke(request);
        } finally {
            System.out.println("SampleFilter after client invoke");
        }
    }
}
複製代碼

客戶端引用配置

一樣的,客戶端過濾器須要在 sofa:referencesofa:global-attrs 標籤中配置 filter 屬性,實現客戶端引用類的調用攔截。

<bean id="sampleFilter" class="com.alipay.sofa.rpc.samples.filter.SampleClientFilter"/>
<sofa:reference id="filterServiceReference" interface="com.ostenant.sofa.rpc.example.filter.FilterService">
    <sofa:binding.bolt>
        <sofa:global-attrs filter="sampleFilter"/>
    </sofa:binding.bolt>
</sofa:reference>
複製代碼

服務端啓動類

SpringApplication springApplication = new SpringApplication(FilterServerApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
複製代碼

客戶端啓動類

SpringApplication springApplication = new SpringApplication(FilterClientApplication.class);
ApplicationContext applicationContext = springApplication.run(args);
複製代碼

客戶端調用

FilterService filterServiceReference = (FilterService) applicationContext.getBean("filterServiceReference");
try {
    // sleep 5s, 便於觀察過濾器效果
    Thread.sleep(5000);
} catch (InterruptedException e) {
    e.printStackTrace();
}

String result = filterServiceReference.sayFilter("filter");
System.out.println("Invoke result: " + result);
複製代碼

查看攔截輸出

  • 服務端打印輸出
SampleFilter before server process
SampleFilter after server process
複製代碼
  • 客戶端打印輸出
SampleFilter before client invoke
SampleFilter after client invoke
Invoke result: filter
複製代碼

過濾器配置生效,總結過濾器攔截前後次序以下:

  1. 客戶端發起調用 -> 客戶端前置攔截 -> 服務端前置攔截
  2. 服務端方法執行
  3. 服務端後置攔截 -> 客戶端後置攔截 -> 客戶端接收返回值

小結

本文介紹了 SOFA-RPC 的集中調用方式,包括單向調用、同步調用、Future調用、回調,引入了 SOFA-RPC 獨有的泛化調用機制,同時對過濾器的配置進行了簡單介紹。


歡迎關注技術公衆號: 零壹技術棧

零壹技術棧

本賬號將持續分享後端技術乾貨,包括虛擬機基礎,多線程編程,高性能框架,異步、緩存和消息中間件,分佈式和微服務,架構學習和進階等學習資料和文章。

相關文章
相關標籤/搜索