推拉模式,也叫 管道模式」Parallel Pipeline」。想象一下這樣的場景,若是須要統計各個機器的日誌,咱們須要將統計任務分發到各個節點機器上,最後收集統計結果,作一個彙總。PipeLine比較適合於這種場景,他的結構圖,如圖1所示
圖1 官方圖html
Ventilator,在管道中生產任務;
Worker ,處理任務;
Sink,收集Worker處理的結果。git
下面有三個對象Ventilator 消息分發者,Worker 消息處理者,Sink 接受Worker處理消息後返回的結果,耗時的計算處理工做是交給Worker的,若是開多個Worker.exe,能夠提高處理速度,Worker的最終目的是分佈式計算,部署到多臺PC上面,把計算工做交給他們去作(在分佈式爬蟲上面,每一個Worker至關於一個爬蟲)。
下面案例結構,如圖2所示:github
圖2負載均衡
static void Main(string[] args) { // Task Ventilator // Binds PUSH socket to tcp://localhost:5557 // Sends batch of tasks to workers via that socket Console.WriteLine("====== VENTILATOR ======"); //socket to send messages on using (NetMQSocket sender = new DealerSocket()) { sender.Bind("tcp://*:5557"); using (var sink = new DealerSocket()) { sink.Connect("tcp://localhost:5558"); Console.WriteLine("Press enter when worker are ready"); Console.ReadLine(); //the first message it "0" and signals start of batch //see the Sink.csproj Program.cs file for where this is used Console.WriteLine("Sending start of batch to Sink"); sink.SendFrame("0"); Console.WriteLine("Sending tasks to workers"); //initialise random number generator Random rand = new Random(0); //expected costs in Ms int totalMs = 0; //send 100 tasks (workload for tasks, is just some random sleep time that //the workers can perform, in real life each work would do more than sleep for (int taskNumber = 0; taskNumber < 100; taskNumber++) { //Random workload from 1 to 100 msec int workload = rand.Next(0, 100); totalMs += workload; Console.WriteLine("Workload : {0}", workload); sender.SendFrame(workload.ToString()); } Console.WriteLine("Total expected cost : {0} msec", totalMs); Console.WriteLine("Press Enter to quit"); Console.ReadLine(); } } }
static void Main(string[] args) { // Task Worker // Connects PULL socket to tcp://localhost:5557 // collects workload for socket from Ventilator via that socket // Connects PUSH socket to tcp://localhost:5558 // Sends results to Sink via that socket Console.WriteLine("====== WORKER ======"); //socket to receive messages on using (var receiver = new DealerSocket()) { receiver.Connect("tcp://localhost:5557"); //socket to send messages on using (var sender = new DealerSocket()) { sender.Connect("tcp://localhost:5558"); //process tasks forever while (true) { //workload from the vetilator is a simple delay //to simulate some work being done, see //Ventilator.csproj Proram.cs for the workload sent //In real life some more meaningful work would be done string workload = receiver.ReceiveString(); //simulate some work being done Thread.Sleep(int.Parse(workload)); //send results to sink, sink just needs to know worker //is done, message content is not important, just the precence of //a message means worker is done. //See Sink.csproj Proram.cs Console.WriteLine("Sending to Sink"); sender.SendFrame(string.Empty); } } } }
static void Main(string[] args) { // Task Sink // Bindd PULL socket to tcp://localhost:5558 // Collects results from workers via that socket Console.WriteLine("====== SINK ======"); //socket to receive messages on using (var receiver = new DealerSocket()) { receiver.Bind("tcp://localhost:5558"); //wait for start of batch (see Ventilator.csproj Program.cs) var startOfBatchTrigger = receiver.ReceiveString(); Console.WriteLine("Seen start of batch"); //Start our clock now Stopwatch watch = new Stopwatch(); watch.Start(); for (int taskNumber = 0; taskNumber < 100; taskNumber++) { var workerDoneTrigger = receiver.ReceiveString(); if (taskNumber % 10 == 0) { Console.Write(":"); } else { Console.Write("."); } } watch.Stop(); //Calculate and report duration of batch Console.WriteLine(); Console.WriteLine("Total elapsed time {0} msec", watch.ElapsedMilliseconds); Console.ReadLine(); } }
處理一個Ventilator任務,能夠使用數量不一樣的worker:dom
一個worker:
在我本地計算機上,耗時 5566 mesc
socket
二個worker:
在我本地計算機上,耗時2917 mesc
tcp
三個worker:
在我本地計算機上,耗時2031 msec
分佈式