數據庫學習筆記 (三) python操做數據庫

python 操做MYSQL數據庫主要有兩種方式:   html

  使用原生模塊:pymysql
  ORM框架:SQLAchemypython

1、pymysqlmysql

1.1下載安裝模塊sql

第一種:cmd下:執行命令下載安裝:pip3 install pymysql
第二種:IDE下pycharm python環境路徑下添加模塊

1.2使用操做數據庫

#導入模塊
import pymysql

#創建鏈接通道,創建鏈接填入(鏈接數據庫的IP地址,端口號,用戶名,密碼,要操做的數據庫,字符編碼)
conn = pymysql.connect(
	host="",
	port="",
	user='',
	password='',
	database=""
	charset="",
	)  

# 建立遊標,操做設置爲字典類型,返回結果爲字典格式!不寫默認是元組格式!
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)

#操做數據庫的sql語句
sql=""

# 向數據庫發送數據,在方法內部進行拼接!!!

#向數據庫發送操做單條操做指令
# 格式化輸入的值能夠單個按順序傳入 或是寫成列表 (注意 順序和位置)
r = cursor.execute(sql,v1,v2……)

r = cursor.execute(sql,args)
#r 表明接收返回受影響的行數(數字)及執行這一條sql語句,數據庫中有多少行受到了影響。
#sql 指上邊寫的sql語句
#args 指要給sql語句中傳的參數
sql 語句能夠不傳值 及爲空 []
sql 語句能夠傳一個值 及 [v1,]
sql 語句能夠傳多值 及 [v1,v2,v3……]

#向數據庫發送操做多條數據指令 args=[(v1,s1),(v2,s2),(v3,s3)]
r = cursor.executemany(sql,[('egon','sb'),('laoyao','BS')])

#數據庫有四種操做:增刪改查!
# 執行查操做的時候就得接收從數據庫返回的數據!
#執行增刪改操做的時候,就須要像數據庫提交數據!

#查操做:(接收的數據格式由建立的遊標樣式決定!)
#接收數據有三種方式:

res = cursor.fetchone()  #接收返回的第一行數據

ret = cursor.fetchmany(n) #接收返回的n行數據

req = cursor.fetchall() #接收返回的說有數據

#注:在fetch數據時按照順序進行,可使用cursor.scroll(num,mode)來移動遊標位置,如:

cursor.scroll(1,mode='relative')  # 相對當前位置移動
cursor.scroll(2,mode='absolute') # 相對絕對位置移動

#增刪改操做:
#寫完發送操做語句以後,就須要把更改的數據提交,否則數據庫沒法完成新建或是修改操做

conn.commit() #提交

#注:此處有個獲取新建數據自增ID的操做(只能拿到最後那一行的id數)
#執行增長語句,並提交以後,能夠獲取到
new_id=cursor.lastrowid
print(new_id)

#操做完成以後,就須要關閉鏈接
cursor.close() #關閉遊標
conn.close()   #關閉鏈接

操做總結:
  一、重中之重,必定要注意sql注入的問題!!!編程

#格式化寫入sql語句,就會形成sql注入的狀況!!!

import pymysql

user = input("username:")
pwd = input("password:")

conn = pymysql.connect(host="localhost",user='root',password='',database="db666")
cursor = conn.cursor()
sql = "select * from userinfo where username='%s' and password='%s'" %(user,pwd,)
# select * from userinfo where username='uu' or 1=1 -- ' and password='%s'
cursor.execute(sql)
result = cursor.fetchone()
cursor.close()
conn.close()

if result:
    print('登陸成功')
else:
    print('登陸失敗')
sql注入問題示例
import pymysql

user = input("username:")
pwd = input("password:")

conn = pymysql.connect(host="localhost",user='root',password='',database="db666")
cursor = conn.cursor()
sql = "select * from userinfo where username=%s and password=%s"
# sql = "select * from userinfo where username=%(u)s and password=%(p)s"

#傳入數據類型舉例
cursor.execute(sql,user,pwd)  #直接傳值
# cursor.execute(sql,[user,pwd]) #列表形式
# cursor.execute(sql,{'u':user,'p':pwd}) #字典格式
result = cursor.fetchone()
cursor.close()
conn.close()
if result:
    print('登陸成功')
else:
    print('登陸失敗')
傳入數據類型舉例
import pymysql

