搞定thrift雙向消息

thrift做爲脫胎於facebook的rpc框架,各方面都很是優秀。清晰的分層設計,多語言的支持,以及不輸protocolbuffer的效率(compact下優於protocolbuffer),都讓thrift擁有愈來愈多的使用者。apache

做爲一個RPC框架,thrift支持的是open->client--rpc-->server->close的短鏈接模式。在實際應用中,卻常常會有客戶端創建鏈接後,等待服務端數據的長鏈接模式,也能夠稱爲雙向鏈接。一般的方案有三種,可參考http://dongxicheng.org/search-engine/thrift-bidirectional-async-rpc/,文中提到第三種方法會修改源碼,而實際操做過程當中發現這實際上是做者小小的理解錯誤,實現thrift雙向通訊並無這麼複雜,通過一番實驗,發現只須要以下理解和實現便可輕鬆實現一個thrift的雙向鏈接。服務器

  1. 雙向鏈接的service必須爲oneway,不然會由於recv函數拋出remote close異常
  2. 客戶端重用創建client的protocol,開線程使用processor.Process(protocol,protocol)監聽服務端callback的消息。
  3. 服務端使用ProcessorFactory,使用TConnectionInfo中的transport做爲向客戶端發送消息的client的transport

搞定以上三步,便可實現一個thrift雙向鏈接,這裏附上實驗代碼,客戶端使用C#(sorry for my pool C#),服務端使用C++框架

thriftasync

service HandshakeService{
    oneway void HandShake();
}

service CallbackService{
    oneway void Push(1: string msg); 
}

client函數

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Thrift.Collections;
using Thrift.Protocol;
using Thrift.Server;
using Thrift.Transport;
using System.Threading;
using Thrift;
using System.IO;

namespace ThriftBidirection
{
    class Program
    {
        class CallbackServiceImply : CallbackService.Iface
        {
            int msgCount = 0;
            public void Push(string msg)
            {
                Console.WriteLine("receive msg {0}: {1}", msgCount++, msg);
            }
        }
        //服務處理線程
        static void ProcessThread(TProtocol protocol)
        {
            TProcessor processor = new CallbackService.Processor(new CallbackServiceImply());
            while (true)
            {
                try
                {
                    //////////////////////////////////////////////////////////////////////////
                    ///模仿server行爲,同時重用client端protocol
                    ///至關於同時重用一個鏈接
                    while (processor.Process(protocol, protocol)) { };
                    ///connection lost, return
                    return;
                }
                catch (IOException) //not fatal error, resume
                {
                    continue;
                }
                catch (TException) //fatal error
                {
                    return;
                }
            }
        }
        //服務器狀態監聽線程
        static void MonitorThread(TTransport trans, Action<string> callback)
        {
            while (true)
            {
                try
                {
                    if (!trans.Peek())
                    {
                        callback("鏈接中斷");
                    }
                    Thread.Sleep(3000);
                }
                catch (Thrift.TException ex)
                {
                    callback(ex.Message);
                    return;
                }
            }
        }

        static void Main(string[] args)
        {
            TTransport transport = new TBufferedTransport(new TSocket("localhost", 5555));
            TProtocol protocol = new TBinaryProtocol(transport);
            HandshakeService.Client client = new HandshakeService.Client(protocol);
            Action<TProtocol> processAction = new Action<TProtocol>(ProcessThread);
            Action<TTransport, Action<string>> monitorAction = new Action<TTransport, Action<string>>(MonitorThread);

            transport.Open();
            processAction.BeginInvoke(protocol, (result) =>
            {
                 processAction.EndInvoke(result);
            }, null);
            monitorAction.BeginInvoke(transport, (msg) =>
            {
                Console.WriteLine("鏈接中斷: {0}", msg);
            }, (result) =>
            {

            }, null);

            for (int i = 0; i < 100; ++i)
            {
                client.HandShake();
                Thread.Sleep(10);
            }
            Console.Read();
            transport.Close();
        }
    }
}

 

serverui

// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "HandshakeService.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <boost/make_shared.hpp>
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/concurrency/PlatformThreadFactory.h>
#include "CallbackService.h"

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace apache::thrift::concurrency;

using boost::make_shared;
using boost::shared_ptr;

class HandshakeServiceHandler : virtual public HandshakeServiceIf {
 public:
  HandshakeServiceHandler(const boost::shared_ptr<TTransport> &trans) 
      : m_client(make_shared<TBinaryProtocol>(trans))
  {
      boost::once_flag flag = BOOST_ONCE_INIT;
      m_flag = flag;
  }

  virtual ~HandshakeServiceHandler()
  {
        m_thread->interrupt();
        m_thread->join();
  }

  void CallbackThread()
  {
      while(true)
      {
          try
          {
              m_client.Push("server push msg");
          }
          catch (TException)
          {
              return;
          }
          boost::this_thread::sleep_for(boost::chrono::milliseconds(20));
      }
  }

  void HandShake() {
    // Your implementation goes here
    printf("HandShake\n");
    boost::call_once(boost::bind(&HandshakeServiceHandler::_StartThread, this), m_flag);
  }

  void _StartThread()
  {
    m_thread.reset(new boost::thread(boost::bind(&HandshakeServiceHandler::CallbackThread, this)));
  }

boost::shared_ptr<TTransport> m_trans;
CallbackServiceClient m_client;
shared_ptr<boost::thread> m_thread;
boost::once_flag m_flag;
};

class ProcessorFactoryImply : public TProcessorFactory
{
    virtual boost::shared_ptr<TProcessor> getProcessor(
        const TConnectionInfo& connInfo)
    {
        return make_shared<HandshakeServiceProcessor>(make_shared<HandshakeServiceHandler>(connInfo.transport));
    }
};


int main(int argc, char **argv) {
  int port = 5555;
  shared_ptr<TProcessorFactory> processorFactory(new ProcessorFactoryImply());
  shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
  shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
  shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
  shared_ptr<ThreadManager> threadMgr = ThreadManager::newSimpleThreadManager(30);
  boost::shared_ptr<PlatformThreadFactory> threadFactory =
      boost::shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());

  threadMgr->threadFactory(threadFactory);
  threadMgr->start();
  TThreadPoolServer server(processorFactory,serverTransport, transportFactory, protocolFactory, threadMgr);
  server.serve();
  return 0;
}

一個簡單的thrift雙向通訊就實現了。this

相關文章
相關標籤/搜索