redis,rabbitmq,SqlAlchemy

redis發佈和訂閱

 

發佈者一旦發送消息,那麼全部訂閱者都會收到。python

RedisHelpermysql

 
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import  redis
class redishelper:
    def __init__(self):
        self.__conn = redis.Redis(host='192.168.11.87')
    def public(self, msg, chan):
        self.__conn.publish(chan, msg)
        return msg,chan
    def subscribe(self, chan):
        pub = self.__conn.pubsub()
        pub.subscribe(chan)
        pub.parse_response()
        return pub
 
發佈者
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import b1
obj = b1.redishelper()      #實例化方法
obj.public('aaaaaa','fm111.7')          #執行發佈
訂閱者
obj = b1.redishelper()           #實例化方法
data = obj.subscribe('fm111.7')  #調用訂閱方法
while True:
    msg = data.parse_response()   #接收發布消息
    print(msg)

 RabbitMQ

RabbitMQ是一個在AMQP基礎上完整的,可複用的企業消息系統。他遵循Mozilla Public License開源協議。redis

MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消 息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過 隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。sql

安裝數據庫

 
#安裝配置epel源
   $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
#安裝erlang
   $ yum -y install erlang
#安裝RabbitMQ
   $ yum -y install rabbitmq-server
#啓動、關閉服務
   service rabbitmq-server start/stop
API
python -m pip install pika

1.基於RabbitMQ實現生產者消費者模型。對於RabbitMQ來講,生產和消費再也不針對內存裏的一個Queue對象,而是某臺服務器上的RabbitMQ Server實現的消息隊列。
生產者代碼
 
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import pika

# ######################### 發消息 #########################
#鏈接rabbitmq服務器
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.11.158'))
#建立頻道
channel = connection.channel()

#若是沒有這個隊列會建立一個
channel.queue_declare(queue='hello')

#向隊列插入數值 routing_key是隊列名 body是要插入的內容
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')

print(" [x] Sent 'Hello World!'")

#關閉鏈接
connection.close()

消費者代碼
express

 
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import pika

# ##########################取消息 ##########################

#鏈接rabbitmq服務器
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.11.158'))
#建立頻道
channel = connection.channel()

#若是生產者沒有運行建立隊列,那麼消費者建立隊列
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
 

 當生產者生成一條數據,被消費者接收,消費者中斷後若是不超過10秒,鏈接的時候數據還在。當超過10秒以後,從新連接,數據將消失。消費者等待連接。編程

 

2.消息不丟失(數據持久化)ubuntu

1.當把no_ack=false時,若是消費者遇到狀況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,RabbitMQ會從新將該任務添加到隊列中。緩存

2.durable服務器

生產者代碼

#!/usr/bin/env python
import pika
#連接rabbit服務器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
#建立頻道
channel = connection.channel()
#建立隊列,使用durable方法
channel.queue_declare(queue='hello', durable=True)
#若是想讓隊列實現持久化那麼加上durable=True
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2,
#標記咱們的消息爲持久化的 - 經過設置 delivery_mode 屬性爲 2
#這樣必須設置,讓消息實現持久化
))
#這個exchange參數就是這個exchange的名字. 空字符串標識默認的或者匿名的exchange:若是存在routing_key, 消息路由到routing_key指定的隊列中。
print(" [x] 開始隊列'")
connection.close()

 消費者代碼

 
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
#建立頻道
channel = connection.channel()
#建立隊列,使用durable方法
channel.queue_declare(queue='hello', durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print('ok')
    ch.basic_ack(delivery_tag = method.delivery_tag)

    channel.basic_consume(callback,
                    queue='hello',
                    no_ack=False)

    print(' [*] 等待隊列. To exit press CTRL+C')
    channel.start_consuming()
 

 注:標記消息爲持久化的並不能徹底保證消息不會丟失,儘管告訴RabbitMQ保存消息到磁盤,當RabbitMQ接收到消息尚未保存的時候仍然有一個 短暫的時間窗口. RabbitMQ不會對每一個消息都執行同步fsync(2) --- 可能只是保存到緩存cache尚未寫入到磁盤中,這個持久化保證不是很強,但這比咱們簡單的任務queue要好不少,若是你想很強的保證你可使用 publisher confirms

 

3.消息獲取順序

默認消息隊列裏的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者1去隊列中獲取 偶數 序列的任務。

channel.basic_qos(prefetch_count=1) 表示誰來誰取,再也不按照奇偶數排列

消費者代碼

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)   #表示誰來誰取,再也不按照奇偶排列

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

發佈與訂閱

發佈訂閱和簡單的消息隊列區別在於,發佈訂閱會將消息發送給全部的訂閱者,而消息隊列中的數據被消費一次便消失。因此,RabbitMQ實現發佈和訂閱時,會爲每個訂閱者建立一個隊列,而發佈者發佈消息時,會將消息放置在全部相關隊列中。

發佈者發送的消息實際是先發送到exchange,而後由exchange發送到相對應的隊列。

