安裝html
sudo pip install sqlalchemy 或sudo pip3 install sqlalchemypython
下載速度慢可以使用國內源,如:mysql
sudo pip install sqlalchemy -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
測試1redis
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker eng = create_engine('mysql+mysqlconnector://y-user:y-passwd@127.0.0.1:3306/y-db?charset=utf8') #eng = create_engine('postgresql+psycopg2://y-user:y-passwd@127.0.0.1:5432/y-db',echo=True,client_encoding='utf8') DB_Session = sessionmaker(bind=eng) session = DB_Session() data = session.execute("SELECT * FROM y-table") for row in data: for col in row: print col, print session.close()
測試2sql
#coding=utf-8 from sqlalchemy import * from sqlalchemy.orm import * eng = create_engine('mysql+mysqlconnector://y-user:y-passwd@127.0.0.1:3306/y-db?charset=utf8') metadata = MetaData(eng) # 綁定元信息 y_table = Table('y-table', metadata, autoload=True) session = create_session() query = session.query(y_table) data = query.all() for row in data: for col in row: print col, print session.close()
測試3--orm方式數據庫
數據庫表STUDENT的創建參考:session
http://my.oschina.net/u/2245781/blog/653897?fromerr=YEVmVLAxapp
# coding=utf-8 from sqlalchemy import Column, String, create_engine from sqlalchemy.dialects.mysql import DATETIME from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base import sys reload(sys) sys.setdefaultencoding( "utf-8" ) # 建立對象的基類: Base = declarative_base() # 定義User對象: class Stu(Base): # 表的名字: __tablename__ = 'STUDENT' # 表的結構: SNO = Column(String(7), primary_key=True) SNAME = Column(String(8)) SEX = Column(String(2)) BDATE = Column(DATETIME) DIR = Column(String(16)) def __repr__(self): return "<Stu(SNO='%s', SNAME='%s', SEX='%s',BDATE ='%s',DIR ='%s')>" % \ (self.SNO,self.SNAME,self.SEX,self.BDATE.strftime("%Y-%m-%d"),self.DIR) # 初始化數據庫鏈接: engine = create_engine('mysql+mysqlconnector://y-user:y-passwd@127.0.0.1:3306/y-db?charset=utf8') # 建立DBSession類型: DBSession = sessionmaker(bind=engine) # 建立session對象: session = DBSession() query = session.query(Stu) for s in session.query(Stu): print(s) #for s in query.all(): # print s.SNAME+' '+s.SNO+' '+s.SEX +' '+s.BDATE.strftime("%Y-%m-%d")+' ' + s.DIR session.close()
插入async
# coding=utf-8 from sqlalchemy import Column, String, create_engine from sqlalchemy.dialects.mysql import DATETIME from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base # 建立對象的基類: Base = declarative_base() # 定義User對象: class Stu(Base): # 表的名字: __tablename__ = 'STUDENT' # 表的結構: SNO = Column(String(7), primary_key=True) SNAME = Column(String(8)) SEX = Column(String(2)) BDATE = Column(DATETIME) DIR = Column(String(16)) def __repr__(self): return "<Stu(SNO='%s', SNAME='%s', SEX='%s',BDATE ='%s',DIR ='%s')>" % \ (self.SNO,self.SNAME,self.SEX,self.BDATE.strftime("%Y-%m-%d"),self.DIR) # 初始化數據庫鏈接: engine = create_engine('mysql+mysqlconnector://y-user:y-passwd@127.0.0.1:3306/y-db?charset=utf8') # 建立DBSession類型: DBSession = sessionmaker(bind=engine) # 建立session對象: session = DBSession() new_stu = Stu(SNO='9302205', SNAME='方邪真',SEX='男',BDATE='1973-11-18',DIR='數理邏輯') # delete from STUDENT where SNO ='9302205'; # 添加到session: session.add(new_stu) # 提交即保存到數據庫: session.commit() # 關閉session: session.close()
根據sqlalchemy文檔http://docs.sqlalchemy.org/en/latest/dialects/postgresql.htmlpost
當數據庫爲postgresql時DATETIME改成DATE類型,關於查詢的例子以下:
# coding=utf-8 from sqlalchemy import Column, String, create_engine from sqlalchemy.dialects.postgres import DATE from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base from datetime import * import sys reload(sys) sys.setdefaultencoding("utf-8") # 建立對象的基類: Base = declarative_base() # 定義Stu對象: class Stu(Base): # 表的名字: __tablename__ = 'student' # 表的結構: sno = Column(String(7), primary_key=True) sname = Column(String(8)) sex = Column(String(2)) bdate = Column(DATE) dir = Column(String(16)) def __repr__(self): return "<Stu(SNO='%s', SNAME='%s', SEX='%s',BDATE ='%s',DIR ='%s')>" % \ (self.sno,self.sname,self.sex,self.bdate.strftime("%Y-%m-%d"),self.dir) # 初始化數據庫鏈接: engine = create_engine('postgresql+psycopg2://user:passwd@127.0.0.1:5432/db',echo=True,client_encoding='utf8') # 建立DBSession類型: DBSession = sessionmaker(bind=engine) # 建立session對象: session = DBSession() query = session.query(Stu) #for s in query.all(): # print s.sname+' '+s.sno+' '+s.sex +' '+s.bdate.strftime("%Y-%m-%d")+' ' + s.dir for s in session.query(Stu): print(s) session.close()
根據官方文檔中的例子:
表建立crtusers.py
# coding=utf-8 from sqlalchemy import ForeignKey from sqlalchemy import create_engine from sqlalchemy import Column, Integer,String from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship engine = create_engine('mysql+mysqlconnector://y-user:y-passwd@127.0.0.1:3306/y-db?charset=utf8') Base = declarative_base() #創建表users class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(20)) fullname = Column(String(40)) password = Column(String(12)) def __init__(self, name, fullname, password): self.name = name self.fullname = fullname self.password = password def __repr__(self): return "<User(name='%s', fullname='%s', password='%s')>" % (self.name, self.fullname, self.password) Base.metadata.create_all(engine) #創建表addresses class Address(Base): __tablename__ = 'addresses' id = Column(Integer, primary_key=True) email_address = Column(String(40), nullable=False) user_id = Column(Integer, ForeignKey('users.id')) user = relationship("User", back_populates="addresses") def __repr__(self): return "<Address(email_address='%s')>" % self.email_address User.addresses = relationship("Address", order_by=Address.id, back_populates="user") Base.metadata.create_all(engine) #插入一條user記錄 DBSession = sessionmaker(bind=engine) session = DBSession() ed_user = User(name='ed', fullname='Ed Jones', password='edspassword') session.add(ed_user) #插入多條記錄 session.add_all([User(name='wendy', fullname='Wendy Williams', password='foobar'),\ User(name='mary', fullname='Mary Contrary', password='xxg527'),\ User(name='lisa', fullname='lisa Contrary', password='ls123'),\ User(name='cred', fullname='cred Flinstone', password='bla123'),\ User(name='fred', fullname='Fred Flinstone', password='blah')]) #插入1條user記錄,兩條address jack = User(name='jack', fullname='Jack Bean', password='gjffdd') jack.addresses = [Address(email_address='jack@google.com'),Address(email_address='j25@yahoo.com')] session.add(jack) # 提交即保存到數據庫: session.commit() # 關閉session: session.close()
查詢
# coding=utf-8 from operator import or_ from sqlalchemy import text from sqlalchemy import create_engine from sqlalchemy import Column, Integer,String from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from sqlalchemy import func from crtusers import Address, User engine = create_engine('mysql+mysqlconnector://y-user:y-passwd@127.0.0.1:3306/y-db?charset=utf8') DBSession = sessionmaker(bind=engine) session = DBSession() #query sample query = session.query(User) for u in query.all(): print str(u.id)+' '+u.name+' '+u.fullname+' '+u.password for u in session.query(User).order_by(User.id)[1:4]: print(u) for name in session.query(User.name).filter_by(fullname='Ed Jones'): print(name) for u in session.query(User).filter(User.name.isnot(None)): print(u) for u in session.query(User).filter(or_(User.name == 'ed', User.name == 'wendy')): print(u) for u in session.query(User).filter(~User.name.like('%ed')).order_by(User.id): print(u) for u in session.query(User).filter(User.name=='ed').filter(User.fullname == 'Ed Jones'): print(u) for u in session.query(User).filter(User.name.in_(['ed', 'wendy', 'jack'])): print(u) #Using Textual SQL for user in session.query(User).filter(text("id<224")).order_by(text("id")).all(): print(user.name) for u in session.query(User).from_statement\ (text("SELECT * FROM users where id >= :id")).\ params(id=2).all(): print(u) for u in session.query(User).from_statement \ (text("SELECT * FROM users where name=:name")). \ params(name='ed').all(): print(u) #Counting on count() print(session.query(func.count('*')).select_from(User).scalar()) print(session.query(User).filter(User.name.like('%ed')).count()) for n in session.query(func.count(User.name), User.name).group_by(User.name).all(): print(n) #Querying Address for u, a in session.query(User, Address).filter(User.id == Address.user_id).\ filter(Address.email_address == 'jack@google.com').all(): print(u) print(a) for u in session.query(User).join(Address).filter(Address.email_address=='jack@google.com').all(): print(u) for name, in session.query(User.name).filter(User.addresses.any(Address.email_address.like('%google%'))): print(name) # 關閉session: session.close()
安裝psycopg2要指定路徑,故用源碼編譯方式
psycopg2-2.6.2$ python setup.py build_ext --pg-config /opt/PostgreSQL/9.5/bin/pg_config build
psycopg2-2.6.2$ sudo python setup.py build_ext --pg-config /opt/PostgreSQL/9.5/bin/pg_config install
新版地址
wget http://initd.org/psycopg/tarballs/PSYCOPG-2-7/psycopg2-2.7.1.tar.gz
例子:根據psycopg2-2.6.2/examples/simple.py(只修改DSN以適合你的數據庫環境)
# simple.py - very simple example of plain DBAPI-2.0 usage # # currently used as test-me-stress-me script for psycopg 2.0 # # Copyright (C) 2001-2010 Federico Di Gregorio <fog@debian.org> # # psycopg2 is free software: you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # psycopg2 is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. ## put in DSN your DSN string DSN = 'dbname=your-db user=your-user password=your-passwd host=127.0.0.1' ## don't modify anything below this line (except for experimenting) class SimpleQuoter(object): def sqlquote(x=None): return "'bar'" import sys import psycopg2 if len(sys.argv) > 1: DSN = sys.argv[1] print "Opening connection using dsn:", DSN conn = psycopg2.connect(DSN) print "Encoding for this connection is", conn.encoding curs = conn.cursor() curs.execute("SELECT 1 AS foo") print curs.fetchone() curs.execute("SELECT 1 AS foo") print curs.fetchmany() curs.execute("SELECT 1 AS foo") print curs.fetchall() conn.rollback() sys.exit(0) curs.execute("SELECT 1 AS foo", async=1) curs.execute("SELECT %(foo)s AS foo", {'foo':'bar'}) curs.execute("SELECT %(foo)s AS foo", {'foo':None}) curs.execute("SELECT %(foo)f AS foo", {'foo':42}) curs.execute("SELECT %(foo)s AS foo", {'foo':SimpleQuoter()})
PyGreSQL安裝
修改setup.py
f = os.popen('pg_config --%s' % s)
改成:
f = os.popen('/opt/PostgreSQL/10/bin/pg_config --%s' % s)
再安裝:
sudo python3 setup.py install
測試代碼PyGreSQLtest1.py:
#coding=utf-8 from pg import DB db = DB(dbname='testdb', host='localhost', port=5432,user='mymotif', passwd='wxwpxh'); print(db.query("select * from userinfo"));
測試2:
# -*- coding: utf-8 -*- #導入pg模塊 import pg def operate_postgre_tbl_product(): #鏈接數據庫 try: pgdb_conn = pg.connect(dbname = "testdb", host = "127.0.0.1", user = "mymotif", passwd = "wxwpxh") except Exception, e: print e.args[0] return #刪除表 sql_desc = "DROP TABLE IF EXISTS tbl_product3;" try: pgdb_conn.query(sql_desc) except Exception, e: print "drop table failed" pgdb_conn.close() return #建立表 sql_desc = """CREATE TABLE tbl_product3( i_index INTEGER, sv_productname VARCHAR(32) );""" try: pgdb_conn.query(sql_desc) except Exception, e: print "create table failed" pgdb_conn.close() return #插入記錄 sql_desc = "INSERT INTO tbl_product3(sv_productname) values('apple')" try: pgdb_conn.query(sql_desc) except Exception, e: print "insert record into table failed" pgdb_conn.close() return #查詢表 1 sql_desc = "select * from tbl_product3" for row in pgdb_conn.query(sql_desc).dictresult(): print row #關閉數據庫鏈接 pgdb_conn.close() if __name__ == "__main__": #操做數據庫 operate_postgre_tbl_product()
PyGreSQL with SQLAlchemy
# coding=utf-8 from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker # PyGreSQL eng = create_engine('postgresql+pygresql://mymotif:wxwpxh@127.0.0.1:5432/mymotif') DB_Session = sessionmaker(bind=eng) session = DB_Session() data = session.execute("SELECT * FROM STUDENT") for row in data: for col in row: print col, # python3改成print(col, end=' ') print # python3改成print() session.close()
參考:http://blog.csdn.net/phashh/article/details/51077943