zeromq rpc原型

/*
Asynchronous request-reply single-threaded server in Python
that spawns a request handler each time a request is received
This is different from other examples because the number of request handler threads is not defined ahead of time.
Request:
Client DEALER --> Server ROUTER --> Request handler (spawned)
1. Clients send requests via a DEALER socket on port 5570
2. Server receives requests via a ROUTER socket on port 5570
3. Server passes both the request and the client identity directly to request handlers when they are spawned
Reply:
Client DEALER <-- Server ROUTER <-- Server DEALER <-- Request handler DEALER
1. Request handler returns the reply to the Server via a DEALER socket on inproc
2. Server receives the reply from the request handler via a DEALER socket on inproc
3. Server sends the reply to the client via a ROUTER socket on port 5570
4. Client receives the reply via a DEALER socket on port 5570
*/

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;

namespace NetmqSample
{
    public class ZmqClient
    {
        public void Request(string input)
        {
            var socket = new DealerSocket();
            socket.Options.Identity = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
            socket.Connect("tcp://127.0.0.1:5570");

            socket.SendFrame(input);
            Console.WriteLine($"client send: {input} : {DateTime.Now:T}");

            var answer = socket.ReceiveFrameString();
            Console.WriteLine($"client received: {answer} : {DateTime.Now:T}");

            socket.Dispose();
        }
    }

    public class ZmqServer
    {
        private DealerSocket _backend;
        private RouterSocket _frontend;

        public void Run()
        {
            _frontend = new RouterSocket();
            _frontend.Bind("tcp://*:5570");
            _frontend.ReceiveReady += Frontend_ReceiveReady;

            _backend = new DealerSocket();
            _backend.Bind("inproc://backend");
            _backend.ReceiveReady += Backend_ReceiveReady;

            var poller = new NetMQPoller { _frontend, _backend };
            poller.RunAsync();

            Console.WriteLine("server started");
        }

        private void Backend_ReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            var id = e.Socket.ReceiveFrameString();
            var msg = e.Socket.ReceiveFrameString();

            Console.WriteLine($"server backend response: {id} : {msg}");
            _frontend.SendFrame(id, true);
            _frontend.SendFrame(msg);
        }

        private void Frontend_ReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            var id = e.Socket.ReceiveFrameString();
            var msg = e.Socket.ReceiveFrameString();

            //Console.WriteLine($"server frontend received: {id} : {msg} : {DateTime.Now:T}");
            var task = new Task(() => new RequestHandler().Run(id, msg), TaskCreationOptions.LongRunning);
            task.Start();
        }
    }

    public class RequestHandler
    {
        public void Run(string id, string msg)
        {
            var worker = new DealerSocket("inproc://backend");

            // Simulate a long-running operation
            Thread.Sleep(2000);

            worker.SendFrame(id, true);
            worker.SendFrame(msg + " : " + DateTime.Now.ToLongTimeString());
            worker.Dispose(); 
        }
    }
}

 

    class Program
    {
        static void Main(string[] args)
        {
            var server = new ZmqServer();
            server.Run();

            Enumerable.Range(0, 2000).ToList().ForEach(x =>
            {
                Task.Factory.StartNew(() => new ZmqClient().Request(x.ToString("0000")), TaskCreationOptions.LongRunning);
            });

            Console.ReadLine();
        }
    }
相關文章
相關標籤/搜索