# 增長,刪,該
# conn = pymysql.connect(host="localhost",user='root',password='',database="db666")
# cursor = conn.cursor()
# sql = "insert into userinfo(username,password) values('root','123123')"
# 受影響的行數
# r = cursor.execute(sql)
# #  ******
# conn.commit()
# cursor.close()
# conn.close()

# conn = pymysql.connect(host="localhost",user='root',password='',database="db666")
# cursor = conn.cursor()
# # sql = "insert into userinfo(username,password) values(%s,%s)"
# # cursor.execute(sql,(user,pwd,))

#插入多條信息
# sql = "insert into userinfo(username,password) values(%s,%s)"
# # 受影響的行數
# r = cursor.executemany(sql,[('egon','sa'),('laoyao','BS')])
# #  ******
# conn.commit()
# cursor.close()
# conn.close()


# 查
# conn = pymysql.connect(host="localhost",user='root',password='',database="db666")
# cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
# sql = "select * from userinfo"
# cursor.execute(sql)

# cursor.scroll(1,mode='relative')  # 相對當前位置移動
# cursor.scroll(2,mode='absolute') # 相對絕對位置移動
# result = cursor.fetchone()
# print(result)
# result = cursor.fetchone()
# print(result)
# result = cursor.fetchone()
# print(result)
# result = cursor.fetchall()
# print(result)
# result = cursor.fetchmany(4)
# print(result)
# cursor.close()
# conn.close()


# 新插入數據的自增ID: cursor.lastrowid
# import pymysql
#
# conn = pymysql.connect(host="localhost",user='root',password='',database="db666")
# cursor = conn.cursor()
# sql = "insert into userinfo(username,password) values('asdfasdf','123123')"
# cursor.execute(sql)
# conn.commit()
# print(cursor.lastrowid)
# cursor.close()
# conn.close()
增刪改查操做

 2、SQLAchemy session

2.1下載安裝模塊oracle

pip3 install SQLAlchemy
IDE下pycharm python環境路徑下添加模塊

2.2原理框架

SQLAlchemy是python編程語言下的一款ORM框架,該框架創建在數據庫API之上,使用關係對象映射進行數據庫操做,簡言之即是:將對象轉換成SQL,而後使用數據API執行SQL並獲取執行結果。編程語言

  利用模塊,按照對應規則,自動生成sql語句!
    做用:提供簡單的規則,自動轉換成sql語句,最終仍是執行sql語句,獲取結果!

ORM操做流程:
  建立一個類,類對應數據庫的表,類能實例一個對象,這個對象對應表裏的數據行
關係對象映射關係:

代碼     數據庫

 類  --->   表
對象 ---> 行

DB first :手動建立數據庫和表,經過ORM框架 根據數據庫,經過類生成一個一個表

code first :手動建立類和數據庫,經過ORM框架 利用類建立表

SQLAlchemy自己沒法操做數據庫,其必須以來pymsql等第三方插件,Dialect用於和數據API進行交流,根據配置文件的不一樣調用不一樣的數據庫API,從而實現對數據庫的操做。

遠程鏈接數據庫引擎類型:
	MySQL-Python
		mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
	   
	pymysql
		mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
	   
	MySQL-Connector
		mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
	   
	cx_Oracle
		oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
	   
	更多詳見:http://docs.sqlalchemy.org/en/latest/dialects/index.html

  SQLAchemy 只負責把類轉換成sql語句,鏈接數據庫仍是須要插件配合着數據庫模塊鏈接的。
    鏈接的數據庫不一樣,轉換成的sql語句也不一樣。
    提早必須有鏈接不一樣數據庫的模塊或是軟件,SQLAchemy再去配置

  規則:導入模塊,生成一個基類,而後再用建立類的方法去建立表,sql語句中的語法,所有轉換成了方法

    雖然沒有使用__init__方法,可是在執行定義的時候,會copy到__init__中
    找到當前全部繼承base的類,而後建立對應的表

  注意:利用SQLAchemy 建立表以前,須要先手動建立一個數據庫!

2.3操做

導入模塊:
	from sqlalchemy.ext.declarative import declarative_base
	from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHAR
	from sqlalchemy.orm import sessionmaker, relationship
	from sqlalchemy import create_engine

建立基類:
	Base = declarative_base()

經過pymysql與mysql數據庫創建遠程鏈接 和設置最大鏈接數:
	engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/day63?charset=utf8", max_overflow=5)
	
