做者 | CDA數據分析師
來源 | CDA數據分析研究院
本文涉及到的開發環境:python
兩種方法進行數據庫的鏈接分別是PyMySQL和mysql.connectormysql
步驟:sql
PyMySQL : 是封裝了MySQL驅動的Python驅動,一個能使Python鏈接到MySQL的庫數據庫
環境要求:Python version >= 3.4app
PyMySQL安裝fetch
安裝編碼
Win鍵+X鍵再按I鍵,調出Windows PowerShell窗口,輸入spa
pip install PyMySQL操作系統
回車3d
運行結果以下則安裝成功
pip version ===19.0.3
查看版本
查看PyMySQL的版本,輸入
pip show PyMySQL
回車
首先咱們的MySQL數據庫已安裝,且已建好名爲test的數據庫,其中有名爲student的表
import pymysql
conn=pymysql.connect(host = '127.0.0.1' # 鏈接名稱,默認127.0.0.1
,user = 'root' # 用戶名
,passwd='password' # 密碼
,port= 3306 # 端口,默認爲3306
,db='test' # 數據庫名稱
,charset='utf8' # 字符編碼
)
cur = conn.cursor() # 生成遊標對象
sql="select * from student
" # SQL語句
cur.execute(sql) # 執行SQL語句
data = cur.fetchall() # 經過fetchall方法得到數據
for i in data[:2]: # 打印輸出前2條數據
print (i)
cur.close() # 關閉遊標
conn.close() # 關閉鏈接
上述代碼中,實現了經過Python鏈接MySQL查詢全部的數據,並輸出前2條數據的功能。執行結果以下:
('a', '趙大', '16')
('b', '錢二', '16')
mysql-connector-python:是MySQL官方的純Python驅動;
mysql.connector安裝
安裝
pip install mysql
查看版本
pip show mysql
首先咱們的MySQL數據庫已安裝,且已建好名爲test的數據庫,其中有名爲student的表
import mysql.connector
conn=mysql.connector.connect(host = '127.0.0.1' # 鏈接名稱,默認127.0.0.1
,user = 'root' # 用戶名
,passwd='password' # 密碼
,port= 3306 # 端口,默認爲3306
,db='test' # 數據庫名稱
,charset='utf8' # 字符編碼
)
cur = conn.cursor() # 生成遊標對象
sql="select * from student
" # SQL語句
cur.execute(sql) # 執行SQL語句
data = cur.fetchall() # 經過fetchall方法得到數據
for i in data[:2]: # 打印輸出前2條數據
print (i)
cur.close() # 關閉遊標
conn.close() # 關閉鏈接
上述代碼中,實現了經過Python鏈接MySQL查詢全部的數據,並輸出前2條數據的功能。執行結果以下:
('a', '趙大', '16')
('b', '錢二', '16')
接下來咱們以用pymysql包爲例,介紹一下如何用Python對數據庫中的數據進行增刪改查 。
增
import pymysql
conn=pymysql.connect(host = '127.0.0.1' # 鏈接名稱,默認127.0.0.1
,user = 'root' # 用戶名
,passwd='password' # 密碼
,port= 3306 # 端口,默認爲3306
,db='test' # 數據庫名稱
,charset='utf8' # 字符編碼
)
cur = conn.cursor() # 生成遊標對象
sql= "INSERT INTO student VALUES ('p','魏六','17')"
try:
cur.execute(sql1) # 執行插入的sql語句
conn.commit() # 提交到數據庫執行
except:
coon.rollback()# 若是發生錯誤則回滾
conn.close() # 關閉數據庫鏈接
而後咱們再運行查詢語句
import mysql.connector
conn=mysql.connector.connect(host = '127.0.0.1' # 鏈接名稱,默認127.0.0.1
,user = 'root' # 用戶名
,passwd='password' # 密碼
,port= 3306 # 端口,默認爲3306
,db='test' # 數據庫名稱
,charset='utf8' # 字符編碼
)
cur = conn.cursor() # 生成遊標對象
sql="select * from student
" # SQL語句
cur.execute(sql) # 執行SQL語句
data = cur.fetchall() # 經過fetchall方法得到數據
for i in data[:]: # 打印輸出全部數據
print (i)
cur.close() # 關閉遊標
conn.close() # 關閉鏈接
執行結果就是
('b', '錢二', '16')
('c', '張三', '17')
('d', '李四', '17')
('e', '王五', '16')
('a', '趙大', '16')
('p', '魏六', '17')
刪
import pymysql
conn=pymysql.connect(host = '127.0.0.1' # 鏈接名稱,默認127.0.0.1
,user = 'root' # 用戶名
,passwd='password' # 密碼
,port= 3306 # 端口,默認爲3306
,db='test' # 數據庫名稱
,charset='utf8' # 字符編碼
)
cur = conn.cursor() # 生成遊標對象
sql = "DELETE FROM student WHERE 學號
= "a"
try:
cur.execute(sql) # 執行插入的sql語句
conn.commit() # 提交到數據庫執行
except:
coon.rollback()# 若是發生錯誤則回滾
conn.close() # 關閉數據庫鏈接
改
import pymysql
conn=pymysql.connect(host = '127.0.0.1' # 鏈接名稱,默認127.0.0.1
,user = 'root' # 用戶名
,passwd='password' # 密碼
,port= 3306 # 端口,默認爲3306
,db='test' # 數據庫名稱
,charset='utf8' # 字符編碼
)
cur = conn.cursor() # 生成遊標對象
sql ="UPDATE student SET 學員姓名
= '歐陽' WHERE 學號
= 'b' "
try:
cur.execute(sql) # 執行插入的sql語句
conn.commit() # 提交到數據庫執行
except:
coon.rollback()# 若是發生錯誤則回滾
conn.close() # 關閉數據庫鏈接
查
import pymysql
conn=pymysql.connect(host = '127.0.0.1' # 鏈接名稱,默認127.0.0.1
,user = 'root' # 用戶名
,passwd='password' # 密碼
,port= 3306 # 端口,默認爲3306
,db='test' # 數據庫名稱
,charset='utf8' # 字符編碼
)
cur = conn.cursor() # 生成遊標對象
sql="select * from student
" # SQL語句
try:
cur.execute(sql) # 執行插入的sql語句
data = cur.fetchall()
for i in data[:]:
print (i)
conn.commit() # 提交到數據庫執行
except:
coon.rollback()# 若是發生錯誤則回滾
conn.close() # 關閉數據庫鏈接
小型案例
import pymysql
config = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd': 'password',
'charset':'utf8',
}
conn = pymysql.connect(**config)
cursor = conn.cursor()
try:
DB_NAME = 'test_3'
cursor.execute('DROP DATABASE IF EXISTS %s' %DB_NAME)
cursor.execute('CREATE DATABASE IF NOT EXISTS %s' %DB_NAME)
conn.select_db(DB_NAME)
TABLE_NAME = 'bankData'
cursor.execute('CREATE TABLE %s(id int primary key,money int(30))' %TABLE_NAME)
values = []
for i in range(20):
values.append((int(i),int(156*i)))
cursor.executemany('INSERT INTO bankData values(%s,%s)',values)
conn.commit()
count = cursor.execute('SELECT * FROM %s' %TABLE_NAME)
print ('total records:{}'.format(cursor.rowcount))
desc = cursor.description
print ("%s %3s" % (desc0, desc1))
cursor.scroll(10,mode='absolute')
results = cursor.fetchall()
for result in results:
print (result)
except:
import traceback
traceback.print_exc()
conn.rollback()
finally:
cursor.close()
conn.close()
FIFA球員信息系統
from pymysql import *
class Mysqlpython:
def __init__(self, database='test', host='127.0.0.1', user="root",
password='password', port=3306, charset="utf8"):
self.host = host
self.user = user
self.password = password
self.port = port
self.database = database
self.charset = charset
def open(self):
self.db = connect(host=self.host, user=self.user,
password=self.password, port=self.port,
database=self.database,
charset=self.charset)
self.cur = self.db.cursor()
def close(self):
self.cur.close()
self.db.close()
def Operation(self, sql):
try:
self.open()
self.cur.execute(sql)
self.db.commit()
print("ok")
except Exception as e:
self.db.rollback()
print("Failed", e)
self.close()
def Search(self, sql):
try:
self.open()
self.cur.execute(sql)
result = self.cur.fetchall()
return result
except Exception as e:
print("Failed", e)
self.close()
def Insert():#如何從外面將數據錄入到sql語句中
ID = int(input("請輸入球員編號:"))
people_name = input("請輸入球員名字:")
PAC = int(input("請輸入速度評分:"))
DRI = int(input("請輸入盤帶評分:"))
SHO = int(input("請輸入射門評分:"))
DEF = int(input("請輸入防守評分:"))
PAS = int(input("請輸入傳球評分:"))
PHY = int(input("請輸入身體評分:"))
score =(PAC+DRI+SHO+DEF+PAS+PHY)/6
sql_insert = "insert into FIFA(ID, people_name, PAC,DRI,SHO,DEF, PAS, PHY, score) values(%d,'%s',%d,%d,%d,%d,%d,%d,%d)"%(ID, people_name, PAC,DRI,SHO,DEF, PAS, PHY, score)
print(people_name)
return sql_insert
def Project():
print("球員的能力評分有:")
list=['速度','盤帶','射門','防守','傳球','身體','綜合']
print(list)
def Exit():
print("歡迎下次使用!!!")
exit()
def Search_choice(num):
date = Mysqlpython()
date.open()
if num=="2":
sql_insert = Insert()
date.Operation(sql_insert)
print("添加成功!")
Start()
elif num=="1":
input_date=input("請選擇您想要以什麼格式輸出:默認升序排列1.球員編號,2.速度,3.盤帶,4.射門, 5.防守, 6.傳球, 7.身體 , 8.綜合 ")
if input_date=="1":
sql_search = "select * from FIFA order by ID asc"
elif input_date=="2":
sql_search = "select * from FIFA order by PAC asc"
elif input_date=="3":
sql_search = "select * from FIFA order by DRI asc"
elif input_date=="4":
sql_search = "select * from FIFA order by SHO asc"
elif input_date=="5":
sql_search = "select * from FIFA order by DEF asc"
elif input_date=="6":
sql_search = "select * from FIFA order by PAS asc"
elif input_date=="7":
sql_search = "select * from FIFA order by PHY asc"
elif input_date=="8":
sql_search = "select * from FIFA order by PHY score"
else:
print("請從新輸入!")
result = date.Search(sql_search)
print(" 編號 姓名 速度 盤帶 射門 防守 傳球 身體 綜合 ")
for str in result:
print(str)
Start()
elif num=="3":
Project()
Start()
elif num=="4":
del_num=input("請輸入您要刪除球員的編號:")
sql_delete="delete from FIFA where id=%s"%del_num
date.Operation(sql_delete)
print("刪除成功!")
Start()
elif num=="5":
Exit()
else:
print("輸入有誤,請從新輸入!")
def Start():
print("")
print(" 歡迎來到FIFA球員信息系統 ")
print("1.查看球員信息 2.球員信息錄入 ")
print("3.球員能力 4.刪除球員信息 ")
print("5.退出系統 ")
print("")
choice = input("請輸入您的選擇:")
Search_choice(choice)
if __name__=="__main__":
Start()
銀行轉帳系統
先創建數據庫test_3和表bankdata
import pymysql
config = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd': 'password',
'charset':'utf8',
}
conn = pymysql.connect(**config)
cursor = conn.cursor()
try:
DB_NAME = 'test_3'
cursor.execute('DROP DATABASE IF EXISTS %s' %DB_NAME)
cursor.execute('CREATE DATABASE IF NOT EXISTS %s' %DB_NAME)
conn.select_db(DB_NAME)
TABLE_NAME = 'bankData'
cursor.execute('CREATE TABLE %s(id int primary key,money int(30))' %TABLE_NAME)
values = []
for i in range(20):
values.append((int(i),int(156*i)))
cursor.executemany('INSERT INTO bankData values(%s,%s)',values)
conn.commit()
count = cursor.execute('SELECT * FROM %s' %TABLE_NAME)
print ('total records:{}'.format(cursor.rowcount))
desc = cursor.description
print ("%s %3s" % (desc0, desc1))
cursor.scroll(10,mode='absolute')
results = cursor.fetchall()
for result in results:
print (result)
except:
import traceback
traceback.print_exc()
conn.rollback()
finally:
cursor.close()
conn.close()
構建系統
import pymysql
class TransferMoney(object):
def __init__(self, conn):
self.conn = conn
self.cur = conn.cursor()
def transfer(self, source_id, target_id, money):
if not self.check_account_avaialbe(source_id):
raise Exception("帳戶不存在")
if not self.check_account_avaialbe(target_id):
raise Exception("帳戶不存在")
if self.has_enough_money(source_id, money):
try:
self.reduce_money(source_id, money)
self.add_money(target_id, money)
except Exception as e:
print("轉帳失敗:", e)
self.conn.rollback()
else:
self.conn.commit()
print("%s給%s轉帳%s金額成功" % (source_id, target_id, money))
def check_account_avaialbe(self, acc_id):
"""判斷賬號是否存在, 傳遞的參數是銀行卡號的id"""
select_sqli = "select * from bankData where id=%d;" % (acc_id)
print("execute sql:", select_sqli)
res_count = self.cur.execute(select_sqli)
if res_count == 1:
return True
else:
return False
def has_enough_money(self, acc_id, money):
"""判斷acc_id帳戶上金額> money"""
select_sqli = "select money from bankData where id=%d;" % (acc_id)
print("execute sql:", select_sqli)
self.cur.execute(select_sqli) # ((1, 500), )
acc_money = self.cur.fetchone()[0]
if acc_money >= money:
return True
else:
return False
def add_money(self, acc_id, money):
update_sqli = "update bankData set money=money+%d where id=%d" % (money, acc_id)
print("add money:", update_sqli)
self.cur.execute(update_sqli)
def reduce_money(self, acc_id, money):
update_sqli = "update bankData set money=money-%d where id=%d" % (money, acc_id)
print("reduce money:", update_sqli)
self.cur.execute(update_sqli)
def __del__(self):
self.cur.close()
self.conn.close()
if name == '__main__':
conn = pymysql.connect(host = '127.0.0.1' # 鏈接名稱,默認127.0.0.1 ,user = 'root' # 用戶名,passwd='password' # 密碼,port= 3306 # 端口,默認爲3306,db='test_3' # 數據庫名稱,charset='utf8',autocommit=True, # 若是插入數據,自動提交給數據庫 )trans = TransferMoney(conn) trans.transfer(15, 12, 200)