《C# 爬蟲 破境之道》:第二境 爬蟲應用 — 第二節:以事件驅動狀態、數據處理

續上一節內容,對Web爬蟲進行進一步封裝,經過委託將爬蟲本身的狀態變化以及數據變化暴露給上層業務處理或應用程序。php

爲了方便之後的擴展,我先定義一個螞蟻抽象類(Ant),並讓WorkerAnt(工蟻)繼承自它。編程

[Code 2.2.1]app

 1 using System;
 2 
 3 public abstract class Ant
 4 {
 5     public UInt32 AntId { get; set; }
 6 
 7     public Action<Ant, JobEventArgs> OnJobStatusChanged { get; set; }
 8 
 9     protected virtual JobEventArgs NotifyStatusChanged(JobEventArgs args)
10     {
11         if (null != OnJobStatusChanged)
12             OnJobStatusChanged(args.EventAnt, args);
13         else
14             Console.WriteLine($"Worker { args.EventAnt.AntId } JobStatus: {args.Context.JobStatus}.");
15 
16         return args;
17     }
18 }
螞蟻類(Ant)

螞蟻類比較簡單,定義了一個屬性(AntId),做爲每隻小螞蟻的編號;框架

定義了一個委託(OnJobStatusChanged),當任務狀態發生變化時,用來發出狀態變化通知;其中第二個參數JobEventArgs咱們一會列出它的定義;ide

在有就是定義了一個虛方法NotifyStatusChanged,用來檢查和觸發委託事件;ui

[Code 2.2.2]this

1 using System.ComponentModel;
2 
3 public class JobEventArgs : CancelEventArgs
4 {
5     public Ant EventAnt { get; set; }
6     public JobContext Context { get; set; }
7     public String Message { get; set; }
8 }
委託參數類(JobEventArgs)

委託參數類也比較簡單,編碼

  • 定義了一個屬性(EventAnt),指示事件的觸發者,就是編程世界中頗有名氣的sender,一般是object類型,不過在咱們的爬蟲框架裏,這個事件一般是有螞蟻觸發,因此我就暫定它的類型爲螞蟻了,先把坑占上,若是之後擴展須要外部觸發的話,咱們再升級;
  • 另外一個屬性(Context)就是上節中使用的JobContext,內涵與Job相關的屬性、描述信息;
  • 還有一個屬性Message,作簡單的說明,好比失敗的緣由是什麼;