建立單表:
	class 類名(Base):
		__tablename__="表名" #建立表名
		列名=Column(數據類型,是否爲空,主鍵,自增,索引,惟一索引)
		__table_args__(
			UniqueConstraint("列名1","列名2","聯合惟一索引名"),
			index("索引名","列名1","列名2"),
		) #建立聯合惟一索引
	數據類型:Integer 整型;String 字符串類型(CHAR,VARCHAR也能夠);
	是否爲空:nullable=True,
	
	是否爲主鍵:primary_key=True, 
	是否自增:autoincrement=True
	 
	索引:index=True
	惟一索引:unique=True
	例:
	class Users(Base):
		__tablename__ = 'users'
		id = Column(Integer, primary_key=True, autoincrement=True)
		name = Column(VARCHAR(32), nullable=True, index=True)
		email = Column(VARCHAR(16), unique=True)
		__table_args__ = (
			UniqueConstraint('id', 'name', name='uix_id_name'),
			Index('ix_n_ex','name', 'email',),
		 )

建立有外鍵關係的多表:
	一、先建立一個繼承Base基類 存放數據的普通表
	二、建立一個繼承Base基類 與其有外鍵關係的表
	三、語法:外鍵名("表名.列名")ForeignKey("usertype.id") 
	
	例:
		class UserType(Base):
    __tablename__ = "usertype"
    id = Column(Integer,primary_key=True,autoincrement=True)
    title = Column(String(32),nullable=True,index=True)

	class Users(Base):
		__tablename__ = "users"
		id = Column(Integer,primary_key=True,autoincrement=True)
		name = Column(String(32),nullable=True,index=True)
		email = Column(String(16),unique=True)
		u_type_id = Column(Integer,ForeignKey("usertype.id"))
		#外鍵名 = Column(數據類型,ForeignKey("表名.列名"))
		 
生成表或是刪除表(能夠把操做寫成一個函數!):
	#找到當前全部繼承base的類,而後建立全部的表
	def create_table():
		Base.metadata.create_all(engine)
	#找到當前全部繼承base的類,而後刪除全部的表
	def del_table():
		Base.metadata.drop_all(engine)

