mosquitto在Linux環境下的部署/安裝/使用/測試

 mosquitto在Linux環境下的部署

看了有三四天的的源碼,(固然沒怎麼好好看了),忽然發現對mosquitto的源碼有了一點點感受,因而在第五天決定在Linux環境下部署mosquitto。

使用傳統源碼安裝步驟:
步驟1:http://mosquitto.org/files/source/官網下載源碼,放到Linux環境中。解壓後,找到主要配置文件config.mk,其中包含mosquitto的安裝選項,須要注意的是,默認狀況下mosquitto的安裝須要OpenSSL(一個強大的安全套接字層密碼庫)的支持,若不須要SSL,則須要關閉config.mk裏面與SSL功能有關的選項(WITH_TLS、WITH_TLS_PSK)。筆者這裏直接將這兩句話屏蔽掉了。



步驟2 :配置完畢後,輸入「make install」進行安裝(須要root權限),這裏編譯失敗出現了一個問題:
error while loading shared libraries:libmosquitto.so.1 : cannot open shared object file: No such file or directory

因此問題很清楚,沒有找到這個動態連接庫。遇到這種問題就有兩種狀況:
1).確實沒有這個庫或者庫的版本不對  。    2)這個庫所在的路徑不在系統查找範圍內。

筆者感受這個庫名字很眼熟,果真在「make install」命令執行的打印信息中發現蛛絲馬跡:
「install -s --strip-program=strip libmosquitto.so.1 /usr/local/lib/libmosquitto.so.1」
筆者在這個路徑下,找到了該動態庫,說明如今的問題應該是屬於第二種狀況(並且是官方的代碼,也不該該會犯第一種問題),因而在網上找到了解決方案。

1) 若是共享庫文件安裝到了/lib或/usr/lib目錄下, 那麼需執行一下ldconfig命令
       ldconfig命令的用途, 主要是在默認搜尋目錄(/lib和/usr/lib)以及動態庫配置文件/etc/ld.so.conf內所列的目錄下, 搜索出可共享的動態連接庫(格式如lib*.so*), 進而建立出動態裝入程序(ld.so)所需的鏈接和緩存文件. 緩存文件默認爲/etc/ld.so.cache, 此文件保存已排好序的動態連接庫名字列表.
