RabbitMQ是一個在AMQP基礎上完整的,可複用的企業消息系統。他遵循Mozilla Public License開源協議。html
MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消 息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過 隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。python
RabbitMQ安裝mysql
安裝配置epel源 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 安裝erlang $ yum -y install erlang 安裝RabbitMQ $ yum -y install rabbitmq-server
注意:service rabbitmq-server start/stopsql
安裝API數據庫
pip install pika or easy_install pika or 源碼 https://pypi.python.org/pypi/pika
使用API操做RabbitMQexpress
基於Queue實現生產者消費者模型編程
#!/usr/bin/env python # -*- coding:utf-8 -*- import Queue import threading message = Queue.Queue(10) def producer(i): while True: message.put(i) def consumer(i): while True: msg = message.get() for i in range(12): t = threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t = threading.Thread(target=consumer, args=(i,)) t.start()
對於RabbitMQ來講,生產和消費再也不針對內存裏的一個Queue對象,而是某臺服務器上的RabbitMQ Server實現的消息隊列。api
#!/usr/bin/env python import pika # ######################### 生產者 ######################### connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
#!/usr/bin/env python import pika # ########################## 消費者 ########################## connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
一、acknowledgment 消息不丟失服務器
no-ack = False,若是消費者遇到狀況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼,RabbitMQ會從新將該任務添加到隊列中。session
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.211.55.4')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print 'ok' ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
二、durable 消息不丟失
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) channel = connection.channel() # make message persistent channel.queue_declare(queue='hello', durable=True) channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent 'Hello World!'") connection.close()
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) channel = connection.channel() # make message persistent channel.queue_declare(queue='hello', durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print 'ok' ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
三、消息獲取順序
默認消息隊列裏的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者1去隊列中獲取 偶數 序列的任務。
channel.basic_qos(prefetch_count=1) 表示誰來誰取,再也不按照奇偶數排列
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) channel = connection.channel() # make message persistent channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print 'ok' ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='hello', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
四、發佈訂閱
發佈訂閱和簡單的消息隊列區別在於,發佈訂閱會將消息發送給全部的訂閱者,而消息隊列中的數據被消費一次便消失。因此,RabbitMQ實現發佈和訂閱時,會爲每個訂閱者建立一個隊列,而發佈者發佈消息時,會將消息放置在全部相關隊列中。
exchange type = fanout
複製代碼 #!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
SQLAlchemy是Python編程語言下的一款ORM框架,該框架創建在數據庫API之上,使用關係對象映射進行數據庫操做,簡言之即是:將對象轉換成SQL,而後使用數據API執行SQL並獲取執行結果。
Dialect用於和數據API進行交流,根據配置文件的不一樣調用不一樣的數據庫API,從而實現對數據庫的操做,如:
MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 更多詳見:http://docs.sqlalchemy.org/en/latest/dialects/index.html
步驟一:
使用 Engine/ConnectionPooling/Dialect 進行數據庫操做,Engine使用ConnectionPooling鏈接數據庫,而後再經過Dialect執行SQL語句。
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) engine.execute( "INSERT INTO ts_test (a, b) VALUES ('2', 'v1')" ) engine.execute( "INSERT INTO ts_test (a, b) VALUES (%s, %s)", ((555, "v1"),(666, "v1"),) ) engine.execute( "INSERT INTO ts_test (a, b) VALUES (%(id)s, %(name)s)", id=999, name="v1" ) result = engine.execute('select * from ts_test') result.fetchall()
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) # 事務操做 with engine.begin() as conn: conn.execute("insert into table (x, y, z) values (1, 2, 3)") conn.execute("my_special_procedure(5)") conn = engine.connect() # 事務操做 with conn.begin(): conn.execute("some statement", {'x':5, 'y':10})
注:查看數據庫鏈接:show status like 'Threads%';
步驟二:
使用 Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 進行數據庫操做。Engine使用Schema Type建立一個特定的結構對象,以後經過SQL Expression Language將該對象轉換成SQL語句,而後經過 ConnectionPooling 鏈接數據庫,再而後經過 Dialect 執行SQL,並獲取結果。
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey metadata = MetaData() user = Table('user', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) color = Table('color', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) metadata.create_all(engine) # metadata.clear() # metadata.remove()
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey metadata = MetaData() user = Table('user', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) color = Table('color', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) conn = engine.connect() # 建立SQL語句,INSERT INTO "user" (id, name) VALUES (:id, :name) conn.execute(user.insert(),{'id':7,'name':'seven'}) conn.close() # sql = user.insert().values(id=123, name='wu') # conn.execute(sql) # conn.close() # sql = user.delete().where(user.c.id > 1) # sql = user.update().values(fullname=user.c.name) # sql = user.update().where(user.c.name == 'jack').values(name='ed') # sql = select([user, ]) # sql = select([user.c.id, ]) # sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id) # sql = select([user.c.name]).order_by(user.c.name) # sql = select([user]).group_by(user.c.name) # result = conn.execute(sql) # print result.fetchall() # conn.close()
更多內容詳見:
http://www.jianshu.com/p/e6bba189fcbd
http://docs.sqlalchemy.org/en/latest/core/expression_api.html
注:SQLAlchemy沒法修改表結構,若是須要可使用SQLAlchemy開發者開源的另一個軟件Alembic來完成。
步驟三:
使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 全部組件對數據進行操做。根據類建立對象,對象轉換成SQL,執行SQL。
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(50)) # 尋找Base的全部子類,按照子類的結構在數據庫中生成對應的數據表信息 # Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) session = Session() # ########## 增 ########## # u = User(id=2, name='sb') # session.add(u) # session.add_all([ # User(id=3, name='sb'), # User(id=4, name='sb') # ]) # session.commit() # ########## 刪除 ########## # session.query(User).filter(User.id > 2).delete() # session.commit() # ########## 修改 ########## # session.query(User).filter(User.id > 2).update({'cluster_id' : 0}) # session.commit() # ########## 查 ########## # ret = session.query(User).filter_by(name='sb').first() # ret = session.query(User).filter_by(name='sb').all() # print ret # ret = session.query(User).filter(User.name.in_(['sb','bb'])).all() # print ret # ret = session.query(User.name.label('name_label')).all() # print ret,type(ret) # ret = session.query(User).order_by(User.id).all() # print ret # ret = session.query(User).order_by(User.id)[1:3] # print ret # session.commit()
使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 全部組件對數據進行操做。根據類建立對象,對象轉換成SQL,執行SQL。
一、建立表
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5) Base = declarative_base() # 建立單表 class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32)) extra = Column(String(16)) __table_args__ = ( UniqueConstraint('id', 'name', name='uix_id_name'), Index('ix_id_name', 'name', 'extra'), ) # 一對多 class Favor(Base): __tablename__ = 'favor' nid = Column(Integer, primary_key=True) caption = Column(String(50), default='red', unique=True) class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) favor_id = Column(Integer, ForeignKey("favor.nid")) # 多對多 class Group(Base): __tablename__ = 'group' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) port = Column(Integer, default=22) class Server(Base): __tablename__ = 'server' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False) class ServerToGroup(Base): __tablename__ = 'servertogroup' nid = Column(Integer, primary_key=True, autoincrement=True) server_id = Column(Integer, ForeignKey('server.id')) group_id = Column(Integer, ForeignKey('group.id')) def init_db(): Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine)
ForeignKeyConstraint(['other_id'], ['othertable.other_id']),
二、操做表
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5) Base = declarative_base() # 建立單表 class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32)) extra = Column(String(16)) __table_args__ = ( UniqueConstraint('id', 'name', name='uix_id_name'), Index('ix_id_name', 'name', 'extra'), ) def __repr__(self): return "%s-%s" %(self.id, self.name) # 一對多 class Favor(Base): __tablename__ = 'favor' nid = Column(Integer, primary_key=True) caption = Column(String(50), default='red', unique=True) def __repr__(self): return "%s-%s" %(self.nid, self.caption) class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) favor_id = Column(Integer, ForeignKey("favor.nid")) # 與生成表結構無關,僅用於查詢方便 favor = relationship("Favor", backref='pers') # 多對多 class ServerToGroup(Base): __tablename__ = 'servertogroup' nid = Column(Integer, primary_key=True, autoincrement=True) server_id = Column(Integer, ForeignKey('server.id')) group_id = Column(Integer, ForeignKey('group.id')) group = relationship("Group", backref='s2g') server = relationship("Server", backref='s2g') class Group(Base): __tablename__ = 'group' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) port = Column(Integer, default=22) # group = relationship('Group',secondary=ServerToGroup,backref='host_list') class Server(Base): __tablename__ = 'server' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False) def init_db(): Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine) Session = sessionmaker(bind=engine) session = Session()
增刪改查
#增 obj = Users(name="alex0", extra='sb') session.add(obj) session.add_all([ Users(name="alex1", extra='sb'), Users(name="alex2", extra='sb'), ]) session.commit() #刪 session.query(Users).filter(Users.id > 2).delete() session.commit() #改 session.query(Users).filter(Users.id > 2).update({"name" : "099"}) session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False) session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate") session.commit() #查 ret = session.query(Users).all() ret = session.query(Users.name, Users.extra).all() ret = session.query(Users).filter_by(name='alex').all() ret = session.query(Users).filter_by(name='alex').first() #其餘 # 條件 ret = session.query(Users).filter_by(name='alex').all() ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all() ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all() ret = session.query(Users).filter(Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all() from sqlalchemy import and_, or_ ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all() ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all() ret = session.query(Users).filter( or_( Users.id < 2, and_(Users.name == 'eric', Users.id > 3), Users.extra != "" )).all() # 通配符 ret = session.query(Users).filter(Users.name.like('e%')).all() ret = session.query(Users).filter(~Users.name.like('e%')).all() # 限制 ret = session.query(Users)[1:2] # 排序 ret = session.query(Users).order_by(Users.name.desc()).all() ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 分組 from sqlalchemy.sql import func ret = session.query(Users).group_by(Users.extra).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() # 連表 ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() ret = session.query(Person).join(Favor).all() ret = session.query(Person).join(Favor, isouter=True).all() # 組合 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all() q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all()