操做表:
	萬年不變的 數據行 的增刪改查

	#首先,先創建連接通道
		Session = sessionmaker(bind=engine)
		session = Session()
	
	#其次,操做表   注意:操做內填如的內容,必定並必須是表達式!
	
		#增   
			對哪張表更改,就用其對應的類進行實例化,生成的對象就表明着數據行
		
			#增長單個  session.add() 
				obj = UserType(title = "黑金用戶")
				session.add(obj)
			
			#增長多個  session.add_all()
				objs =[
					UserType(title = "會員用戶"),
					UserType(title = "超級用戶"),
					UserType(title = "鉑金用戶"),
					UserType(title = "黑金用戶"),
				]
				session.add_all(objs)
		
		#查  session.query(類名).all()  #直接獲取整個類(表)下全部的對象(數據行)
			
			#直接操做,獲取的是像數據庫發送執行的sql語句
			res = session.query(UserType)  #SQL語句
			print(res)
			
			#獲取全部對應類(表)的對象(數據行) 列表類型
			res_list = session.query(UserType).all()  
 			print(res_list)
			
			#查詢操做,獲取表中某列的值!是對接收到的整個列表進行循環遍歷查找
			
			#查詢整個表內的信息 ------->等效於數據庫中:  select xxx from usertype
			res_list = session.query(UserType).all()
			for sss in res_list:
				print(sss.id,sss.title)
			
		#注意點:.filter()方法是過濾的意思,至關於sql語句中的where
			
			#條件查找表內信息 -------->等效於數據庫中: select xxx usertype where 條件
			res_list = session.query(UserType).filter(UserType.id >2)
			for sss in res_list:
				print(sss.id,sss.title)

	#注意點:執行刪除和更改操做時,都是先把數據行找到(查操做),再進行刪或改操做!
		
		#刪  找到對應的數據行,刪除便可  .delete()
		
			#先找後刪,等效於------> delete from usertype where usertype.id > 4
			session.query(UserType).filter(UserType.id > 4).delete()
		
		#改 先查後改  注意傳值的格式!
			#這裏有個參數 synchronize_session 沒別的招,看源碼解釋!!!
			#對錶進行批量更改!
			session.query(UserType).filter(UserType.id>0).update({"title":"黑金"}) 
			#動態獲取原表的數據(char類型),對錶進行批量更改
			session.query(UserType).filter(UserType.id>0).update({UserType.title:UserType.title+"SX"},synchronize_session=False) 
			#動態獲取原表的數據(int類型),對錶進行批量更改
			session.query(UserType).filter(UserType.id>0).update({"title":UserType.id+1},synchronize_session="evaluate") 

		#查找其餘操做:
			# 分組,排序,連表,通配符,子查詢,limit,union,where,原生SQL、
			
			# 條件

				#過濾,又叫條件判斷
				ret = session.query(Users).filter_by(name='alex').all()
				#兩個表達式同時存在,逗號分開,不寫關係默認是 and
				ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
				#between and
				ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
				#in判斷 語法:in_
				ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
				# not in 判斷 語法:表達式最前加 ~
				ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
				#子查詢
				ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
				#邏輯判斷:  and_ or_ 操做  
				from sqlalchemy import and_, or_
				ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
				ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
				ret = session.query(Users).filter(
				or_(
					Users.id < 2,
					and_(Users.name == 'eric', Users.id > 3),
					Users.extra != ""
				)).all()


			# 通配符 .like()的方法調用
				ret = session.query(Users).filter(Users.name.like('e%')).all()
				ret = session.query(Users).filter(~Users.name.like('e%')).all()

			# 限制
				ret = session.query(Users)[1:2]
				
			#分頁 .limit(n) 取n個數據
				res_list = session.query(UserType).limit(2).all()
				for sss in res_list:
					print(sss.id,sss.title)

			# 排序  查表.order_by(列名.desc()/列名.asc())  [.desc() 由大到小;.asc() 由小到大]
				ret = session.query(Users).order_by(Users.name.desc()).all()
				ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

			# 分組 聚合函數func.方法(列名)  和 .group_by(列名) 方法
				from sqlalchemy.sql import func
				ret = session.query(Users).group_by(Users.extra).all()
				ret = session.query(
					func.max(Users.id),
					func.sum(Users.id),
					func.min(Users.id)).group_by(Users.name).all()

				ret = session.query(
					func.max(Users.id),
					func.sum(Users.id),
					func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()

			# 連表

				ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() #直連
				ret = session.query(Person).join(Favor).all()
				ret = session.query(Person).join(Favor, isouter=True).all()
				
			#子查詢 三種樣式

				#查詢的信息做爲另外一張表的條件
				# 1.select * from b where id in (select id from tb2)

				#最爲一張新表進行二次篩選	
				# 2. select * from (select * from tb) as B
					
					#查詢語句.subquery()  查詢結果做爲一個子查詢(新表)好在進行下一步的查詢。不加.subquery()的話會報錯,再也不往下查詢
					# q1 = session.query(UserType).filter(UserType.id > 0).subquery()
					# result = session.query(q1).all()
					# print(result)

				#做爲一個列內的數據,在另外一張表中顯示 ****** .as_scalar()方法
				# 3. select id ,(select * from users where users.user_type_id=usertype.id) from usertype;

					# session.query(UserType,Users)
					
					# result = session.query(UserType.id,session.query(Users).as_scalar())
					# print(result) #查看對應的sql語句
					
					# result = session.query(UserType.id,session.query(Users).filter(Users.user_type_id==UserType.id).as_scalar())
					# print(result) #查看對應的sql語句


			# 組合(上下連表) .union()  和 .union_all()  注意是:先把信息找到再操做!
				q1 = session.query(Users.name).filter(Users.id > 2)
				q2 = session.query(Favor.caption).filter(Favor.nid < 2)
				ret = q1.union(q2).all()

				q1 = session.query(Users.name).filter(Users.id > 2)
				q2 = session.query(Favor.caption).filter(Favor.nid < 2)
				ret = q1.union_all(q2).all()
				
		- 便利的功能 relationship() 與生成表結構無關,僅用於查詢方便  釋放了連表操做的繁瑣查找,直接經過方法定位!
			使用規範:哪一個類中有外鍵列,就在外鍵列下添加。
			語法:自定義名=relationship("外鍵有聯繫的類名",backref="任意命名")
			
			
			# 問題1. 獲取用戶信息以及與其關聯的用戶類型名稱(FK,Relationship=>正向操做)
				
				#初始方法:連表操做
				user_list = session.query(Users,UserType).join(UserType,isouter=True)
				print(user_list)
				for row in user_list:
				    print(row[0].id,row[0].name,row[0].email,row[0].user_type_id,row[1].title)
				
				user_list = session.query(Users.name,UserType.title).join(UserType,isouter=True).all()
				for row in user_list:
				    print(row[0],row[1],row.name,row.title)

				#(FK,Relationship=>正向操做) 先查用戶信息表,經過命名的 自定義名 正向獲取用戶類型
				user_list = session.query(Users)
				for row in user_list:
					print(row.name,row.id,row.user_type.title)

			# 問題2. 獲取用戶類型  (FK,Relationship=>反向操做)
				
				#連表操做:
				type_list = session.query(UserType)
				for row in type_list:
				    print(row.id,row.title,session.query(Users).filter(Users.user_type_id == row.id).all())
				
				#反向操做:先查類型表,再經過backref 自定義的變量 反向查找用戶信息
				type_list = session.query(UserType)
				for row in type_list:
				    print(row.id,row.title,row.xxoo)
							
			PS:正向操做與反向操做,是相對於外鍵來相對判斷的!
				例如:A表與B表,A表中創建了與B表聯繫的外鍵,A表經過外鍵獲取B表中的信息,叫正向操做;反之,叫反向操做!
			
	最後,操做及語法寫完後,都須要提交給數據庫去執行,再也不使用也須要斷開鏈接!
		session.commit()  #提交
		session.close()	  #關閉鏈接
#!/usr/bin/env python
# _*_ coding:utf-8 _*_

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine

Base = declarative_base()
engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/day63?charset=utf8",max_overflow=5)

class UserType(Base):
    __tablename__ = "usertype"
    id = Column(Integer,primary_key=True,autoincrement=True)
    title = Column(String(32),nullable=True,index=True)

class Users(Base):
    __tablename__ = "users"
    id = Column(Integer,primary_key=True,autoincrement=True)
    name = Column(String(32),nullable=True,index=True)
    email = Column(String(16),unique=True)
    u_type_id = Column(Integer,ForeignKey("usertype.id"))

    u_type = relationship("UserType",backref="sss")

def create_table():
    Base.metadata.create_all(engine)

def del_table():
    Base.metadata.drop_all(engine)

#類 --> 表
#對象 --> 行

#創建連接通道
Session = sessionmaker(bind=engine)
session = Session()

#操做內填入的內容,必須是表達式

########## 增長 ###################

#增長單個
# obj = UserType(title = "黑金用戶")
# session.add(obj)

#增長多個
# objs =[
#     UserType(title = "會員用戶"),
#     UserType(title = "超級用戶"),
#     UserType(title = "鉑金用戶"),
#     UserType(title = "黑金用戶"),
# ]
# session.add_all(objs)

############ 查詢 ################
# res = session.query(UserType)  #SQL語句
# print(res)

# res_list = session.query(UserType).all()  #獲取全部對應類(表)的對象(數據行) 列表類型
# print(res_list)

#select xxx from usertype
# res_list = session.query(UserType).limit(2).all()
# for sss in res_list:
#     print(sss.id,sss.title)
#
# #select xxx usertype where ***
# res_list = session.query(UserType).filter(UserType.id >2)
# for sss in res_list:
#     print(sss.id,sss.title)

############### 刪除 ###################

# delete from usertype where usertype.id > 4
# session.query(UserType).filter(UserType.id > 4).delete()

################ 更改 ########################

#這裏有個參數 synchronize_session 沒別的招,看源碼解釋!!!
# session.query(UserType).filter(UserType.id>0).update({"title":"黑金"}) #對錶進行批量更改
# session.query(UserType).filter(UserType.id>0).update({UserType.title:UserType.title+"SX"},synchronize_session=False) #動態獲取原先的數據,對錶進行批量更改
# session.query(UserType).filter(UserType.id>0).update({"title":UserType.id+1},synchronize_session="evaluate") #對錶進行批量更改

############# 查詢其餘操做 #################
# 分組,排序,連表,通配符,limit,union,where,原生SQL#

#條件 and or
# ret = session.query(Users).filter(Users.id > 1, Users.name == "sesc").all()
# for row in ret:
#     print(row.email)


# #正向操做
# res = session.query(Users)
# for row in res:
#     print(row.id,row.name,row.u_type.title)
#
# #反向操做
# res = session.query(UserType)
# for row in res:
#     for a in row.sss:
#         print(row.id,row.title,a.name)

session.commit()
session.close()
部分代碼舉例!從上邊粘貼測試便可!
相關文章
相關標籤/搜索