Docker部署Django項目+Nginx+Fluend日誌收集 和redis、memcached、RabbitMQ、Celery

前言

 

 

 

 

 

1、docker

一、docker是什麼?html

Docker的英文本意是「搬運工」,Docker搬運的是集裝箱(Container)能夠成爲容器,我能夠把寫的Django的WEB應用以及Python依賴庫打包進一個可移植的容器裏傳播,解決了應用部署的平臺兼容性問題,同時她也是一種輕量級的虛擬化技術能夠作到秒級啓動一個容器(相似小虛擬機,區別啓動快、能夠傳播);python

jmysql

 

二、docker和鏡像的關係linux

鏡像是docker 建立、啓動一個容器的文件系統,這個文件系統包含依賴包、命令工具、APP等;nginx

 

三、在centos7中安裝dockerweb

yum -y install epel*        #添加epel Yum源  因爲Docker要求Linux內核版本必須在要在3.10,因此centos6.X版本須要 yum install docker-io

 systemctl start docker.service #開啓docker服務ajax

  docker info #查看docker信息正則表達式

 

四、docker基本使用redis

4.0、鏡像製造算法

關於鏡像的製造方法有2種

1)、dockerhub 鏡像------》容器-------》修改容器-------》新的鏡像

2)、docker file:在基礎鏡像上,執行dockerfile 中定義的一系列命令,執行完畢後獲得一個自定義新鏡像;

docker file 語法

From centos  #設置基礎鏡像
MAINTAINER egon zhanggen@le.com  #標註該鏡像的做者 郵箱
ENV name=egon  設置變量
ENV age=18
RUN mkdir /dockerfile  執行的命令
RUN echo 'hellow world' > /dockerfile/file.txt
CMD echo '歡迎來到新的容器'  開機自動執行命令(參數會被覆蓋)
ENTYRPOINT ['hell']                開機自動執行命令(參數不能夠重複,和覆蓋)
WORKDIR: 設置進入容器後的 默認(pwd)工做目錄

ADD: 把宿主機的目錄 打包到容器目錄 (注意使用的是當前 build目錄下的文件) docker history centos 查看鏡像打包的過程 執行了那些指令 EXPOSE
80 指定容器對外開放的端口

 

3)、dockerfile製做Django web應用鏡像 例子

From centos
MAINTAINER egon zhanggen@le.com
CMD /usr/sbin/init
RUN yum -y install epel*
RUN yum -y install python-pip
RUN pip install django
RUN pip install gunicorn
ADD /zhanggen/ /zhanggen/
CMD cd /zhanggen/djproject
CMD systemctl start nginx
WORKDIR /zhanggen/djproject
CMD gunicorn djproject.wsgi:application --bind=0.0.0.0:8000
EXPOSE 8000

 

4)、生成鏡像

docker build -t zhanggen .

[root@localhost nginx]# cd /build/
[root@localhost build]# ls
Dockerfile  web.conf  zhanggen
[root@localhost build]# vim Dockerfile 
[root@localhost build]# ls
Dockerfile  web.conf  zhanggen
[root@localhost build]# docker build -t zhanggen .

 

5)、補充

修改docker的 -log-driver(日誌引擎) 爲fluentd 收集日誌模式
docker run -id --log-driver=fluentd --log-opt fluentd-address=192.168.182.146:24224 -p 8000:8000 zhanggen
docker logs -f -t --since="2017-05-31" --tail=10 容器ID  查看容器的日誌

 

4.一、鏡像管理

docker pull hello-world 從Docker Hub拉取鏡像
docker rmi 18 刪除鏡像
docker run -it centos bash 使用centos鏡像啓動一個容器

 docker鏡像的傳播

 docker save nginx > nginx.tar 打包鏡像

 docker load < nginx.tar       解壓

 

4.二、容器管理

docker ps 查看已經啓動的容器

docker top 容器ID 查看容器中運行的程序

docker run -idt  365 /bin/bash 在後臺啓動容器(exit)

docker exec -it 4a bash 進入容器

docker stop 0 關閉後臺運行的容器

docker ps -a 查看已經關閉的容器

docker rm f 刪除容器信息(容器關閉後,容器信息還會存在)

 docker rmi -f 4a725d3b3b1c  強制刪除

docker rm `docker ps -aq` 刪除全部容器

docker inspect 16 查看容器的相關信息

 

4.三、docker的網絡模型

1)docker network list 查看容器使用的網絡鏈接方式

bridge bridge local                      默認使用橋接主機模式
662180063e96 host host local   HOST模式和主機公用一個IP
9a36997e04cd none null local    none模式容器不使用IP地址

2)添加容器和主機間的端口映射

                宿主機/docker
docker run -d -p 8080:80 nginx  (切記開啓systemctl start firewalld.service
)

 

 

2、Nginx+Gunicorn+Django(docker)+Fluentd 收集docker日誌

 

 

 

一、安裝gunicorn

 pip install gunicorn
pip install gunicorn

 

二、修改Django項目的setings文件

ALLOWED_HOSTS = ['*']

STATIC_URL = '/static/'
STATICFILES_DIRS=(
    os.path.join(BASE_DIR,'static'),
                )
STATIC_ROOT = os.path.join(BASE_DIR,"static_assets")
setings.py

 

 三、建立static文件、media文件路徑,將django admin和django app的靜態文件收集到STATIC_ROOT目錄下並設置 靜態文件的訪問路徑

[root@cmdb /]# ls
WorkOrderSystemData
static_assets 
mkdir static_assets WorkOrderSystemData
python manage.py collectstatic
python manage.py collectstatic
from django.contrib.staticfiles.urls import staticfiles_urlpatterns


urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^$', views.login),
    url(r'^media/(?P<path>.*)$',serve,{'document_root': settings.MEDIA_ROOT}),
    url(r'^arya/', (v1.site.urls,None,'arya')),
    url(r'^login/', views.login,name='login'),
    url(r'^chart/', views.chart),
    url(r'^chartdown/', views.chartdown),
    url(r'^chartdownload/', views.chartdownload),
    url(r'^check_code/', views.check_code),
    url(r'^login_first/$', views.login_first),
    url(r'^login_first/changepwd/$', views.changepwd), #修改密碼
    url(r'^work_order_api/$', views.work_order_api),#zabbix報警轉換成工單的API
    url(r'^users_api/$', views.user_list),
    url(r'^DBshow/', include('DBshow.urls')),
    url(r'^track_work_api/$', views.track_work_api, name='track_work_api'),               #把工單轉移到跟進表
    url(r'^list_trackwork_api/$', views.trackwork_list_api, name='trackwork_list_api'),      #獲取跟進表數據  
    url(r'^multitask/',include('multitask.urls')),
    url(r'^webcron/', include('webcron.urls')),
    url(r'^oauth2/sina/', views.oauth_sina),
    url(r'^pc-geetest/register', views.get_geetest, name='get_geetest'),  # 極驗獲取驗證碼url
    url(r'^ajax_cmdb_details/',views.ajax_cmdb_details,name='ajax_cmdb_details'), #cmdb詳情頁api
    url(r'^chat/', include('chat.urls')),
    url(r'^auto_option/',include('DB_auto.urls')),
    url(r'^chart_warn',views.chart_warn),

]



urlpatterns += [url(r'^static/(?P<path>.*)$', serve,{'document_root': settings.STATIC_ROOT})]
urls.py

 

四、配置文件

Nginx配置文件

user root root;
worker_processes  4;

#error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;

#pid        logs/nginx.pid;


events {
    worker_connections  1024;
}


http {
    include       mime.types;
    default_type  application/octet-stream;

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';

    #access_log  logs/access.log  main;

    sendfile        on;
    tcp_nopush     on;
    #keepalive_timeout  0;
    keepalive_timeout  65;
    gzip  on;

   # upstream app_server {
   #     server 127.0.0.1:8000;  #與gunicorn配置中bind的地址一致
   # }

    server {
        listen       8080;
        server_name  172.17.10.112;  #域名或主機地址

        access_log  logs/host.access.log  main;

        location / {
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header Host $http_host;
            proxy_redirect off;
            proxy_pass http://172.17.10.112:8001;
        }
        
        location /static/ {
            alias /static_assets/;
        }

        location /media/ {
            alias /WorkOrderSystemData/cmdb_media/;
        }

        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }

    }

}
/usr/local/nginx/conf/nginx.conf

PS:

 alias是絕對路徑, root是/下的相對路徑

 

gunicorn配置文件

import logging
import logging.handlers
from logging.handlers import WatchedFileHandler
import os
import multiprocessing

bind = "0.0.0.0:8001"   #綁定的ip與端口
backlog = 512                #監聽隊列數量,64-2048
#chdir = '/home/test/server/bin'  #gunicorn要切換到的目的工做目錄
worker_class = 'sync' #使用gevent模式,還可使用sync 模式,默認的是sync模式
workers = 4 # multiprocessing.cpu_count()    #進程數
threads = 2 #multiprocessing.cpu_count()*4 #指定每一個進程開啓的線程數
loglevel = 'info' #日誌級別,這個日誌級別指的是錯誤日誌的級別,而訪問日誌的級別沒法設置
access_log_format = '%(t)s %(p)s %(h)s "%(r)s" %(s)s %(L)s %(b)s %(f)s" "%(a)s"'

# accesslog = "/home/log/gunicorn_access.log"      #訪問日誌文件
#errorlog = "/home/log/gunicorn_error.log"        #錯誤日誌文件
accesslog = "-"  #訪問日誌文件,"-" 表示標準輸出
errorlog = "-"   #錯誤日誌文件,"-" 表示標準輸出
proc_name = 'fof_api'   #進程名

              
gunicorn.conf.py

 

五、經過gunicorn管理Django項目 (切記在manage.py 這個目錄下)

啓動Django

[root@cmdb cmdb_rbac_arya]# gunicorn -c gunicorn.conf.py cmdb_rbac_arya.wsgi:application
[2019-04-01 15:19:28 +0800] [9222] [INFO] Starting gunicorn 19.9.0
[2019-04-01 15:19:28 +0800] [9222] [INFO] Listening at: http://0.0.0.0:8001 (9222)
[2019-04-01 15:19:28 +0800] [9222] [INFO] Using worker: threads
[2019-04-01 15:19:28 +0800] [9225] [INFO] Booting worker with pid: 9225
[2019-04-01 15:19:28 +0800] [9226] [INFO] Booting worker with pid: 9226
[2019-04-01 15:19:28 +0800] [9228] [INFO] Booting worker with pid: 9228
[2019-04-01 15:19:28 +0800] [9229] [INFO] Booting worker with pid: 9229
gunicorn -c gunicorn.conf.py cmdb_rbac_arya.wsgi:application

重啓Django

