Ignite.NET插件示例:分佈式Semaphore(信號量)

Ignite.NET從2.0版本開始,引入了插件系統,插件能夠僅在於.NET環境中,也能夠在於.NET + Java混合環境中,本文會描述如何在後者實現插件。html

爲何須要插件?

Ignite.NET構建於Ignite(用Java編寫)之上,JVM會在.NET進程中啓動,.NET部分與Java部分進行交互,並儘量重用現有的Ignite功能。java

插件系統將此平臺交互機制公開給第三方,主要場景之一是在.NET中可使用Ignite和第三方Java API。git

這種API的一個典型事例是IgniteSemaphore,該功能在Ignite.NET中尚不可用。github

分佈式Semaphore API

Ignite中的Semaphore相似於System.Threading.SemaphoreMSDN),可是是在整個集羣中生效的,限制在全部Ignite節點上執行指定代碼段的線程數。apache

代碼大體以下:分佈式

IIgnite ignite = Ignition.GetIgnite();
ISemaphore semaphore = ignite.GetOrCreateSemaphore(name: "foo", count: 3);

semaphore.WaitOne();  // Enter the semaphore (may block)
// Do work
semaphore.Release();

看起來很簡單並且很是有用,與.NET內置的SemaphoreAPI相同。顯然不能更改IIgnite的接口,所以GetOrCreateSemaphore就是一個擴展點,下面會詳細描述。ide

Java插件

先看Java端,這裏須要一種調用Ignite.semaphore()的方法並向.NET平臺提供訪問該實例的方法。測試

建立一個Java項目並經過Maven引用Ignite(具體內容請參見構建多平臺Ignite集羣文章)。ui

每一個插件都以PluginConfiguration開始,本例的插件不須要任何配置屬性,可是該類必須存在,所以只需建立一個簡單的類便可:this

public class IgniteNetSemaphorePluginConfiguration implements PluginConfiguration {}

而後是插件的入口:PluginProvider<PluginConfiguration>。該接口有不少方法,可是大多數方法均可覺得空(nameversion不能爲空,所以須要爲其賦值)。這裏只需關注initExtensions方法,它是跨平臺互操做的入口點,本例中作的就是註冊PlatformPluginExtension實現:

public class IgniteNetSemaphorePluginProvider implements PluginProvider<IgniteNetSemaphorePluginConfiguration> {
    public String name() { return "DotNetSemaphore"; }
    public String version() { return "1.0"; }

    public void initExtensions(PluginContext pluginContext, ExtensionRegistry extensionRegistry)
            throws IgniteCheckedException {
        extensionRegistry.registerExtension(PlatformPluginExtension.class,
                new IgniteNetSemaphorePluginExtension(pluginContext.grid()));
    }
...
}

PlatformPluginExtension有一個惟一的id,用於從.NET端訪問它,還有一個PlatformTarget createTarget()方法,用於建立能夠從.NET端訪問的對象。

Java中的PlatformTarget會映射到.NET中的IPlatformTarget接口,當在.NET中調用IPlatformTarget.InLongOutLong時,就會調用Java實現中的PlatformTarget.processInLongOutLong。還有許多其餘方法能夠用於交換基本類型、序列化數據和對象。每一個方法都有一個指定了操做代碼的type參數,以防插件上有不少不一樣的方法。

本例中須要兩個PlatformTarget類:一個表明整個插件並具備getOrCreateSemaphore方法,另外一個表明每一個特定信號量。第一個應該持有字符串類型的名稱和整型的計數器並返回一個對象,所以須要實現PlatformTarget.processInStreamOutObject,其餘方法都不須要能夠將其置空:

public class IgniteNetPluginTarget implements PlatformTarget {
    private final Ignite ignite;

    public IgniteNetPluginTarget(Ignite ignite) {
        this.ignite = ignite;
    }

    public PlatformTarget processInStreamOutObject(int i, BinaryRawReaderEx binaryRawReaderEx) throws IgniteCheckedException {
        String name = binaryRawReaderEx.readString();
        int count = binaryRawReaderEx.readInt();

        IgniteSemaphore semaphore = ignite.semaphore(name, count, true, true);

        return new IgniteNetSemaphore(semaphore);
    }
...
}

.NET中的每一個ISemaphore對象在Java中都會有一個對應的IgniteNetSemaphore,它也是一個PlatformTarget。這個對象將處理WaitOneRelease方法,並將它們委託給底層的IgniteSemaphore對象。因爲這兩個方法都是返回void且是無參數的,所以最簡單的PlatformTarget是:

public long processInLongOutLong(int i, long l) throws IgniteCheckedException {
    if (i == 0) semaphore.acquire();
    else semaphore.release();

    return 0;
}

