rabbitmq php 學習

參考文檔:
http://www.cnblogs.com/phpinfo/p/4104551...
http://blog.csdn.net/historyasamirror/ar...javascript

依賴包安裝

yum install ncurses-devel unixODBC unixODBC-devel

erlang環境

wget http://erlang.org/download/otp_src_18.1.tar.gz tar -zxvf otp_src_18.1.tar.gz cd otp_src_18.1 ./configure --prefix=/usr/local/erlang make make install # 配置erlang環境變量 vim /etc/profile # 增長內容: export PATH="$PATH:/usr/local/erlang/bin" # 保存退出,並刷新變量 source /etc/profile # 測試erlang是否安裝成功 # 安裝完成之後,執行erl看是否能打開eshell,用’halt().’退出,注意後面的點號,那是erlang的結束符。 [root@localhost src]# erl Erlang/OTP 17 [erts-6.1] [source] [64-bit] [async-threads:10] [hipe] [kernel-poll:false] Eshell V6.1 (abort with ^G) 2> 9+3. 12 3> halt(). 

安裝rabbitmq依賴文件,安裝rabbitmq

# 安裝rabbitmq依賴包
yum install xmlto # 安裝rabbitmq服務端 wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.7/rabbitmq-server-3.5.7.tar.gz tar zxvf rabbitmq-server-3.5.7.tar.gz cd rabbitmq-server-3.5.7/ make make install TARGET_DIR=/usr/local/rabbitmq SBIN_DIR=/usr/local/rabbitmq/sbin MAN_DIR=/usr/local/rabbitmq/man DOC_INSTALL_DIR=/usr/local/rabbitmq/doc # 配置hosts vim /etc/hosts # 增長一行內容 # 當前IP地址 綁定HOSTNAME名稱(vim /etc/sysconfig/network) 192.168.2.208 localhost.localdomain # 這種會提示錯誤(Warning: PID file not written; -detached was passed.) /usr/local/rabbitmq/sbin/rabbitmq-server -detached 啓動rabbitmq /usr/local/rabbitmq/sbin/rabbitmqctl status 查看狀態 /usr/local/rabbitmq/sbin/rabbitmqctl stop 關閉rabbitmq # 目前我本身使用 /usr/local/rabbitmq/sbin/rabbitmq-server start & 啓動rabbitmq /usr/local/rabbitmq/sbin/rabbitmqctl status 查看狀態 /usr/local/rabbitmq/sbin/rabbitmqctl stop 關閉rabbitmq

啓用管理插件

mkdir /etc/rabbitmq /usr/local/rabbitmq/sbin/rabbitmq-plugins list 查看插件列表 /usr/local/rabbitmq/sbin/rabbitmq-plugins enable rabbitmq_management (啓用插件) /usr/local/rabbitmq/sbin/rabbitmq-plugins disable rabbitmq_management (禁用插件) # 重啓rabbitmq # 訪問 http://127.0.0.1:15672/ # 若是有iptables vim /etc/sysconfig/iptables # 增長一下內容 -A INPUT -m state --state NEW -m tcp -p tcp --dport 15672 -j ACCEPT # 重啓動iptable service iptables restart

開機自啓動配置

#!/bin/sh #start rabbitMq sudo /usr/local/rabbitmq/sbin/rabbitmq-server & > /usr/local/rabbitmq/logs/rabbitmq.log 2>&1 

RabbitMQ PHP擴展安裝

# 安裝rabbitmq-c依賴包 yum install libtool autoconf # 安裝rabbitmq-c ( 最好下載 0.5的,0.6安裝可能會報錯) # 版本下載:https://github.com/alanxz/rabbitmq-c/releases/tag/v0.5.0 wget https://github.com/alanxz/rabbitmq-c/releases/download/v0.5.0/rabbitmq-c-0.5.0.tar.gz tar -zxvf v0.5.0 cd rabbitmq-c-0.5.0/ autoreconf -i ./configure --prefix=/usr/local/rabbitmq-c make make install # 安裝PHP擴展 amqp wget http://pecl.php.net/get/amqp-1.6.1.tgz tar zxvf amqp-1.6.1.tgz cd amqp-1.6.1 /usr/local/php/bin/phpize ./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c make make install # 編輯php.ini文件,增長amqp擴展支持 vim /usr/local/php/etc/php.ini # 增長下面內容 ; rabbitmq擴展支持 extension=amqp.so # 重啓php-fpm /etc/init.d/php-fpm restart

驗證是否成功 phpinfo()查看下是否支持amqp擴展
php


相關配置

hostname mq        // 設置hostname名稱 vim /etc/sysconfig/network // 設置hostname vim /etc/hosts // 編輯hosts ./rabbitmqctl add_user admin admin // 添加用戶 ./rabbitmqctl set_user_tags admin administrator // 添加admin 到 administrator分組 ./rabbitmqctl set_permissions -p / admin "*." "*." "*." // 添加權限

建立配置文件

