本篇文章主要講解mysqlclient操做MySQL關係型數據庫,安裝mysqlclient的命令行:pip install mysqlclient
;
而後建立一個名爲XKD_Python_Course的數據庫和一張名爲students的數據庫表,咱們先在命令行工具裏面查看一下表名是否存在,登陸mysql數據庫的命令行:mysql -uroot -p
,而後 show databases;
,發現沒有XKD_Python_Course這個數據庫,那咱們就建立一個:create database XKD_Python_Course;
,建立完再: show databases;
一下,能夠看到XKD_Python_Course數據庫已經存在了html
use XKD_Python_Course;
進入這個庫,而後開始建立數據庫表;CREATE TABLE `students` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) NOT NULL, `age` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
show create table students;
,查看一下建立表的建立;desc students;
,查看錶的結構;grant all on XKD_Python_Course.* to 'zengzeng'@'%' identified by '123456'; # 刷新權限 flush privileges;
mysql -uzengzeng -p123456
,就能夠成功登陸,登陸後能夠show databases;
查看一下剛剛建立的數據庫use XKD_Python_Course
剛剛咱們安裝的mysqlclient中有一個MySQLdb類,咱們能夠用這個MySQLdb類去操做mysql數據庫mysql
select * from students;
,而後能夠放心的進行插入了import MySQLdb connect = None # 鏈接對象 cursor = None # 遊標對象 try: # 鏈接對象 connect = MySQLdb.connect(host='localhost', # 主機地址 user='zengzeng', # 帳號 password='123456', # 密碼 database='XKD_Python_Course', # 數據庫名 use_unicode=True, charset='utf8') # 指定字符集 # 遊標對象 cursor = connect.cursor() # 經過鏈接對象調用cursor() except Exception as e: print(e) connect.close() try: if cursor: result = cursor.execute("insert into students (name, age) values ('Angle', 18)") # 插入操做 print('result = {}'.format(result)) connect.commit() # 提交 except Exception as e: print(e) connect.rollback() # 回滾 finally: if cursor: cursor.close() if connect: connect.close()
執行代碼,返回的結果是result = 1,表示操做了一行數據,這時咱們查看數據庫表select * from students;
,是否是發現表中多了一條數據啊;sql
import MySQLdb connect = None # 鏈接對象 cursor = None # 遊標對象 try: # 鏈接對象 connect = MySQLdb.connect(host='localhost', # 主機地址 user='zengzeng', # 帳號 password='123456', # 密碼 database='XKD_Python_Course', # 數據庫名 use_unicode=True, charset='utf8') # 指定字符集 # 遊標對象 cursor = connect.cursor() # 經過鏈接對象調用cursor() except Exception as e: print(e) connect.close() try: if cursor: for i in range(10): result = cursor.execute("insert into students (name, age) values ('Angle', {})".format(i)) # 插入操做 connect.commit() # 提交 except Exception as e: print(e) connect.rollback() # 回滾 finally: if cursor: cursor.close() if connect: connect.close()
import MySQLdb from pprint import pprint # 換行 connect = None # 鏈接對象 cursor = None # 遊標對象 try: # 鏈接對象 connect = MySQLdb.connect(host='localhost', # 主機地址 user='zengzeng', # 帳號 password='123456', # 密碼 database='XKD_Python_Course', # 數據庫名 use_unicode=True, charset='utf8') # 指定字符集 # 遊標對象 cursor = connect.cursor() # 經過鏈接對象調用cursor() except Exception as e: print(e) connect.close() try: if cursor: cursor.execute('select * from students') # 不會返回任何對象,想要拿到數據能夠經過cursor.方法名 one_result = cursor.fetchone() # 查詢結果集的下一條數據 many_result = cursor.fetchmany(5) # 查詢結果集的下五條數據 all_result = cursor.fetchall() # 查詢結果集的剩餘全部數據 # 換行打印 pprint(one_result) print('*' * 100) pprint(many_result) print('*' * 100) pprint(all_result) connect.commit() # 提交 except Exception as e: print(e) connect.rollback() # 回滾 finally: if cursor: cursor.close() if connect: connect.close()
execute(op[,args])
:執行一個數據庫的查詢和命令;fetchmany(size)
:獲取結果集的下幾行務;fetchone()
:獲取結果集的下一行;fetchall()
:獲取結果集中剩下的全部行;rowcount()
:最近一次execute返回數據的行數或影響的行數;close()
:關閉遊標對象;在使用子查詢的時候,咱們就可使用查詢參數數據庫
# 位置參數 cursor.execute('select * from students where id = %s', args=(10, )) # 關鍵字參數 cursor.execute('select * from students where id = %(id)s', args={'id': 10})
會自動關閉遊標編程
import MySQLdb connect = MySQLdb.connect(host='localhost', user='zengzeng', password='123456', database='XKD_Python_Course', use_unicode=True, charset='utf8') with connect as cursor: # 會自動關閉cursor對象 cursor.execute("insert into students (name, age) values ('zengzeng', 22)") # 此時鏈接尚未關閉 cursor.execute("insert into students (name, age) values ('Mark', 23)") connect.close()
Queue模塊實現了多生產者多消費者隊列,尤爲適合多線程編程,Queue類中實現了全部須要的鎖原語,Queue模塊實現了三種類型隊列:多線程
queue模塊中的Queue與multiprocessing模塊的Queue的區別併發
from threading import Thread, Event from queue import Queue import time def write(q: Queue, e: Event): for value in range(100): print('put {} to queue'.format(value)) q.put(value) time.sleep(0.5) else: e.set() def read(q: Queue, e: Event): while True : if not q.empty() or not e.is_set(): value = q.get() print('get {} from queue'.format(value)) time.sleep(1) else: break if __name__ == '__main__': q = Queue() e = Event() tw = Thread(target=write, args=(q,e)) tr = Thread(target=read, args=(q,e)) tw.start() tr.start() tw.join() tr.join() print('finished ')
from multiprocessing import Process,Queue, Event import time def write(q: Queue, e: Event): for value in range(100): print('put {} to queue'.format(value)) q.put(value) time.sleep(0.5) else: e.set() def read(q: Queue, e: Event): while True : if not q.empty() or not e.is_set(): value = q.get() print('get {} from queue'.format(value)) time.sleep(1) else: break if __name__ == '__main__': q = Queue() e = Event() pw = Process(target=write, args=(q,e)) pr = Process(target=read, args=(q,e)) pw.start() pr.start() pw.join() pr.join() print('finished ')
qsize()
:返回隊列的大體大小;empty()
:判斷隊列是否爲空,若是隊列爲空,返回True,反之False;full()
:判斷是否滿了;put()
:將項目放入隊列;put_nowait
:至關於put(item, False);get()
:從隊列中刪除並返回一個項目;get_nowait()
:提供了兩種方法來支持跟蹤守護進程消費者線程是否已徹底處理入隊任務;task_done()
:表示之前排隊的任務已完成;join()
:阻止直到隊列中的全部項目都已獲取並處理完畢;from queue import Queue import MySQLdb class ConnectPool: def __init__(self, size=5, *args, **kwargs): if not isinstance(size, int) or size < 1: size = 10 self.__pool = Queue(size) for i in range(size): self.__pool.put(MySQLdb.connect(*args, **kwargs)) @property def connect(self): return self.__pool.get() @connect.setter def connect(self, conn): self.__pool.put(conn) if __name__ == '__main__': # 構建鏈接池 pool = ConnectPool(host='localhost', user='zengzeng', password='123456', database='XKD_Python_Course', use_unicode=True, charset='utf8') # 獲取一個鏈接 connect = pool.connect # with connect as cursor: with cursor: sql = 'select * from students' cursor.execute(sql) print(cursor.fetchall())
from queue import Queue import MySQLdb import threading class ConnectPool: def __init__(self, size=5, *args, **kwargs): if not isinstance(size, int) or size < 1: size = 10 self.__pool = Queue(size) for i in range(size): self.__pool.put(MySQLdb.connect(*args, **kwargs)) # 建立一個local對象 self.local = threading.local() @property def connect(self): return self.__pool.get() @connect.setter def connect(self, conn): self.__pool.put(conn) def __enter__(self): if getattr(self.local, 'conn', None) is None: self.local.conn = self.connect return self.local.conn.cursor() def __exit__(self, *exc_info): if exc_info: self.local.conn.rollback() else: self.local.conn.commit() # 將鏈接對象歸還到鏈接池 self.connect = self.local.conn # 線程級別的鏈接對象引用計算器減一 self.local.conn = None if __name__ == '__main__': pool = ConnectPool(host='localhost', user='zengzeng', password='123456', database='XKD_Python_Course', use_unicode=True, charset='utf8') def foo(pool): with pool as cursor: with cursor: # 對cursor作上下文管理 sql = 'select * from students' cursor.execute(sql) print(cursor.fetchall()) for i in range(5): t = threading.Thread(target=foo, args=(pool, )) t.start()