python DBUtils 線程池 鏈接 Postgresql(多線程公用線程池,DB-API : psycopg2)

1、DBUtilshtml

DBUtils 是一套容許線程化 Python 程序能夠安全和有效的訪問數據庫的模塊,DBUtils提供兩種外部接口: PersistentDB :提供線程專用的數據庫鏈接,並自動管理鏈接。 PooledDB :提供線程間可共享的數據庫鏈接,並自動管理鏈接。sql

操做數據庫模板:數據庫

  1 import datetime
  2 import sys
  3 import os
  4 import configparser
  5 import logging
  6 import psycopg2
  7 
  8 from  DBUtils.PooledDB import PooledDB
  9 
 10 
 11   
 12 
 13 class  DatabaseOperator(object):
 14     '''
 15     class for database operator
 16     '''
 17 
 18 
 19     def __init__(self, 
 20         database_config_path, database_config=None):
 21         '''
 22         Constructor
 23         '''
 24         self._database_config_path = database_config_path
 25         
 26         # load database configuration
 27         if not database_config :
 28             self._database_config = self.parse_postgresql_config(database_config_path)
 29         else:
 30             self._database_config = database_config
 31         self._pool = None
 32 
 33     def database_config_empty(self):
 34         if self._database_config:
 35             return False
 36         else:
 37             return True
 38         
 39     def parse_postgresql_config(self, database_config_path=None):
 40         '''解析pei數據庫配置文件
 41             參數
 42         ---------
 43         arg1 : conf_file
 44                         數據庫配置文件路徑
 45             返回值
 46         --------
 47         dict
 48                         解析配置屬性dict--config
 49     
 50             示例
 51         --------
 52  53        '''
 54         if database_config_path == None and self._database_config_path != None:
 55             database_config_path = self._database_config_path
 56         if not os.path.isfile(database_config_path):
 57             sys.exit("ERROR: Could not find configuration file: {0}".format(database_config_path))
 58         parser =  configparser.SafeConfigParser()
 59         parser.read(database_config_path)
 60         config = {}
 61         config['database'] = parser.get('UniMonDB', 'Database')
 62         config['db_user'] = parser.get('UniMonDB', 'UserName')
 63         config['db_passwd'] = parser.get('UniMonDB', 'Password')
 64         config['db_port'] = parser.getint('UniMonDB', 'Port')
 65         config['db_host'] = parser.get('UniMonDB', 'Servername')
 66         self._database_config = config
 67         
 68         return config  
 69     
 70     
 71     def get_pool_conn(self):
 72         
 73         if not self._pool:
 74             self.init_pgsql_pool()
 75         return self._pool.connection()
 76         
 77     def init_pgsql_pool(self):
 78         '''利用數據庫屬性鏈接數據庫
 79                     參數
 80                 ---------
 81                 arg1 : config
 82                                 數據庫配置屬性
 83                     返回值
 84                 --------
 85                 
 86                     示例
 87                 --------
 88  89         '''
 90         # 字典config是否爲空
 91         config = self.parse_postgresql_config()
 92         POSTGREIP = config['db_host']
 93         POSTGREPORT = config['db_port']
 94         POSTGREDB = config['database']
 95         POSTGREUSER = config['db_user']
 96         POSTGREPASSWD = config['db_passwd']
 97         try:
 98             logging.info('Begin to create {0} postgresql pool on:{1}.\n'.format(POSTGREIP, datetime.datetime.now()))
 99             
100             pool = PooledDB(
101                 creator=psycopg2,  # 使用連接數據庫的模塊mincached
102                 maxconnections=6,  # 鏈接池容許的最大鏈接數,0和None表示不限制鏈接數
103                 mincached=1,  # 初始化時,連接池中至少建立的空閒的連接,0表示不建立
104                 maxcached=4,  # 連接池中最多閒置的連接,0和None不限制
105                 blocking=True,  # 鏈接池中若是沒有可用鏈接後,是否阻塞等待。True,等待;False,不等待而後報錯
106                 maxusage=None,  # 一個連接最多被重複使用的次數,None表示無限制
107                 setsession=[],  # 開始會話前執行的命令列表。
108                 host=POSTGREIP,
109                 port=POSTGREPORT,
110                 user=POSTGREUSER,
111                 password=POSTGREPASSWD,
112                 database=POSTGREDB)
113             self._pool = pool    
114             logging.info('SUCCESS: create postgresql success.\n')
115                     
116         except Exception as e:
117             logging.error('ERROR: create postgresql pool failed:{0}\n')
118             self.close_db_cursor()
119             sys.exit('ERROR: create postgresql pool error caused by {0}'.format(str(e)))
120 
121             
122     def pg_select_operator(self, sql):
123         '''進行查詢操做,函數返回前關閉cursor,conn
124                     參數
125                 ---------
126                 arg1 : sql查詢語句
127                     返回值
128                 --------
129                 list:result
130                                         類型爲list的查詢結果:result
131             
132                     示例
133                 --------
134 135         '''
136         # 執行查詢
137         try:
138             conn = self.get_pool_conn()
139             cursor = conn.cursor()      
140             cursor.execute(sql)
141             result = cursor.fetchall()
142         except Exception as e:
143             logging.error('ERROR: execute  {0} causes error'.format(sql))
144             sys.exit('ERROR: load data from database error caused {0}'.format(str(e)))
145         finally:
146             cursor.close()
147             conn.close()       
148         return result
149 
150     def test_pool_con(self):
151         sql = 'select * from tbl_devprofile'
152         result = self.pg_select_operator(sql)
153         print(result)
154         
155     def pg_insert_operator(self, sql):
156         
157         result = False
158         try:
159             conn = self.get_pool_conn()
160             cursor = conn.cursor()      
161             cursor.execute(sql)
162             result =  True
163         except Exception as e:
164             logging.error('ERROR: execute  {0} causes error'.format(sql))
165             sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))
166         finally:
167             cursor.close()
168             conn.commit()
169             conn.close()    
170         return result
171     
172     def pg_update_operator(self, sql):
173         
174         result = False
175         try:
176             conn = self.get_pool_conn()
177             cursor = conn.cursor()      
178             cursor.execute(sql)
179             result =  True
180         except Exception as e:
181             logging.error('ERROR: execute  {0} causes error'.format(sql))
182             sys.exit('ERROR: update data from database error caused {0}'.format(str(e)))
183         finally:
184             cursor.close()
185             conn.commit()
186             conn.close()    
187         return result
188 
189     def pg_delete_operator(self, sql):
190         result = False
191         # 執行查詢
192         try:
193             conn = self.get_pool_conn()
194             cursor = conn.cursor()   
195             cursor.execute(sql)
196             result =  True
197         except Exception as e:
198             logging.error('ERROR: execute  {0} causes error'.format(sql))
199             sys.exit('ERROR: delete data from database error caused {0}'.format(str(e)))
200         finally:
201             cursor.close()
202             conn.commit()
203             conn.close()       
204         return result
205 
206     
207     def close_pool(self):
208         '''關閉pool
209                     參數
210                 ---------
211 212 
213                     返回值
214                 --------
215 216                     示例
217                 --------
218 219         '''
220         if self._pool != None:
221             self._pool.close()
222             
223 if __name__ == '__main__':
224     path = "E:\\Users\\Administrator\\eclipse-workspace\\com.leagsoft.basemodule\\base\\config\\sql_conf.conf"
225     db = DatabaseOperator(
226     database_config_path=path)
227     db.test_pool_con()