這樣Java部分就完成了!建立resources\META-INF.services\org.apache.ignite.plugin.PluginProvider文件,內容爲類名,Java服務加載器就能夠加載該類。使用Maven打包該項目(在終端中執行mvn package或使用IDE)後,target目錄中就應該有一個IgniteNetSemaphorePlugin-1.0-SNAPSHOT.jar文件。

.NET插件

首先建立一個控制檯項目,安裝Ignite NuGet軟件包,並以剛剛建立的jar文件的路徑啓動Ignite:

var cfg = new IgniteConfiguration
{
    JvmClasspath = @"..\..\..\..\Java\target\IgniteNetSemaphorePlugin-1.0-SNAPSHOT.jar"
};

Ignition.Start(cfg);

Ignite節點啓動後就能夠在日誌中看到插件的名稱:

[16:02:38] Configured plugins:
[16:02:38]   ^-- DotNetSemaphore 1.0

對於.NET部分將採用API優先的方法:首先實現擴展方法,而後從那裏繼續。

public static class IgniteExtensions
{
    public static Semaphore GetOrCreateSemaphore(this IIgnite ignite, string name, int count)
    {
        return ignite.GetPlugin<SemaphorePlugin>("semaphorePlugin").GetOrCreateSemaphore(name, count);
    }
}

爲了使該GetPlugin方法生效,須要配置IgniteConfiguration.PluginConfigurations屬性,它持有IPluginConfiguration實現的集合,而且每一個實現又必須連接到IPluginProvider的實現:

[PluginProviderType(typeof(SemaphorePluginProvider))]
class SemaphorePluginConfiguration : IPluginConfiguration  {...}

在節點啓動時,Ignite.NET會迭代插件配置,實例化插件提供者,並調用其Start(IPluginContext<SemaphorePluginConfiguration> context)方法,而後對IIgnite.GetPlugin的調用會委託給指定名字的提供者的IPluginProvider.GetPlugin

class SemaphorePluginProvider : IPluginProvider<SemaphorePluginConfiguration>
{
    private SemaphorePlugin _plugin;

    public T GetPlugin<T>() where T : class
    {
        return _plugin as T;
    }

    public void Start(IPluginContext<SemaphorePluginConfiguration> context)
    {
        _plugin = new SemaphorePlugin(context);
    }

    ...

}

經過IPluginContext能夠訪問Ignite實例、Ignite和插件的配置,還有GetExtension方法,會委託給Java中的PlatformPluginExtension.createTarget()方法,這樣就能夠在兩個平臺之間「創建鏈接」。.NET中的IPlatformTarget連接到Java中的PlatformTarget,它們能夠相互調用,而且Java對象的生存週期與.NET對象的生存週期是關聯的,即一旦垃圾收集器回收了.NET對象,也會釋放Java對象的引用,所以Java對象也會被回收。

下面的實現很簡單,只調用了對應的IPlatformTarget方法:

class SemaphorePlugin
{
    private readonly IPlatformTarget _target;  // Refers to IgniteNetPluginTarget in Java

    public SemaphorePlugin(IPluginContext<SemaphorePluginConfiguration> context)
    {
        _target = context.GetExtension(100);
    }

    public Semaphore GetOrCreateSemaphore(string name, int count)
    {
        var semaphoreTarget = _target.InStreamOutObject(0, w =>
        {
            w.WriteString(name);
            w.WriteInt(count);
        });

        return new Semaphore(semaphoreTarget);
    }
}

class Semaphore
{
    private readonly IPlatformTarget _target;  // Refers to IgniteNetSemaphore in Java

    public Semaphore(IPlatformTarget target)
    {
        _target = target;
    }

    public void WaitOne()
    {
        _target.InLongOutLong(0, 0);
    }

    public void Release()
    {
        _target.InLongOutLong(1, 0);
    }
}

這樣就能夠了,而且向現有插件添加更多邏輯也很容易,只需在兩側實現一對方法便可。Ignite使用JNI和非託管內存在.NET和Java平臺之間使用一個進程交換數據,既簡單又高效。

測試

爲了演示Semaphore的分佈式特性,能夠運行多個Ignite節點,每一個節點都調用WaitOne(),就會看到一次只有兩個節點可以獲取信號量:

var ignite = Ignition.Start(cfg);
var sem = ignite.GetOrCreateSemaphore("foo", 2);

Console.WriteLine("Trying to acquire semaphore...");

sem.WaitOne();

Console.WriteLine("Semaphore acquired. Press any key to release.");
Console.ReadKey();
相關文章
相關標籤/搜索