Tigase組件第三節 – 多線程

Tigase組件第三節 – 多線程

發表評論做者  儲天行 on  2010/11/16

本文翻譯自 – http://www.tigase.org/content/component-implementation-lesson-3-multi-threading java

擁有多個CPU或多核CPU的電腦已經很是普及了。你也許會把像XMPP服務這樣的應用部署到多核或多CPU的電腦上。可是如今你的新組件還只能在單一線程裏面處理全部的packet。 算法

若是packet的處理是複雜的運算(好比垃圾信息過濾),那麼後果可能很嚴重:一個CPU負載爲100%,而另外的幾顆CPU卻還在閒置。你也許很是但願全部CPU或核心都可以分攤負載,共同參與到工做當中。 數據庫

Tigase API提供一種很是方便的方式讓組件的processPacket(Packet packet)在多個線程中運行。int processingThreads()方法返回組件被分配到幾個線程當中。缺省狀況下,返回值爲「1」,這是由於並非全部的組件都可以容許多線程併發處理packet。你能夠經過覆寫(overwrite)processThreads方法來指定processPacket能夠在幾個線程裏面併發執行。 服務器

若是packet的處理只是簡單的CPU運算,那麼你也許會想要儘量多的線程來壓榨CPU的潛力: 網絡

1
2
3
4
@Override
publicintprocessingThreads() {
  returnRuntime.getRuntime().availableProcessors();
}

若是處理中須要使用IO(網絡或者是數據庫),那麼在添加線程數的時候可能須要考慮更多。很難準確的定義到底須要多少個線程,只能經過一些測試來獲取經驗值。 多線程

如今新開發的組件已經有足夠多的線程了。但還有個小問題,在多數狀況下多個packet的處理順序是不能顛倒的。若是組件的processPacket(…)方法由多個線程併發處理,頗有可能會出現一個後發送的消息取代了先前發送的消息首先到達目的地,特別是在第一個packet很大而第二個很小的時候。咱們能夠經過調整「負責packet在線程中作分發的那個方法」來避免這種狀況發生。 併發

包的分發算法很簡單: less

1
intthread_idx = hashCodeForPacket(packet) % threads_total;

因此調整的關鍵在於hashCodeForPacket(…)方法。經過覆寫這個方法能夠保證全部發往同一個用戶的packet都是由同一個線程來進行處理的: ide

1
2
3
4
5
6
7
8
9
10
11
12
@Override
publicinthashCodeForPacket(Packet packet) {
  if(packet.getElemTo() !=null) {
    returnpacket.getElemTo().hashCode();
  }
  // 程序不該該運行到這裏,全部的packet都必須具備一個目的地地址,可是也許垃圾過濾器也許會過濾一些奇怪的地址
  if(packet.getElemFrom() !=null) {
    returnpacket.getElemFrom().hashCode();
  }
  // 若是程序真的運行到這一部,就應該好好檢查一下到達組件的packet是否正常,而後找到一個更好的計算hashCode方法。
  return1;
}

剛剛提到的兩個方法一個能夠控制組件處理packet的線程數,另外一個能夠控制packet分發到線程的邏輯。它們還不是Tigase API在多線程處理時提供的所有方法。 wordpress

有時還會用到週期性任務。你固然能夠建立一個Timer實例,並使用TimerTasks來加載它;但當不少相似的工做都須要在各個組件當中執行的時候,你會須要更多額外的資源來支撐這麼多個TimerTask。Tigase提供了一些容許你重用Timer對象來執行各類任務的方法。

首先,你有三個能夠執行週期性任務的方法:

1
2
3
publicsynchronizedvoideverySecond();
publicsynchronizedvoideveryMinute();
publicsynchronizedvoideveryHour();

一個「週期性向某些特定地址發送通知」的功能能夠這樣來實現:

1
2
3
4
5
6
7
8
9
10
11
@Override
publicsynchronizedvoideveryMinute() {
  super.everyMinute();
  if((++delayCounter) >= notificationFrequency) {
    addOutPacket(Packet.getMessage(abuseAddress, getComponentId(),
      StanzaType.chat,"Detected spam messages: "+ spamCounter,
      "Spam counter",null, newPacketId("spam-")));
    delayCounter =0;
    spamCounter =0;
  }
}

這個方法每分鐘(也就是notificationFrequency)向「abuseAddress」發送一個通知,內容爲上個週期垃圾信息過濾器攔截了多少條消息。須要注意的是:你必定須要調用super.everyMinute()方法,確保父類的語句也可以被執行,同時也要讓本身添加的語句儘量高效率快速得完成,尤爲是在複寫的是everySecond()方法的時候。

還有兩個能夠安排在特定的時間執行任務的方法,它們很是相似於java.util.Timer API,區別在於Timer能夠在類/父類/爺爺類等各個層級被重用。每個類實例都有一個獨立的Timer,這樣可以避免組件間的干擾:

1
2
addTimerTask(TimerTask task,longdelay, TimeUnit unit);
addTimerTask(TimerTask task,longdelay);

