corba事件服務中的push和pull模型

首先說一下Corba中相對比較簡單的服務模型,事件服務。 java

對於事件服務的話,有push和pull兩種模型。下面就分別說一下這兩種模型具體實現: 異步

首先,push和pull模型都是基於事件通道EventChannel的,兩種模型的通訊最終都必須經過事件通道push或pull對象的引用,那就簡要的說一下事件通道的概念。 ui

事件信道(event channel)是一個既是事件提供者又是事件消費者的插入對象,它容許多個事件提供者和多個事件消費者異步地通訊而不須要相互瞭解。事件信道又是一個標準的CORBA對象,駐留在對象請求中介上,能夠斷開提供者和消費者的通訊。 this

事件信道利用代理(proxy)對象撤消時間的提供者和消費者。提供者和消費者不是直接交互做用,而是從事件信道那裏得到代理對象,讓代理對象在未來的事件交換中表明本身。提供者得到一個消費者代理,而消費者得到一個提供者代理。事件信道經過這些代理對象代理事件的交換。 spa

push模型而言: 代理

1)、下面說一下事件通道的註冊綁定過程,具體實現代碼以下: code

    Properties properties = new Properties();

    //properties用來設置初始化orb所須要的參數信息

    properties.put("org.omg.PortableInterceptor.ORBInitializerClass.bidir_init","org        .jac    orb.orb.giop.BiDirConnectionInitializer");
    
    properties.put("org.omg.CORBA.ORBClass", "org.jacorb.orb.ORB");

    properties.put("org.omg.CORBA.ORBSingletonClass",

            "org.jacorb.orb.ORBSingleton");

    //初始化orb

    org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(argv, properties);

    try

    {

      //序列化poa

      org.omg.PortableServer.POA poa =

          org.omg.PortableServer.POAHelper.narrow(

              orb.resolve_initial_references("RootPOA"));

     

      NamingContextExt nc =

          NamingContextExtHelper.narrow(

              orb.resolve_initial_references("NameService"));

      //使用orb和poa來建立事件對象

      EventChannelImpl channel = new EventChannelImpl(orb,poa);

 

      poa.the_POAManager().activate();       //激活poa的manager

 

      org.omg.CORBA.Object o = poa.servant_to_reference(channel);

      //將事件通道綁定到"eventchannel.example"名字上

      nc.bind(nc.to_name("eventchannel.example"), o);

 

      orb.run();

    }

    catch( Exception e)

    {

      e.printStackTrace();

    }

2)、下面說一下PushSupplier端的具體實現代碼以下: 對象

class PushSupplierDemo extends PushSupplierPOA {

 

    public PushSupplierDemo(String[] args) {

        Properties properties = new Properties();

        properties.put(

                "org.omg.PortableInterceptor.ORBInitializerClass.bidir_init",

                "org.jacorb.orb.giop.BiDirConnectionInitializer");

        properties.put("org.omg.CORBA.ORBClass", "org.jacorb.orb.ORB");

        properties.put("org.omg.CORBA.ORBSingletonClass",

                "org.jacorb.orb.ORBSingleton");

        org.omg.CosEventChannelAdmin.EventChannel e = null;

        org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(args, properties);

        org.omg.PortableServer.POA poa = null;

        org.omg.CORBA.Object refObj = null;

 

        try {

            poa = org.omg.PortableServer.POAHelper.narrow(orb

                    .resolve_initial_references("RootPOA"));

            poa.the_POAManager().activate();

 

            NamingContextExt nc = NamingContextExtHelper.narrow(orb

                    .resolve_initial_references("NameService"));

            //

            e = EventChannelHelper.narrow(nc.resolve(nc

                    .to_name("eventchannel.example")));

        } catch (Exception ex) {

            ex.printStackTrace();

        }

 

        SupplierAdmin supplierAdmin = e.for_suppliers();

        ProxyPushConsumer proxyPushConsumer = supplierAdmin

                .obtain_push_consumer();

 

        try {

            proxyPushConsumer.connect_push_supplier(_this(orb));

        } catch (org.omg.CosEventChannelAdmin.AlreadyConnected ex) {

            ex.printStackTrace();

        }

 

        for (int i = 0; i < 10; i++) {

            try {

                Any any = orb.create_any();

                try {

                    System.out.println("supplier sleep");

                    Thread.sleep(1000);

                } catch (InterruptedException e1) {

                    e1.printStackTrace();

                }

                // 實例化一個對象

                Version_IOperationsImpl version_IOperationsImpl = new Version_IOperationsImpl();

                version_IOperationsImpl.setVersion("version0000");

                try {

                    // 從servant得到一個對象引用

                    refObj = poa.servant_to_reference(version_IOperationsImpl); //必須傳送的是對象的引用,而不是對象自己

                } catch (ServantNotActive e3) {

                    e3.printStackTrace();

                } catch (WrongPolicy e3) {

                    e3.printStackTrace();

                }

                try {

                    any.insert_Object(refObj);  //將對象的引用,加入到any對象之中

                } catch (Exception e2) {

                    e2.printStackTrace();

                }

                proxyPushConsumer.push(any);

                System.out.println("Pushing event ### " + (i));

 

            } catch (Disconnected d) {

                d.printStackTrace();

            }

        }

        proxyPushConsumer.disconnect_push_consumer();

    }

 

