續上一節內容,對Web爬蟲進行進一步封裝,經過委託將爬蟲本身的狀態變化以及數據變化暴露給上層業務處理或應用程序。php
爲了方便之後的擴展,我先定義一個螞蟻抽象類(Ant),並讓WorkerAnt(工蟻)繼承自它。編程
[Code 2.2.1]app
1 using System; 2 3 public abstract class Ant 4 { 5 public UInt32 AntId { get; set; } 6 7 public Action<Ant, JobEventArgs> OnJobStatusChanged { get; set; } 8 9 protected virtual JobEventArgs NotifyStatusChanged(JobEventArgs args) 10 { 11 if (null != OnJobStatusChanged) 12 OnJobStatusChanged(args.EventAnt, args); 13 else 14 Console.WriteLine($"Worker { args.EventAnt.AntId } JobStatus: {args.Context.JobStatus}."); 15 16 return args; 17 } 18 }
螞蟻類比較簡單,定義了一個屬性(AntId),做爲每隻小螞蟻的編號;框架
定義了一個委託(OnJobStatusChanged),當任務狀態發生變化時,用來發出狀態變化通知;其中第二個參數JobEventArgs咱們一會列出它的定義;ide
在有就是定義了一個虛方法NotifyStatusChanged,用來檢查和觸發委託事件;ui
[Code 2.2.2]this
1 using System.ComponentModel; 2 3 public class JobEventArgs : CancelEventArgs 4 { 5 public Ant EventAnt { get; set; } 6 public JobContext Context { get; set; } 7 public String Message { get; set; } 8 }
委託參數類也比較簡單,編碼
[Code 2.2.3]url
1 using System; 2 using System.Diagnostics; 3 using System.IO; 4 using System.Net; 5 using System.Security.Cryptography.X509Certificates; 6 using System.Threading.Tasks; 7 8 /// <summary> 9 /// 一個爬蟲的最小任務單位,一隻小工蟻。 10 /// </summary> 11 public class WorkerAnt : Ant 12 { 13 public void Work(JobContext context) 14 { 15 if (null == context) 16 { 17 context.JobStatus = TaskStatus.Faulted; 18 NotifyStatusChanged(new JobEventArgs 19 { 20 Context = context, 21 EventAnt = this, 22 Message = @"can not start a job with no context", 23 }); 24 return; 25 } 26 27 switch ((context.Method ?? string.Empty)) 28 { 29 case WebRequestMethods.Http.Connect: 30 case WebRequestMethods.Http.Get: 31 case WebRequestMethods.Http.Head: 32 case WebRequestMethods.Http.MkCol: 33 case WebRequestMethods.Http.Post: 34 case WebRequestMethods.Http.Put: 35 break; 36 default: 37 context.JobStatus = TaskStatus.Faulted; 38 NotifyStatusChanged(new JobEventArgs 39 { 40 Context = context, 41 EventAnt = this, 42 Message = $"can not start a job with request method <{(context.Method ?? "no method")}> is unsupported", 43 }); 44 return; 45 } 46 47 if (null == context.Uri || !Uri.IsWellFormedUriString(context.Uri, UriKind.RelativeOrAbsolute)) 48 { 49 context.JobStatus = TaskStatus.Faulted; 50 NotifyStatusChanged(new JobEventArgs 51 { 52 Context = context, 53 EventAnt = this, 54 Message = $"can not start a job with uri '{context.Uri}' is not well formed", 55 }); 56 return; 57 } 58 59 context.JobStatus = TaskStatus.Created; 60 if (NotifyStatusChanged(new JobEventArgs { Context = context, EventAnt = this, }).Cancel) 61 { 62 context.JobStatus = TaskStatus.Canceled; 63 NotifyStatusChanged(new JobEventArgs { Context = context, EventAnt = this, }); 64 return; 65 } 66 67 /* ........... 此處省略上萬字 ......... */ 68 } 69 70 private void GetResponse(JobContext context) 71 { 72 context.Request.BeginGetResponse(new AsyncCallback(acGetResponse => 73 { 74 var contextGetResponse = acGetResponse.AsyncState as JobContext; 75 using (contextGetResponse.Response = contextGetResponse.Request.EndGetResponse(acGetResponse)) 76 using (contextGetResponse.ResponseStream = contextGetResponse.Response.GetResponseStream()) 77 using (contextGetResponse.Memory = new MemoryStream()) 78 { 79 var readCount = 0; 80 if (null == contextGetResponse.Buffer) contextGetResponse.Buffer = new byte[512]; 81 IAsyncResult ar = null; 82 do 83 { 84 if (0 < readCount) 85 { 86 contextGetResponse.Memory.Write(contextGetResponse.Buffer, 0, readCount); 87 contextGetResponse.JobStatus = TaskStatus.Running; 88 if (NotifyStatusChanged(new JobEventArgs { Context = contextGetResponse, EventAnt = this, }).Cancel) 89 { 90 contextGetResponse.JobStatus = TaskStatus.Canceled; 91 NotifyStatusChanged(new JobEventArgs { Context = contextGetResponse, EventAnt = this, }); 92 break; 93 } 94 } 95 ar = contextGetResponse.ResponseStream.BeginRead( 96 contextGetResponse.Buffer, 0, contextGetResponse.Buffer.Length, null, contextGetResponse); 97 } while (0 < (readCount = contextGetResponse.ResponseStream.EndRead(ar)) 98 && TaskStatus.Running == contextGetResponse.JobStatus); // 與EndRead的順序不能顛倒 99 100 contextGetResponse.Request.Abort(); 101 contextGetResponse.Response.Close(); 102 contextGetResponse.Watch.Stop(); 103 104 if (TaskStatus.Running == contextGetResponse.JobStatus) 105 { 106 contextGetResponse.Buffer = contextGetResponse.Memory.ToArray(); 107 108 contextGetResponse.JobStatus = TaskStatus.RanToCompletion; 109 NotifyStatusChanged(new JobEventArgs { Context = context, EventAnt = this, }); 110 } 111 contextGetResponse.Buffer = null; 112 } 113 }), context); 114 } 115 }
工蟻類抹去了內部輸出,採用狀態變動通知方式向外界傳遞消息。spa
第15~57行,演示瞭如何處理參數異常,發出通知,並中止採集工做。
其中第27~45行,演示瞭如何驗證一個Request Method是否有效,注意,Method須要所有大寫,因此,驗證方法是區分大小寫的;
其中第47~57行,演示瞭如何驗證一個Uri是不是合法的格式;
第60~65行以及82~98,演示瞭如何處理業務邏輯返回的'Cancel'指令,並中止採集工做;
其中第87~93行,演示了在數據下載過程當中,發出狀態通知,業務邏輯層或應用層能夠藉此機會對部分數據進行編碼或更新進度條;若是下載的數據是壓縮數據,也能夠在此時進行解壓縮工做;也能夠對數據進行文件寫入操做;這也將致使在業務層或應用層將收到不止一次JobStatus = TaskStatus.Runing的消息;
第104~110行,演示瞭如何發出的任務完成通知;
[Code 2.2.4]
1 Console.WriteLine("/* ************** 第二境 * 第二節 * 以事件驅動狀態、數據處理 ************** */"); 2 3 var requestDataBuilder = new StringBuilder(); 4 requestDataBuilder.AppendLine("using System;"); 5 requestDataBuilder.AppendLine("namespace HelloWorldApplication"); 6 requestDataBuilder.AppendLine("{"); 7 requestDataBuilder.AppendLine(" class HelloWorld"); 8 requestDataBuilder.AppendLine(" {"); 9 requestDataBuilder.AppendLine(" static void Main(string[] args)"); 10 requestDataBuilder.AppendLine(" {"); 11 requestDataBuilder.AppendLine(" Console.WriteLine(\"《C# 爬蟲 破境之道》\");"); 12 requestDataBuilder.AppendLine(" }"); 13 requestDataBuilder.AppendLine(" }"); 14 requestDataBuilder.AppendLine("}"); 15 16 var requestData = Encoding.UTF8.GetBytes( 17 @"code=" + System.Web.HttpUtility.UrlEncode(requestDataBuilder.ToString()) 18 + @"&token=4381fe197827ec87cbac9552f14ec62a&language=10&fileext=cs"); 19 20 for (int i = 0; i < 10; i++) 21 { 22 new WorkerAnt() 23 { 24 AntId = (uint)Math.Abs(DateTime.Now.ToString("yyyyMMddHHmmssfff").GetHashCode()), 25 OnJobStatusChanged = (sender, args) => 26 { 27 Console.WriteLine($"{args.EventAnt.AntId} said: {args.Context.JobName} entered status '{args.Context.JobStatus}'."); 28 switch (args.Context.JobStatus) 29 { 30 case TaskStatus.Created: 31 if (string.IsNullOrEmpty(args.Context.JobName)) 32 { 33 Console.WriteLine($"Can not execute a job with no name."); 34 args.Cancel = true; 35 } 36 else 37 Console.WriteLine($"{args.EventAnt.AntId} said: job {args.Context.JobName} created."); 38 break; 39 case TaskStatus.Running: 40 if (null != args.Context.Memory) 41 Console.WriteLine($"{args.EventAnt.AntId} said: {args.Context.JobName} already downloaded {args.Context.Memory.Length} bytes."); 42 break; 43 case TaskStatus.RanToCompletion: 44 if (null != args.Context.Buffer && 0 < args.Context.Buffer.Length) 45 { 46 Task.Factory.StartNew(oBuffer => 47 { 48 var content = new UTF8Encoding(false).GetString((byte[])oBuffer); 49 Console.WriteLine(content.Length > 100 ? content.Substring(0, 90) + "..." : content); 50 }, new MemoryStream(args.Context.Buffer).ToArray(), TaskCreationOptions.LongRunning); 51 } 52 if (null != args.Context.Watch) 53 Console.WriteLine("/* ********************** using {0}ms / request ******************** */" 54 + Environment.NewLine + Environment.NewLine, (args.Context.Watch.Elapsed.TotalMilliseconds / 100).ToString("000.00")); 55 break; 56 case TaskStatus.Faulted: 57 Console.WriteLine($"{args.EventAnt.AntId} said: job {args.Context.JobName} faulted because {args.Message}."); 58 break; 59 case TaskStatus.WaitingToRun: 60 case TaskStatus.WaitingForChildrenToComplete: 61 case TaskStatus.Canceled: 62 case TaskStatus.WaitingForActivation: 63 default: 64 /* Do nothing on this even. */ 65 break; 66 } 67 }, 68 }.Work(new JobContext 69 { 70 JobName = "「以事件驅動狀態、數據處理」", 71 Uri = @"https://tool.runoob.com/compile.php", 72 ContentType = @"application/x-www-form-urlencoded; charset=UTF-8", 73 Method = WebRequestMethods.Http.Post, 74 Buffer = requestData, 75 }); 76 }
對應用層的改造,主要體如今第25~67行,增長了對OnJobStatusChanged事件的處理。
其中,第30~38行,演示瞭如何在應用層或業務邏輯層,取消採集任務;
其中,第39~42行,演示瞭如何獲取當前任務的當前已下載總量,而且能夠經過context.Buffer獲取當前下載的增量;若是context.Response.ContentLength不爲-1的話,還能夠計算出已下載量的佔比;不過這裏要當心的另外一個陷阱就是HTTP 1.1 提供的Transfer-Encoding: Chunked;若是後面能碰到具體的場景,再舉慄說明,這裏先點破,不說破吧:)
其中,第43~55行,演示瞭如何獲取下載的完整數據,注意,此時的context.Buffer是context.Memory中的全部數據,而不是當前下載的增量了。本節中所說的context.Memory是指當前Job累計下載的全部數據,爲何要加一個條件「本節所說的」呢,由於MemoryStream並非無限大的,它也有極限,若是咱們用它來處理一個Html文檔或一張普通小照片還好,若是咱們用它來處理一個很大的資源(好比一部藍光電影或一個巨大的壓縮包文件),將會發生異常,在那種狀況下,咱們就要考慮去使用文件內存映射(MemoryMappedFile)或其餘技術了,暫且不在本節討論。
至此,一個簡單的事件處理機制就算是改造完成了。畢竟Web資源採集很重要,後面還會繼續改造升級~敬請期待~
喜歡本系列叢書的朋友,能夠點擊連接加入QQ交流羣(994761602)【C# 破境之道】
方便各位在有疑問的時候能夠及時給我個反饋。同時,也算是給各位志同道合的朋友提供一個交流的平臺。
須要源碼的童鞋,也能夠在羣文件中獲取最新源代碼。