還有一個和多線程沒有直接聯繫,可是很是有用,能夠在某一個特定時間點執行任務的方法。這個時間點就是在服務器完成初始化的時候,這個時候全部的組件都已經被建立,並接收到它們各自的配置項。這時Tigase服務器會調用每個已加載組件的void initializationComplete()方法,你能夠經過覆寫這個方法來執行一些任務。

下面的代碼使用到了上面提到的全部API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
importjava.util.Arrays;
importjava.util.Map;
importjava.util.logging.Logger;
importtigase.server.AbstractMessageReceiver;
importtigase.server.Packet;
importtigase.util.JIDUtils;
importtigase.xmpp.StanzaType;
 
publicclassTestComponentextendsAbstractMessageReceiver {
 
  privatestaticfinalLogger log =
    Logger.getLogger(TestComponent.class.getName());
 
  privatestaticfinalString BAD_WORDS_KEY ="bad-words";
  privatestaticfinalString WHITELIST_KEY ="white-list";
  privatestaticfinalString PREPEND_TEXT_KEY ="log-prepend";
  privatestaticfinalString SECURE_LOGGING_KEY ="secure-logging";
  privatestaticfinalString ABUSE_ADDRESS_KEY ="abuse-address";
  privatestaticfinalString NOTIFICATION_FREQ_KEY ="notification-freq";
 
  privateString[] badWords = {"word1","word2","word3"};
  privateString[] whiteList = {"admin@localhost"};
  privateString prependText ="Spam detected: ";
  privateString abuseAddress ="abuse@locahost";
  privateintnotificationFrequency =10;
  privateintdelayCounter =0;
  privatebooleansecureLogging =false;
  privatelongspamCounter =0;
 
  @Override
  publicvoidprocessPacket(Packet packet) {
    // 這是一個message packet嗎?
    if("message"== packet.getElemName()) {
      String from = JIDUtils.getNodeID(packet.getElemFrom());
      // 消息的發送者在白名單內嗎?
      if(Arrays.binarySearch(whiteList, from) <0) {
        // 若是ta不在白名單裏面,那麼檢查消息的內容
        String body = packet.getElemCData("/message/body");
        if(body !=null&& !body.isEmpty()) {
          body = body.toLowerCase();
          for(String word : badWords) {
            if(body.contains(word)) {
              log.finest(prependText + packet.toString(secureLogging));
              ++spamCounter;
              return;
            }
          }
        }
      }
    }
    // 不是垃圾信息,返回以便作下一步處理
    Packet result = packet.swapFromTo();
    addOutPacket(result);
  }
 
  @Override
  publicintprocessingThreads() {
    returnRuntime.getRuntime().availableProcessors();
  }
 
  @Override
  publicinthashCodeForPacket(Packet packet) {
    if(packet.getElemTo() !=null) {
      returnpacket.getElemTo().hashCode();
    }
    // 程序不該該運行到這裏,全部的packet都必須具備一個目的地地址,可是也許垃圾過濾器也許會過濾一些奇怪的地址
    if(packet.getElemFrom() !=null) {
      returnpacket.getElemFrom().hashCode();
    }
    // 若是程序真的運行到這一部,就應該好好檢查一下到達組件的packet是否正常,而後找到一個更好的計算hashCode方法。
    return1;
  }
 
  @Override
  publicMap<String, Object> getDefaults(Map<String, Object> params) {
    Map<String, Object> defs =super.getDefaults(params);
    defs.put(BAD_WORDS_KEY, badWords);
    defs.put(WHITELIST_KEY, whiteList);
    defs.put(PREPEND_TEXT_KEY, prependText);
    defs.put(SECURE_LOGGING_KEY, secureLogging);
    defs.put(ABUSE_ADDRESS_KEY, abuseAddress);
    defs.put(NOTIFICATION_FREQ_KEY, notificationFrequency);
    returndefs;
  }
 
  @Override
  publicvoidsetProperties(Map<String, Object> props) {
    super.setProperties(props);
    badWords = (String[])props.get(BAD_WORDS_KEY);
    whiteList = (String[])props.get(WHITELIST_KEY);
    Arrays.sort(whiteList);
    prependText = (String)props.get(PREPEND_TEXT_KEY);
    secureLogging = (Boolean)props.get(SECURE_LOGGING_KEY);
    abuseAddress = (String)props.get(ABUSE_ADDRESS_KEY);
    notificationFrequency = (Integer)props.get(NOTIFICATION_FREQ_KEY);
  }
 
  @Override
  publicsynchronizedvoideveryMinute() {
    super.everyMinute();
    if((++delayCounter) >= notificationFrequency) {
      addOutPacket(Packet.getMessage(abuseAddress, getComponentId(),
        StanzaType.chat,"Detected spam messages: "+ spamCounter,
        "Spam counter",null, newPacketId("spam-")));
      delayCounter =0;
      spamCounter =0;
    }
  }
 
}
相關文章
相關標籤/搜索