[Code 2.2.3]url

  1 using System;
  2 using System.Diagnostics;
  3 using System.IO;
  4 using System.Net;
  5 using System.Security.Cryptography.X509Certificates;
  6 using System.Threading.Tasks;
  7 
  8 /// <summary>
  9 /// 一個爬蟲的最小任務單位,一隻小工蟻。
 10 /// </summary>
 11 public class WorkerAnt : Ant
 12 {
 13     public void Work(JobContext context)
 14     {
 15         if (null == context)
 16         {
 17             context.JobStatus = TaskStatus.Faulted;
 18             NotifyStatusChanged(new JobEventArgs
 19             {
 20                 Context = context,
 21                 EventAnt = this,
 22                 Message = @"can not start a job with no context",
 23             });
 24             return;
 25         }
 26 
 27         switch ((context.Method ?? string.Empty))
 28         {
 29             case WebRequestMethods.Http.Connect:
 30             case WebRequestMethods.Http.Get:
 31             case WebRequestMethods.Http.Head:
 32             case WebRequestMethods.Http.MkCol:
 33             case WebRequestMethods.Http.Post:
 34             case WebRequestMethods.Http.Put:
 35                 break;
 36             default:
 37                 context.JobStatus = TaskStatus.Faulted;
 38                 NotifyStatusChanged(new JobEventArgs
 39                 {
 40                     Context = context,
 41                     EventAnt = this,
 42                     Message = $"can not start a job with request method <{(context.Method ?? "no method")}> is unsupported",
 43                 });
 44                 return;
 45         }
 46 
 47         if (null == context.Uri || !Uri.IsWellFormedUriString(context.Uri, UriKind.RelativeOrAbsolute))
 48         {
 49             context.JobStatus = TaskStatus.Faulted;
 50             NotifyStatusChanged(new JobEventArgs
 51             {
 52                 Context = context,
 53                 EventAnt = this,
 54                 Message = $"can not start a job with uri '{context.Uri}' is not well formed",
 55             });
 56             return;
 57         }
 58 
 59         context.JobStatus = TaskStatus.Created;
 60         if (NotifyStatusChanged(new JobEventArgs { Context = context, EventAnt = this, }).Cancel)
 61         {
 62             context.JobStatus = TaskStatus.Canceled;
 63             NotifyStatusChanged(new JobEventArgs { Context = context, EventAnt = this, });
 64             return;
 65         }
 66 
 67         /* ........... 此處省略上萬字 ......... */
 68     }
 69 
 70     private void GetResponse(JobContext context)
 71     {
 72         context.Request.BeginGetResponse(new AsyncCallback(acGetResponse =>
 73         {
 74             var contextGetResponse = acGetResponse.AsyncState as JobContext;
 75             using (contextGetResponse.Response = contextGetResponse.Request.EndGetResponse(acGetResponse))
 76             using (contextGetResponse.ResponseStream = contextGetResponse.Response.GetResponseStream())
 77             using (contextGetResponse.Memory = new MemoryStream())
 78             {
 79                 var readCount = 0;
 80                 if (null == contextGetResponse.Buffer) contextGetResponse.Buffer = new byte[512];
 81                 IAsyncResult ar = null;
 82                 do
 83                 {
 84                     if (0 < readCount)
 85                     {
 86                         contextGetResponse.Memory.Write(contextGetResponse.Buffer, 0, readCount);
 87                         contextGetResponse.JobStatus = TaskStatus.Running;
 88                         if (NotifyStatusChanged(new JobEventArgs { Context = contextGetResponse, EventAnt = this, }).Cancel)
 89                         {
 90                             contextGetResponse.JobStatus = TaskStatus.Canceled;
 91                             NotifyStatusChanged(new JobEventArgs { Context = contextGetResponse, EventAnt = this, });
 92                             break;
 93                         }
 94                     }
 95                     ar = contextGetResponse.ResponseStream.BeginRead(
 96                         contextGetResponse.Buffer, 0, contextGetResponse.Buffer.Length, null, contextGetResponse);
 97                 } while (0 < (readCount = contextGetResponse.ResponseStream.EndRead(ar))
 98                 && TaskStatus.Running == contextGetResponse.JobStatus); // 與EndRead的順序不能顛倒
 99 
100                 contextGetResponse.Request.Abort();
101                 contextGetResponse.Response.Close();
102                 contextGetResponse.Watch.Stop();
103 
104                 if (TaskStatus.Running == contextGetResponse.JobStatus)
105                 {
106                     contextGetResponse.Buffer = contextGetResponse.Memory.ToArray();
107 
108                     contextGetResponse.JobStatus = TaskStatus.RanToCompletion;
109                     NotifyStatusChanged(new JobEventArgs { Context = context, EventAnt = this, });
110                 }
111                 contextGetResponse.Buffer = null;
112             }
113         }), context);
114     }
115 }
工蟻(WorkerAnt)進行改造

工蟻類抹去了內部輸出,採用狀態變動通知方式向外界傳遞消息。spa

第15~57行,演示瞭如何處理參數異常,發出通知,並中止採集工做。

其中第27~45行,演示瞭如何驗證一個Request Method是否有效,注意,Method須要所有大寫,因此,驗證方法是區分大小寫的;

其中第47~57行,演示瞭如何驗證一個Uri是不是合法的格式;

第60~65行以及82~98,演示瞭如何處理業務邏輯返回的'Cancel'指令,並中止採集工做;

其中第87~93行,演示了在數據下載過程當中,發出狀態通知,業務邏輯層或應用層能夠藉此機會對部分數據進行編碼或更新進度條;若是下載的數據是壓縮數據,也能夠在此時進行解壓縮工做;也能夠對數據進行文件寫入操做;這也將致使在業務層或應用層將收到不止一次JobStatus = TaskStatus.Runing的消息;

第104~110行,演示瞭如何發出的任務完成通知;

[Code 2.2.4] 

 1 Console.WriteLine("/* ************** 第二境 * 第二節 * 以事件驅動狀態、數據處理 ************** */");
 2 
 3 var requestDataBuilder = new StringBuilder();
 4 requestDataBuilder.AppendLine("using System;");
 5 requestDataBuilder.AppendLine("namespace HelloWorldApplication");
 6 requestDataBuilder.AppendLine("{");
 7 requestDataBuilder.AppendLine("    class HelloWorld");
 8 requestDataBuilder.AppendLine("    {");
 9 requestDataBuilder.AppendLine("        static void Main(string[] args)");