2) 若是共享庫文件安裝到了/usr/local/lib(不少開源的共享庫都會安裝到該目錄下)或其它"非/lib或/usr/lib"目錄下, 那麼在執行ldconfig命令前, 還要把新共享庫目錄加入到共享庫配置文件/etc/ld.so.conf中, 以下:(須要root權限執行下面命令)
# cat /etc/ld.so.conf
include ld.so.conf.d/*.conf
# echo "/usr/local/lib" >> /etc/ld.so.conf
# ldconfig

(詳情參閱http://blog.chinaunix.net/uid-26212859-id-3256667.html)
這裏筆者就是使用第二種狀況的辦法,成功完成編譯。
完成後會在系統命令行裏發現mosquitto、mosquitto_sub、mosquitto_pub三個工具(網上說有四個,還有一個mosquitto_passwd,用於管理密碼,應該是關閉SSL的緣由),分別用於啓動代理、訂閱消息和發佈消息。
 
 


 測試
步驟1:開啓一個終端:輸入「mosquitto」命令,結果以下圖,服務啓動,由於一直監聽,因此不會看到命令行。
                                                                   正常狀況

輸入「mosquitto」若是以下圖報錯,發現報錯是地址已被使用,可使用 "ps -e "查看進程和「netstat -apn | grep :1883」來查看誰佔用端口,可以使用「kill -s 9 pid號」殺死該進程,而後從新輸入「mosquitto」命令便可獲得上圖正確結果。
                                                                  報錯



步驟2:開啓第二個終端,輸入「mosquitto_sub -t 主題名 -i 用戶名」, (後面的「-i 用戶名可省略」)
例如:mosquitto_sub -t mqtt        結果如圖,因爲一直監聽,因此也不會看到命令行。
                          發佈消息前第二個終端截圖

步驟3:開啓第三個終端,輸入「mosquitto_pub -t 主題名 -i 用戶名 -m 要發送的消息」
(若是要發送的消息中有空格,需用引號括起來)
例如:mosquitto_pub -h localhost -t mqtt -m "hello world"
則第二個終端能夠收到這條信息。筆者看到其命令行有文件傳輸,又嘗試傳一個文件(內容只有一句話),第二個終端會直接顯示文件的內容(截圖中「hello World」下面的那句話就是)。嘗試一個大文件的傳輸,將一個7M的書傳過去,首先是能夠傳,可是第二個窗口顯示的全是亂碼,傳輸的速度也是一個問題。

發佈消息後的第二個終端截圖 這裏之因此會想到傳文件是由於看到mosquitto_pub的命令參數中有關於把文件當作message傳輸的記錄,如圖: 這裏的文件上限默認是256M。邏輯中有對文件大小的判斷,超過256M的文件則不傳。不知道這裏若是吧這個值修改更大,會不會產生影響,筆者沒有嘗試,由於傳7M的文件都感受很慢。(這個問題在MQTT協議介紹中能夠獲得答案,MQTT文件長度的表示是用1至4個字節來表示,而其表示長度的方式又有特殊的加密方式,按照這種方式,其最大表示的長度爲256M) 測試總結 三個終端,一個用來開啓服務,一個執行mosquitto_sub來訂閱消息,與服務器保持長鏈接,隨時接收來自服務器推送的消息,最後一個終端則用來發布消息。這個測試的結果如今是正確的,但仍存在侷限性,還有如下幾個問題須要注意: 1)瞭解mosquitto_sub和mosquitto_pub命令背後是如何執行的,須要修改,訂閱端的處理確定不能僅僅是顯示內容 到標準輸出上。 2)瞭解mosquitto命令的邏輯,這裏包含的內容不少,估計也是最難的。 3)這裏的實驗是在本地傳輸,須要作一個客戶端出來(客戶端多是Android端或者MCU端),看是否能夠正常傳輸,還有就是能傳多大的數據,容許同時連入的客戶數有多少(聽說是20000以上)。

 

 mosquito monitor idea
check process mosquitto with pidfile /var/run/mosquitto.pid
start = "/etc/init.d/mosquitto start"
stop = "/etc/init.d/mosquitto stop"

mosquito 是一個MQTT 服務器。目前只在公司移動互聯網服務器上環境部署過,用的比較少,show 一下安裝過程:

wget http://mosquitto.org/files/source/mosquitto-1.0.3.tar.gz
tar -zxvf mosquitto-1.0.3.tar.gz
cd mosquitto-1.0.3
make WITH_TLS_PSK=no  (遇到undefined reference to `SSL_CTX_set_psk_server_callback' ,添加參數WITH_TLS_PSK=no 禁用SSL PSK 支持)
make install prefix=/home/mosquitto
mkdir /home/mosquitto/etc
mv /etc/mosquitto/*  /home/mosquitto/etc/
strip /home/mosquitto/bin/*
strip /homo/mosquitto/sbin/*

strip /homo/mosquitto/lib/*

echo "/home/mosquitto/lib/" >> /etc/ld.so.conf
ldconfig -f /etc/ld.so.conf

修改 /homo/mosquitto/etc/mosquitto.conf 用戶:
user nobody
(其它參數目前使用默認值)

啓動服務(若有服務相關錯誤,檢查/home/mosquitto/mosquitto.log,端口默認1883)

/home/mosquitto/sbin/mosquitto  -d -c /home/mosquitto/etc/mosquitto.conf > /home/mosquitto/mosquitto.log 2>&1


終端測試:
客戶端
mosquitto_sub -h SERVERIP -t test
服務器端執行後
/home/mosquitto/bin/mosquitto_pub -t test -m "123456"
客戶端會成功收到"123456"

查看Mqtt訂閱者運行情況
mosquitto_sub -v -t \$SYS/#
或者細化爲其中一個命令
mosquitto_sub -v -t ‘$SYS/broker/clients/active’
monitor statitics script 
100萬併發鏈接之筆記
epoll詳解
select,poll和epoll區別
高併發的epoll多線程實例
mosquitto的pthread沒法使用
C1000鏈接
mosquitto源代碼分析
Java NIO 單線程服務器
lighthttp vs apache vs ngix
Java heap space error summary
mqtt benchmark can not create native thread
mosquitto 1.2.1代碼優化
mosquitto 100k測試場景
Mosquitto pub/sub代碼分析
優化:
對於後續的提升優化的地方,簡單記錄幾點:

發送數據用writev
poll -> epoll ,用以支持更高的冰法;
改成單線程版本,下降鎖開銷,目前鎖開銷仍是很是大的。目測能夠改成單進程版本,相似redis,精心維護的話應該能達到不錯的效果;
網絡數據讀寫使用一次儘可能多讀的方式,避免屢次進入系統調用;
內存操做優化。不free,留着下次用;
考慮使用spwan-fcgi的形式或者內置一次啓動多個實例監聽同一個端口。這樣能更好的發揮機器性能,達到更高的性能;


(MaxProcessMemory - JVMMemory - ReservedOsMemory) / (ThreadStackSize) = Number of threads
MaxProcessMemory 指的是一個進程的最大內存
JVMMemory         JVM內存
ReservedOsMemory  保留的操做系統內存
ThreadStackSize      線程棧的大小

在java語言裏, 當你建立一個線程的時候,虛擬機會在JVM內存建立一個Thread對象同時建立一個操做系統線程,而這個系統線程的內存用的不是JVMMemory,而是系統中剩下的內存(MaxProcessMemory - JVMMemory - ReservedOsMemory)。 

一哥們發送線程數和xss以及max user process 有關
epoll + 多線程實現併發網絡鏈接處理
Mosquitto持久層羣推消息實現思路

JAVA使用EPoll來進行NIO處理的方法
JDK 6.0 以及JDK 5.0 update 9 的 nio支持epoll (僅限 Linux 系統 ),對併發idle connection會有大幅度的性能提高,這就是不少網絡服務器應用程序須要的。

啓用的方法以下:

-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider

    單進程FD上限是最大能夠打開文件的數目,

這個數字通常遠大於2048,舉個例子,在1GB內存的機器上大約是10萬左右,具體數目能夠cat /proc/sys/fs/file-max

 


mosquitto -c /etc/mosquitto/mosquitto.conf -d」便可開啓服務

壓力測試: 系統參數修改
ulimit -u 12000 (max processes)
ulimit -n 12000 (max files)

測試最大鏈接數

#!/bin/bash
c=1
while [ $c -le 18000 ]
do
mosquitto_sub -d -t hello/world -k 900 &
 (( c++ ))
done

# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}' 

kill -9 `ps -ef|grep mosquitto|awk '{print $2}'` 
ps -ef|grep report |grep -v grep|awk '{print $2}' |xargs kill -9 
pkill -9 mosquitto
killall -9 mosquitto
查看cpu 佔用高的線程
ps H -eo user,pid,ppid,tid,time,%cpu,cmd --sort=%cpu

查看線程的I/O信息
/proc/15938/task/15942/fd
lsof -c mosquitto

[root@wwwu fd]# pstack 15938
Thread 5 (Thread 0x7f280e377700 (LWP 15939)):
#0  0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6
#1  0x000000000040c1d0 in mosquitto_main_loop_4_client ()
#2  0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3  0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 4 (Thread 0x7f280d976700 (LWP 15940)):
#0  0x000000000040a464 in mqtt3_db_message_timeout_check_epoll ()
#1  0x000000000040bc3b in mosquitto_main_loop_4_client_all ()
#2  0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3  0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 3 (Thread 0x7f280cf75700 (LWP 15941)):
#0  0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6
#1  0x000000000040c1d0 in mosquitto_main_loop_4_client ()
#2  0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3  0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 2 (Thread 0x7f280c574700 (LWP 15942)):
#0  0x000000000040a464 in mqtt3_db_message_timeout_check_epoll ()
#1  0x000000000040bc3b in mosquitto_main_loop_4_client_all ()
#2  0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3  0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 1 (Thread 0x7f280e59c7c0 (LWP 15938)):
#0  0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6
#1  0x000000000040c7c5 in mosquitto_main_loop_4_epoll ()
#2  0x000000000040414b in main ()

    實際測試

Test case 1

1 10934 max connections with QoS 0
[root@ ~]# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}' 
CLOSE_WAIT 1
ESTABLISHED 10934
LISTEN 12

腳本輸出
Client mosqsub/13526-wwwu.mqtt sending CONNECT
Client mosqsub/13526-wwwu.mqtt received CONNACK
Client mosqsub/13526-wwwu.mqtt sending SUBSCRIBE (Mid: 1, Topic: hello/world, QoS: 0)
Client mosqsub/13526-wwwu.mqtt received SUBACK
Subscribed (mid: 1): 0
Client mosqsub/13523-wwwu.mqtt sending CONNECT
Client mosqsub/13523-wwwu.mqtt received CONNACK
Client mosqsub/13523-wwwu.mqtt sending SUBSCRIBE (Mid: 1, Topic: hello/world, QoS: 0)
Client mosqsub/13523-wwwu.mqtt received SUBACK
Subscribed (mid: 1): 0


Test Case 2:
該測試只是驗證能打開的最大鏈接數,意義不太大
max files:
 ulimit -n 15000

[root@wwwu test]# netstat -na|grep ESTAB|grep 1883|wc -l
15088

cpu 2 core 只使用100%(total 200%),目前沒有publish數據,因此memory使用量不多3%,
還能夠繼續提高
vmstat 

[root@wwwu test]# vmstat 3
procs -----------memory---------- ---swap-- -----io---- --system-- -----cpu-----
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st
 1  0  24080 1865596 233880 266228    1    3     2     4    2   11  0  0 99  0  0
 1  0  24080 1865588 233880 266228    0    0     0     0 1030   30 17 33 50  0  0

[root@wwwu test]# top

top - 03:17:46 up 13 days, 10 min,  4 users,  load average: 1.09, 0.83, 0.37
Tasks: 164 total,   2 running, 162 sleeping,   0 stopped,   0 zombie
Cpu0  : 31.2%us, 68.8%sy,  0.0%ni,  0.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu1  :  0.0%us,  0.3%sy,  0.0%ni, 99.7%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Mem:   2957536k total,  1091948k used,  1865588k free,   233880k buffers
Swap:  3096568k total,    24080k used,  3072488k free,   266228k cached

  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                                
 6709 root      20   0  150m 113m  784 R 100.0  3.9  15:16.63 mosquitto   

發現問題

no route to host;

telnet: Unable to connect to remote host: No route to host

以爲甚是差別,估計是虛擬機裝了有問題,就把虛擬機中的防火牆給清了一下,發現可行。

[zhoulei@localhost ~]$ sudo iptables -F


mosquitto沒法啓動

發現netstat -an|grep 1883 沒有發現記錄,發現mosquitto.db文件很大445M,把該文件刪除後,

能夠正常啓動


2014/05/07 結果

------------------------------------------------------------------------------------------------

【subscriber】


開6000個subscribe active 鏈接,腳本以下

java  -Xms150M -Xmx350M -Djava.ext.dirs=./ -cp /usr/Paul  MQTT31PerfHarness -su -nt 6000 -ss 10 -sc BasicStats -rl 0 -wp true -wc 50 -wi 60 -wt 90 -id 1 -qos 2 -ka 600 -cs false -tc mqtt.Subscriber -d TOPICSINGEL -db 1 -dx 20000 -dn 1 -iu tcp://9.119.154.107:1883


kswapd0 進程跳來跳去


top - 13:16:13 up 31 min,  4 users,  load average: 617.83, 187.04, 65.29

Tasks: 161 total,   1 running, 160 sleeping,   0 stopped,   0 zombie

Cpu(s):  2.9%us, 58.7%sy,  0.0%ni,  0.0%id, 34.5%wa,  0.0%hi,  3.9%si,  0.0%st

Mem:   2957536k total,  2899268k used,    58268k free,      984k buffers

Swap:  3096568k total,   805564k used,  2291004k free,    14656k cached


  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                    2538 root      20   0 25.2g 1.7g 4420 S 122.1 60.3   5:27.12 java    


[root@wwwu ~]# free -m

             total       used       free     shared    buffers     cached

 

Mem:          2888       2810         77          0          0         15


id=1,rate=996.20,threads=6000

id=1,rate=1007.30,threads=6000

以後司機

publish 只有700M可用內存

[root@oc8050533176 Local]# top


top - 17:13:51 up  3:44,  2 users,  load average: 0.00, 0.00, 0.05

Tasks: 124 total,   1 running, 123 sleeping,   0 stopped,   0 zombie

Cpu(s): 14.7%us, 15.2%sy,  0.0%ni, 64.6%id,  0.0%wa,  0.0%hi,  5.4%si,  0.0%st

Mem:   2011348k total,  1736976k used,   274372k free,    23976k buffers

Swap:  8388600k total,        0k used,  8388600k free,   803732k cached


  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                     

 1946 root      20   0 3362m 591m 7292 S 66.7 30.1   3:22.69 java    

最多2500個publish active 鏈接, throughput 3242 msgs/second, 

id=5,rate=3084.80,threads=2000

id=5,rate=2324.60,threads=2000

id=5,rate=3176.30,threads=2000

id=5,rate=3091.40,threads=2000


[mosquitto]

connection 後的top

[root@wwwu mosquitto]# top -b

top - 05:50:55 up  2:00,  4 users,  load average: 1.00, 0.81, 0.74

Tasks: 161 total,   2 running, 159 sleeping,   0 stopped,   0 zombie

Cpu(s): 31.9%us,  7.4%sy,  0.0%ni, 54.8%id,  3.2%wa,  0.0%hi,  2.6%si,  0.0%st

Mem:   2957536k total,  2518020k used,   439516k free,    31976k buffers

Swap:  3096568k total,        0k used,  3096568k free,   209064k cached


  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                     

 2509 root      20   0 1264m 1.2g  740 R 77.9 42.5  93:35.86 mosquitto


 [root@wwwu ~]# netstat -an|grep ESTABLISH|grep 1883|wc -l

8000


遇到問題

[root@wwwu ~]# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}' 

CLOSE_WAIT 1

FIN_WAIT1 298

ESTABLISHED 2372

 

LISTEN 12


tcp        0  21848 9.119.154.107:1883          9.119.154.160:55851(sub)         FIN_WAIT1   

tcp        0  37393 9.119.154.107:1883          9.119.154.160:57275         FIN_WAIT1   

tcp        0      0 9.119.154.107:1883          9.119.154.160:56913         FIN_WAIT2   

 

tcp        0  40545 9.119.154.107:1883          9.119.154.160:55864         FIN_WAIT1   


netstat顯示的鏈接狀態有幾種WAIT: FIN_WAIT_1,FIN_WAIT_2,CLOSE_WAIT和TIME_WAIT. 他們的含義要從TCP的鏈接中斷過程提及


Server              Client

  -------- FIN -------->

  <------- ACK ---------

  <------- FIN ---------

  -------- ACK -------->

假設服務器主動關閉鏈接(Active Close)


服務器首先向客戶機發送FIN包,而後服務器進入FIN_WAIT_1狀態。

客戶機向服務器確認FIN包收到,向服務器發送FIN/ACK,客戶機進入CLOSE_WAIT狀態。

服務器收到來自客戶機的FIN/ACK後,進入FIN_WAIT_2狀態

如今客戶機進入被動關閉(「passive close」)狀態,客戶機操做系統等待他上面的應用程序關閉鏈接。一旦鏈接被關閉,客戶端會發送FIN包到服務器

當服務器收到FIN包後,服務器會向客戶機發送FIN/ACK確認,而後進入著名的TIME_WAIT狀態

 

因爲在鏈接關閉後,還不能肯定全部鏈接關閉前的包都被服務器接受到了(包的接受是沒有前後順序的),所以有了TIME_WAIT狀態。在這個狀態中,服務器仍然在等待客戶機發送的可是還未到達服務器的包。這個狀態將保持2*MSL的時間,這裏的MSL指的是一個TCP包在網絡中存在的最長時間。通常狀況下2*MSL=240秒。

-------------------------------------------------------------------------------------


10000 sub connections 

mosquitto 2G 內存 32% CPU

Tasks: 161 total,   1 running, 160 sleeping,   0 stopped,   0 zombie

Cpu0  :  3.7%us, 11.6%sy,  0.0%ni, 84.7%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st

Cpu1  :  4.3%us, 13.6%sy,  0.0%ni, 82.1%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st

Mem:   2957536k total,  2202836k used,   754700k free,    34916k buffers

Swap:  3096568k total,        0k used,  3096568k free,    48164k cached


  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                                

 

 2509 root      20   0 1264m 1.2g  740 S 32.6 42.5 148:35.98 mosquitto   



-----------------boss worker multithread model---------------------------

int queue[QUEUE_SIZE];

This is the main thread. It creates a queue struct (defined elsewhere) with methods enqueue,

 dequeue, empty, etc. When the server accepts a connection,

 it enqueues the socket that the incoming connection is on. 

 The worker threads which were dispatched at the beginning are constantly checking this queue to

 see if any jobs have been added, and if there are jobs, then they dequeue the socket, connect to that port, 

 and read/parse/write the incoming http request.


int main(int argc, char* argv[])

{

int hSocket, hServerSocket;  

struct hostent* pHostInfo;  

struct sockaddr_in Address;

int nAddressSize = sizeof(struct sockaddr_in);

int nHostPort;

int numThreads;

int i;


init(&head,&tail);


/

hServerSocket=socket(AF_INET,SOCK_STREAM,0);


if(hServerSocket == SOCKET_ERROR)

{

    printf("\nCould not make a socket\n");

    return 0;

}


 

Address.sin_addr.s_addr = INADDR_ANY;

Address.sin_port = htons(nHostPort);

Address.sin_family = AF_INET;


printf("\nBinding to port %d\n",nHostPort);


 

if(bind(hServerSocket,(struct sockaddr*)&Address,sizeof(Address)) == SOCKET_ERROR) {

    printf("\nCould not connect to host\n");

    return 0;

}

 

getsockname(hServerSocket, (struct sockaddr *) &Address,(socklen_t *)&nAddressSize);


printf("Opened socket as fd (%d) on port (%d) for stream i/o\n",hServerSocket, ntohs(Address.sin_port));


printf("Server\n\

      sin_family        = %d\n\

      sin_addr.s_addr   = %d\n\

      sin_port          = %d\n"

      , Address.sin_family

      , Address.sin_addr.s_addr

      , ntohs(Address.sin_port)

    );

//Up to this point is boring server set up stuff. I need help below this.

/

if(listen(hServerSocket,QUEUE_SIZE) == SOCKET_ERROR) {

    printf("\nCould not listen\n");

    return 0;

}


while(1) {


    pthread_mutex_lock(&mtx);

    printf("\nWaiting for a connection");


    while(!empty(head,tail)) {

        pthread_cond_wait (&cond2, &mtx);

    }


   

    hSocket = accept(hServerSocket,(struct sockaddr*)&Address,(socklen_t *)&nAddressSize);


    printf("\nGot a connection");


    enqueue(queue,&tail,hSocket);


    pthread_mutex_unlock(&mtx);

    pthread_cond_signal(&cond);     // wake worker thread

}

}



void *worker(void *threadarg) {


while(true)

{


pthread_mutex_lock(&mtx);


while(empty(head,tail)) {

    pthread_cond_wait(&cond, &mtx);

}

int hSocket = dequeue(queue,&head);


unsigned nSendAmount, nRecvAmount;

char line[BUFFER_SIZE];


nRecvAmount = read(hSocket,line,sizeof line);

printf("\nReceived %s from client\n",line);



/

if(close(hSocket) == SOCKET_ERROR) {

    printf("\nCould not close socket\n");

    return 0;

}



pthread_mutex_unlock(&mtx);

pthread_cond_signal(&cond);


 

}


    epoll與select、poll區別

一、相比於select與poll,epoll最大的好處在於它不會隨着監聽fd數目的增加而下降效率。內核中的select與poll的實現是採用輪詢來處理的,輪詢的fd數目越多,天然耗時越多。 
二、epoll的實現是基於回調的,若是fd有指望的事件發生就經過回調函數將其加入epoll就緒隊列中,也就是說它只關心「活躍」的fd,與fd數目無關。 
三、內核 / 用戶空間 內存拷貝問題,如何讓內核把 fd消息通知給用戶空間呢?在這個問題上select/poll採起了內存拷貝方法。而epoll採用了共享內存的方式。 
四、epoll不只會告訴應用程序有I/0 事件到來,還會告訴應用程序相關的信息,這些信息是應用程序填充的,所以根據這些信息應用程序就能直接定位到事件,而沒必要遍歷整個fd集合。

epoll 的EPOLLLT (水平觸發,默認)和 EPOLLET(邊沿觸發)模式的區別

一、EPOLLLT:徹底靠kernel epoll驅動,應用程序只須要處理從epoll_wait返回的fds,這些fds咱們認爲它們處於就緒狀態。此時epoll能夠認爲是更快速的poll。

二、EPOLLET:此模式下,系統僅僅通知應用程序哪些fds變成了就緒狀態,一旦fd變成就緒狀態,epoll將再也不關注這個fd的任何狀態信息,(從epoll隊列移除)直到應用程序經過讀寫操做(非阻塞)觸發EAGAIN狀態,epoll認爲這個fd又變爲空閒狀態,那麼epoll又從新關注這個fd的狀態變化(從新加入epoll隊列)。隨着epoll_wait的返回,隊列中的fds是在減小的,因此在大併發的系統中,EPOLLET更有優點,可是對程序員的要求也更高,由於有可能會出現數據讀取不完整的問題,舉例以下:

假設如今對方發送了2k的數據,而咱們先讀取了1k,而後這時調用了epoll_wait,若是是邊沿觸發,那麼這個fd變成就緒狀態就會從epoll 隊列移除,極可能epoll_wait 會一直阻塞,忽略還沒有讀取的1k數據,與此同時對方還在等待着咱們發送一個回覆ack,表示已經接收到數據;若是是電平觸發,那麼epoll_wait 還會檢測到可讀事件而返回,咱們能夠繼續讀取剩下的1k 數據。

    man epoll example


           #define MAX_EVENTS 10
           struct epoll_event ev, events[MAX_EVENTS];
           int listen_sock, conn_sock, nfds, epollfd;

           

           epollfd = epoll_create(10);
           if (epollfd == -1) {
               perror("epoll_create");
               exit(EXIT_FAILURE);
           }

           ev.events = EPOLLIN;
           ev.data.fd = listen_sock;
           if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
               perror("epoll_ctl: listen_sock");
               exit(EXIT_FAILURE);
           }

           for (;;) {
               nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
               if (nfds == -1) {
                   perror("epoll_pwait");
                   exit(EXIT_FAILURE);
               }

               for (n = 0; n < nfds; ++n) {
                   if (events[n].data.fd == listen_sock) {
                       conn_sock = accept(listen_sock,
                                       (struct sockaddr *) &local, &addrlen);
                       if (conn_sock == -1) {
                           perror("accept");
                           exit(EXIT_FAILURE);
                       }
                       setnonblocking(conn_sock);
                       ev.events = EPOLLIN | EPOLLET;
                       ev.data.fd = conn_sock;
                       if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
                                   &ev) == -1) {
                           perror("epoll_ctl: conn_sock");
                           exit(EXIT_FAILURE);
                       }
                   } else {
                       do_use_fd(events[n].data.fd);
                   }
               }
           }




 

Mosquitto簡要教程(安裝/使用/測試)
上篇文章《Android主流推送方案分析(MQTT/XMPP/GCM)》中,咱們給你們介紹了,如何在移動領域使用靈巧的消息傳輸協議MQTT來完成消息推送,最後也提到了開源項目Mosquitto。實際上,Mosquitto是一個實現了MQTT3.1協議的代理服務器,由MQTT協議創始人之一的Andy Stanford-Clark開發,它爲咱們提供了很是棒的輕量級數據交換的解決方案。本文的主旨在於記錄Mosquitto服務的安裝和使用,以備往後查閱。

1、獲取&安裝
Mosquitto提供了Windows、Linux以及qnx系統的版本,安裝文件可從http://mosquitto.org/files/binary/地址中獲取(建議使用最新的1.1.x版本)。Windows系統下的安裝過程很是簡單,咱們甚至能夠把Mosquitto直接安裝成爲系統服務;可是,在實際應用中,咱們更傾向於使用Linux系統的服務器,接下來咱們就將重點介紹Linux版Mosquitto的安裝方法。

在Linux系統上安裝Mosquitto,本人建議你們使用源碼安裝模式,最新的源碼可從http://mosquitto.org/files/source/地址中獲取。解壓以後,咱們能夠在源碼目錄裏面找到主要的配置文件config.mk,其中包含了全部Mosquitto的安裝選項,詳細的參數說明以下:
[html] view plaincopy

    # 是否支持tcpd/libwrap功能.  
    #WITH_WRAP:=yes  
      
    # 是否開啓SSL/TLS支持  
    #WITH_TLS:=yes  
      
    # 是否開啓TLS/PSK支持  
    #WITH_TLS_PSK:=yes  
      
    # Comment out to disable client client threading support.  
    #WITH_THREADING:=yes  
      
    # 是否使用嚴格的協議版本(老版本兼容會有點問題)  
    #WITH_STRICT_PROTOCOL:=yes  
      
    # 是否開啓橋接模式  
    #WITH_BRIDGE:=yes  
      
    # 是否開啓持久化功能  
    #WITH_PERSISTENCE:=yes  
      
    # 是否監控運行狀態  
    #WITH_MEMORY_TRACKING:=yes  

這裏須要注意的是,默認狀況下Mosquitto的安裝須要OpenSSL的支持;若是不須要SSL,則須要關閉config.mk裏面的某些與SSL功能有關的選項(WITH_TLS、WITH_TLS_PSK)。接着,就是運行make install進行安裝,完成以後會在系統命令行裏發現mosquitto、mosquitto_passwd、mosquitto_pub和mosquitto_sub四個工具(截圖以下),分別用於啓動代理、管理密碼、發佈消息和訂閱消息。



2、配置&運行

安裝完成以後,全部配置文件會被放置於/etc/mosquitto/目錄下,其中最重要的就是Mosquitto的配置文件,即mosquitto.conf,如下是詳細的配置參數說明。
[html] view plaincopy

    # =================================================================  
    # General configuration  
    # =================================================================  
      
    # 客戶端心跳的間隔時間  
    #retry_interval 20  
      
    # 系統狀態的刷新時間  
    #sys_interval 10  
      
    # 系統資源的回收時間,0表示儘快處理  
    #store_clean_interval 10  
      
    # 服務進程的PID  
    #pid_file /var/run/mosquitto.pid  
      
    # 服務進程的系統用戶  
    #user mosquitto  
      
    # 客戶端心跳消息的最大併發數  
    #max_inflight_messages 10  
      
    # 客戶端心跳消息緩存隊列  
    #max_queued_messages 100  
      
    # 用於設置客戶端長鏈接的過時時間,默認永不過時  
    #persistent_client_expiration  
      
    # =================================================================  
    # Default listener  
    # =================================================================  
      
    # 服務綁定的IP地址  
    #bind_address  
      
    # 服務綁定的端口號  
    #port 1883  
      
    # 容許的最大鏈接數,-1表示沒有限制  
    #max_connections -1  
      
    # cafile:CA證書文件  
    # capath:CA證書目錄  
    # certfile:PEM證書文件  
    # keyfile:PEM密鑰文件  
    #cafile  
    #capath  
    #certfile  
    #keyfile  
      
    # 必須提供證書以保證數據安全性  
    #require_certificate false  
      
    # 若require_certificate值爲true,use_identity_as_username也必須爲true  
    #use_identity_as_username false  
      
    # 啓用PSK(Pre-shared-key)支持  
    #psk_hint  
      
    # SSL/TSL加密算法,可使用「openssl ciphers」命令獲取  
    # as the output of that command.  
    #ciphers  
      
    # =================================================================  
    # Persistence  
    # =================================================================  
      
    # 消息自動保存的間隔時間  
    #autosave_interval 1800  
      
    # 消息自動保存功能的開關  
    #autosave_on_changes false  
      
    # 持久化功能的開關  
    persistence true  
      
    # 持久化DB文件  
    #persistence_file mosquitto.db  
      
    # 持久化DB文件目錄  
    #persistence_location /var/lib/mosquitto/  
      
    # =================================================================  
    # Logging  
    # =================================================================  
      
    # 4種日誌模式:stdout、stderr、syslog、topic  
    # none 則表示不記日誌,此配置能夠提高些許性能  
    log_dest none  
      
    # 選擇日誌的級別(可設置多項)  
    #log_type error  
    #log_type warning  
    #log_type notice  
    #log_type information  
      
    # 是否記錄客戶端鏈接信息  
    #connection_messages true  
      
    # 是否記錄日誌時間  
    #log_timestamp true  
      
    # =================================================================  
    # Security  
    # =================================================================  
      
    # 客戶端ID的前綴限制,可用於保證安全性  
    #clientid_prefixes  
      
    # 容許匿名用戶  
    #allow_anonymous true  
      
    # 用戶/密碼文件,默認格式:username:password  
    #password_file  
      
    # PSK格式密碼文件,默認格式:identity:key  
    #psk_file  
      
    # pattern write sensor/%u/data  
    # ACL權限配置,經常使用語法以下:  
    # 用戶限制:user <username>  
    # 話題限制:topic [read|write] <topic>  
    # 正則限制:pattern write sensor/%u/data  
    #acl_file  
      
    # =================================================================  
    # Bridges  
    # =================================================================  
      
    # 容許服務之間使用「橋接」模式(可用於分佈式部署)  
    #connection <name>  
    #address <host>[:<port>]  
    #topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix]  
      
    # 設置橋接的客戶端ID  
    #clientid  
      
    # 橋接斷開時,是否清除遠程服務器中的消息  
    #cleansession false  
      
    # 是否發佈橋接的狀態信息  
    #notifications true  
      
    # 設置橋接模式下,消息將會發布到的話題地址  
    # $SYS/broker/connection/<clientid>/state  
    #notification_topic  
      
    # 設置橋接的keepalive數值  
    #keepalive_interval 60  
      
    # 橋接模式,目前有三種:automatic、lazy、once  
    #start_type automatic  
      
    # 橋接模式automatic的超時時間  
    #restart_timeout 30  
      
    # 橋接模式lazy的超時時間  
    #idle_timeout 60  
      
    # 橋接客戶端的用戶名  
    #username  
      
    # 橋接客戶端的密碼  
    #password  
      
    # bridge_cafile:橋接客戶端的CA證書文件  
    # bridge_capath:橋接客戶端的CA證書目錄  
    # bridge_certfile:橋接客戶端的PEM證書文件  
    # bridge_keyfile:橋接客戶端的PEM密鑰文件  
    #bridge_cafile  
    #bridge_capath  
    #bridge_certfile  
    #bridge_keyfile  
      
    # 本身的配置能夠放到如下目錄中  
    include_dir /etc/mosquitto/conf.d  

最後,啓動Mosquitto服務很簡單,直接運行命令行「mosquitto -c /etc/mosquitto/mosquitto.conf -d」便可開啓服務。接下來,就讓咱們盡情體驗Mosquitto的強大功能吧!固然,有了Mosquitto,咱們就能夠安心地拋棄「簡陋」的rsmb了,有興趣的話,你們還能夠嘗試把Mosquitto服務運用到上一篇的Android推送服務中。

另外,Mosquitto是個異步IO框架,經測試能夠輕鬆處理20000個以上的客戶端鏈接。固然,實際的最大承載量還和業務的複雜度還有比較大的關係。下圖是本人在一臺普通Linux機器上進行的壓力測試結果,你們能夠參考。


友情提醒:測試的時候不要忘記調整系統的最大鏈接數和棧大小,好比:Linux上可用ulimit -n20000 -s512命令設置你須要的系統參數。

http://blog.csdn.net/shagoo/article/details/7910598

 

https://github.com/mqtt/mqtt.github.iohtml

http://mosquitto.org/download/java

相關文章
相關標籤/搜索