php想要實現mqtt須要使用到php中的socket函數;php
socket函數是什麼?
html
這次使用的是網上開源mqtt案例:其中使用的是 stream_socket_xxxx 系列函數node
大概意思是:web
正如你所指出的,'stream'是PHP核心(內置的,始終可用),而'套接字'是不多包含的擴展的一部分。除此以外,它們幾乎徹底相同。您能夠同時使用TCP和UDP兩種流,也可使用阻塞和非阻塞模式,這些模式涵蓋了全部用例的99%。我能想到的惟一常見的例外是ICMP。例如,'ping'。可是,看起來目前尚未一種安全的方式來從PHP執行ICMP。這種調用須要經過套接字擴展來實現SOCK_RAW,這須要執行「root」權限。此外,並不是全部路由器都會在TCP,UDP和ICMP以外路由其餘數據包類型。這限制了套接字擴展的實用性。json
/* phpMQTT */ class Mqtt { private $socket; /* holds the socket */ private $msgid = 1; /* counter for message id */ public $keepalive = 10; /* default keepalive timmer */ public $timesinceping; /* host unix time, used to detect disconects */ public $topics = array(); /* used to store currently subscribed topics */ public $debug = false; /* should output debug messages */ public $address; /* broker address */ public $port; /* broker port */ public $clientid; /* client id sent to brocker */ public $will; /* stores the will of the client */ private $username; /* stores username */ private $password; /* stores password */ public $cafile; function __construct($address, $port, $clientid, $cafile = NULL){ $this->broker($address, $port, $clientid, $cafile); } /* sets the broker details */ function broker($address, $port, $clientid, $cafile = NULL){ $this->address = $address; $this->port = $port; $this->clientid = $clientid; $this->cafile = $cafile; } function connect_auto($clean = true, $will = NULL, $username = NULL, $password = NULL){ while($this->connect($clean, $will, $username, $password)==false){ sleep(10); } return true; } /* connects to the broker inputs: $clean: should the client send a clean session flag */ function connect($clean = true, $will = NULL, $username = NULL, $password = NULL){ if($will) $this->will = $will; if($username) $this->username = $username; if($password) $this->password = $password; if ($this->cafile) { $socketContext = stream_context_create(["ssl" => [ "verify_peer_name" => true, "cafile" => $this->cafile ]]); $this->socket = stream_socket_client("tls://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT, $socketContext); } else { $this->socket = stream_socket_client("tcp://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT); } if (!$this->socket ) { if($this->debug) error_log("stream_socket_create() $errno, $errstr \n"); return false; } stream_set_timeout($this->socket, 5); stream_set_blocking($this->socket, 0); $i = 0; $buffer = ""; $buffer .= chr(0x00); $i++; $buffer .= chr(0x06); $i++; $buffer .= chr(0x4d); $i++; $buffer .= chr(0x51); $i++; $buffer .= chr(0x49); $i++; $buffer .= chr(0x73); $i++; $buffer .= chr(0x64); $i++; $buffer .= chr(0x70); $i++; $buffer .= chr(0x03); $i++; //No Will $var = 0; if($clean) $var+=2; //Add will info to header if($this->will != NULL){ $var += 4; // Set will flag $var += ($this->will['qos'] << 3); //Set will qos if($this->will['retain']) $var += 32; //Set will retain } if($this->username != NULL) $var += 128; //Add username to header if($this->password != NULL) $var += 64; //Add password to header $buffer .= chr($var); $i++; //Keep alive $buffer .= chr($this->keepalive >> 8); $i++; $buffer .= chr($this->keepalive & 0xff); $i++; $buffer .= $this->strwritestring($this->clientid,$i); //Adding will to payload if($this->will != NULL){ $buffer .= $this->strwritestring($this->will['topic'],$i); $buffer .= $this->strwritestring($this->will['content'],$i); } if($this->username) $buffer .= $this->strwritestring($this->username,$i); if($this->password) $buffer .= $this->strwritestring($this->password,$i); $head = " "; $head{0} = chr(0x10); $head{1} = chr($i); fwrite($this->socket, $head, 2); fwrite($this->socket, $buffer); $string = $this->read(4); if(ord($string{0})>>4 == 2 && $string{3} == chr(0)){ if($this->debug) echo "Connected to Broker\n"; }else{ error_log(sprintf("Connection failed! (Error: 0x%02x 0x%02x)\n", ord($string{0}),ord($string{3}))); return false; } $this->timesinceping = time(); return true; } /* read: reads in so many bytes */ function read($int = 8192, $nb = false){ // print_r(socket_get_status($this->socket)); $string=""; $togo = $int; if($nb){ return fread($this->socket, $togo); } while (!feof($this->socket) && $togo>0) { $fread = fread($this->socket, $togo); $string .= $fread; $togo = $int - strlen($string); } return $string; } /* subscribe: subscribes to topics */ function subscribe($topics, $qos = 0){ $i = 0; $buffer = ""; $id = $this->msgid; $buffer .= chr($id >> 8); $i++; $buffer .= chr($id % 256); $i++; foreach($topics as $key => $topic){ $buffer .= $this->strwritestring($key,$i); $buffer .= chr($topic["qos"]); $i++; $this->topics[$key] = $topic; } $cmd = 0x80; //$qos $cmd += ($qos << 1); $head = chr($cmd); $head .= chr($i); fwrite($this->socket, $head, 2); fwrite($this->socket, $buffer, $i); $string = $this->read(2); $bytes = ord(substr($string,1,1)); $string = $this->read($bytes); } /* ping: sends a keep alive ping */ function ping(){ $head = " "; $head = chr(0xc0); $head .= chr(0x00); fwrite($this->socket, $head, 2); if($this->debug) echo "ping sent\n"; } /* disconnect: sends a proper disconect cmd */ function disconnect(){ $head = " "; $head{0} = chr(0xe0); $head{1} = chr(0x00); fwrite($this->socket, $head, 2); } /* close: sends a proper disconect, then closes the socket */ function close(){ $this->disconnect(); stream_socket_shutdown($this->socket, STREAM_SHUT_WR); } /* publish: publishes $content on a $topic */ function publish($topic, $content, $qos = 0, $retain = 0){ $i = 0; $buffer = ""; $buffer .= $this->strwritestring($topic,$i); //$buffer .= $this->strwritestring($content,$i); if($qos){ $id = $this->msgid++; $buffer .= chr($id >> 8); $i++; $buffer .= chr($id % 256); $i++; } $buffer .= $content; $i+=strlen($content); $head = " "; $cmd = 0x30; if($qos) $cmd += $qos << 1; if($retain) $cmd += 1; $head{0} = chr($cmd); $head .= $this->setmsglength($i); fwrite($this->socket, $head, strlen($head)); fwrite($this->socket, $buffer, $i); } /* message: processes a recieved topic */ function message($msg){ $tlen = (ord($msg{0})<<8) + ord($msg{1}); $topic = substr($msg,2,$tlen); $msg = substr($msg,($tlen+2)); $found = 0; foreach($this->topics as $key=>$top){ if( preg_match("/^".str_replace("#",".*", str_replace("+","[^\/]*", str_replace("/","\/", str_replace("$",'\$', $key))))."$/",$topic) ){ if(is_callable($top['function'])){ call_user_func($top['function'],$topic,$msg); $found = 1; } } } if($this->debug && !$found) echo "msg recieved but no match in subscriptions\n"; } /* proc: the processing loop for an "allways on" client set true when you are doing other stuff in the loop good for watching something else at the same time */ function proc( $loop = true){ if(1){ $sockets = array($this->socket); $w = $e = NULL; $cmd = 0; //$byte = fgetc($this->socket); if(feof($this->socket)){ if($this->debug) echo "eof receive going to reconnect for good measure\n"; fclose($this->socket); $this->connect_auto(false); if(count($this->topics)) $this->subscribe($this->topics); } $byte = $this->read(1, true); if(!strlen($byte)){ if($loop){ usleep(100000); } }else{ $cmd = (int)(ord($byte)/16); if($this->debug) echo "Recevid: $cmd\n"; $multiplier = 1; $value = 0; do{ $digit = ord($this->read(1)); $value += ($digit & 127) * $multiplier; $multiplier *= 128; }while (($digit & 128) != 0); if($this->debug) echo "Fetching: $value\n"; if($value) $string = $this->read($value); if($cmd){ switch($cmd){ case 3: $this->message($string); break; } $this->timesinceping = time(); } } if($this->timesinceping < (time() - $this->keepalive )){ if($this->debug) echo "not found something so ping\n"; $this->ping(); } if($this->timesinceping<(time()-($this->keepalive*2))){ if($this->debug) echo "not seen a package in a while, disconnecting\n"; fclose($this->socket); $this->connect_auto(false); if(count($this->topics)) $this->subscribe($this->topics); } } return 1; } /* getmsglength: */ function getmsglength(&$msg, &$i){ $multiplier = 1; $value = 0 ; do{ $digit = ord($msg{$i}); $value += ($digit & 127) * $multiplier; $multiplier *= 128; $i++; }while (($digit & 128) != 0); return $value; } /* setmsglength: */ function setmsglength($len){ $string = ""; do{ $digit = $len % 128; $len = $len >> 7; // if there are more digits to encode, set the top bit of this digit if ( $len > 0 ) $digit = ($digit | 0x80); $string .= chr($digit); }while ( $len > 0 ); return $string; } /* strwritestring: writes a string to a buffer */ function strwritestring($str, &$i){ $ret = " "; $len = strlen($str); $msb = $len >> 8; $lsb = $len % 256; $ret = chr($msb); $ret .= chr($lsb); $ret .= $str; $i += ($len+2); return $ret; } function printstr($string){ $strlen = strlen($string); for($j=0;$j<$strlen;$j++){ $num = ord($string{$j}); if($num > 31) $chr = $string{$j}; else $chr = " "; printf("%4d: %08b : 0x%02x : %s \n",$j,$num,$num,$chr); } } }
// 發送給訂閱號信息,建立socket,無sam隊列 $server = "127.0.0.1"; // 服務代理地址(mqtt服務端地址) $port = 1883; // 通訊端口 $username = ""; // 用戶名(若是須要) $password = ""; // 密碼(若是須要 $client_id = "clientx9293670xxctr"; // 設置你的鏈接客戶端id $mqtt = new Mqtt($server, $port, $client_id); //實例化MQTT類 if ($mqtt->connect(true, NULL, $username, $password)) { //若是建立連接成功 $mqtt->publish("xxx3809293670ctr", "setr=3xxxxxxxxx", 0); // 發送到 xxx3809293670ctr 的主題 一個信息 內容爲 setr=3xxxxxxxxx Qos 爲 0 $mqtt->close(); //發送後關閉連接 } else { echo "Time out!\n"; }
/*// 訂閱信息,接收一個信息後退出 $server = "127.0.0.1"; // 服務代理地址(mqtt服務端地址) $port = 1883; // 通訊端口 $username = ""; // 用戶名(若是須要) $password = ""; // 密碼(若是須要 $client_id = "clientx9293670xxctr"; // 設置你的鏈接客戶端id $mqtt = new Mqtt($server, $port, $client_id); if(!$mqtt->connect(true, NULL, $username, $password)) { //連接不成功再重複執行監聽鏈接 exit(1); } $topics['SN69143809293670state'] = array("qos" => 0, "function" => "procmsg"); // 訂閱主題爲 SN69143809293670state qos爲0 $mqtt->subscribe($topics, 0); while($mqtt->proc()){ } //死循環監聽 $mqtt->close(); function procmsg($topic, $msg){ //信息回調函數 打印信息 echo "Msg Recieved: " . date("r") . "\n"; echo "Topic: {$topic}\n\n"; echo "\t$msg\n\n"; $xxx = json_decode($msg); var_dump($xxxxxx->aa); die; }
這是php實現方法,若是用php作發送端仍是不錯的.可是segmentfault
我被這個圖片打擊了,區塊鏈應用還真提莫的是js寫起來跟簡單;api
我最終寫出的mqtt api 使用的是node;爲何?安全
node.js實現mqttsession