發佈者一旦發送消息,那麼全部訂閱者都會收到。python
RedisHelpermysql
#!/usr/bin/env python #-*- coding:utf-8 -*- import redis class redishelper: def __init__(self): self.__conn = redis.Redis(host='192.168.11.87') def public(self, msg, chan): self.__conn.publish(chan, msg) return msg,chan def subscribe(self, chan): pub = self.__conn.pubsub() pub.subscribe(chan) pub.parse_response() return pub
發佈者
#!/usr/bin/env python #-*- coding:utf-8 -*- import b1 obj = b1.redishelper() #實例化方法 obj.public('aaaaaa','fm111.7') #執行發佈
訂閱者
obj = b1.redishelper() #實例化方法 data = obj.subscribe('fm111.7') #調用訂閱方法 while True: msg = data.parse_response() #接收發布消息 print(msg)
RabbitMQ是一個在AMQP基礎上完整的,可複用的企業消息系統。他遵循Mozilla Public License開源協議。redis
MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消 息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過 隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。sql
安裝數據庫
#安裝配置epel源 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm #安裝erlang $ yum -y install erlang #安裝RabbitMQ $ yum -y install rabbitmq-server #啓動、關閉服務 service rabbitmq-server start/stop
API
python -m pip install pika
1.基於RabbitMQ實現生產者消費者模型。對於RabbitMQ來講,生產和消費再也不針對內存裏的一個Queue對象,而是某臺服務器上的RabbitMQ Server實現的消息隊列。
生產者代碼
#!/usr/bin/env python #-*- coding:utf-8 -*- import pika # ######################### 發消息 ######################### #鏈接rabbitmq服務器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.11.158')) #建立頻道 channel = connection.channel() #若是沒有這個隊列會建立一個 channel.queue_declare(queue='hello') #向隊列插入數值 routing_key是隊列名 body是要插入的內容 channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") #關閉鏈接 connection.close()
消費者代碼
express
#!/usr/bin/env python #-*- coding:utf-8 -*- import pika # ##########################取消息 ########################## #鏈接rabbitmq服務器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.11.158')) #建立頻道 channel = connection.channel() #若是生產者沒有運行建立隊列,那麼消費者建立隊列 channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
當生產者生成一條數據,被消費者接收,消費者中斷後若是不超過10秒,鏈接的時候數據還在。當超過10秒以後,從新連接,數據將消失。消費者等待連接。編程
2.消息不丟失(數據持久化)ubuntu
1.當把no_ack=false時,若是消費者遇到狀況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,RabbitMQ會從新將該任務添加到隊列中。緩存
2.durable服務器
生產者代碼
#!/usr/bin/env python import pika #連接rabbit服務器 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) #建立頻道 channel = connection.channel() #建立隊列,使用durable方法 channel.queue_declare(queue='hello', durable=True) #若是想讓隊列實現持久化那麼加上durable=True channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, #標記咱們的消息爲持久化的 - 經過設置 delivery_mode 屬性爲 2 #這樣必須設置,讓消息實現持久化 )) #這個exchange參數就是這個exchange的名字. 空字符串標識默認的或者匿名的exchange:若是存在routing_key, 消息路由到routing_key指定的隊列中。 print(" [x] 開始隊列'") connection.close()
消費者代碼
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) #建立頻道 channel = connection.channel() #建立隊列,使用durable方法 channel.queue_declare(queue='hello', durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print('ok') ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello', no_ack=False) print(' [*] 等待隊列. To exit press CTRL+C') channel.start_consuming()
注:標記消息爲持久化的並不能徹底保證消息不會丟失,儘管告訴RabbitMQ保存消息到磁盤,當RabbitMQ接收到消息尚未保存的時候仍然有一個 短暫的時間窗口. RabbitMQ不會對每一個消息都執行同步fsync(2) --- 可能只是保存到緩存cache尚未寫入到磁盤中,這個持久化保證不是很強,但這比咱們簡單的任務queue要好不少,若是你想很強的保證你可使用 publisher confirms
3.消息獲取順序
默認消息隊列裏的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者1去隊列中獲取 偶數 序列的任務。
channel.basic_qos(prefetch_count=1) 表示誰來誰取,再也不按照奇偶數排列
消費者代碼
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) channel = connection.channel() # make message persistent channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print 'ok' ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) #表示誰來誰取,再也不按照奇偶排列 channel.basic_consume(callback, queue='hello', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
發佈與訂閱
發佈訂閱和簡單的消息隊列區別在於,發佈訂閱會將消息發送給全部的訂閱者,而消息隊列中的數據被消費一次便消失。因此,RabbitMQ實現發佈和訂閱時,會爲每個訂閱者建立一個隊列,而發佈者發佈消息時,會將消息放置在全部相關隊列中。
發佈者發送的消息實際是先發送到exchange,而後由exchange發送到相對應的隊列。
exchange類型可用: direct , topic , headers 和 fanout 。
exchange type = fanout
發佈者
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
訂閱者
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
result = channel.queue_declare(exclusive=True) #隊列斷開後自動刪除臨時隊列
queue_name = result.method.queue # 隊列名採用服務端分配的臨時隊列
channel.queue_bind(exchange='logs',
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
關鍵字發送( exchange type = direct)
以前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。
生產者
#!/usr/bin/env python #-*- coding:utf-8 -*- ##########發送消息 import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.11.87')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = 'info' message = 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
消費者
#!/usr/bin/env python #-*-coding=utf-8-*- import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.11.87')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = ['error','warning','info'] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
模糊匹配( exchange type = topic)
在topic類型下,可讓隊列綁定幾個模糊的關鍵字,以後發送者將數據發送到exchange,exchange將傳入」路由值「和 」關鍵字「進行匹配,匹配成功,則將數據發送到指定隊列。
#表示能夠匹配0個或多個單詞 *表示只能匹配一個單詞 發送者路由值 隊列中 old.boy.python old.* -- 不匹配 old.boy.python old.# -- 匹配
生產者
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
消費者
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
SQLAlchemy是Python編程語言下的一款ORM框架,該框架創建在數據庫API之上,使用關係對象映射進行數據庫操做,簡言之即是:將對象轉換成SQL,而後使用數據API執行SQL並獲取執行結果。
schema/type:定義的一種映射格式,把表映射成類。
sql expression language: 封裝了增刪改查的sql語句
engine: 引擎
connection pooling: 鏈接池
dialect: 用於和數據庫API進行交流,根據配置文件的不一樣調用不一樣的數據庫API,從而實現對數據庫的操做
MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
示例,中間狀態 演示一個過程
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine #初始化數據庫鏈接 engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5) # 執行SQL # cur = engine.execute( # "INSERT INTO hosts (host, color_id) VALUES ('1.1.1.22', 3)" # ) # 新插入行自增ID # cur.lastrowid # 執行SQL # cur = engine.execute( # "INSERT INTO hosts (host, color_id) VALUES(%s, %s)",[('1.1.1.22', 3),('1.1.1.221', 3),] # ) # 執行SQL # cur = engine.execute( # "INSERT INTO hosts (host, color_id) VALUES (%(host)s, %(color_id)s)", # host='1.1.1.99', color_id=3 # ) # 執行SQL # cur = engine.execute('select * from hosts') # 獲取第一行數據 # cur.fetchone() # 獲取第n行數據 # cur.fetchmany(3) # 獲取全部數據 # cur.fetchall()
增刪改查
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey metadata = MetaData() user = Table('user', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) color = Table('color', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) conn = engine.connect() # 建立SQL語句,INSERT INTO "user" (id, name) VALUES (:id, :name) conn.execute(user.insert(),{'id':7,'name':'seven'}) conn.close() # sql = user.insert().values(id=123, name='wu') # conn.execute(sql) # conn.close() # sql = user.delete().where(user.c.id > 1) # sql = user.update().values(fullname=user.c.name) # sql = user.update().where(user.c.name == 'jack').values(name='ed') # sql = select([user, ]) # sql = select([user.c.id, ]) # sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id) # sql = select([user.c.name]).order_by(user.c.name) # sql = select([user]).group_by(user.c.name) # result = conn.execute(sql) # print result.fetchall() # conn.close()
完整示例
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String from sqlalchemy.orm import sessionmaker Base = declarative_base() #生成一個SqlORM 基類 engine = create_engine("mysql+mysqldb://root@localhost:3306/test",echo=False) class Host(Base): __tablename__ = 'hosts' id = Column(Integer,primary_key=True,autoincrement=True) hostname = Column(String(64),unique=True,nullable=False) ip_addr = Column(String(128),unique=True,nullable=False) port = Column(Integer,default=22) Base.metadata.create_all(engine) #建立全部表結構 if __name__ == '__main__': SessionCls = sessionmaker(bind=engine) #建立與數據庫的會話session class ,注意,這裏返回給session的是個class,不是實例 session = SessionCls() #h1 = Host(hostname='localhost',ip_addr='127.0.0.1') #h2 = Host(hostname='ubuntu',ip_addr='192.168.2.243',port=20000) #h3 = Host(hostname='ubuntu2',ip_addr='192.168.2.244',port=20000) #session.add(h3) #session.add_all( [h1,h2]) #h2.hostname = 'ubuntu_test' #只要沒提交,此時修改也沒問題 #session.rollback() #session.commit() #提交 res = session.query(Host).filter(Host.hostname.in_(['ubuntu2','localhost'])).all() print(res)