    public void disconnect_push_supplier() {

        System.out.println("Supplier disconnected");

    }

 

    public static void main(String[] args) {

        PushSupplierDemo demo = new PushSupplierDemo(args);

}

3)、下面說一下PushConsumer端的具體實現代碼以下: 接口

static public void main(String[] args) {

        Properties properties = new Properties();

        properties.put(

                "org.omg.PortableInterceptor.ORBInitializerClass.bidir_init",

                "org.jacorb.orb.giop.BiDirConnectionInitializer");

        properties.put("org.omg.CORBA.ORBClass", "org.jacorb.orb.ORB");

        properties.put("org.omg.CORBA.ORBSingletonClass",

                "org.jacorb.orb.ORBSingleton");

 

        EventChannel ecs = null;

        ConsumerAdmin ca = null;

        PushConsumer pushConsumer = null;

        ProxyPushSupplier pps = null;

 

        try {

            orb = org.omg.CORBA.ORB.init(args, properties);

            NamingContextExt nc = NamingContextExtHelper.narrow(orb

                    .resolve_initial_references("NameService"));

 

            ecs = EventChannelHelper.narrow(nc.resolve(nc

                    .to_name("eventchannel.example")));

        } catch (Exception e) {

            e.printStackTrace();

        }

 

        ca = ecs.for_consumers();

        pps = ca.obtain_push_supplier();

 

        try {

            org.omg.PortableServer.POA poa = org.omg.PortableServer.POAHelper

                    .narrow(orb.resolve_initial_references("RootPOA"));

 

            poa.the_POAManager().activate();

 

            PushConsumerPOATie pt = new PushConsumerPOATie(

                    new PushConsumerDemo(pps));

            pt._this_object(orb);

            pushConsumer = PushConsumerHelper.narrow(poa

                    .servant_to_reference(pt));

            pps.connect_push_consumer(pushConsumer);

            System.out.println("PushConsumerImpl registered.");

            orb.run();

        } catch (Exception e) {

            e.printStackTrace();

        }

        System.out.println("Quit.");

    }

 

    public synchronized void push(org.omg.CORBA.Any data)

            throws org.omg.CosEventComm.Disconnected {

        org.omg.CORBA.Object object = data.extract_Object();

        try {

            Thread.sleep(1000);

            System.out.println("consumer sleep");

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        Version_I meImpl =  Version_IHelper.narrow(object); //必須用接口來接受序列化

        System.out.println("Server Version: " + meImpl.getVersion());

        count++;

        System.out.println("event@@@ " + count + " : " + data.extract_Object());

       

        if (count >= limit) {

            System.out.println("unregister");

            myPps.disconnect_push_supplier();

            orb.shutdown(false);

     }

}
相關文章
相關標籤/搜索