10 requestDataBuilder.AppendLine("        {");
11 requestDataBuilder.AppendLine("            Console.WriteLine(\"《C# 爬蟲 破境之道》\");");
12 requestDataBuilder.AppendLine("        }");
13 requestDataBuilder.AppendLine("    }");
14 requestDataBuilder.AppendLine("}");
15 
16 var requestData = Encoding.UTF8.GetBytes(
17     @"code=" + System.Web.HttpUtility.UrlEncode(requestDataBuilder.ToString())
18     + @"&token=4381fe197827ec87cbac9552f14ec62a&language=10&fileext=cs");
19 
20 for (int i = 0; i < 10; i++)
21 {
22     new WorkerAnt()
23     {
24         AntId = (uint)Math.Abs(DateTime.Now.ToString("yyyyMMddHHmmssfff").GetHashCode()),
25         OnJobStatusChanged = (sender, args) =>
26         {
27             Console.WriteLine($"{args.EventAnt.AntId} said: {args.Context.JobName} entered status '{args.Context.JobStatus}'.");
28             switch (args.Context.JobStatus)
29             {
30                 case TaskStatus.Created:
31                     if (string.IsNullOrEmpty(args.Context.JobName))
32                     {
33                         Console.WriteLine($"Can not execute a job with no name.");
34                         args.Cancel = true;
35                     }
36                     else
37                         Console.WriteLine($"{args.EventAnt.AntId} said: job {args.Context.JobName} created.");
38                     break;
39                 case TaskStatus.Running:
40                     if (null != args.Context.Memory)
41                         Console.WriteLine($"{args.EventAnt.AntId} said: {args.Context.JobName} already downloaded {args.Context.Memory.Length} bytes.");
42                     break;
43                 case TaskStatus.RanToCompletion:
44                     if (null != args.Context.Buffer && 0 < args.Context.Buffer.Length)
45                     {
46                         Task.Factory.StartNew(oBuffer =>
47                         {
48                             var content = new UTF8Encoding(false).GetString((byte[])oBuffer);
49                             Console.WriteLine(content.Length > 100 ? content.Substring(0, 90) + "..." : content);
50                         }, new MemoryStream(args.Context.Buffer).ToArray(), TaskCreationOptions.LongRunning);
51                     }
52                     if (null != args.Context.Watch)
53                         Console.WriteLine("/* ********************** using {0}ms / request  ******************** */"
54                             + Environment.NewLine + Environment.NewLine, (args.Context.Watch.Elapsed.TotalMilliseconds / 100).ToString("000.00"));
55                     break;
56                 case TaskStatus.Faulted:
57                     Console.WriteLine($"{args.EventAnt.AntId} said: job {args.Context.JobName} faulted because {args.Message}.");
58                     break;
59                 case TaskStatus.WaitingToRun:
60                 case TaskStatus.WaitingForChildrenToComplete:
61                 case TaskStatus.Canceled:
62                 case TaskStatus.WaitingForActivation:
63                 default:
64                     /* Do nothing on this even. */
65                     break;
66             }
67         },
68     }.Work(new JobContext
69     {
70         JobName = "「以事件驅動狀態、數據處理」",
71         Uri = @"https://tool.runoob.com/compile.php",
72         ContentType = @"application/x-www-form-urlencoded; charset=UTF-8",
73         Method = WebRequestMethods.Http.Post,
74         Buffer = requestData,
75     });
76 }
應用層調用示例改造

對應用層的改造,主要體如今第25~67行,增長了對OnJobStatusChanged事件的處理。

其中,第30~38行,演示瞭如何在應用層或業務邏輯層,取消採集任務;

其中,第39~42行,演示瞭如何獲取當前任務的當前已下載總量,而且能夠經過context.Buffer獲取當前下載的增量;若是context.Response.ContentLength不爲-1的話,還能夠計算出已下載量的佔比;不過這裏要當心的另外一個陷阱就是HTTP 1.1 提供的Transfer-Encoding: Chunked;若是後面能碰到具體的場景,再舉慄說明,這裏先點破,不說破吧:)

其中,第43~55行,演示瞭如何獲取下載的完整數據,注意,此時的context.Buffer是context.Memory中的全部數據,而不是當前下載的增量了。本節中所說的context.Memory是指當前Job累計下載的全部數據,爲何要加一個條件「本節所說的」呢,由於MemoryStream並非無限大的,它也有極限,若是咱們用它來處理一個Html文檔或一張普通小照片還好,若是咱們用它來處理一個很大的資源(好比一部藍光電影或一個巨大的壓縮包文件),將會發生異常,在那種狀況下,咱們就要考慮去使用文件內存映射(MemoryMappedFile)或其餘技術了,暫且不在本節討論。

 

至此,一個簡單的事件處理機制就算是改造完成了。畢竟Web資源採集很重要,後面還會繼續改造升級~敬請期待~

喜歡本系列叢書的朋友,能夠點擊連接加入QQ交流羣(994761602)【C# 破境之道】
方便各位在有疑問的時候能夠及時給我個反饋。同時,也算是給各位志同道合的朋友提供一個交流的平臺。
須要源碼的童鞋,也能夠在羣文件中獲取最新源代碼。

相關文章
相關標籤/搜索