[root@cmdb arya]# pstree -ap|grep gunicorn
  |-gunicorn,929 /usr/bin/gunicorn -c gunicorn.conf.py cmdb_rbac_arya.wsgi:application
  |   |-gunicorn,979 /usr/bin/gunicorn -c gunicorn.conf.py cmdb_rbac_arya.wsgi:application
  |   |   |-{gunicorn},11343
  |   |   |-{gunicorn},11344
  |   |   |-{gunicorn},11345
  |   |   |-{gunicorn},11346
  |   |   |-{gunicorn},11347
  |   |   |-{gunicorn},11348
  |   |   |-{gunicorn},11349
  |   |   |-{gunicorn},11350
  |   |   |-{gunicorn},11351
  |   |   |-{gunicorn},11352
  |   |   |-{gunicorn},11353
  |   |   |-{gunicorn},11354
  |   |   |-{gunicorn},11355
  |   |   |-{gunicorn},11356
  |   |   |-{gunicorn},11357
  |   |   |-{gunicorn},11358
  |   |   |-{gunicorn},11359
  |   |   |-{gunicorn},11360
  |   |   |-{gunicorn},11361
  |   |   |-{gunicorn},11362
  |   |   |-{gunicorn},11363
  |   |   |-{gunicorn},11364
  |   |   |-{gunicorn},11365
  |   |   |-{gunicorn},11366
  |   |   |-{gunicorn},11367
  |   |   |-{gunicorn},11368
  |   |   |-{gunicorn},11369
  |   |   |-{gunicorn},11370
  |   |   |-{gunicorn},11371
  |   |   |-{gunicorn},11372
  |   |   |-{gunicorn},11373
  |   |   |-{gunicorn},11374
  |   |   `-{gunicorn},11409
  |   |-gunicorn,980 /usr/bin/gunicorn -c gunicorn.conf.py cmdb_rbac_arya.wsgi:application
  |   |   |-{gunicorn},18450
  |   |   |-{gunicorn},18451
  |   |   |-{gunicorn},18452
  |   |   |-{gunicorn},18453
  |   |   |-{gunicorn},18454
  |   |   |-{gunicorn},18455
  |   |   |-{gunicorn},18456
  |   |   |-{gunicorn},18457
  |   |   |-{gunicorn},18458
  |   |   |-{gunicorn},18459
  |   |   |-{gunicorn},18460
  |   |   |-{gunicorn},18461
  |   |   |-{gunicorn},18462
  |   |   |-{gunicorn},18463
  |   |   |-{gunicorn},18464
  |   |   |-{gunicorn},18465
  |   |   |-{gunicorn},18466
  |   |   |-{gunicorn},18467
  |   |   |-{gunicorn},18468
  |   |   |-{gunicorn},18469
  |   |   |-{gunicorn},18470
  |   |   |-{gunicorn},18471
  |   |   |-{gunicorn},18472
  |   |   |-{gunicorn},18473
  |   |   |-{gunicorn},18474
  |   |   |-{gunicorn},18475
  |   |   |-{gunicorn},18476
  |   |   |-{gunicorn},18477
  |   |   |-{gunicorn},18478
  |   |   |-{gunicorn},18479
  |   |   |-{gunicorn},18480
  |   |   |-{gunicorn},18481
  |   |   `-{gunicorn},18482
  |   |-gunicorn,982 /usr/bin/gunicorn -c gunicorn.conf.py cmdb_rbac_arya.wsgi:application
  |   |   |-{gunicorn},11376
  |   |   |-{gunicorn},11377
  |   |   |-{gunicorn},11378
  |   |   |-{gunicorn},11379
  |   |   |-{gunicorn},11380
  |   |   |-{gunicorn},11381
  |   |   |-{gunicorn},11382
  |   |   |-{gunicorn},11383
  |   |   |-{gunicorn},11384
  |   |   |-{gunicorn},11385
  |   |   |-{gunicorn},11386
  |   |   |-{gunicorn},11387
  |   |   |-{gunicorn},11388
  |   |   |-{gunicorn},11389
  |   |   |-{gunicorn},11390
  |   |   |-{gunicorn},11391
  |   |   |-{gunicorn},11392
  |   |   |-{gunicorn},11393
  |   |   |-{gunicorn},11394
  |   |   |-{gunicorn},11395
  |   |   |-{gunicorn},11396
  |   |   |-{gunicorn},11397
  |   |   |-{gunicorn},11398
  |   |   |-{gunicorn},11399
  |   |   |-{gunicorn},11400
  |   |   |-{gunicorn},11401
  |   |   |-{gunicorn},11402
  |   |   |-{gunicorn},11403
  |   |   |-{gunicorn},11404
  |   |   |-{gunicorn},11405
  |   |   |-{gunicorn},11406
  |   |   |-{gunicorn},11407
  |   |   `-{gunicorn},11410
  |   `-gunicorn,984 /usr/bin/gunicorn -c gunicorn.conf.py cmdb_rbac_arya.wsgi:application
  |       |-{gunicorn},11411
  |       |-{gunicorn},11412
  |       |-{gunicorn},11413
  |       |-{gunicorn},11414
  |       |-{gunicorn},11415
  |       |-{gunicorn},11416
  |       |-{gunicorn},11417
  |       |-{gunicorn},11418
  |       |-{gunicorn},11419
  |       |-{gunicorn},11420
  |       |-{gunicorn},11421
  |       |-{gunicorn},11422
  |       |-{gunicorn},11423
  |       |-{gunicorn},11424
  |       |-{gunicorn},11425
  |       |-{gunicorn},11426
  |       |-{gunicorn},11427
  |       |-{gunicorn},11428
  |       |-{gunicorn},11429
  |       |-{gunicorn},11430
  |       |-{gunicorn},11431
  |       |-{gunicorn},11432
  |       |-{gunicorn},11433
  |       |-{gunicorn},11434
  |       |-{gunicorn},11435
  |       |-{gunicorn},11436
  |       |-{gunicorn},11437
  |       |-{gunicorn},11438
  |       |-{gunicorn},11439
  |       |-{gunicorn},11440
  |       |-{gunicorn},11441
  |       |-{gunicorn},11442
  |       `-{gunicorn},11443
  |   |       |-grep,16177 gunicorn
[root@cmdb arya]# kill -HUP 979
pstree -ap|grep gunicorn kill -HUP 979

中止Django

[root@cmdb arya]# pstree -ap|grep gunicorn
  |-gunicorn,929 /usr/bin/gunicorn -c gunicorn.conf.py cmdb_rbac_arya.wsgi:application
  |   |-gunicorn,980 /usr/bin/gunicorn -c gunicorn.conf.py cmdb_rbac_arya.wsgi:application
  |   |   |-{gunicorn},18450
  |   |   |-{gunicorn},18451
  |   |   |-{gunicorn},18452
  |   |   |-{gunicorn},18453
  |   |   |-{gunicorn},18454
  |   |   |-{gunicorn},18455
  |   |   |-{gunicorn},18456
  |   |   |-{gunicorn},18457
  |   |   |-{gunicorn},18458
  |   |   |-{gunicorn},18459
  |   |   |-{gunicorn},18460
  |   |   |-{gunicorn},18461
  |   |   |-{gunicorn},18462
  |   |   |-{gunicorn},18463
  |   |   |-{gunicorn},18464
  |   |   |-{gunicorn},18465
  |   |   |-{gunicorn},18466
  |   |   |-{gunicorn},18467
  |   |   |-{gunicorn},18468
  |   |   |-{gunicorn},18469
  |   |   |-{gunicorn},18470
  |   |   |-{gunicorn},18471
  |   |   |-{gunicorn},18472
  |   |   |-{gunicorn},18473
  |   |   |-{gunicorn},18474
  |   |   |-{gunicorn},18475
  |   |   |-{gunicorn},18476
  |   |   |-{gunicorn},18477
  |   |   |-{gunicorn},18478
  |   |   |-{gunicorn},18479
  |   |   |-{gunicorn},18480
  |   |   |-{gunicorn},18481
  |   |   `-{gunicorn},18482
  |   |-gunicorn,982 /usr/bin/gunicorn -c gunicorn.conf.py cmdb_rbac_arya.wsgi:application
  |   |   |-{gunicorn},11376
  |   |   |-{gunicorn},11377
  |   |   |-{gunicorn},11378
  |   |   |-{gunicorn},11379
  |   |   |-{gunicorn},11380
  |   |   |-{gunicorn},11381
  |   |   |-{gunicorn},11382
  |   |   |-{gunicorn},11383
  |   |   |-{gunicorn},11384
  |   |   |-{gunicorn},11385
  |   |   |-{gunicorn},11386
  |   |   |-{gunicorn},11387
  |   |   |-{gunicorn},11388
  |   |   |-{gunicorn},11389
  |   |   |-{gunicorn},11390
  |   |   |-{gunicorn},11391
  |   |   |-{gunicorn},11392
  |   |   |-{gunicorn},11393
  |   |   |-{gunicorn},11394
  |   |   |-{gunicorn},11395
  |   |   |-{gunicorn},11396
  |   |   |-{gunicorn},11397
  |   |   |-{gunicorn},11398
  |   |   |-{gunicorn},11399
  |   |   |-{gunicorn},11400
  |   |   |-{gunicorn},11401
  |   |   |-{gunicorn},11402
  |   |   |-{gunicorn},11403
  |   |   |-{gunicorn},11404
  |   |   |-{gunicorn},11405
  |   |   |-{gunicorn},11406
  |   |   |-{gunicorn},11407
  |   |   `-{gunicorn},11410
  |   |-gunicorn,984 /usr/bin/gunicorn -c gunicorn.conf.py cmdb_rbac_arya.wsgi:application
  |   |   |-{gunicorn},11411
  |   |   |-{gunicorn},11412
  |   |   |-{gunicorn},11413
  |   |   |-{gunicorn},11414
  |   |   |-{gunicorn},11415
  |   |   |-{gunicorn},11416
  |   |   |-{gunicorn},11417
  |   |   |-{gunicorn},11418
  |   |   |-{gunicorn},11419
  |   |   |-{gunicorn},11420
  |   |   |-{gunicorn},11421
  |   |   |-{gunicorn},11422
  |   |   |-{gunicorn},11423
  |   |   |-{gunicorn},11424
  |   |   |-{gunicorn},11425
  |   |   |-{gunicorn},11426
  |   |   |-{gunicorn},11427
  |   |   |-{gunicorn},11428
  |   |   |-{gunicorn},11429
  |   |   |-{gunicorn},11430
  |   |   |-{gunicorn},11431
  |   |   |-{gunicorn},11432
  |   |   |-{gunicorn},11433
  |   |   |-{gunicorn},11434
  |   |   |-{gunicorn},11435
  |   |   |-{gunicorn},11436
  |   |   |-{gunicorn},11437
  |   |   |-{gunicorn},11438
  |   |   |-{gunicorn},11439
  |   |   |-{gunicorn},11440
  |   |   |-{gunicorn},11441
  |   |   |-{gunicorn},11442
  |   |   `-{gunicorn},11443
  |   `-gunicorn,16185 /usr/bin/gunicorn -c gunicorn.conf.py cmdb_rbac_arya.wsgi:application
  |   |       |-grep,16195 gunicorn
[root@cmdb arya]# kill -9 929 
pstree -ap|grep gunicorn kill -9 929

nohup啓動

[root@cmdb cmdb_rbac_arya]# nohup gunicorn -c gunicorn.conf.py cmdb_rbac_arya.wsgi:application &

 

ps:生產環境中發現Django修改了代碼以後,必定要重啓gunicorn,纔會生效

 

 

六、安裝Fluend

curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sh

七、啓動Fluend

systemctl restart td-agent.service 

八、配置Fluend

1、
vim /etc/td-agent/td-agent.conf 
2<source>
  @type forward
</source>

<source>
    @type http
    port 8888
    bind 0.0.0.0
</source>
<match *> @type file path /log/zhanggen.log flush_interval 10000000000000s #日誌從緩存寫到文件的刷新時間 </match>

 九、Fluend配置參考:

http://www.cnblogs.com/kaituorensheng/p/5146133.html

 

 

 

3、提高網站性能相關

一、 RabbitMQ

 RabbitMQ消息隊列是一個消息隊列,在學習消息隊列前先來回顧一下Python的 queue模塊

import queue
q=queue.Queue() #建立1個隊列 q

q.put('1')   #在q 隊列中存放1個爲1的值
q.put('2')   #在q 隊列中存放1個爲1的值

print(q.get()) #1   #注意獲取順序:先進先出 frist in frist out
print(q.get()) #2

 

隊列解決的2個問題:

問題1:解耦

問題2:異步

 

什麼是同步和異步?

同步:用戶A  向web服務器發送請求,web服務器去鏈接數據庫、連表...查詢,等待數據庫響應給web服務期以後,web服務器在把結果響應給用戶A;

在web服務器鏈接數據庫、查詢數據 期間web服務器和用戶A都不能作其餘的事情,若是其餘B、C、D用戶請求過來也只能排隊;

 

優勢:保證當前任務及時執行

缺點:排隊、耗時問題

 

異步:用戶A若是想要訪問web服務器,就把請求信息放到隊列裏,隊列生成惟一任務ID,隊列響應用戶A;用戶B,C...請求過來也是放在隊列,由隊列生成惟一任務ID;

web服務去隊列裏面拿任務並執行,執行完畢響應給隊列,隊列根據任務ID,再把web服務的執行的結果響應給各位用戶;

                      這就大大下降了用戶和服務端的耦合性;

優勢:解決排隊問題,解耦

缺點:不能任務及時執行

 

消息隊列:

1.存儲消息

2.保證消息順序

3.保證任務的交付

 

爲何要用RabbitMQ而不用Python的queue模塊?

Python的queue模塊只能在同1個進程中 開子線程往隊列裏面放消息,再開子線程往隊列裏面取消息,不能誇進程也不就不能用在生產環境;

 

 

 

 

RabbitMQ的架構

RabbitMQ的架構就是一個分佈式得生產者和消費者模型,它是1個獨立的組件能夠單獨監聽在服務器的某個端口上;

 

 

 

 

 

RabbitMQ使用

1.安裝RabbitMQ

1.安裝erlang
下載rpm倉庫:wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm

安裝rpm倉庫
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm

安裝erlang
yum -y install erlang


2.安裝RabbitMQ
下載RabbitMQ的rpm:wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el6.noarch.rpm
 
yum -y install rabbitmq-server-3.6.6-1.el6.noarch.rpm
 
注:
 
若是報:Requires: socat
 
更新源wget –no-cache http://www.convirture.com/repos/definitions/rhel/6.x/convirt.repo -O /etc/yum.repos.d/convirt.repo
yum install socat
啓動rabbitmq服務:   

關閉防火牆:
systemctl stop firewalld.service  
前臺運行:rabbitmq-server start (用戶關閉鏈接後,自動結束進程)  

後臺運行:rabbitmq-server -detached   (RabbitMQ默認監聽5672端口)
 
參考連接:https://www.cnblogs.com/crazylqy/p/6567253.html

 

二、RabbitMQ服務器

rabbitmqctl list_queues:顯示當前的隊列列表

[root@localhost ~]# rabbitmqctl list_queues
Listing queues ...
q1 隊列中有5個任務
[root@localhost ~]# 

 

add_user zhanggen password  :在rabbitmq server上建立一個用戶並設置密碼

rabbitmqctl set_permissions -p / zhanggen ".*" ".*" ".*":同時還要配置權限,容許從外面訪問;set_permissions [-p vhost] {user} {conf} {write} {read}

vhost
The name of the virtual host to which to grant the user access, defaulting to /.

user
The name of the user to grant access to the specified virtual host.

conf
A regular expression matching resource names for which the user is granted configure permissions.

write
A regular expression matching resource names for which the user is granted write permissions.

read
A regular expression matching resource names for which the user is granted read permissions.

 
參數說明

 

 

 三、生產者使用RabbitMQ

 

#!/usr/bin/env/python
# -*- coding: utf-8 -*-
import pika
auth=pika.PlainCredentials('zhanggen','123.com') #建立鏈接須要認證信息
conn=pika.BlockingConnection(pika.ConnectionParameters(host="192.168.226.128",credentials=auth)) #建立鏈接

channel=conn.channel() #創建鏈接通道


channel.queue_declare(queue='q1') #在RabbitMQ服務器中建立1個隊列

channel.basic_publish(exchange='',routing_key='q1',body='hello world') #在q1隊列裏放消息
#exchange :消息過濾模塊,空表明使用默認
#routing_key:告訴exchange把hello world轉發到q1隊列上

conn.close() #關閉鏈接
生產者

 

四、消費者使用RabbitMQ

import pika
auth=pika.PlainCredentials('zhanggen','123.com') #建立鏈接須要認證信息
conn=pika.BlockingConnection(pika.ConnectionParameters(host="192.168.226.128",credentials=auth)) #建立鏈接

channel=conn.channel() #創建鏈接通道


def callback(ch,method,properties,body):
    print(body)

#callback:獲取到消息以後調用的函數
#queue='q1:獲取消息的隊列
channel.basic_consume(callback,queue='q1',no_ack=True)

channel.start_consuming() #開始消費獲取數據 (阻塞模式有接收,沒有一直等待)
消費者

 

五、RabbitMQ消息的持久化

生產者指定本身發送的數據持久化,消費者把消息消費完畢以後手動向RabbitMQ確認,RabbitMQ確認以後刪除本次任務;(保證消費者在執行任務過程當中宕機,隊列消息不丟失

#!/usr/bin/env/python
# -*- coding: utf-8 -*-
import pika
auth=pika.PlainCredentials('zhanggen','123.com') #建立鏈接須要認證信息
conn=pika.BlockingConnection(pika.ConnectionParameters(host="192.168.226.128",credentials=auth)) #建立鏈接

channel=conn.channel() #創建鏈接通道


channel.queue_declare(queue='q1') #在RabbitMQ服務器中建立1個隊列

channel.basic_publish(exchange='',routing_key='q1',body='hello world',properties=pika.BasicProperties(delivery_mode=2)) #在q1隊列裏放消息
#exchange :消息過濾模塊,空表明使用默認
#routing_key:告訴exchange把hello world轉發到q1隊列上
#properties:發送的消息持久化

conn.close() #關閉鏈接
生產者
import pika
auth=pika.PlainCredentials('zhanggen','123.com') #建立鏈接須要認證信息
conn=pika.BlockingConnection(pika.ConnectionParameters(host="192.168.226.128",credentials=auth)) #建立鏈接

channel=conn.channel() #創建鏈接通道


def callback(ch,method,properties,body):
    print(body)
    ch.basic_ack(delivery_tag=method.delivery_tag) #手動確認

#callback:獲取到消息以後調用的函數
#queue='q1:獲取消息的隊列
#no_ack=True:消息處理完畢後不向ranbit-server彙報任務執行完畢!若是服務器端沒有收到確認,服務端就會一直保留任務
channel.basic_consume(callback,queue='q1',)

channel.start_consuming() #開始消費獲取數據 (阻塞模式有接收,沒有一直等待)
消費者

 

 

 六、隊列持久化(durable=True)

消息持久化是足以應對生產環境的, 若是RabbitMQ-server宕機了,重啓以後全部隊列就會所有消失,這無疑是災難;全部在建立隊列的時候就要設置隊列持久化;

#!/usr/bin/env/python
# -*- coding: utf-8 -*-
import pika
auth=pika.PlainCredentials('zhanggen','123.com') #建立鏈接須要認證信息
conn=pika.BlockingConnection(pika.ConnectionParameters(host="192.168.226.128",credentials=auth)) #建立鏈接

channel=conn.channel() #創建鏈接通道


channel.queue_declare(queue='task1',durable=True) #在RabbitMQ服務器中建立1個隊列,durable=True保證隊列持久化

channel.basic_publish(exchange='',routing_key='task1',body='hello world',properties=pika.BasicProperties(delivery_mode=2)) #在q1隊列裏放消息
#exchange :消息過濾模塊,空表明使用默認
#routing_key:告訴exchange把hello world轉發到q1隊列上
#properties:發送的消息持久化

conn.close() #關閉鏈接
隊列+消息持久化(生產者)
import pika
auth=pika.PlainCredentials('zhanggen','123.com') #建立鏈接須要認證信息
conn=pika.BlockingConnection(pika.ConnectionParameters(host="192.168.226.128",credentials=auth)) #建立鏈接

channel=conn.channel() #創建鏈接通道


def callback(ch,method,properties,body):
    print(body)
    ch.basic_ack(delivery_tag=method.delivery_tag) #手動確認

#callback:獲取到消息以後調用的函數
#queue='q1:獲取消息的隊列
#no_ack=True:消息處理完畢後不向ranbit-server彙報任務執行完畢!若是服務器端沒有收到確認,服務端就會一直保留任務
channel.basic_consume(callback,queue='task1',)

channel.start_consuming() #開始消費獲取數據 (阻塞模式有接收,沒有一直等待)
隊列+消息持久化(消費者)

 

 

七、RabbitMQ消息公平消費

RabbitMQ是生產者和消費者模型,默認狀況下是RabbitMQ-server收到生產者的消息以後,會按照輪詢規則把任務分發 給鏈接到RabbitMQ-server的消費者們;

不考慮消費者負載不一樣的話,極可能出現,一個機器配置不高的消費者那裏堆積了不少消息處理不完,同時配置高的消費者卻一直很輕鬆。爲解決此問題,能夠在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了;

#!/usr/bin/env/python
# -*- coding: utf-8 -*-
import pika
auth=pika.PlainCredentials('zhanggen','123.com') #建立鏈接須要認證信息
conn=pika.BlockingConnection(pika.ConnectionParameters(host="192.168.226.128",credentials=auth)) #建立鏈接

channel=conn.channel() #創建鏈接通道


channel.queue_declare(queue='task1',durable=True) #在RabbitMQ服務器中建立1個隊列,durable=True保證隊列持久化

channel.basic_publish(exchange='',routing_key='task1',body='hello world',properties=pika.BasicProperties(delivery_mode=2)) #在q1隊列裏放消息
#exchange :消息過濾模塊,空表明使用默認
#routing_key:告訴exchange把hello world轉發到q1隊列上
#properties:發送的消息持久化

conn.close() #關閉鏈接
生產者代碼不變
import pika
auth=pika.PlainCredentials('zhanggen','123.com') #建立鏈接須要認證信息
conn=pika.BlockingConnection(pika.ConnectionParameters(host="192.168.226.128",credentials=auth)) #建立鏈接

channel=conn.channel() #創建鏈接通道


def callback(ch,method,properties,body):
    import time
    time.sleep(10)
    print(body)
    ch.basic_ack(delivery_tag=method.delivery_tag) #手動確認


#在開始消費以前聲明消費消息的個數
channel.basic_qos(prefetch_count=1)

#callback:獲取到消息以後調用的函數
#queue='q1:獲取消息的隊列
#no_ack=True:消息處理完畢後不向ranbit-server彙報任務執行完畢!若是服務器端沒有收到確認,服務端就會一直保留任務
channel.basic_consume(callback,queue='task1',)

channel.start_consuming() #開始消費獲取數據 (阻塞模式有接收,沒有一直等待)
屌絲消費者
import pika
auth=pika.PlainCredentials('zhanggen','123.com') #建立鏈接須要認證信息
conn=pika.BlockingConnection(pika.ConnectionParameters(host="192.168.226.128",credentials=auth)) #建立鏈接

channel=conn.channel() #創建鏈接通道


def callback(ch,method,properties,body):
    import time
    time.sleep(2)
    print(body)
    ch.basic_ack(delivery_tag=method.delivery_tag) #手動確認


#在開始消費以前聲明消費消息的個數
channel.basic_qos(prefetch_count=100000)

#callback:獲取到消息以後調用的函數
#queue='q1:獲取消息的隊列
#no_ack=True:消息處理完畢後不向ranbit-server彙報任務執行完畢!若是服務器端沒有收到確認,服務端就會一直保留任務
channel.basic_consume(callback,queue='task1',)

channel.start_consuming() #開始消費獲取數據 (阻塞模式有接收,沒有一直等待)
高富帥消費者

 

 

八、RabbitMQ消息發佈和訂閱

以前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被全部的Queue收到,相似廣播的效果,這時候就要用到消息過濾exchange了,

Exchange在定義的時候是有類型的,以決定究竟是哪些Queue符合條件,能夠接收消息,

 

廣播(fanout): 全部bind到此exchange的queue均可以接收消息


組播( direct): 經過routingKey和exchange決定的那個惟一的queue能夠接收消息


 根據特定規則推送(topic):全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息

 

 

 

 

一、消息廣播(exchange_type='fanout')

 

 

生產者推送消息到exchange過濾器,過濾器不作任何 路由規則過濾, 把消息轉發至 全部和本過濾器有訂閱關係的隊列;

 

#!/usr/bin/env/python
# -*- coding: utf-8 -*-
import pika

#一、建立鏈接
conn=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.226.128'))

#二、創建隊列鏈接通道
channel=conn.channel()

#三、建立生產者推送消息的exchange(消息過濾器)和消息過濾器的類型爲廣播
# 注意exchange不負責存消息,只負責轉發消息,若是沒有隊列和exchange綁定,生產者轉發到exchange的消息會丟失。
channel.exchange_declare(exchange='exchange1',exchange_type='fanout')


#四、生產者 推送發送消息到名稱爲exchange1的過濾器
mssage='info:Hello World'
channel.basic_publish(exchange='exchange1',routing_key='',body=mssage)

#五、關閉鏈接
conn.close()


#注意exchange只負責轉發消息,不負責存消息,若是消費者端 沒聲明隊列 並和exchange1進行綁定,生產者轉發到exchange1的消息將會所有丟失!!
生產者

 

注意:exchange只負責轉發生產者端投遞得消息到和本exchange有綁定關係的隊列,不負責存儲消息;

若是消費者端 沒聲明隊列 並和exchange1進行綁定,生產者轉發到exchange1的消息將會所有丟失!!

#!/usr/bin/env/python
# -*- coding: utf-8 -*-

import pika
conn=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.226.128'))

channel=conn.channel()

channel.exchange_declare(exchange='exchange1',exchange_type='fanout')
#爲何生產者已經聲明exchange,消費者也須要聲明exchange,
#這就是爲了防止 消費者先啓動起來沒有exchange報錯,生產者啓動後會檢查 隊列是否已經聲明exchange?若是已經聲明直接使用


#動態生成對隊列名稱惟1、且不重複的隊列
queue_obj=channel.queue_declare(exclusive=True)

#獲取動態隊列的 隊列名稱
queue_name=queue_obj.method.queue

#綁定隊列 和exchange的關係
channel.queue_bind(exchange='exchange1',queue=queue_name)


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


#指定消費者接收 消息的隊列
channel.basic_consume(callback,queue=queue_name)

channel.start_consuming()
消費者1
#!/usr/bin/env/python
# -*- coding: utf-8 -*-

import pika
conn=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.226.128'))

channel=conn.channel()

channel.exchange_declare(exchange='exchange1',exchange_type='fanout')
#爲何生產者已經聲明exchange,消費者也須要聲明exchange,
#這就是爲了防止 消費者先啓動起來沒有exchange報錯,生產者啓動後會檢查 隊列是否已經聲明exchange?若是已經聲明直接使用


#動態生成對隊列名稱惟1、且不重複的隊列
queue_obj=channel.queue_declare(exclusive=True)

#獲取動態隊列的 隊列名稱
queue_name=queue_obj.method.queue

#綁定隊列 和exchange的關係
channel.queue_bind(exchange='exchange1',queue=queue_name)


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


#指定消費者接收 消息的隊列
channel.basic_consume(callback,queue=queue_name)

channel.start_consuming()
消費者2
#!/usr/bin/env/python
# -*- coding: utf-8 -*-

import pika
conn=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.226.128'))

channel=conn.channel()

channel.exchange_declare(exchange='exchange1',exchange_type='fanout')
#爲何生產者已經聲明exchange,消費者也須要聲明exchange,
#這就是爲了防止 消費者先啓動起來沒有exchange報錯,生產者啓動後會檢查 隊列是否已經聲明exchange?若是已經聲明直接使用


#動態生成對隊列名稱惟1、且不重複的隊列
queue_obj=channel.queue_declare(exclusive=True)

#獲取動態隊列的 隊列名稱
queue_name=queue_obj.method.queue

#綁定隊列 和exchange的關係
channel.queue_bind(exchange='exchange1',queue=queue_name)


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


#指定消費者接收 消息的隊列
channel.basic_consume(callback,queue=queue_name)

channel.start_consuming()
消費者3

 

注意:

1.channel.queue_declare(exclusive=True) 會動態生成1個動態生成對隊列名稱惟1、且不重複的隊列;

 

2.爲何建議生產者和消費者端都要聲明隊列?

防止 消費者先啓動起來鏈接不到能夠領取任務的隊列而致使報錯;

若是消費者端已經聲明隊列,生產者啓動後會檢查本身聲明的隊列名稱是否已經存在?若是存在直接使用,開始往改隊列裏聽任務!

 

3.爲何生產者把消息轉發到隊列以後宕機了,消費者卻獲取不到任務呢?

消費者領取消息時,雙方都要在線;

 

 

二、消息組播(exchange type=direct)

routing key: 生產者channel.basic_publish(exchange='direct_logs',body=message,routing_key='路由key')咱們稱之爲routing key;

binding key:消費者channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key='綁定key')咱們稱之爲 binding key;

direct類型的Exchange路由規則也很簡單,它會把消息路由到那些binding key與routing key徹底匹配的Queue中。

 

 

 

 

 

!/usr/bin/env/python
# -*- coding: utf-8 -*-
import pika

#一、建立鏈接
conn=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.226.128',))

#二、創建隊列鏈接通道
channel=conn.channel()

#三、建立生產者推送消息的exchange(消息過濾器)和消息過濾器的類型爲廣播
# 注意exchange不負責存消息,只負責轉發消息,若是沒有隊列和exchange綁定,生產者轉發到exchange的消息會丟失。
channel.exchange_declare(exchange='direct_exchange',exchange_type='direct')


#四、生產者 推送發送消息到名稱爲exchange1的過濾器,設置routing_key='zhanggen
mssage='info:Hello World'
channel.basic_publish(exchange='direct_exchange',routing_key='zhanggen',body=mssage)

#五、關閉鏈接
conn.close()

'''
注意:生產者 推送發送消息到名稱爲direct_exchange的過濾器,設置routing_key='zhanggen;
此時directexchange 就會把生產者發送到direct_exchange過濾器的消息,發送到和directexchange過濾器具備綁定關係,
channel.queue_bind(exchange='direct_logs',routing_key='zhanggen')切綁定key="zhanggen"的隊列上

'''
組播生產者
#!/usr/bin/env/python
# -*- coding: utf-8 -*-

import pika
conn=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.226.128'))

channel=conn.channel()

channel.exchange_declare(exchange='direct_exchange',exchange_type='direct')
#爲何生產者已經聲明exchange,消費者也須要聲明exchange,
#這就是爲了防止 消費者先啓動起來沒有exchange報錯,生產者啓動後會檢查 隊列是否已經聲明exchange?若是已經聲明直接使用


#動態生成對隊列名稱惟1、且不重複的隊列
queue_obj=channel.queue_declare(exclusive=True)

#獲取動態隊列的 隊列名稱
queue_name=queue_obj.method.queue

#綁定隊列 和exchange的關係
channel.queue_bind(exchange='direct_exchange',queue=queue_name,routing_key='zhanggen')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


#指定消費者接收 消息的隊列
channel.basic_consume(callback,queue=queue_name)

channel.start_consuming()
組播消費者

 

 

三、按照規則分發(exchange type=topic)

routing key: 生產者channel.basic_publish(exchange='direct_logs',body=message,routing_key='路由key')咱們稱之爲routing key;

binding key:消費者channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key='綁定key')咱們稱之爲 binding key;

Topic 類型的Exchange路由規則也很簡單,則會按照正則表達式,對RoutingKey與BindingKey進行匹配,若是匹配成功,則發送到對應的Queue中。

 

 

 

#!/usr/bin/env/python
# -*- coding: utf-8 -*-
import pika

#一、建立鏈接
conn=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.226.128',))

#二、創建隊列鏈接通道
channel=conn.channel()

#三、建立生產者推送消息的exchange(消息過濾器)和消息過濾器的類型爲廣播
# 注意exchange不負責存消息,只負責轉發消息,若是沒有隊列和exchange綁定,生產者轉發到exchange的消息會丟失。
channel.exchange_declare(exchange='topic_logs_exchange',exchange_type='topic')


#四、生產者 推送發送消息到名稱爲exchange1的過濾器,設置routing_key='zhanggen
mssage='info:Hello World'
# channel.basic_publish(exchange='topic_logs_exchange',routing_key='dddsss.mysql.error',body=mssage)

# channel.basic_publish(exchange='topic_logs_exchange',routing_key='dddsss.redis.error',body=mssage)

channel.basic_publish(exchange='topic_logs_exchange',routing_key='.dejhjg',body=mssage)

#五、關閉鏈接
conn.close()

'''
 生產者             過濾器正則匹配        消費者
dddsss.mysql.error -------------->   *.mysql.error 
dddsss.redis.error -------------->   *.mysql.error
   all             -------------->   #
   
注意 # 匹配全部 routing_key

'''
topic 生產者
#!/usr/bin/env/python
# -*- coding: utf-8 -*-

import pika
conn=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.226.128'))

channel=conn.channel()

channel.exchange_declare(exchange='topic_logs_exchange',exchange_type='topic')
#爲何生產者已經聲明exchange,消費者也須要聲明exchange,
#這就是爲了防止 消費者先啓動起來沒有exchange報錯,生產者啓動後會檢查 隊列是否已經聲明exchange?若是已經聲明直接使用


#動態生成對隊列名稱惟1、且不重複的隊列
queue_obj=channel.queue_declare(exclusive=True)

#獲取動態隊列的 隊列名稱
queue_name=queue_obj.method.queue

#綁定隊列 和exchange的關係
# channel.queue_bind(exchange='topic_logs_exchange',queue=queue_name,routing_key='*.mysql.error')
# channel.queue_bind(exchange='topic_logs_exchange',queue=queue_name,routing_key='*.redis.error')

channel.queue_bind(exchange='topic_logs_exchange',queue=queue_name,routing_key='#')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


#指定消費者接收 消息的隊列
channel.basic_consume(callback,queue=queue_name)

channel.start_consuming()


'''
 生產者             過濾器正則匹配        消費者
dddsss.mysql.error -------------->   *.mysql.error 
dddsss.redis.error -------------->   *.mysql.error
   all             -------------->   #
   
注意 # 匹配全部 routing_key

'''
topic消費者

 

 

 四、Remote procedure call (RPC)

 經過隊列在遠程服務器執行一條命令,在經過另外一隊列返回執行結果;

 

 

__author__ = 'Administrator'

#1 。 定義fib函數
#2. 聲明接收指令的隊列名rpc_queue
#3. 開始監聽隊列,收到消息後 調用fib函數
#4 把fib執行結果,發送回客戶端指定的reply_to 隊列
import subprocess
import pika
import time
credentials = pika.PlainCredentials('alex', 'alex3714')

parameters = pika.ConnectionParameters(host='192.168.11.106',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #隊列鏈接通道

channel.queue_declare(queue='rpc_queue2')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)


def run_cmd(cmd):
    cmd_obj = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    result = cmd_obj.stdout.read() + cmd_obj.stderr.read()

    return result


def on_request(ch, method, props, body):
    cmd = body.decode("utf-8")

    print(" [.] run (%s)" % cmd)
    response = run_cmd(cmd)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to, #隊列
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=response)

    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(on_request, queue='rpc_queue2')

print(" [x] Awaiting RPC requests")
channel.start_consuming()
rpc-server

 

_author__ = 'Administrator'

# 1.聲明一個隊列,做爲reply_to返回消息結果的隊列
# 2.  發消息到隊列,消息裏帶一個惟一標識符uid,reply_to
# 3.  監聽reply_to 的隊列,直到有結果
import queue

import pika
import uuid

class CMDRpcClient(object):
    def __init__(self):
        credentials = pika.PlainCredentials('alex', 'alex3714')
        parameters = pika.ConnectionParameters(host='192.168.11.106',credentials=credentials)
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue #命令的執行結果的queue

        #聲明要監聽callback_queue
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        """
        收到服務器端命令結果後執行這個函數
        :param ch:
        :param method:
        :param props:
        :param body:
        :return:
        """
        if self.corr_id == props.correlation_id:
            self.response = body.decode("gbk") #把執行結果賦值給Response

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4()) #惟一標識符號
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue2',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))


        while self.response is None:
            self.connection.process_data_events()  #檢測監聽的隊列裏有沒有新消息,若是有,收,若是沒有,返回None
            #檢測有沒有要發送的新指令
        return self.response

cmd_rpc = CMDRpcClient()

print(" [x] Requesting fib(30)")
response = cmd_rpc.call('ipconfig')

print(response)
rpc-client

 

 

非關係型內存數據庫memcached、redis

redis是一種把數據以鍵值對(不使用二維表)形式,存儲在內存中的非關係(無1對一、1對多、多對多..關係)型數據庫,因爲數據存儲在內存中因此redis的讀寫速度很是快,通常應用在頁面緩存;

 

 memcached 和redis有什麼區別?

因爲 memcached 和redis都是非關係型數據庫全部應用十分普遍,兩者功能類似有什麼區別呢?http://blog.csdn.net/e_wsq/article/details/9183749

 

一、redis和memcached都是以鍵值對形式來存儲數據的redis有5大數據類型,而memcached:數據類型單一;

a.memcached:支持數據類型

   k(字符串):v(字符串)

 

b.redis支持數據類型

k='字符串' (注意數字也是以str存儲)

k='列表'

k='hash'

k='set'

k='有序集合'

 

 

二、持久化

memcached:不支持持久化,服務器斷電內存清空

redis:支持持久化(redis啓動後會建立另1個進程作持久化,因爲開啓了2個進程性能會減低。);

 

三、redis只能使用單核,而memcached可使用多核

 

 

 

memcached數據庫

 

準備安裝

 

一、服務端:安裝 memcached服務:

安裝:  yum -y install memcached

啓動服務:  systemctl restart memcached  

檢查服務是否啓動?:ps -aux | grep memcached 、 lsof -i :11211

 

二、客戶端: 安裝 Python鏈接memcached數據庫 :memcache模塊

 

a.源碼包安裝

wget http://ftp.tummy.com/pub/python-memcached/old-releases/python-memcached-1.54.tar.gz

tar -zxvf python-memcached-1.54.tar.gz

cd python-memcached-1.54

python setup.py install   直接安裝到Python的Site package目錄下,就能夠直接 import memcache

 

b. whl文件安裝  

https://pypi.python.org/pypi/python-memcached#downloads

pip install wheel 

pip install python_memcached-1.59-py2.py3-none-any.whl

 

 

                                                                   pycham操做memcache

 

一、第一次操做

import memcache
conn=memcache.Client([('192.168.226.128:11211',2)],debug=True)
# 第一次使用 memcha模塊
# 天生支持機器 memcache.Client(['192.168.226.128:11211',....],)內部使用一致性hash算法

# conn=memcache.Client([ ('192.168.226.128:11211',],)
conn=memcache.Client([('192.168.226.128:11211',2)]) #設置權重

conn.set( 'name','alex',time=37) #time=37:37秒以後消失
ret=conn.get('name')
print(ret)
設置1個鍵值對 37秒後消失

 

 

二、天生支持集羣(一致性hash算法)

python-memcached模塊原生支持集羣操做,其原理是在內存維護一個主機列表,且集羣中主機的權重值和主機在列表中重複出現的次數成正比

     主機    權重
    1.1.1.1   1
    1.1.1.2   2
    1.1.1.3   1
 
那麼在內存中主機列表爲:
    host_list = ["1.1.1.1", "1.1.1.2", "1.1.1.2", "1.1.1.3", ]

若是用戶根據若是要在內存中建立一個鍵值對(如:k1 = "v1"),那麼要執行一下步驟:

  • 根據算法將 k1 轉換成一個數字
  • 將數字和主機列表長度求餘數,獲得一個值 N( 0 <= N < 列表長度 )
  • 在主機列表中根據 第2步獲得的值爲索引獲取主機,例如:host_list[N]
  • 鏈接 將第3步中獲取的主機,將 k1 = "v1" 放置在該服務器的內存中

 

三、add 增長key操做

添加一條鍵值對,若是已經存在的 key,重複執行add操做異常;

#add key操做
conn=memcache.Client([('192.168.226.128:11211',2)],debug=True)
conn.add('name','alex')  #若是添加的key已經在存在,會報錯;(debug=True)
conn.add('name','egon')
ret=conn.get('name')
print(ret)

'''
報錯信息
MemCached: while expecting 'STORED', got unexpected response 'NOT_STORED'
MemCached: while expecting 'STORED', got unexpected response 'NOT_STORED'
zhanggen
'''
添加key操做

 

四、replace  替換key的value操做

replace 修改某個key的值,若是key不存在,則異常;

#replace替換某個鍵的值
conn=memcache.Client([('192.168.226.128:11211',2)],debug=True)
conn.replace('name',252)
conn.replace('ssss',252) #replace 修改某個key的值,若是key不存在,則異常;
ret=conn.get('name')
print(type(ret))
print(ret)

'''
異常信息
MemCached: while expecting 'STORED', got unexpected response 'NOT_STORED'

'''
替換key的值

 

五、set 和 set_multi 設置key的value 、同時設置多個key的value操做

set 設置一個鍵值對,若是key不存在,則建立,若是key存在,則修改;

set_multi 設置多個鍵值對,若是key不存在,則建立,若是key存在,則修改;

#五、set 和 set_multi


conn=memcache.Client([('192.168.226.128:11211',2)],debug=True)
conn.set('name',252)
print()
conn.set('name',304)  # 若是key不存在,則建立,若是key存在,則修改
ret=conn.get('name')
print(ret)  #304

conn.set_multi({'name':'999','gender':'man','age':19,  }) #set_multi 設置多個鍵值對,若是key不存在,則建立;
ret1=conn.get('name')  #若是key存在,則修改;
print(ret1) #999
set 和 set_multi

 

六、delete 和 delete_multi   刪除key 、同時刪除多個key操做

delete             在Memcached中刪除指定的一個鍵值對;
delete_multi    在Memcached中刪除指定的多個鍵值對;

#六、delete 和 delete_multi

conn=memcache.Client([('192.168.226.128:11211',2)],debug=True)
conn.set('name',252)
conn.set('name',304)  # 若是key不存在,則建立,若是key存在,則修改
conn.set_multi({'name':'999','gender':'man','age':19,  }) #set_multi 設置多個鍵值對,若是key不存在,則建立;
ret1=conn.get('name')  #若是key存在,則修改;

conn.delete('name') #刪除單個key
print(conn.get('name'))  # 若是刪除返回 None

conn.delete_multi(['name','gender','age'])  #同時刪除多個[keys] 注意:若是刪除的 key不存在會報錯
print(conn.get('age'))
刪除多個[keys]

 

 

 七、get 和 get_multi  獲取多個key的值

get            獲取一個鍵值對
get_multi   獲取多一個鍵值對  字典

#七、get 和 get_multi

conn=memcache.Client([('192.168.226.128:11211',2)],debug=True)
conn.set('name',252)
conn.set('name',304)  # 若是key不存在,則建立,若是key存在,則修改
conn.set_multi({'name':'999','gender':'man','age':19,  }) #set_multi 設置多個鍵值對,若是key不存在,則建立;




ret1=conn.get('name')  #獲取某個key的值
print(ret1) #999
item_dict = conn.get_multi(["name", "gender", "age",'' ]) #獲取鍵值對字段 注意獲取不到不會報錯
print(item_dict)                           #{'name': '999', 'gender': 'man', 'age': 19}
獲取字典類型的數據

 

 

八、append 和 prepend 編輯 key的值

append    修改指定key的值,在該值 後面 追加內容
prepend   修改指定key的值,在該值 前面 插入內容

conn=memcache.Client([('192.168.226.128:11211',2)],debug=True)
conn.set('name',252)
conn.set('name',304)  # 若是key不存在,則建立,若是key存在,則修改
conn.set_multi({'name':'999','gender':'man','age':19,  }) #set_multi 設置多個鍵值對,若是key不存在,則建立;

conn.append('name','after') #在999後面追加after
conn.prepend('name','befor') #在999前面增長befor
print(conn.get('name'))       # 結果  befor999after
編輯key的值

 

九、decr 和 incr 對 value進行加減操做

incr  自增,將Memcached中的某一個值增長 N ( N默認爲1 )
decr 自減,將Memcached中的某一個值減小 N ( N默認爲1 )

#9.decr 和 incr  + - 操做

conn=memcache.Client([('192.168.226.128:11211',2)],debug=True)
conn.set('name',252)

conn.decr('name') #默認 -1
conn.decr('name',100) #-100
conn.incr('name')  #默認+1
conn.incr('name',100) #+100
print(conn.get('name'))
加法和減法

 

十、gets 和 cas 併發獲取數據控制


假設A 從memcached獲取到了munbr=100的數據,同時B也獲取到了munbr=100的數據;

A進行加1操做設置munbr=101,而後B也想對本身獲取到的number進行加1操做,可是此時B得number(已經被A+1了)已不在是當時B獲取到的100了;

若是容許B進行+1操做,將會形成數據錯亂,正確的作法應該是給B拋出一個異常,提醒B而後讓B從新獲取一下number進行+1操做;

以上的功能 memcached的set 和 get方法是沒法實現的,須要藉助gets 和 cas 進行併發控制;

 

import memcache
conn=memcache.Client([('192.168.226.128:11211',2)],debug=True,cache_cas=True)
print(conn.gets('number'))
v=input('............')
conn.cas('number','111112222')

'''
異常信息

MemCached: while expecting 'STORED', got unexpected response 'EXISTS'
'''
A
import memcache
conn=memcache.Client([('192.168.226.128:11211',2)],debug=True,cache_cas=True)  #開啓併發控制
# print(conn.get_multi(['number',]))
print(conn.gets('number')) #
v=input('............')
conn.cas('number','rrwwwwwwwwwwwwwwwweeqqdd')
B

 

十一、memcached+Django緩存

Django支持把網頁緩存存在memcached中,目前還不支持redis,因此memcached有了用武之地;

STATIC_URL = '/static/'
STATICFILES_DIRS=(
os.path.join(BASE_DIR,'static'),
)



CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
        'LOCATION': '192.168.226.128:11211',
            # 經過本地網絡socket緩存memcache 數據庫
    }
}
setings.py配置文件
from django.shortcuts import render,HttpResponse
from django.views.decorators.cache import cache_page  #導入設置緩存的裝飾器
import time
from app01 import models
@cache_page(3)  #注意 60*3 是緩存時間爲3分鐘,(3)3秒,若是更換了設置,記得更新url
def new2(request):
    users=time.time()
    print(users)
    return HttpResponse(users)
視圖

 

 

                                                                                Python操做redis數據庫

 

 

 前言:

 

redis-py 的API的使用能夠分類爲:

  • 鏈接方式
  • 鏈接池
  • 操做
    • String 操做
    • Hash 操做
    • List 操做
    • Set 操做
    • Sort Set 操做
  • 管道(支持事物)
  • 發佈訂閱

 

 

1.鏈接redis數據庫的2種方式

redis有2種鏈接方式 普通鏈接和鏈接池

使用connection pool來管理對一個redis server的全部鏈接,避免每次創建、釋放鏈接的開銷。默認,每一個Redis實例都會維護一個本身的鏈接池。能夠直接創建一個鏈接池,而後做爲參數Redis,這樣就能夠實現多個Redis實例共享一個鏈接池。

import redis

conn=redis.Redis(host='192.168.226.128',port=6379)
conn.set('name','alex666666')   #ex:過時時間(秒)
# conn.set('name','alex',px=100,xx=True) #px:過時時間爲毫秒
print(conn.get('name')) #注意Python3中 獲取到的結果是 字節類型
普通鏈接
#鏈接池鏈接
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
#開始操做
conn.set('name','zhanggen')
print(conn.get('name'))
鏈接池

 

 

 二、字符串 操做

String操做,redis中的String在在內存中按照一個name對應一個value來存儲。如圖:

 

set()方法:

不存在設置value,若是key已經存在覆蓋原來key的值;

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
#開始操做
conn.set('name','alex')  #set():不存在設置value,若是key已經存在覆蓋原來key的值
conn.set('name','alex666')
print(conn.get('name')) #b'alex666'
set()方法

 

setnx()方法:

若是key不存在設置value,存在不會覆蓋;

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
#開始操做
conn.setnx('name','alex')  #setnx:key不存在設置value成功若是key已經存在不覆蓋key的值
conn.setnx('name','alex666')
print(conn.get('name'))
setnx()方法

 

setex(name, value, time)方法:

設置key,value的時候必需設置過時時間單位爲秒,和set方法同樣,若是設置的key已經存在,會覆蓋原來key的值;

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
#開始操做
conn.setex('name','alex',11)  #key已經存在會覆蓋原有key的值,必需設置過時時間
conn.setex('name','alex666',11)
print(conn.get('name'))
setex(key,value,time)

 

psetex(name, time_ms, value)方法:

key已經存在會覆蓋原有key的value,必需設置過時時間:,注意過時時間在k v中間,且單位爲毫秒;

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
#開始操做
conn.psetex('name',11,'alex',)  #key已經存在會覆蓋原有key的值,必需設置過時時間,注意過時時間在k v中間,單位爲毫秒
conn.psetex('name',11,'alex666')
print(conn.get('name'))
psetex()方法

 

mset(*args, **kwargs)和mget(keys, *args)方法:

批量設置值和獲取值

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)

#批量設置
conn.mset(k1='v1',k2='v2') #關鍵字參
conn.mset({'name':'alex','name1':'egon'}) #字典

#批量獲取
print(conn.mget('k1','k2'))  
print(conn.mget(['name','name1']))
批量操做

 

 

getset(name, value)方法:

獲取原來的value,而且設置新的value進行覆蓋;

#六、getset(*args, **kwargs)
 #獲取老值,設置新值

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)

conn.set('name','alex')
print(conn.getset('name','alex666')) #獲取老值而且設置新值; b'alex'
print(conn.get('name'))                #b'alex666'
getset()方法

 

 

getrange(key, start, end)方法:

對獲取到的value進行切片操做

#七、getrange(key, start, end)

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)

print(conn.getset('name','alex666')) #獲取老值而且設置新值; b'alex'
print(conn.getrange('name',1,2))       #b'le'對獲取到的 value進行切片操做
getrange(key,start,end)方法

 

setrange(name, offset, value)方法:

獲取key的老value進行切片後 和新的value進行組;

#七、setrange(key, start, end)

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)

conn.set('key','Hello ')
conn.setrange('key',6,'zhanggen') #獲取key的老value進行切片後 和新的value進行組合
print(conn.get('key')) #b'Hello zhanggen'
setrange(name, offset, value)

 

 

setbit(key, bit, 0/1)方法:

對key進行二進制操做

參數:

key:鍵

bit:鍵轉換成二進制以後的 位數

0/1:轉成0或者1

#setbit(name, bit, 0/1)

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)

conn.set('key','foo ')
conn.setbit('key',7,1) #對key進行二進制操做
print(conn.get('key')) #b'goo '
setbit(name,bit,0/1)

注:若是在Redis中有一個對應: n1 = "foo",

   那麼字符串foo的二進制表示爲: 01100110  01101111  01101111
   因此,若是執行 setbit( 'n1' 7 1 ),則就會將第 7 位設置爲 1
   那麼最終二進制則變成  01100111  01101111  01101111 ,即: "goo"

 

 

getbit(key, bit) 方法:

獲取name對應的值的二進制表示中的某位的值 (0或1)

#getbit(name, offset)
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.set('name','A')
print(conn.getbit('name',1))  #獲取A轉成二進制以後的第1位
getbit('key',bit))

 

bitcount(key, start=None, end=None)方法:

獲取key對應的value轉換成二進制以後,指定範圍, 1 的個數;

# bitcount(key, start=None, end=None)

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.set('name','Awwww')
print(conn.bitcount('name',1,4)) #獲取key對應的值的二進制表示中 1 的個數; 執行結果:24個1
bitcount(key, start=None,end=None)

 

bitop(operation, dest, *keys)

對老key對應的value作 邏輯運行,生成新key的value;

參數:

operation,AND(並) 、 OR(或) 、 NOT(非) 、 XOR(異或)
dest, 新的Redis的name
*keys,要查找的Redis的name
 
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.set('name1','alex1')
conn.set('name2','alex2')
conn.set('name3','alex3')

conn.bitop('and','name4','name1','name2','name3') #對'name1','name2','name3'對應的value作 邏輯運行,生成新key的value
print(conn.get('name4'))
conn.bitop('and','name4','name1','name2','name3')

 

 

strlen(key)

返回name對應值的字節長度(一個漢字3個字節)

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.set('name1','alex1')
conn.set('name2','1234')
print(conn.strlen('name1')) #返回 key對應value的字節長度 執行結果5
print(conn.strlen('name2')) #執行結果4
print(conn.strlen('name1')

 

incr(self, name, amount=1)和decr(self, name, amount=1)

自增和自減 key對應的值,當name不存在時,則建立key=amount,不然,則自增或自減。

#incr(self, name, amount=1)和decr(self, name, amount=1)

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.set('name2','1234')

print(conn.incr(name='name2',amount=123 )) #整數自增操做1234+123=1357
print(conn.decr('name2',amount=123))       #正數自減操做
自增和自減(整數)

 

incrbyfloat(self, name, amount=浮點型)

自增數爲浮點型增長操做

#incrbyfloat(self, name, amount=1.0) 覺得浮點型增長

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.set('name2','1234')
print(conn.incrbyfloat(name='name2',amount=3.1415926)) #整數自增操做 浮點型 #1237.1415926
conn.incrbyfloat(name='name2',amount=3.1415926)

 

 

append(key, value)

對key對應的值,進行內容追加操做;

#append(key, value)

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.set('name','Hello')
conn.append('name',' World')   #對key對應的value進行追究操做
print(conn.get('name'))  #b'Hello World'
append(key, value)

 

 

 

三、Hash(嵌套字典)操做:

redis中Hash在內存中的存儲格式以下圖:

 

 

hset(name, key, value)和hget(name,key)方法:

name對應的hash中設置一個鍵值對(不存在,則建立;不然,修改)

# hset(name, key, value)

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.hset('SessionID','username','123.com')
print(conn.hget('SessionID','username'))   #b'123.com'
hset(name, key, value)

 

 

hmset(name, mapping) 和hmget()方法:

在name對應的hash(Python字典)中批量設置鍵值對;

# hmset(name, mapping) 批量操做

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.hmset('1',{'name':'alex','pwd':'alec2098'}) #{'1':{'name':'alex','pwd':'alec2098'}}

print(conn.hmget('1',['name','pwd']))
hmset(name, mapping)

 

hgetall(name)方法:

獲取name對應hash的全部鍵值

#hgetall 獲取全部的鍵值對


import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.hmset('1',{'name':'alex','pwd':'alec2098'}) #{'1':{'name':'alex','pwd':'alec2098'}}
print(conn.hgetall('1'))  #{b'name': b'alex', b'pwd': b'alec2098'}
獲取全部鍵值對

 

hlen(name)

獲取name對應的hash(字典)中鍵值對的個數

#  hlen(name) 獲取
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.hmset('1',{'name':'alex','pwd':'alec2098','age':19}) #{'1':{'name':'alex','pwd':'alec2098'}}
print(conn.hlen('1'))  #獲取 key 對應 hash(字典)中鍵值對個數
hlen(key)

 

 

hash(字典)操做方法:(hkeys、hexists、hdel、hincrby、hincrbyfloat、負數爲自減)

#  hlen(name) 獲取
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.hmset('1',{'name':'alex','pwd':'alec2098','age':19,'gender':'woman'}) #{'1':{'name':'alex','pwd':'alec2098'}}
print(conn.hkeys('1')) #獲取hash全部的key
print(conn.hexists('1', 'pwd')) #檢查name對應的hash中是否存在該key! 執行結果:True
print(conn.hdel('1','gender'))  #刪除name對應hash中對應的key      !  執行結果:1
print(conn.hincrby('1','age',100))   #對name對應的hash中的key進行增長操做 119 (整型)
print(conn.hincrbyfloat('1','age',3.2345))  # #對name對應的hash中的key進行增長操做 119(浮點型)


# #ps:有自增爲何沒有自減方法呢?變成 負數就是自減  #注意浮點之間整型不能,自加或者自減。

print(conn.hgetall('1'))
print(conn.hincrbyfloat('1','age',-3.2345))  # #對name對應的hash中的key進行增長操做 119(浮點型)
print(conn.hincrby('1','age',-100))   #對name對應的hash中的key進行增長操做 119 (整型)
hash操做

 

生成器獲取 hash數據

增量式迭代獲取,對於數據大的數據很是有用,hscan能夠實現分片的獲取數據,並不是一次性將數據所有獲取完,從而放置內存被撐爆

hscan(name, cursor=0, match=None, count=None)

參數:

name,redis的name

cursor,遊標(基於遊標分批取獲取數

match,匹配指定key,默認None 表示全部的key (支持模糊正則)

count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數

 

如:

  第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None)

   第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None)
  第 N次:  ...
   直到返回值cursor的值爲0時,表示數據已經經過分片獲取完畢
 
# hscan()方法 迭代器獲取 hash數據
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.hmset('1',{'name':'alex','pwd':'alec2098','age':19,'gender':'woman'}) #{'1':{'name':'alex','pwd':'alec2098'}}
cursor1, data1=conn.hscan('1',cursor=0,match=None, count=1)
print(data1,cursor1)
cursor2,data2=conn.hscan('1',cursor=cursor1,match="*",count=2)
print(data2,cursor2)
迭代器獲取hash數據

 

hscan_iter(name, match=None, count=None)

hscan方法雖然解決了大數據獲取的問題,可是操做有點複雜,因此有了hscan_iter() 方法;

# hscan_iter() 方法

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.hmset('1',{'name':'alex','pwd':'alec2098','age':19,'gender':'woman'}) #{'1':{'name':'alex','pwd':'alec2098'}}
gen=conn.hscan_iter('1',match='*',count=None) #獲得1個生成器對象
for item in gen:     #循環獲取
    print(item) 
hscan_iter() 方法

 

 

三、List操做:

redis中的List在在內存中按照一個name對應一個List來存儲。如圖:

 

lpushx(name,value)和rpushx(name,value)列表追加操做方法:

找到name對應的list,

從list的左邊(頭)開始添加,最後1個參數放在最左邊;(lpush())

從list的最右邊(尾)開始添加,最後1個參數放在最右邊(rpush());

#lpush(name,values) 和name,values)
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.lpush('mylist6','.. ','Hello')  #從list的左邊(頭)開始添加,最後的參數放在最左邊;
conn.rpush('mylist6','zhang','gen')  #從list的最右邊(尾)開始添加,最後到參數放在最右邊;
print(conn.lrange('mylist6',0,-1))     #從左到右獲取  [b'Hello', b'.. ', b'zhang', b'gen']
lpush和rpush列表追加操做

 

lpushx(name,value) ,lpushx(name,value)  方法:

在name已經存在的狀況下,

從list的左邊(頭)開始添加,最後1個參數放在最左邊;

從list的最右邊(尾)開始添加,最後1個參數放在最右邊;

#lpushx 和 rpushx
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.lpushx('mylist6','Left ')  #在name已經存在的狀況下,從list的左邊(頭)開始添加,最後1個參數放在最左邊;
conn.rpushx('mylist6','Right')  #在name已經存在的狀況下, 從list的最右邊(尾)開始添加,最後1個參數放在最右邊;
print(conn.lrange('mylist6',0,-1))
lpushx 和 rpushx

 

llen(name)方法:

獲取name對應list的列表長度

#llen(name)

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
print(conn.llen('mylist6'))  #獲取 mylist6對應 list的長度
llen(name)

 

linsert(name, where, refvalue, value))

在 Xname對應的list元素中,值爲X的 左/右邊插入 'x'

參數:

name,redis的name
where,BEFORE(左)或AFTER(右)
refvalue,標杆值,即:在它先後插入數據
value,要插入的數據
#linsert(name, where, refvalue, value))

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.linsert('mylist6','AFTER',44,'88') #在mylist6的元素 44右邊插入 '88'

print(conn.lrange('mylist6',0,-1))
linsert(name, where, refvalue, value))

 

 

r.lset(name, index, value)方法:

對name對應的list中某個索引位置,插入value;

參數:

   name,redis的name
   index,list的索引位置
   value,要設置的值
# r.lset(name, index, value)
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.lset('mylist6',0,'frist')#在name對應的列表中的某個索引位置,插入值;
print(conn.lrange('mylist6',0,-1)) 
r.lset(name, index, value)

 

 

r.lrem(name, value, num)方法:

在name對應的list中刪除指定的值,

參數:

  name,redis的name
 value,要刪除的值
 num,  num=0,刪除列表中全部的指定值;
  num=2,從前到後,刪除2個;
  num=-2,從後向前,刪除2個
# r.lset(name, index, value)
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.lrem('mylist6','Left',99)  #刪除 mylist6對應列表中,Left元素,若是有99個重複的刪除99個;
print(conn.lrange('mylist6',0,-1))
r.lset(name, index, value)

 

 

lpop(name)和rpop(name)方法:

pop最左/最右側的值

# lpop(name)和rpop(name)方法
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.lpop('mylist6')  #彈出mylist6對應的list中最左側的元素
conn.rpop('mylist6')  #彈出mylist6對應的list中最右側的元素
print(conn.lrange('mylist6',0,-1))
pop最左/最右

 

ltrim(name, start, end)方法:

刪除name對應list中指定範圍的元素,從索引1開始;

#ltrim(name, start, end)方法
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.lpush('mylist8', 'alex','egon','eric')  #存儲順序 4,3,2,1,0
conn.ltrim('mylist8',1,2) #從索引1開始,不是從0
print(conn.lrange('mylist8',0,-1))
ltrim(name, start, end)

 

lindex(name, index)方法:

按索引獲取name對應list中元素;

# lindex(name, index)方法
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.lpush('mylist6', 0,1,2,3,4)  #存儲順序 4,3,2,1,0
print(conn.lindex('mylist6',1)) #獲取mylist6對應的list中 索引爲1的value;
按索引取值

 

lrange(name, start, end)方法:

在name對應的list作切邊操做

# lrange(name, start, end)方法
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.lpush('mylist6', 0,1,2,3,4)  #存儲順序 4,3,2,1,0
print(conn.lrange('mylist6',0,3)) # [b'4', b'3', b'2', b'1']
lrange(name, start, end)

 

rpoplpush(src, dst)方法:

從一個列表取出最右邊的元素,同時將其添加至另外一個列表的最左邊;

參數:
    src,要取數據的列表的name
    dst,要添加數據的列表的name
#rpoplpush(src, dst)
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
# #
conn.lpush('List1','5','4','3','2','1','0')
conn.lpush('List2','order')

conn.rpoplpush('List1','List2')
print(conn.lrange('List1',0,-1)) #從一個列表取出最右邊的元素,同時將其添加至另外一個列表的最左邊
print(conn.lrange('List2',0,-1)) #[b'2', b'order', b'3', b'order', b'4', b'order', b'5', b'order']
rpoplpush(src, dst)

 

brpoplpush(src, dst, timeout=0)方法:

 

從一個列表的右側移除一個元素並將其添加到另外一個列表的左側

  參數:
     src,取出並要移除元素的列表對應的name
     dst,要插入元素的列表對應的name
     timeout,當src對應的列表中沒有數據時,阻塞等待其有數據的超時時間(秒),0 表示永遠阻塞

 

 

blpop(keys, timeout)和brpop(keys, timeout)方法:

將多個列表排列,按照從左到右去pop對應列表的元素;

參數:

    keys,redis的name的集合
    timeout,超時時間,當元素全部列表的元素獲取完以後,阻塞等待列表內有數據的時間(秒), 0 表示永遠阻塞
 
更多:
    r.brpop(keys, timeout),從右向左獲取數據
#blpop(keys, timeout),brpop
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.lpush('List1','5','4','3','2','1','0')
print(conn.blpop('List1',1)) #將多個列表排列,按照從左到右去pop對應列表的元素
print(conn.brpop('List1',1)) #將多個列表排列,按照從右到左去pop對應列表的元素
print(conn.lrange('List1',0,-1))
blpop和brpop方法

 

自定義list迭代器獲取

若是redis的list裏面存有大數據,一次性獲取會把內存稱霸,可是list數據類型,沒有想hash那樣迭代器的方法,全部須要本身實現;

# 因爲redis類庫中沒有提供對列表元素的增量迭代,若是想要循環name對應的列表的全部元素,那麼就須要:
    # 一、獲取name對應的全部列表
    # 二、循環列表
# 可是,若是列表很是大,那麼就有可能在第一步時就將程序的內容撐爆,全部有必要自定義一個增量迭代的功能:
 
def list_iter(name):
    """
    自定義redis列表增量迭代
    :param name: redis中的name,即:迭代name對應的列表
    :return: yield 返回 列表元素
    """
    list_count = r.llen(name)
    for index in xrange(list_count):
        yield r.lindex(name, index)
 
# 使用
for item in list_iter('pp'):
    print item
Python2 list迭代器
#blpop(keys, timeout),brpop
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.lpush('List1','5','4','3','2','1','0')
def list_iter(name):
    list_count = conn.llen(name) 
    for index in range (list_count):
        yield conn.lindex(name, index)

for item in list_iter('List1'):
    print(item)
python3 list迭代器

 

 

 四、Set操做

(Set集合就是不容許重複的列表)

 

sadd(name,values)方法:

添加name對應的集合中成員

 

sismember(name, value)

 獲取name對應的集合的全部成員  **經常使用

 

scard(name)方法:

獲取name對應集合中的成員個數;

#添加集合成員
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.sadd( 'set1',1,2,3,4,4,4)  # 在name對應的set1集合中,添加的成員1,2,3,4;注意成員是不會重複的
print(conn.smembers('set1'))  #查看name對應的全部集合成員;{b'2', b'1', b'3', b'4
print(conn.scard('set1'))     #查看 set的成員個數, 4
添加集合成員並獲取成員個數

 

sdiff(keys, *args) 方法:差集

在第1個name對應的集合,和且其餘name對應的集合的差集

#sdiff(keys, *args)
# 求差集
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.sadd( 'set1',1,2,3,4,4,4,)  # 在name對應的set1集合中,添加的成員1,2,3,4;注意成員是不會重複的
conn.sadd('set2',2,6,7)
conn.sadd('set2',2,9,'A')
print(conn.sdiff('set1','set2','set3')) # 求在se1和其餘集合的差集 {b'1', b'3', b'4'}
求差集

 

sdiffstore(dest, keys, *args)方法:

求差集 添加到新的集合

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.sadd( 'set1',1,2,3,4,4,4,)  # 在name對應的set1集合中,添加的成員1,2,3,4;注意成員是不會重複的
conn.sadd('set2',2,6,7)
conn.sadd('set2',2,9,'A')

conn.sdiffstore('set666','set1','set2','set3') #求set1和其餘集合的差集,添加到set666成員中
print(conn.smembers('set666')) #{b'3', b'1', b'4'}
求差集添加至新集合

 

 

 sinter(set1, set2,set3...)方法:

 求set1和其餘集合的並集

#求並集
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.sadd( 'set_name1',1,2,3,4,4,4,)  # 在name對應的set1集合中,添加的成員1,2,3,4;注意成員是不會重複的
conn.sadd('set_name2',2,6,7)
conn.sadd('set_name3',2,9,'A')

print(conn.sinter('set_name1','set_name2','set_name3')) #求set_name1 和其餘集合的並集;
求並集

 

sinterstore(dest, set1, set2,set3...)方法:

求set1和其餘集合的並集,添加到dest集合

#sinterstore(dest, set1, set2,set3...)方法:
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.sadd( 'set1',1,2,3,4,4,4,)  # 在name對應的set1集合中,添加的成員1,2,3,4;注意成員是不會重複的
conn.sadd('set2',2,6,7)
conn.sadd('set3',2,9,'A')
conn.sinterstore( 'set999','set1','set2','set3') #求set1和其餘集合的並集,添加到set999成員中
print(conn.smembers('set999'))
求並集添加到其餘集合

 

smove(src, dst, value)方法:

將某個成員從一個集合中移動到另一個集合

#smove(src, dst, value)方法:
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.smove('set1','set2','1') #把set1集合中的1,移除到set2
print(conn.smembers('set2'))
將某個成員從一個集合中移動到另一個集合

 

spop(name)方法:

從集合的右側(尾部)移除一個成員,並將其返回

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
print(conn.smembers('set2'))
print(conn.spop('set2'))  #把 set2集合的最後1個成員,移除並返回!
移除集合的最後1名成員並返回

 

srandmember(name, numbers)方法:

從name對應集合中獲取numbers個成員,並返回;注意:不刪除

#srandmember(name, numbers)
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
print(conn.smembers('set3'))
print(conn.srandmember('set3',2)) #從set集合中獲取2個成員,並返回;注意不刪除
獲取N個成員並返回

 

 

srem(name, values)方法:

從name對應集合中刪除某個值

#srem(name, values)

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
print(conn.smembers('set3'))
conn.srem('set3',2)    #從set集合中刪除 2
print(conn.smembers('set3'))
刪除集合中某個成員

 

sunion(keys, *args)

獲取多個name對應的集合的的並集   注意:sinter 和sdiff方法是求第1個集合和其餘集合;

#srem(name, values)

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.sadd('set1',1,2,3)
conn.sadd('set2',2,3,4)
conn.sadd('set3',2,5,6)
print(conn.smembers('set1'))
print(conn.smembers('set2'))
print(conn.smembers('set3'))
print(conn.sunion('set1','set2','set3')) #求全部集合的並集


'''

{b'2', b'4', b'1', b'3', b'A'}
{b'2', b'9', b'5', b'A', b'6'}
{b'2', b'1', b'4', b'3', b'9', b'5', b'A', b'6'}

'''
求多個集合的並集

 

sunionstore(dest,keys, *args)方法:

求獲取多個name對應的集合的的並集 ,而後保存到dest集合中;

#sunionstore(dest,keys, *args)
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.sadd('set1',1,2,3)
conn.sadd('set2',2,3,4)
conn.sadd('set3',2,5,6)
print(conn.smembers('set1'))
print(conn.smembers('set2'))
print(conn.smembers('set3'))
conn.sunionstore ('set555' ,'set1','set2','set3') #求全部集合的並集,並將並集所有保存到 set555

print(conn.smembers('set555'))

'''

{b'2', b'3', b'1'}
{b'2', b'1', b'4', b'3', b'A'}
{b'5', b'2', b'6', b'9', b'A'}
{b'5', b'2', b'1', b'6', b'4', b'3', b'9', b'A'}

'''
求多集合並集並保存

 

 

sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)

迭代器獲取集合成員,避免內存消耗太大;

# sscan_iter/sscan
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)

print(conn.sscan('set2',cursor=0,match=None,count=2))
print(conn.sscan_iter('set2',match=None,count=2)) #返回迭代器
迭代器獲取集合成員

 

 

有序集合

有序集合,在集合的基礎上,爲每元素排序;元素的排序須要根據另一個值來進行比較,因此,對於有序集合,每個元素有兩個值,即:值和分數,分數專門用來作排序。

 

zadd(name, *args, **kwargs)方法:

在name對應的有序集合中添加元素

如:
     # zadd('zz', 'n1', 1, 'n2', 2)
     #
     # zadd('zz', n1=11, n2=22)

 

zcard(name)方法:

獲取有序集合的成員個數

 print(conn.zcard('score')) #獲取有序集合的成員個數

 

zcount(name, min, max)方法:

獲取name對應的有序集合中分數 在 [min,max] 之間的個數;例如:考了60-100分的人個數;

# print(conn.zcount('score',60,100)) #獲取name對應的有序集合中分數 在 [min,max] 之間的個數,例如:考了60-100分的人個數

 

zincrby(name, value, amount)方法:

自增name對應的有序集合中,成員對應的分數;

print(conn.zincrby('score','A',amount=100)) #自增name對應的有序集合的 name 對應的分數,默認分數自增1

 

zrank(name, value)方法:

查看有序集合中的排行,zrevrank(name, value),從大到小排行

print(conn.zrank('score','A')) 

 

 

 

r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)方法:

按照索引範圍獲取name對應的有序集合的元素;

# print(conn.zrange('score',0,0,desc=True,withscores=True,score_cast_func=float))
'''
 按照索引範圍獲取name對應的有序集合的元素 
 desc=True,按照降序排列,默認是升序
 withscores=True 獲取分值
 score_cast_func  指定獲取分值的類型int/float
 '''


# 參數:
    # name,redis的name
    # start,有序集合索引發始位置(非分數)
    # end,有序集合索引結束位置(非分數)
    # desc,排序規則,默認按照分數從小到大排序
    # withscores,是否獲取元素的分數,默認只獲取元素的值
    # score_cast_func,對分數進行數據轉換的函數
 
# 更多:
    # 從大到小排序
    # zrevrange(name, start, end, withscores=False, score_cast_func=float)
 
    # 按照分數範圍獲取name對應的有序集合的元素
    # zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float)
    # 從大到小排序
    # zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)
View Code

 

 

zrangebylex(name, min, max, start=None, num=None)方法:

當有序集合的全部成員都具備相同的分值時,有序集合的元素會根據成員的   值 (lexicographical ordering)來進行排序

而這個命令則能夠返回給定的有序集合鍵 key 中, 元素的值介於 min 和 max 之間的成員;(區間)

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.zadd('myzset',aa=0,ba=1,ca=2,da=3,ea=4,fa=5,ga=6)
print(conn.zrangebylex('myzset',min='-',max='[ca',start=0,num=2)) #[b'aa', b'ba']

'''
min:+ 表示正無限; - 表示負無限; ( 表示開區間; [ 則表示閉區間

max:+ 表示正無限; - 表示負無限; ( 表示開區間; [ 則表示閉區間

start,對結果進行 二次分片處理,索引位置

num,對結果進行二次分片處理,索引後面的num個元素

'''
View Code

 

 

 zrem(name, values)方法:

刪除name對應有序集合中的成員;

#刪除name對應的有序集合中值是values的成員
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
print(conn.zrange('score',0,-1,))
print(conn.zrem('score','B','C')) #刪除score有序集合中的 'B','C'成員
print(conn.zrange('score',0,-1,))
刪除有序集合中某些成員

 

 

zremrangebyrank(name, min, max)方法:

根據排行 範圍刪除

# zremrangebyrank(name, min, max)
#根據排行 範圍刪除
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
print(conn.zrange('myzset',0,-1))
conn.zremrangebyrank('myzset',0,2) # 根據排行 範圍刪除
print(conn.zrange('myzset',0,-1))
根據排行範圍進行刪除

 

zremrangebyscore(name, min, max)方法:

根據分數範圍刪除

conn.zremrangebyscore('myzset',2,3) #根據分數範圍進行刪除

 

 

zremrangebylex(name, min, max)方法:

根據zrangebylex進行查詢,把返回的結果remove;

conn.zremrangebylex('myzset', '-','+') #根據zrangebylex查詢,並刪除根據返回得結果;
zrangebylex查詢並刪除

 

zscore(name, value)方法:

獲取有序集合中某成員的分值

print(conn.zrange('score',0,-1,withscores=True))
print(conn.zscore('score','A')) #獲取 score對應集合中成員 A對應的分數 執行結果1.0
print(conn.zrange('score',0,-1,withscores=True))#[(b'A', 1.0)]

執行結果:

[(b'A', 1.0)]
1.0
[(b'A', 1.0)]

 

 

zinterstore(dest, keys, aggregate=None)方法:

獲取兩個有序集合的交集,若是遇到相同值不一樣分數,對分數進行 aggregate操做獲取。

aggregate的值爲:  SUM  MIN  MAX

#zinterstore(dest, keys, aggregate=None)方法:
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.zadd('myset',A=1,B=2,C=3,D=4)
conn.zadd('myset1',A=1,B=2,C=3,D=111)
conn.zinterstore('myset2',('myset','myset1'),aggregate='MIN') #MIN保留分值小的
print(conn.zrange('myset2',0,-1))
print(conn.zscore('myset2','D')) #4.0
求交集並保存

 

 

zunionstore(dest, keys, aggregate=None)方法:

獲取兩個有序集合的並集,若是遇到相同值不一樣分數,則按照aggregate進行操做

 aggregate的值爲:  SUM  MIN  MAX  

#zunionstore(dest, keys, aggregate=None)
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
conn.zadd('myset',A=1,B=2,C=3,D=4)
conn.zadd('myset1',A=1,B=2,C=3,D=111)
conn.zunionstore('myset3',('myset','myset1','myset2'),aggregate='MAX') #MIN保留分值小的
print(conn.zscore('myset3','D'))
求並集並保存

 

有序集合的迭代器操做

zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)

 

 

5 、redis全局操做

以上是針對不一樣數據類型的操做,如下是全局操做;

 

keys(pattern='*')方法:

獲取redis數據庫中全部key,支持正則匹配;

  更多:
    KEYS * 匹配數據庫中全部 key 。
    KEYS h?llo 匹配 hello , hallo 和 hxllo 等。
    KEYS h*llo 匹配 hllo 和 heeeeello 等。
    KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo
print(conn.keys(pattern='*')) #獲取redis數據庫中全部key

 

delete('set_name1')方法:

刪除name爲set_name1的key

conn.delete('set_name1') #刪除name爲set_name1的key

 

exists(name)方法:

檢查name是否存在

print(conn.exists('name4')) #檢查數據庫中是否存在name4 key,存在返回True,不然False

 

expire(name ,time)方法:

爲某個redis的某個name設置超時時間

conn.expire( 'name4',4)  # 爲某個redis的某個name設置超時時間

 

rename(src, dst)方法:

對redis的name進行重命名

conn.rename('List6666','List6666wwwwwwwwwwwwwwwwwwww') #對redis的name重命名爲

 

move(name, db)方法:

redis爲了合理使用內存空間,啓動redis以後默認會建立16個內存空間大小不一樣的db,以保障對數據的分門別類存放,move就是 把name切換到其餘db中;

conn.move('mylist',9) #9表明第9個數據庫

 

 

randomkey()方法:

隨機獲取一個redis的name(不刪除) 

print(conn.randomkey())

 

type(name)方法:

查看name對應的數據類型

randomkey=conn.randomkey()

print(conn.type(randomkey))

 

 

迭代器方式獲取name

scan(cursor=0, match=None, count=None)
scan_iter(match=None, count=None)

 同字符串操做,用於增量迭代獲取key

 

 

六、redis事物操做

 

redis-py默認在執行每次請求都會建立(鏈接池申請鏈接)和斷開(歸還鏈接池)一次鏈接操做,若是想要在一次請求中指定多個命令,則可使用pipline實現一次請求指定多個命令,而且默認狀況下一次pipline 是原子性操做。

# !/usr/bin/env python
# -*- coding:utf-8 -*-

import redis

pool = redis.ConnectionPool(host='192.168.226.128', port=6379)

r = redis.Redis(connection_pool=pool)

# pipe = r.pipeline(transaction=False)
pipe = r.pipeline(transaction=True) #transaction=True鏈接的時候支持事物

pipe.set('name', 'alex')
pipe.set('role', 'sb')

pipe.execute()
redis事物操做

 

七、redis 發佈訂閱

 

redis也有消息隊列的功能並支持發佈和訂閱,可是對發佈消息的過濾功能不如RabbitMQ;

 

import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)
#建立1個頻道
conn.publish('fm103.9','alexSB')
生產者
import redis
#建立1個鏈接池
conn_pool=redis.ConnectionPool(host='192.168.226.128',port=6379)
#從鏈接池中獲取1個鏈接
conn=redis.Redis(connection_pool=conn_pool)

#開始訂閱
pb=conn.pubsub()
#訂閱fm103.9頻道
pb.subscribe('fm103.9')
#開始循環接收來自 fm103.頻道的消息
while True:
    msg=pb.parse_response()
    print(msg)
消費者

 

 

                                                                      Celery分佈式任務隊列

 

前言

Celery 是一個 基於python開發的分佈式異步消息任務隊列,經過它能夠輕鬆的實現任務的異步處理,其配合redis/RabbitMQ消息隊列,讓生產者發任務,消費者處理任務的操做變得更加簡單,經過它能夠輕鬆的實現任務的異步處理。

異步處理:

客戶端發送請求到server端,客戶端無需等待server端處理完畢以後把結果響應客戶端,而是當即響應1個惟一的任務ID;

客戶端拿着任務ID,再去請求server端查詢任務執行結果;

 

Celery 在執行任務時須要經過一個消息中間件來接收和發送任務消息,以及存儲任務結果, 默認使用rabbitMQ還可使用Redis;

 

舉幾個實例場景中可用的例子:

  1. 你想對100臺機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等着結果返回,而是給你返回 一個任務ID,你過一段時間只須要拿着這個任務id就能夠拿到任務執行結果, 在任務執行ing進行時,你能夠繼續作其它的事情。 
  2. 你想作一個定時任務,好比天天檢測一下大家全部客戶的資料,若是發現今天 是客戶的生日,就給他發個短信祝福

   3.  例如你再作1個報警推送模塊,該模塊接收到 報警信息以後,無需響應執行結果,而是再次轉發給tembition平臺;

 

 

celery的優勢:

  1. 簡單:一單熟悉了celery的工做流程後,配置和使用仍是比較簡單的
  2. 高可用:當任務執行失敗或執行過程當中發生鏈接中斷,celery 會自動嘗試從新執行任務
  3. 快速:一個單進程的celery每分鐘可處理上百萬個任務
  4. 靈活:因爲celery是分佈式的消息隊列,全部幾乎celery的各個組件均可以被橫向擴展及自定製

 

Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。

 

 

 

 

 

1、celery的安裝和簡單使用

 

1.安裝celery(注意目前只能在Linux平臺使用)

root@localhost zhanggen]# pip install -i https://pypi.douban.com/simple celery  #使用國內豆瓣源

測試是否安裝成功?

>>> from celery import Celery

 

2.啓動redis消息中間件

[root@localhost celery_study]# pip install -i https://pypi.douban.com/simple redis #安裝Python鏈接redis數據庫的模塊
[root@localhost zhanggen]# systemctl restart redis.service  #啓動redis數據庫

 

 

3.建立任務列表並啓動任務  (ceclery)

# !/usr/bin/env python
# -*- coding:utf-8 -*-

from celery import Celery
app=Celery('tasks',broker='redis://localhost',backend='redis://localhost')  

'''
實例化1個celery的app對象,能夠調用對象封裝的API生產任務,也能夠添加任務;
broker:提交執行任務的消息隊列
backend:存儲任務的執行結果
'''
@app.task #添加裝飾器就等於註冊 1個celery任務 def add(x,y): print("running...",x,y) return x+y #注意return的結果必須是可json的,由於celery要轉成json數據存儲到消息中間件

 

在任務列表中添加任務

# !/usr/bin/env python
# -*- coding:utf-8 -*-

from celery import Celery
import subprocess
import time


app=Celery('tasks',broker='redis://localhost',backend='redis://localhost')

@app.task
def add(x,y):
    print("running...",x,y)
    return x+y



@app.task
def run_cmd(cmd):
    print('run %s'%(cmd))
    time.sleep(5)
    cmd_obj=subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    return cmd_obj.stdout.read().decode('utf-8')
添加1個任務

 

 

四、啓動1個celery work(任務消費者)

 

 

重啓任務注意:celery只能識別新增的代碼,不能識別修改以後的代碼進行reload從新加載,因此任務修改以後要重啓worker;


cd 至項目所在目錄
celery -A task worker --loglevel=info #celery -A 任務列表文件 worker --loglevel=info

 

 

5.生產任務(任務生產者)

 

 

只要當前目錄能導入celery任務.py文件這個模塊,就能夠向celery提交你的任務了;

[zhanggen@localhost ~]$ cd celery_study/
[zhanggen@localhost celery_study]$ ls
task.py  task.pyc
[zhanggen@localhost celery_study]$ python
Python 2.7.5 (default, Nov  6 2016, 00:28:07) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-11)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from task import add                            #導入任務列表 >>> add.delay(6,8)                                 #調用任務 注意必須使用delay方法才能調用task執行任務 <AsyncResult: b1a631d4-214f-4c86-9997-43ccbfa30b00> #異步返回任務ID 注意:這裏就體現出celery的異步了,不用等結果返回任務ID,而是拿着任務ID去get結果;

 >>> r=add.delay(6,8) #提交任務 
 >>> r.get()          #獲取任務執行結果
 14
 >>>

 

在redis消息隊列查看任務狀態

[root@localhost zhanggen]# redis-cli 
127.0.0.1:6379> keys *
 1) "mylist22"
 2) "k2"
 3) "SessionID"
 4) "celery-task-meta-b1a631d4-214f-4c86-9997-43ccbfa30b00"

127.0.0.1:6379> get "celery-task-meta-b1a631d4-214f-4c86-9997-43ccbfa30b00"

"{\"status\": \"SUCCESS\", \"traceback\": null, \"result\": 14, \"task_id\": \"b1a631d4-214f-4c86-9997-43ccbfa30b00\", \"children\": []}" 127.0.0.1:6379>

'''
任務執行信息(json數據),驗證celecy的任務和任務執行結果都存儲在redis消息中間件
status:SUCCESS 執行狀態
traceback:null 執行過程當中出現的錯誤信息
result:14      該任務的執行結果
task_id:b1a631d4-214f-4c86-9997-43ccbfa30b00 該任務的任務ID
children:[]     該任務包含的子任務
'''

 

6.celery經常使用API

>>> r=task.run_cmd.delay('df -h')
>>> r.ready() #查看任務是否執行完畢
True
>>> 

 

>>> r=task.run_cmd.delay('df -h')
>>> r.get(timeout=1)                 #指定獲取任務結果時等待得時間
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python2.7/site-packages/celery/result.py", line 194, in get
    on_message=on_message,
  File "/usr/lib/python2.7/site-packages/celery/backends/async.py", line 189, in wait_for_pending
    for _ in self._wait_for_pending(result, **kwargs):
  File "/usr/lib/python2.7/site-packages/celery/backends/async.py", line 260, in _wait_for_pending
    raise TimeoutError('The operation timed out.')
celery.exceptions.TimeoutError: The operation timed out.

 

 

>>> r=task.run_cmd.delay('df -h')
>>> r.get(propagate=False) #若是獲取結果時出現錯誤,不會致使程序因異常退出

 

r.traceback  #鎖定保存代碼的位置

 

 

 

 

7.退出任務

worker: Warm shutdown (MainProcess) :保存任務而後退出;
worker: Cold shutdown (MainProcess)  :按2次Ctrl+c 強制退出,重啓以後從新執行任務;

 

 

 

 2、在項目中使用celery

 在實際生產環境能夠把celery配置成1個應用,(celery配置和celery任務列表的分離;)

MyProject
    __init__.py 
    celery.py #注意必須名爲celery.py 
    MyTasks.py #任務列表
from __future__ import  absolute_import,unicode_literals
'''
爲了適應Python 3.x的新的字符串的表示方法,在2.7版本的代碼中,能夠經過unicode_literals來使用Python 3.x的新的語法
在 3.0 之前的舊版本中啓用相對導入等特性所必須的 future 語句。
'''
from celery import Celery

app=Celery(
    'MyProject',
    broker='redis://localhost', # 設置提交任務的消息隊列
    backend='redis://localhost', #設置存儲任務消息隊列
    include=['MyProject.MyTasks']#設置任務列表所在位置
          )


app.conf.update(
    result_expires=3600, #設置執行結果的保質期,1小時以後沒來獲取刪除
)

if __name__ == '__main__':
    app.start()
celery.py
from __future__ import absolute_import, unicode_literals
from . celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)
MyTasks.py

 

 啓動應用:

 celery -A  MyProject worker  -l info

 

 調用任務:

>>> r=MyTasks.add.delay(1,1)
>>> r.get()
2

 

後臺啓動多個worker

經過shell終端啓動的worker,只能啓動單個worker,並且在終端退出後woker也會終止,因此須要celery multi管理多個worker在後臺啓動;

a.後臺啓動

celery multi start w8 -A cmdb_rbac_arya -l info -f nohup.out -c1

   
-f: 指定worker日誌的存儲路徑
-c :指定worker併發的數量
celery multi start w8 -A cmdb_rbac_arya -l info -f nohup.out -c1

 b.後臺重啓

celery multi restart w8 -A cmdb_rbac_arya -l info -f nohup.out -c1
celery multi restart w8 -A cmdb_rbac_arya -l info -f nohup.out -c1

 c.中止

celery multi stop w8 -A cmdb_rbac_arya -l info -f nohup.out -c1
celery multi stop w8 -A cmdb_rbac_arya -l info -f nohup.out -c1

d.中止全部tasks

ps -ef | grep celery | grep -v grep | awk '{print "kill -9 "$2}'|sh
ps -ef | grep celery | grep -v grep | awk '{print "kill -9 "$2}'|sh

 

PS:

有一次我啓動celery提示我裝 各類項目已經安裝的模塊,爲何呢?

後來發現二逼運維給我裝了 2個 3.6.1版本的python,全部我告訴了celery

#!/usr/local/python3/bin/python3
# -*- coding: utf-8 -*-
import re
import sys

from celery.__main__ import main

if __name__ == '__main__':
    sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0])
    sys.exit(main())
~                      
vim /usr/local/bin/celery

 

 

3、定時任務

celery支持定時任務,設定好任務的執行時間,celery就會定時自動幫你執行, 這個定時任務模塊叫celery beat,相似於Linux的crontab服務;

 

# !/usr/bin/env python
# -*- coding:utf-8 -*-

  from celery import Celery

from celery.schedules import crontab #導入crontab模塊支持

app=Celery(
    'periodic_task',
    broker='redis://localhost', # 設置提交任務的消息隊列
    backend='redis://localhost', #設置存儲任務消息隊列
          )

@app.on_after_configure.connect 
def setup_periodic_tasks(sender, **kwargs): #設置定時任務列表,sender固定參數()
    #每隔5秒執行1次 test任務
    sender.add_periodic_task(5.0, test.s('hello'), name='add every 10')
  
    #每隔10秒執行1次
    sender.add_periodic_task(10.0, test.s('world'))

    #每週六 間隔1分鐘執行1次
    sender.add_periodic_task(
       crontab(hour="*", minute="*", day_of_week='sat'),
       test.s('Happy Saturday!'),
     )




@app.task
def test(arg): print('run_test',arg)

 

 生產者(啓動以後自動經過beat配置任務計劃、檢查任務計劃併發送任務)

celery -A periodic_task beat -l debug

消費者(執行任務)

celery -A periodic_task worker -l debug

 

 

 4、Celery結合Django

Django默認是一個阻塞式IO的web框架,沒有Tornado的異步非阻塞功能,但只須要簡單配置 就能夠和Celery分佈式隊列結合,完成web應用的異步功能;

 

 

1.Django配置文件

# 設置celery提交任務的消息隊列
CELERY_BROKER_URL='redis://localhost'
CELERY_RESULT_BACKEND='redis://localhost'
setings.py

 

2.celery配置文件

# !/usr/bin/env python
# -*- coding:utf-8 -*-


# Create your tasks here

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
#設置Django的默認配置文件
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_test.settings')

app = Celery('celery_test')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.

#配置celery到配置文件尋找鏈接中間件的 key名稱 CELERY
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
#配置去Django的全部app裏面尋找celery任務
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))
celery.py

 

3.Django項目的__init__.py

# !/usr/bin/env python
# -*- coding:utf-8 -*-

from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app        #Django啓動時自動導入celery

__all__ = ['celery_app']
project/ __init__.py

 

4.在app下建立任務列表

# !/usr/bin/env python
# -*- coding:utf-8 -*-
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time

# Create your tasks here

@shared_task     #支持各個app導入任務
def add(x, y):
    time.sleep(10) #設置任務異常執行
    return x + y


@shared_task
def mul(x, y):
    time.sleep(10)
    return x * y


@shared_task
def xsum(numbers):
    time.sleep(10)
    return sum(numbers)
tasks.py

 

5.Django視圖調用任務

from django.shortcuts import render,HttpResponse
from app01 import tasks #導入任務列表
def index(request):
    res=tasks.add.delay(444,987) #調用任務列表中的 add任務
    return HttpResponse(res.task_id)
app01/views.py

 

6.啓動Django項目(生產者)

  cd celery_test                                                              #進入項目目錄

[root@localhost celery_test]# python manage.py runserver 0.0.0.0:8001       #注意關閉防火牆:systemctl stop firewalld.service 

 

7.啓動worker(消費者)

cd celery_test                                      #進入項目目錄
celery -A celery_test worker -l info #若是Django項目已經配置了和celery的結合直接啓動Django項目就是worker

 

8.經過返回的任務ID獲取任務執行結果API

 
 
# !/usr/bin/env python
# -*- coding:utf-8 -*-
from app01 import views
urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^index/', views.index),      #任務執行API
    url(r'^result/', views.get_result),#經過任務ID獲取結果API

]


'''

http://192.168.226.128:8001/index/     #提交任務

7d74fa4a-0083-49f2-9141-28e6cc7dc5eb   #獲得任ID 


http://192.168.226.128:8001/result/?taskid=7d74fa4a-0083-49f2-9141-28e6cc7dc5eb #經過任務ID獲取任務執行狀態/結果

PENDING/SUCCESS/執行結果
'''

 

def get_result(request):
    task_id=request.GET.get('taskid')
    from celery.result import AsyncResult #導入結果獲取模塊
    res=AsyncResult(id=task_id)             #獲取任務id:AsyncResult(id=task_id)
    if res.status == 'SUCCESS':           #獲取任務狀態:res.status
        msg=res.get()                        #獲取任務執行結果:res.get()
    msg=res.status
    return HttpResponse(msg)

 

5、在django+celery任務計劃

celery有2大功能 分佈式消息隊列+beat模塊的任務計劃,若是Celery和Django結合也可使用django-celery-beat模塊支持任務計劃功能;

此時celery的任務計劃會保存在Django後臺的數據中,全部經過編輯後臺數據的表就能夠編輯任務計劃;(注意Django只負責把任務計劃信息保存在後臺數據庫,不負責生產任務和消費)

 

 

 

 

 

1.yum安裝python-devel(centos7)

yum install python-devel #注意若是一直在嘗試同一個鏡像就把那個鏡像從/etc/yum.repos.d/目錄下移除;

 

 2.pip安裝django-celery-beat  

django-celery-beat  依賴python.h庫,因此在centos7先安裝python-devel;

pip install -i https://pypi.douban.com/simple django-celery-beat # 支持國產,請使用國內豆瓣源!

 

3.Django配置文件中註冊'django_celery_beat'

INSTALLED_APPS =[

'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'app01.apps.App01Config',
'django_celery_beat',

]

 

 

4.把任務計劃信息遷移到Django的後臺數據庫

若是celery和Django結合它的計劃任務是保存在數據庫表中的;

[root@localhost celery_test]# python manage.py migrate
Operations to perform:
  Apply all migrations: admin, auth, contenttypes, django_celery_beat, sessions
Running migrations:
  Applying contenttypes.0001_initial... OK
  Applying auth.0001_initial... OK
  Applying admin.0001_initial... OK
  Applying admin.0002_logentry_remove_auto_add... OK
  Applying contenttypes.0002_remove_content_type_name... OK
  Applying auth.0002_alter_permission_name_max_length... OK
  Applying auth.0003_alter_user_email_max_length... OK
  Applying auth.0004_alter_user_username_opts... OK
  Applying auth.0005_alter_user_last_login_null... OK
  Applying auth.0006_require_contenttypes_0002... OK
  Applying auth.0007_alter_validators_add_error_messages... OK
  Applying auth.0008_alter_user_username_max_length... OK
 Applying django_celery_beat.0001_initial... OK Applying django_celery_beat.0002_auto_20161118_0346... OK Applying django_celery_beat.0003_auto_20161209_0049... OK Applying django_celery_beat.0004_auto_20170221_0000... OK
  Applying sessions.0001_initial... OK

 

5.編輯任務計劃表

因爲Celery和Django結合以後計劃任務會報存到Django後臺,因此對錶的操做就是對celery任務計劃的編輯;

 

a.建立超級用戶

[root@localhost celery_test]# python manage.py createsuperuser 
Username (leave blank to use 'zhanggen'): zhanggen
Email address: 
Password: 
Password (again): 
Superuser created successfully. 
[root@localhost celery_test]# 

 

b.經過Django-admin編輯任務計劃

http://192.168.226.128:8001/admin/ #輸入上邊建立的用戶名密碼,就能夠對如下3張表作編輯操做了!

 

 

 c.添加任務計劃

 

 

 

 

 

 

 

 

 

d.啓動beat和worker

開啓celery beat(生產者)

[root@localhost celery_test]# celery -A celery_test beat -l info  -S django
celery beat v4.1.0 (latentcall) is starting.
__    -    ... __   -        _
LocalTime -> 2018-01-08 11:11:13
Configuration ->
    . broker -> redis://localhost:6379//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> django_celery_beat.schedulers.DatabaseScheduler

    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 seconds (5s)
[2018-01-08 11:11:13,051: INFO/MainProcess] beat: Starting...
[2018-01-08 11:11:13,051: INFO/MainProcess] Writing entries...
[2018-01-08 11:11:13,136: INFO/MainProcess] Scheduler: Sending due task 每10秒中執行1次 (app01.tasks.add)
[2018-01-08 11:11:13,154: INFO/MainProcess] Writing entries...
[2018-01-08 11:11:24,128: INFO/MainProcess] Scheduler: Sending due task 每10秒中執行1次 (app01.tasks.add)
[2018-01-08 11:11:35,130: INFO/MainProcess] Scheduler: Sending due task 每10秒中執行1次 (app01.tasks.add)
[2018-01-08 11:11:46,131: INFO/MainProcess] Scheduler: Sending due task 每10秒中執行1次 (app01.tasks.add)

 

開啓worker消費任務

celery -A celery_test worker -l info

 

e.注意,經測試,每添加或修改一個任務,celery beat都須要重啓一次,要否則新的配置不會被celery beat進程讀到

 

 

 

 

 

 

參考:

源碼安裝Nginx

設置Nginx 經過service 啓動

http://blog.csdn.net/samxx8/article/details/47417133

http://www.cnblogs.com/alex3714/articles/5248247.html

redis和memcached參考: http://www.cnblogs.com/wupeiqi/articles/5132791.html  https://www.cnblogs.com/melonjiang/p/5342505.html

redis命令中文文檔: http://doc.redisfans.com/

Celery:http://www.cnblogs.com/alex3714/p/6351797.html

相關文章
相關標籤/搜索