因爲協程沒辦法完成(一)中所說的任務模式python
接下來就嘗試一下使用線程和隊列來實現一下這個功能sql
在實現以前,咱們先明確一個問題——python的線程是僞併發的。同一時間只能有一個線程在運行。具體怎樣的運做方式由解釋器決定數組
而後回顧一下上一章遇到的問題——return之後,須要另一個線程去檢測以前的操做是否執行成功session
所以程序的流程設計應該是這樣的:併發
1 # 大體流程步驟以下 2 # 1.獲取參數(接口被訪問時觸發) 3 request_data = request.form 4 # 2.根據參數查詢內容 5 target = Target.query.filter_by(id=request_data).first() 6 # 3.將結果插入隊列 7 ans_queue.put(target) 8 # 4.激活線程 9 thread.set() 10 # 5.將結果從隊列中取出 11 ans_queue.get() 12 # 6.處理結果 13 check() 14 # 7.將線程休眠(阻塞) 15 thread.event.clear()
這樣設計的考慮主要是如下幾點:app
1.簡單less
2.入隊能夠保證消息按時間順序被處理fetch
3.出隊能夠保證當隊列不爲空時,檢查線程會執行到隊列爲空爲止。免去沒必要要的喚醒檢查。而後在有消息入隊時被從新激活url
4.其實咱們的設計正常來講不會出現3中的檢查狀況。基本上隊列一旦有消息入隊,線程就會啓動並清空隊列spa
5.入隊能夠保證消息的完整和獨立性,每次請求獲得的數據入隊後,隊列中都是一列數組。處理邏輯更清晰
6.隊列中的數據不出棧是不可見的
7.我就是寧願用全局隊列也不想用全局變量
實際接口代碼和線程代碼以下:
A.隊列和線程代碼
1 # 消息隊列 2 lock_queue = Queue() 3 4 5 def check_kill(event): 6 while True: 7 # check queue 8 if lock_queue.empty() is True: 9 event.clear() 10 # wait event 11 if event.is_set() is not True: 12 event.wait() 13 # do some work 14 sids, serials, minutes, hosts, insts, opasses, ospasses = [], [], [], [], [], [], [] 15 16 # get data until queue empty or datas more than 10 17 if lock_queue.empty() is not True: 18 data = lock_queue.get() 19 for i in data: 20 sid, serial, minute, host, inst, opass, ospass = i.split(',') 21 sids.append(sid) 22 serials.append(serial) 23 minutes.append(minute) 24 hosts.append(host) 25 insts.append(inst) 26 opasses.append(opass) 27 ospasses.append(ospass) 28 29 # init the command 30 kill_command = 'kill -9' 31 32 # each time we deal less or equal 10 check 33 for i in range(len(minutes)): 34 current = datetime.datetime.now().minute 35 if current >= int(minutes[i]): 36 passtime = current - int(minutes[i]) 37 else: 38 passtime = current + 60 - int(minutes[i]) 39 40 print("passtime is", passtime) 41 if (5 - passtime) >= 0: 42 time.sleep((5 - passtime)*60) 43 44 # split piece of list 45 sql_sids, sids = sids[0], sids[1:] 46 sql_serials, serials = serials[0], serials[1:] 47 sql_hosts, hosts = hosts[0], hosts[1:] 48 sql_insts, insts = insts[0], insts[1:] 49 sql_opass, opasses = opasses[0], opasses[1:] 50 sql_ospass, ospasses = ospasses[0], ospasses[1:] 51 52 print("data", sql_hosts, sql_insts, sql_serials, sql_sids) 53 # create cursor 54 55 try: 56 conn = sqlite3.connect('data-dev.sqlite') 57 c = conn.cursor() 58 cu = c.execute("select ouser,oport,osport,osuser from tool_target where host='%s' and inst='%s'" % (sql_hosts, sql_insts)) 59 60 result = cu.fetchall() 61 62 ouser = result[0][0] 63 opass = sql_opass 64 str_conn = (sql_hosts 65 + ':' 66 + str(result[0][1]) 67 + '/' 68 + sql_insts) 69 odb = cx_Oracle.connect(ouser, opass, str_conn) 70 cursor = odb.cursor() 71 72 # select to find if lock exist 73 sql = '''select b.spid, a.sid, a.serial#, a.event from v$session a, v$process b 74 where a.sid = %s and a.serial# = %s ''' % (sql_sids, sql_serials) 75 76 cursor.execute(sql) 77 answer = cursor.fetchall() 78 print("answer is", answer) 79 kill_command += ' ' + answer[0][0] 80 81 s = paramiko.SSHClient() 82 s.load_system_host_keys() 83 s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 84 s.connect(sql_hosts, result[0][2], result[0][3], sql_ospass) 85 stdin, stdout, stderr = s.exec_command(kill_command) 86 stdout.read() 87 print('------------------------') 88 s.close() 89 cursor.close() 90 odb.close() 91 c.close() 92 conn.close() 93 except: 94 pass 95 96 97 txkill_ready = threading.Event() 98 t1 = threading.Thread(target=check_kill, args=(txkill_ready,), name='t1') 99 t1.start() 100 # txkill_ready.set()
B.接口代碼
1 @main.route('/txlock/startkillurl', methods=['POST']) 2 def start_kill_url(): 3 if request.method == 'POST': 4 cmd = request.form.getlist('list')[0] 5 host = request.form.getlist('host')[0] 6 inst = request.form.getlist('inst')[0] 7 # print(len(cmd)) 8 # cmd.replace("\n", "") 9 # cmd.replace("\t", "") 10 # print(len(cmd)) 11 12 tooltarget = ToolTarget.query.filter_by(host=host, inst=inst).first() 13 ouser = tooltarget.ouser 14 opass = ToolTarget.de_rsa(pwd=tooltarget.opass) 15 ospass = ToolTarget.de_rsa(pwd=tooltarget.ospass) 16 str_conn = (tooltarget.host 17 + ':' 18 + str(tooltarget.oport) 19 + '/' 20 + tooltarget.inst) 21 odb = cx_Oracle.connect(ouser, opass, str_conn) 22 cursor = odb.cursor() 23 24 # add into queue 25 c = re.findall('\d*,\d*', cmd) 26 d = [i+','+str(datetime.datetime.now().minute)+','+host+','+inst+','+opass+','+ospass for i in c] 27 # data example : ['15,5,17', '16,23,17', '14,5,17', '142,1,17'] 28 lock_queue.put(d) 29 txkill_ready.set() 30 31 try: 32 cursor.execute(cmd) 33 # pass 34 except: 35 return "執行失敗,關閉彈窗後會自動刷新列表" 36 return "執行成功,關閉彈窗後會自動刷新列表"