[Python學習筆記-006] 使用stomp.py校驗JMS selector的正確性

瞭解Jenkins的人都知道,JMS selector是基於SQL92語法實現的,本文將介紹使用stomp.pyActiveMQ來校驗JMS selector的正確性。html

Q: 什麼是stomp.py?python

A: stomp.py是一個基於STOMP(Simple (or Streaming) Text Orientated Messaging Protocol)協議實現的Python的客戶端函數庫,用來訪問諸如ActiveMQ,RabbitMQ之類的消息服務器。linux

那麼,爲了使用stomp.py函數庫,咱們先安裝一個ActiveMQ服務。git

1.  下載ActiveMQgithub

$ export TGZBALL=5.15.2/apache-activemq-5.15.2-bin.tar.gz $ wget https://archive.apache.org/dist/activemq/$TGZBALL

2. 安裝ActiveMQweb

$ tar zxvf apache-activemq-5.15.2-bin.tar.gz $ sudo mv apache-activemq-5.15.2 /opt $ cd /opt $ sudo ln -s apache-activemq-5.15.2 activemq

3. 啓動ActiveMQapache

$ cd /opt/activemq/bin/linux-x86-64 $ sudo ./activemq start Starting ActiveMQ Broker...

注意ActiveMQ默認的監聽端口是8161,json

$ netstat -an | egrep 8161 tcp6 0      0 :::8161                 :::*                    LISTEN

若是要關閉ActiveMQ,服務器

$ cd /opt/activemq/bin/linux-x86-64 $ sudo ./activemq stop Stopping ActiveMQ Broker... Stopped ActiveMQ Broker.

一旦ActiveMQ, 咱們能夠訪問其web界面,默認的登陸用戶名/密碼是admin/admin, 例如:tcp

4. 克隆stomp.py的源代碼

$ mkdir /var/tmp/sp $ cd /var/tmp/sp $ git clone https://github.com/jasonrbriggs/stomp.py.git

5. 準備一個Python腳本foo.py

 1 #!/usr/bin/python3
 2 import sys  3 import json  4 import collections  5 import time  6 import stomp  7 
 8 
 9 g_rcvmsg_cnt = 0  # counter of received messages
10 
11 
12 class MyListener(stomp.ConnectionListener): 13     def on_error(self, headers, message): 14         global g_rcvmsg_cnt 15         g_rcvmsg_cnt = 0 16 
17         print('Oops!! received an error "%s"' % message) 18 
19     def on_message(self, headers, message): 20         global g_rcvmsg_cnt 21         g_rcvmsg_cnt += 1
22 
23         print('Bingo! received a message, rcv_cnt = %d' % g_rcvmsg_cnt) 24         print('o message headers :\n%s' % json.dumps(headers, indent=4)) 25         print('o message body : %s\n' % message) 26 
27 
28 def validate(): 29     if g_rcvmsg_cnt > 0: 30         print(">>> PASS") 31         return 0 32     else: 33         print(">>> FAIL") 34         return 1
35 
36 
37 def usage(argv0): 38     sys.stderr.write("Usage: %s <JMS selector> <message headers>\n" % argv0) 39 
40 
41 def main(argc, argv): 42     if argc != 3: 43  usage(argv[0]) 44         return 1
45 
46     jms_selector = argv[1] 47     msg_headers = argv[2] 48 
49     d_sub_headers = dict() 50     d_sub_headers['selector'] = jms_selector 51     print('>>> SUB HEADERS', d_sub_headers) 52 
53     d_snd_headers = json.loads(msg_headers, 54                                object_pairs_hook=collections.OrderedDict) 55     print('>>> SND HEADERS', d_snd_headers) 56     print() 57 
58     conn = stomp.Connection() 59     conn.set_listener('', MyListener()) 60  conn.start() 61 
62     # Note default user/password of ActiveMQ is admin/admin
63     user = 'admin'
64     password = 'admin'
65     conn.connect(user, password, wait=True) 66 
67     conn.subscribe(destination='/queue/test', id=1, ack='auto', 68                    headers=d_sub_headers) 69 
70     s_body = ' '.join(argv[1:]) 71     conn.send(destination='/queue/test', body=s_body, headers=d_snd_headers) 72 
73     time.sleep(2) 74  conn.disconnect() 75 
76     return validate() 77 
78 
79 if __name__ == '__main__': 80     sys.exit(main(len(sys.argv), sys.argv))

在上面的代碼中,

67        conn.subscribe(destination='/queue/test', id=1, ack='auto', 68                       headers=d_sub_headers)

L67-68: 根據用戶輸入的JMS selector進行訂閱,

71        conn.send(destination='/queue/test', body=s_body, headers=d_snd_headers)

L71: 將用戶輸入的JSON文本做爲消息頭髮送。

 

6. 使用foo.py測試JMS selector

hdan$ export PYTHONPATH=/var/tmp/sp/stomp.py:$PYTHONPATH hdan$ hdan$ ./foo.py "Type LIKE 'Tag%'" '{"Type": "tag", "foo": 123}'
>>> SUB HEADERS {'selector': "Type LIKE 'Tag%'"} >>> SND HEADERS OrderedDict([('Type', 'tag'), ('foo', 123)]) >>> FAIL hdan$ hdan$ ./foo.py "Type LIKE 'Tag%'" '{"Type": "Tag", "foo": 123}'
>>> SUB HEADERS {'selector': "Type LIKE 'Tag%'"} >>> SND HEADERS OrderedDict([('Type', 'Tag'), ('foo', 123)]) Bingo! received a message, rcv_cnt = 1 o message headers : { "subscription": "1", "content-length": "44", "timestamp": "1550744115995", "destination": "/queue/test", "message-id": "ID:huangdan-40431-1550639739706-3:131:-1:1:1", "foo": "123", "Type": "Tag", "expires": "0", "priority": "4" } o message body : Type LIKE 'Tag%' {"Type": "Tag", "foo": 123} >>> PASS hdan$ hdan$ ./foo.py "Type LIKE 'Tag%'" '{"Type": "Tag123", "foo": 123}'
>>> SUB HEADERS {'selector': "Type LIKE 'Tag%'"} >>> SND HEADERS OrderedDict([('Type', 'Tag123'), ('foo', 123)]) Bingo! received a message, rcv_cnt = 1 o message headers : { "message-id": "ID:huangdan-40431-1550639739706-3:132:-1:1:1", "destination": "/queue/test", "timestamp": "1550744132347", "expires": "0", "priority": "4", "subscription": "1", "content-length": "47", "foo": "123", "Type": "Tag123" } o message body : Type LIKE 'Tag%' {"Type": "Tag123", "foo": 123} >>> PASS hdan$ 

 

附錄SQL92的LIKE操做符 (來源戳這裏

A like string consists of a set of characters where the percent (%),
underscore (_) and escape character, such as backslash (\) are
treated differently.
 * The percent (%) matches any number of characters including zero characters * The underscore (_) matches only one character

e.g.
+-------------+---------------------------------------+
| Like string | Meaning                               |
+-------------+---------------------------------------+
| Topic%      | All strings starting with Topic       |
| %/abc/%     | Any string containing /abc/           |
| Name_       | Any string starting with Name and     |
|             | having exactly one more character     |
| Name\_2     | Only the string Name_2                |
+-------------+---------------------------------------+

 

參考資料:

相關文章
相關標籤/搜索