後臺程序處理(二) python threading - queue 模塊使用

因爲協程沒辦法完成(一)中所說的任務模式python

接下來就嘗試一下使用線程和隊列來實現一下這個功能sql

在實現以前,咱們先明確一個問題——python的線程是僞併發的。同一時間只能有一個線程在運行。具體怎樣的運做方式由解釋器決定數組

而後回顧一下上一章遇到的問題——return之後,須要另一個線程去檢測以前的操做是否執行成功session

 

所以程序的流程設計應該是這樣的:併發

 1 # 大體流程步驟以下
 2 # 1.獲取參數(接口被訪問時觸發)
 3 request_data = request.form
 4 # 2.根據參數查詢內容
 5 target = Target.query.filter_by(id=request_data).first()
 6 # 3.將結果插入隊列
 7 ans_queue.put(target)
 8 # 4.激活線程
 9 thread.set()
10 # 5.將結果從隊列中取出
11 ans_queue.get()
12 # 6.處理結果
13 check()
14 # 7.將線程休眠(阻塞)
15 thread.event.clear()

 

這樣設計的考慮主要是如下幾點:app

1.簡單less

2.入隊能夠保證消息按時間順序被處理fetch

3.出隊能夠保證當隊列不爲空時,檢查線程會執行到隊列爲空爲止。免去沒必要要的喚醒檢查。而後在有消息入隊時被從新激活url

4.其實咱們的設計正常來講不會出現3中的檢查狀況。基本上隊列一旦有消息入隊,線程就會啓動並清空隊列spa

5.入隊能夠保證消息的完整和獨立性,每次請求獲得的數據入隊後,隊列中都是一列數組。處理邏輯更清晰

6.隊列中的數據不出棧是不可見的

7.我就是寧願用全局隊列也不想用全局變量

 

實際接口代碼和線程代碼以下:

A.隊列和線程代碼

  1 # 消息隊列
  2 lock_queue = Queue()
  3 
  4 
  5 def check_kill(event):
  6     while True:
  7         # check queue
  8         if lock_queue.empty() is True:
  9             event.clear()
 10         # wait event
 11         if event.is_set() is not True:
 12             event.wait()
 13         # do some work
 14         sids, serials, minutes, hosts, insts, opasses, ospasses = [], [], [], [], [], [], []
 15 
 16         # get data until queue empty or datas more than 10
 17         if lock_queue.empty() is not True:
 18             data = lock_queue.get()
 19             for i in data:
 20                 sid, serial, minute, host, inst, opass, ospass = i.split(',')
 21                 sids.append(sid)
 22                 serials.append(serial)
 23                 minutes.append(minute)
 24                 hosts.append(host)
 25                 insts.append(inst)
 26                 opasses.append(opass)
 27                 ospasses.append(ospass)
 28 
 29         # init the command
 30         kill_command = 'kill -9'
 31 
 32         # each time we deal less or equal 10 check
 33         for i in range(len(minutes)):
 34             current = datetime.datetime.now().minute
 35             if current >= int(minutes[i]):
 36                 passtime = current - int(minutes[i])
 37             else:
 38                 passtime = current + 60 - int(minutes[i])
 39             
 40             print("passtime is", passtime)
 41             if (5 - passtime) >= 0:
 42                 time.sleep((5 - passtime)*60)
 43 
 44             # split piece of list
 45             sql_sids, sids = sids[0], sids[1:]
 46             sql_serials, serials = serials[0], serials[1:]
 47             sql_hosts, hosts = hosts[0], hosts[1:]
 48             sql_insts, insts = insts[0], insts[1:]
 49             sql_opass, opasses = opasses[0], opasses[1:]
 50             sql_ospass, ospasses = ospasses[0], ospasses[1:]
 51 
 52             print("data", sql_hosts, sql_insts, sql_serials, sql_sids)
 53             # create cursor
 54             
 55             try:
 56                 conn = sqlite3.connect('data-dev.sqlite')
 57                 c = conn.cursor()
 58                 cu = c.execute("select ouser,oport,osport,osuser from tool_target where host='%s' and inst='%s'" % (sql_hosts, sql_insts))
 59 
 60                 result = cu.fetchall()
 61 
 62                 ouser = result[0][0]
 63                 opass = sql_opass
 64                 str_conn = (sql_hosts
 65                             + ':'
 66                             + str(result[0][1])
 67                             + '/'
 68                             + sql_insts)
 69                 odb = cx_Oracle.connect(ouser, opass, str_conn)
 70                 cursor = odb.cursor()
 71 
 72                 # select to find if lock exist
 73                 sql = '''select b.spid, a.sid, a.serial#, a.event from v$session a, v$process b  
 74                         where a.sid = %s and a.serial# = %s ''' % (sql_sids, sql_serials)
 75                 
 76                 cursor.execute(sql)
 77                 answer = cursor.fetchall()
 78                 print("answer is", answer)
 79                 kill_command += ' ' + answer[0][0]
 80 
 81                 s = paramiko.SSHClient()
 82                 s.load_system_host_keys()
 83                 s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
 84                 s.connect(sql_hosts, result[0][2], result[0][3], sql_ospass)
 85                 stdin, stdout, stderr = s.exec_command(kill_command)
 86                 stdout.read()
 87                 print('------------------------')
 88                 s.close()
 89                 cursor.close()
 90                 odb.close()
 91                 c.close()
 92                 conn.close()
 93             except:
 94                 pass
 95 
 96 
 97 txkill_ready = threading.Event()
 98 t1 = threading.Thread(target=check_kill, args=(txkill_ready,), name='t1')
 99 t1.start()
100 # txkill_ready.set()

 

B.接口代碼

 1 @main.route('/txlock/startkillurl', methods=['POST'])
 2 def start_kill_url():
 3     if request.method == 'POST':
 4         cmd = request.form.getlist('list')[0]
 5         host = request.form.getlist('host')[0]
 6         inst = request.form.getlist('inst')[0]
 7         # print(len(cmd))
 8         # cmd.replace("\n", "")
 9         # cmd.replace("\t", "")
10         # print(len(cmd))
11         
12         tooltarget = ToolTarget.query.filter_by(host=host, inst=inst).first()
13         ouser = tooltarget.ouser
14         opass = ToolTarget.de_rsa(pwd=tooltarget.opass)
15         ospass = ToolTarget.de_rsa(pwd=tooltarget.ospass)
16         str_conn = (tooltarget.host
17                     + ':'
18                     + str(tooltarget.oport)
19                     + '/'
20                     + tooltarget.inst)
21         odb = cx_Oracle.connect(ouser, opass, str_conn)
22         cursor = odb.cursor()
23 
24         # add into queue
25         c = re.findall('\d*,\d*', cmd)
26         d = [i+','+str(datetime.datetime.now().minute)+','+host+','+inst+','+opass+','+ospass for i in c]
27         # data example : ['15,5,17', '16,23,17', '14,5,17', '142,1,17']
28         lock_queue.put(d)
29         txkill_ready.set()
30 
31         try:
32             cursor.execute(cmd)
33             # pass
34         except:
35             return "執行失敗,關閉彈窗後會自動刷新列表"
36     return "執行成功,關閉彈窗後會自動刷新列表"
相關文章
相關標籤/搜索