#在/usr/rabbitmq/sbin/rabbitmq-defaults 查看config文件路徑 # 建立配置文件 touch/usr/rabbitmq/sbin #vm_memory_high_watermark 內存低水位線,若低於該水位線,則開啓流控機制,阻止全部請求,默認值是0.4,即內存總量的40%, #vm_memory_high_watermark_paging_ratio 內存低水位線的多少百分比開始經過寫入磁盤文件來釋放內存 vi /usr/rabbitmq/sbin/rabbitmq.config 輸入 [ {rabbit, [{vm_memory_high_watermark_paging_ratio, 0.75}, {vm_memory_high_watermark, 0.7}]} ].

建立環境文件

touch /etc/rabbitmq/rabbitmq-env.conf #輸入 RABBITMQ_NODENAME=FZTEC-240088 節點名稱 RABBITMQ_NODE_IP_ADDRESS=127.0.0.1 監聽IP RABBITMQ_NODE_PORT=5672 監聽端口 RABBITMQ_LOG_BASE=/data/rabbitmq/log 日誌目錄 RABBITMQ_PLUGINS_DIR=/data/rabbitmq/plugins 插件目錄 RABBITMQ_MNESIA_BASE=/data/rabbitmq/mnesia 後端存儲目錄

操做命令

查看exchange信息
          /usr/rabbitmq/sbin/rabbitmqctl list_exchanges name type durable auto_delete arguments 查看隊列信息 /usr/rabbitmq/sbin/rabbitmqctl list_queues name durable auto_delete messages consumers me 查看綁定信息 /usr/rabbitmq/sbin/rabbitmqctl list_bindings 查看鏈接信息 /usr/rabbitmq/sbin/rabbitmqctl list_connections

php的server端腳本

<?php
$routingkey='key'; //設置你的鏈接 $conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest'); $conn = new AMQPConnection($conn_args); if ($conn->connect()) { echo "Established a connection to the broker \n"; } else { echo "Cannot connect to the broker \n "; } //你的消息 $message = json_encode(array('Hello World3!','php3','c++3:')); //建立channel $channel = new AMQPChannel($conn); //建立exchange $ex = new AMQPExchange($channel); $ex->setName('exchange');//建立名字 $ex->setType(AMQP_EX_TYPE_DIRECT); $ex->setFlags(AMQP_DURABLE); //$ex->setFlags(AMQP_AUTODELETE); //echo "exchange status:".$ex->declare(); echo "exchange status:".$ex->declareExchange(); echo "\n"; for($i=0;$i<100;$i++){ if($routingkey=='key2'){ $routingkey='key'; }else{ $routingkey='key2'; } $ex->publish($message,$routingkey); } /* $ex->publish($message,$routingkey); 建立隊列 $q = new AMQPQueue($channel); 設置隊列名字 若是不存在則添加 $q->setName('queue'); $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); echo "queue status: ".$q->declare(); echo "\n"; echo 'queue bind: '.$q->bind('exchange','route.key'); 將你的隊列綁定到routingKey echo "\n"; $channel->startTransaction(); echo "send: ".$ex->publish($message, 'route.key'); //將你的消息經過制定routingKey發送 $channel->commitTransaction(); $conn->disconnect(); */

php客戶端腳本

<?php $bindingkey='key2'; //鏈接RabbitMQ $conn_args = array( 'host'=>'127.0.0.1' , 'port'=> '5672', 'login'=>'guest' , 'password'=> 'guest','vhost' =>'/'); $conn = new AMQPConnection($conn_args); $conn->connect(); //設置queue名稱,使用exchange,綁定routingkey $channel = new AMQPChannel($conn); $q = new AMQPQueue($channel); $q->setName('queue2'); $q->setFlags(AMQP_DURABLE); $q->declare(); $q->bind('exchange',$bindingkey); //消息獲取 $messages = $q->get(AMQP_AUTOACK) ; if ($messages){ var_dump(json_decode($messages->getBody(), true )); } $conn->disconnect(); ?>

翻譯了部分mq常量設置,不正確的地方,你們以試驗爲準

