本篇文章主要講解mysqlclient操做MySQL關係型數據庫,安裝mysqlclient的命令行:pip install mysqlclient
而後建立一個名爲XKD_Python_Course的數據庫和一張名爲students的數據庫表,咱們先在命令行工具裏面查看一下表名是否存在,登陸mysql數據庫的命令行:mysql -uroot -p
,而後 show databases;
,發現沒有XKD_Python_Course這個數據庫,那咱們就建立一個:create database XKD_Python_Course;
,建立完再: show databases;
use XKD_Python_Course;
進入這個庫,而後開始建立數據庫表;CREATE TABLE `students` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) NOT NULL, `age` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
show create table students;
,查看一下建立表的建立;desc students;
,查看錶的結構;grant all on XKD_Python_Course.* to 'zengzeng'@'%' identified by '123456'; # 刷新權限 flush privileges;
mysql -uzengzeng -p123456
,就能夠成功登陸,登陸後能夠show databases;
查看一下剛剛建立的數據庫use XKD_Python_Course
select * from students;
,而後能夠放心的進行插入了import MySQLdb connect = None # 鏈接對象 cursor = None # 遊標對象 try: # 鏈接對象 connect = MySQLdb.connect(host='localhost', # 主機地址 user='zengzeng', # 帳號 password='123456', # 密碼 database='XKD_Python_Course', # 數據庫名 use_unicode=True, charset='utf8') # 指定字符集 # 遊標對象 cursor = connect.cursor() # 經過鏈接對象調用cursor() except Exception as e: print(e) connect.close() try: if cursor: result = cursor.execute("insert into students (name, age) values ('Angle', 18)") # 插入操做 print('result = {}'.format(result)) connect.commit() # 提交 except Exception as e: print(e) connect.rollback() # 回滾 finally: if cursor: cursor.close() if connect: connect.close()
執行代碼,返回的結果是result = 1,表示操做了一行數據,這時咱們查看數據庫表select * from students;
import MySQLdb connect = None # 鏈接對象 cursor = None # 遊標對象 try: # 鏈接對象 connect = MySQLdb.connect(host='localhost', # 主機地址 user='zengzeng', # 帳號 password='123456', # 密碼 database='XKD_Python_Course', # 數據庫名 use_unicode=True, charset='utf8') # 指定字符集 # 遊標對象 cursor = connect.cursor() # 經過鏈接對象調用cursor() except Exception as e: print(e) connect.close() try: if cursor: for i in range(10): result = cursor.execute("insert into students (name, age) values ('Angle', {})".format(i)) # 插入操做 connect.commit() # 提交 except Exception as e: print(e) connect.rollback() # 回滾 finally: if cursor: cursor.close() if connect: connect.close()
import MySQLdb from pprint import pprint # 換行 connect = None # 鏈接對象 cursor = None # 遊標對象 try: # 鏈接對象 connect = MySQLdb.connect(host='localhost', # 主機地址 user='zengzeng', # 帳號 password='123456', # 密碼 database='XKD_Python_Course', # 數據庫名 use_unicode=True, charset='utf8') # 指定字符集 # 遊標對象 cursor = connect.cursor() # 經過鏈接對象調用cursor() except Exception as e: print(e) connect.close() try: if cursor: cursor.execute('select * from students') # 不會返回任何對象,想要拿到數據能夠經過cursor.方法名 one_result = cursor.fetchone() # 查詢結果集的下一條數據 many_result = cursor.fetchmany(5) # 查詢結果集的下五條數據 all_result = cursor.fetchall() # 查詢結果集的剩餘全部數據 # 換行打印 pprint(one_result) print('*' * 100) pprint(many_result) print('*' * 100) pprint(all_result) connect.commit() # 提交 except Exception as e: print(e) connect.rollback() # 回滾 finally: if cursor: cursor.close() if connect: connect.close()
# 位置參數 cursor.execute('select * from students where id = %s', args=(10, )) # 關鍵字參數 cursor.execute('select * from students where id = %(id)s', args={'id': 10})
import MySQLdb connect = MySQLdb.connect(host='localhost', user='zengzeng', password='123456', database='XKD_Python_Course', use_unicode=True, charset='utf8') with connect as cursor: # 會自動關閉cursor對象 cursor.execute("insert into students (name, age) values ('zengzeng', 22)") # 此時鏈接尚未關閉 cursor.execute("insert into students (name, age) values ('Mark', 23)") connect.close()
from threading import Thread, Event from queue import Queue import time def write(q: Queue, e: Event): for value in range(100): print('put {} to queue'.format(value)) q.put(value) time.sleep(0.5) else: e.set() def read(q: Queue, e: Event): while True : if not q.empty() or not e.is_set(): value = q.get() print('get {} from queue'.format(value)) time.sleep(1) else: break if __name__ == '__main__': q = Queue() e = Event() tw = Thread(target=write, args=(q,e)) tr = Thread(target=read, args=(q,e)) tw.start() tr.start() tw.join() tr.join() print('finished ')
from multiprocessing import Process,Queue, Event import time def write(q: Queue, e: Event): for value in range(100): print('put {} to queue'.format(value)) q.put(value) time.sleep(0.5) else: e.set() def read(q: Queue, e: Event): while True : if not q.empty() or not e.is_set(): value = q.get() print('get {} from queue'.format(value)) time.sleep(1) else: break if __name__ == '__main__': q = Queue() e = Event() pw = Process(target=write, args=(q,e)) pr = Process(target=read, args=(q,e)) pw.start() pr.start() pw.join() pr.join() print('finished ')
:至關於put(item, False);get()
:阻止直到隊列中的全部項目都已獲取並處理完畢;from queue import Queue import MySQLdb class ConnectPool: def __init__(self, size=5, *args, **kwargs): if not isinstance(size, int) or size < 1: size = 10 self.__pool = Queue(size) for i in range(size): self.__pool.put(MySQLdb.connect(*args, **kwargs)) @property def connect(self): return self.__pool.get() @connect.setter def connect(self, conn): self.__pool.put(conn) if __name__ == '__main__': # 構建鏈接池 pool = ConnectPool(host='localhost', user='zengzeng', password='123456', database='XKD_Python_Course', use_unicode=True, charset='utf8') # 獲取一個鏈接 connect = pool.connect # with connect as cursor: with cursor: sql = 'select * from students' cursor.execute(sql) print(cursor.fetchall())
from queue import Queue import MySQLdb import threading class ConnectPool: def __init__(self, size=5, *args, **kwargs): if not isinstance(size, int) or size < 1: size = 10 self.__pool = Queue(size) for i in range(size): self.__pool.put(MySQLdb.connect(*args, **kwargs)) # 建立一個local對象 self.local = threading.local() @property def connect(self): return self.__pool.get() @connect.setter def connect(self, conn): self.__pool.put(conn) def __enter__(self): if getattr(self.local, 'conn', None) is None: self.local.conn = self.connect return self.local.conn.cursor() def __exit__(self, *exc_info): if exc_info: self.local.conn.rollback() else: self.local.conn.commit() # 將鏈接對象歸還到鏈接池 self.connect = self.local.conn # 線程級別的鏈接對象引用計算器減一 self.local.conn = None if __name__ == '__main__': pool = ConnectPool(host='localhost', user='zengzeng', password='123456', database='XKD_Python_Course', use_unicode=True, charset='utf8') def foo(pool): with pool as cursor: with cursor: # 對cursor作上下文管理 sql = 'select * from students' cursor.execute(sql) print(cursor.fetchall()) for i in range(5): t = threading.Thread(target=foo, args=(pool, )) t.start()