2、多線程安全

原理:建立多個線程類,多個線程類共享一個隊裏Queue,每個線程類能夠操做數據庫session

 1 from threading import Thread
 2     
 3 class Worker(Thread):
 4     def __init__(self, queue):
 5         Thread.__init__(self)
 6         self.queue = queue
 7  
 8     def run(self):
 9         while True:
10             # Get the work from the queue and expand the tuple
11             # 從隊列中獲取任務
12             database_operator, device, stand_alone_result = self.queue.get()
13             operateResult(database_operator, device, stand_alone_result)
14             # 任務執行完以後要通知隊列
15             self.queue.task_done()

填充隊列多線程

 1     # 使用隊列多線程
 2     logging.info('begin to update all device risk score by multi_processing.\n')
 3     from queue import Queue
 4     queue = Queue()
 5     # 六個線程,每一個線程共享一個隊列
 6     for _ in range(6):
 7         worker = Worker(queue)
 8         worker.setDaemon(True)
 9         worker.start()
10           
11     for record in all_devid:
12         device = record[0]
13         devtype = record[1]
14         all_countlist = all_dict.get(device)
15         stand_alone_result = device_assess(all_countlist)
16         if (devtype in (server_devtype + computer_devtype)) and (stand_alone_result < 100):
17             stand_alone_result *= 0.8
18         # 將設備風險評分數據保存到數據庫中
19         queue.put((database_operator, device, stand_alone_result))
20      
21     #等待隊列任務執行完
22     queue.join()
23 
24 
25 def operateResult(database_operator, device, stand_alone_result):
26     '''
27     函數名稱: device_assess
28     描述:  保存單臺設備分數到數據庫
29     調用: 無
30     被調用:  main
31     被訪問的表: tbl_devprofile
32     被修改的表: 無
33     輸入參數: database_operator, device:設備uid, stand_alone_result:單臺設備風險分數
34     輸出參數:無
35     返回值: 單臺設備風險分數值
36     其它:  無
37     '''
38     import time
39     find_profile_sql = "SELECT uiddevrecordid FROM tbl_devprofile WHERE uiddevrecordid='{0}';".format(device)
40     isExistRecord = database_operator.pg_select_operator(find_profile_sql)
41     #currentTime=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
42     currentTime=time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
43     if len(isExistRecord) > 0:
44         updata_profile_sql = "UPDATE tbl_devprofile SET irisklevel={0}, dtrisktime='{1}' \
45                               WHERE uiddevrecordid='{2}';".format(stand_alone_result, currentTime, device)
46         database_operator.pg_update_operator(updata_profile_sql)
47     else:
48         insert_profile_sql = "INSERT INTO tbl_devprofile VALUES('{0}',NULL,NULL,NULL,NULL,NULL,NULL,NULL,{1},'{2}');".format(
49             device, stand_alone_result, currentTime)
50         database_operator.pg_insert_operator(insert_profile_sql)

使用單線程時,執行完代碼花費20s左右,使用多線程時花費5s左右。eclipse

 

 

Reference:函數

[1] https://blog.csdn.net/zhaihaifei/article/details/54016939post

[2] https://www.cnblogs.com/hao-ming/p/7215050.html?utm_source=itdadao&utm_medium=referralfetch

[3] https://www.cnblogs.com/wozijisun/p/6160065.html (多線程)

[4] http://www.lpfrx.com/archives/4431/

[5] https://www.cnblogs.com/95lyj/p/9047554.html

相關文章
相關標籤/搜索