/** * Passing in this constant as a flag will forcefully disable all other flags. * Use this if you want to temporarily disable the amqp.auto_ack ini setting. * 傳遞這個參數做爲標誌將徹底禁用其餘標誌,若是你想臨時禁用amqp.auto_ack設置起效 */ define('AMQP_NOPARAM', 0); /** * Durable exchanges and queues will survive a broker restart, complete with all of their data. * 持久化交換機和隊列,當代理重啓動後依然存在,幷包括它們中的完整數據 */ define('AMQP_DURABLE', 2); /** * Passive exchanges and queues will not be redeclared, but the broker will throw an error if the exchange or queue does not exist. * 被動模式的交換機和隊列不能被從新定義,可是若是交換機和隊列不存在,代理將扔出一個錯誤提示 */ define('AMQP_PASSIVE', 4); /** * Valid for queues only, this flag indicates that only one client can be listening to and consuming from this queue. * 僅對隊列有效,這我的標誌定義隊列僅容許一個客戶端鏈接而且從其消費消息 */ define('AMQP_EXCLUSIVE', 8); /** * For exchanges, the auto delete flag indicates that the exchange will be deleted as soon as no more queues are bound * to it. If no queues were ever bound the exchange, the exchange will never be deleted. For queues, the auto delete * flag indicates that the queue will be deleted as soon as there are no more listeners subscribed to it. If no * subscription has ever been active, the queue will never be deleted. Note: Exclusive queues will always be * automatically deleted with the client disconnects. * 對交換機而言,自動刪除標誌表示交換機將在沒有隊列綁定的狀況下被自動刪除,若是從沒有隊列和其綁定過,這個交換機將不會被刪除. * 對隊列而言,自動刪除標誌表示若是沒有消費者和你綁定的話將被自動刪除,若是從沒有消費者和其綁定,將不被刪除,獨佔隊列在客戶斷 * 開鏈接的時候將老是會被刪除 */ define('AMQP_AUTODELETE', 16); /** * Clients are not allowed to make specific queue bindings to exchanges defined with this flag. * 這個標誌標識不容許自定義隊列綁定到交換機上 */ define('AMQP_INTERNAL', 32); /** * When passed to the consume method for a clustered environment, do not consume from the local node. * 在集羣環境消費方法中傳遞這個參數,表示將不會從本地站點消費消息 */ define('AMQP_NOLOCAL', 64); /** * When passed to the {@link AMQPQueue::get()} and {@link AMQPQueue::get()} methods as a flag, * the messages will be immediately marked as acknowledged by the server upon delivery. * 當在隊列get方法中做爲標誌傳遞這個參數的時候,消息將在被服務器輸出以前標誌爲acknowledged (已收到) */ define('AMQP_AUTOACK', 128); /** * Passed on queue creation, this flag indicates that the queue should be deleted if it becomes empty. * 在隊列創建時候傳遞這個參數,這個標誌表示隊列將在爲空的時候被刪除 */ define('AMQP_IFEMPTY', 256); /** * Passed on queue or exchange creation, this flag indicates that the queue or exchange should be * deleted when no clients are connected to the given queue or exchange. * 在交換機或者隊列創建的時候傳遞這個參數,這個標誌表示沒有客戶端鏈接的時候,交換機或者隊列將被刪除 */ define('AMQP_IFUNUSED', 512); /** * When publishing a message, the message must be routed to a valid queue. If it is not, an error will be returned. * 當發佈消息的時候,消息必須被正確路由到一個有效的隊列,不然將返回一個錯誤 */ define('AMQP_MANDATORY', 1024); /** * When publishing a message, mark this message for immediate processing by the broker. (High priority message.) * 當發佈消息時候,這個消息將被當即處理. */ define('AMQP_IMMEDIATE', 2048); /** * If set during a call to {@link AMQPQueue::ack()}, the delivery tag is treated as "up to and including", so that multiple * messages can be acknowledged with a single method. If set to zero, the delivery tag refers to a single message. * If the AMQP_MULTIPLE flag is set, and the delivery tag is zero, this indicates acknowledgement of all outstanding * messages. * 當在調用AMQPQueue::ack時候設置這個標誌,傳遞標籤將被視爲最大包含數量,以便經過單個方法標示多個消息爲已收到,若是設置爲0 * 傳遞標籤指向單個消息,若是設置了AMQP_MULTIPLE,而且傳遞標籤是0,將全部未完成消息標示爲已收到 */ define('AMQP_MULTIPLE', 4096); /** * If set during a call to {@link AMQPExchange::bind()}, the server will not respond to the method.The client should not wait * for a reply method. If the server could not complete the method it will raise a channel or connection exception. * 當在調用AMQPExchange::bind()方法的時候,服務器將不響應請求,客戶端將不該該等待響應,若是服務器沒法完成該方法,將會拋出一個異常 */ define('AMQP_NOWAIT', 8192); /** * If set during a call to {@link AMQPQueue::nack()}, the message will be placed back to the queue. * 若是在調用AMQPQueue::nack方法時候設置,消息將會被傳遞迴隊列 */ define('AMQP_REQUEUE', 16384); /** * A direct exchange type. * direct類型交換機 */ define('AMQP_EX_TYPE_DIRECT', 'direct'); /** * A fanout exchange type. * fanout類型交換機 */ define('AMQP_EX_TYPE_FANOUT', 'fanout'); /** * A topic exchange type. * topic類型交換機 */ define('AMQP_EX_TYPE_TOPIC', 'topic'); /** * A header exchange type. * header類型交換機 */ define('AMQP_EX_TYPE_HEADERS', 'headers'); /** * socket鏈接超時設置 */ define('AMQP_OS_SOCKET_TIMEOUT_ERRNO', 536870947);

 

 https://segmentfault.com/a/1190000004627137
相關文章
相關標籤/搜索