exchange類型可用: direct , topic , headers 和 fanout 。

exchange type = fanout

 

發佈者

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

訂閱者

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)       #隊列斷開後自動刪除臨時隊列
queue_name = result.method.queue                     # 隊列名採用服務端分配的臨時隊列

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

關鍵字發送( exchange type = direct)

以前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。

生產者

#!/usr/bin/env python
#-*- coding:utf-8 -*-
##########發送消息

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.11.87'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = 'info'
message = 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
消費者
#!/usr/bin/env python
#-*-coding=utf-8-*-
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.11.87'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = ['error','warning','info']
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
模糊匹配( exchange type = topic)

 在topic類型下,可讓隊列綁定幾個模糊的關鍵字,以後發送者將數據發送到exchange,exchange將傳入」路由值「和 」關鍵字「進行匹配,匹配成功,則將數據發送到指定隊列。

#表示能夠匹配0個或多個單詞
*表示只能匹配一個單詞

發送者路由值              隊列中

old.boy.python          old.*  -- 不匹配
old.boy.python          old.#  -- 匹配

生產者

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
 

消費者

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)

for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
queue=queue_name,
no_ack=True)

channel.start_consuming()

SQLAlchemy

SQLAlchemy是Python編程語言下的一款ORM框架,該框架創建在數據庫API之上,使用關係對象映射進行數據庫操做,簡言之即是:將對象轉換成SQL,而後使用數據API執行SQL並獲取執行結果。

schema/type:定義的一種映射格式,把表映射成類。

sql expression language: 封裝了增刪改查的sql語句

engine:  引擎

connection pooling: 鏈接池

dialect:  用於和數據庫API進行交流,根據配置文件的不一樣調用不一樣的數據庫API,從而實現對數據庫的操做

MySQL-Python
    mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
pymysql
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
MySQL-Connector
    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]

 示例,中間狀態 演示一個過程

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy import create_engine
 
 #初始化數據庫鏈接
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)
 
# 執行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES ('1.1.1.22', 3)"
# )
 
# 新插入行自增ID
# cur.lastrowid
 
# 執行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES(%s, %s)",[('1.1.1.22', 3),('1.1.1.221', 3),]
# )
 
 
# 執行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES (%(host)s, %(color_id)s)",
#     host='1.1.1.99', color_id=3
# )
 
# 執行SQL
# cur = engine.execute('select * from hosts')
# 獲取第一行數據
# cur.fetchone()
# 獲取第n行數據
# cur.fetchmany(3)
# 獲取全部數據
# cur.fetchall()

增刪改查

#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey
 
metadata = MetaData()
 
user = Table('user', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(20)),
)
 
color = Table('color', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(20)),
)
engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
 
conn = engine.connect()
 
# 建立SQL語句,INSERT INTO "user" (id, name) VALUES (:id, :name)
conn.execute(user.insert(),{'id':7,'name':'seven'})
conn.close()
 
# sql = user.insert().values(id=123, name='wu')
# conn.execute(sql)
# conn.close()
 
# sql = user.delete().where(user.c.id > 1)
 
# sql = user.update().values(fullname=user.c.name)
# sql = user.update().where(user.c.name == 'jack').values(name='ed')
 
# sql = select([user, ])
# sql = select([user.c.id, ])
# sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id)
# sql = select([user.c.name]).order_by(user.c.name)
# sql = select([user]).group_by(user.c.name)
 
# result = conn.execute(sql)
# print result.fetchall()
# conn.close()

完整示例

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from  sqlalchemy.orm import sessionmaker
 
Base = declarative_base() #生成一個SqlORM 基類
 
 
engine = create_engine("mysql+mysqldb://root@localhost:3306/test",echo=False)
 
 
class Host(Base):
    __tablename__ = 'hosts'
    id = Column(Integer,primary_key=True,autoincrement=True)
    hostname = Column(String(64),unique=True,nullable=False)
    ip_addr = Column(String(128),unique=True,nullable=False)
    port = Column(Integer,default=22)
 
Base.metadata.create_all(engine) #建立全部表結構
 
if __name__ == '__main__':
    SessionCls = sessionmaker(bind=engine) #建立與數據庫的會話session class ,注意,這裏返回給session的是個class,不是實例
    session = SessionCls()
    #h1 = Host(hostname='localhost',ip_addr='127.0.0.1')
    #h2 = Host(hostname='ubuntu',ip_addr='192.168.2.243',port=20000)
    #h3 = Host(hostname='ubuntu2',ip_addr='192.168.2.244',port=20000)
    #session.add(h3)
    #session.add_all( [h1,h2])
    #h2.hostname = 'ubuntu_test' #只要沒提交,此時修改也沒問題
    #session.rollback()
    #session.commit() #提交
    res = session.query(Host).filter(Host.hostname.in_(['ubuntu2','localhost'])).all()
    print(res)
相關文章
相關標籤/搜索