PHP進程間通訊

PHP是用C編寫的,所以它對系統底層API的操做與C很像,同大多數語言同樣,PHP進程間通訊的方式有如下幾種:消息隊列,管道,共享內存,socket和信號。本文是對這幾種通訊方式對整理:php

管道通訊PIPE

管道用於承載簡稱之間的通信數據。爲了方便理解,能夠將管道比做文件,進程A將數據寫到管道P中,而後進程B從管道P中讀取數據。php提供的管道操做API與操做文件的API基本同樣,除了建立管道使用posix_mkfifo函數,讀寫等操做均與文件操做函數相同。固然,你能夠直接使用文件模擬管道,可是那樣沒法使用管道的特性了。數據結構

經過管道通訊的大概思路是,首先建立一個管道,而後子進程向管道中寫入信息,父進程從管道中讀取信息,這樣就能夠作到父子進程直接實現通訊了。socket

<?php

// 建立管道
$pipePath = "pipe";
if( !file_exists( $pipePath ) ){
    if( !posix_mkfifo( $pipePath, 0666) ){
        exit('make pipe false!' . PHP_EOL);
    }
}

// 建立進程,子進程寫管道,父進程讀管道
// 經過 pcntl_fork函數建立一個子進程。
// pcntl_fork 函數 很特殊,它調用一次擁有 多個返回值。
// 在父進程中:它返回 子進程的ID 這個值是 大於0 的。
// 在子進程中,它返回0。當返回 -1 時表示建立進程失敗。
$pid = pcntl_fork();

if( $pid == 0 ){
    // 子進程寫管道
    $file = fopen( $pipePath, 'w');
    fwrite( $file, 'hello world');
    sleep(1);
    exit;
}else{
    // 父進程讀管道
    $file = fopen( $pipePath, 'r');
    // 設置成讀取非阻塞
    // 當讀取是非阻塞的狀況下,父進程進行讀取信息的時候不會等待,
    // 管道中沒有消息也會立馬返回。
    // stream_set_blocking( $file, False); 
    echo fread( $file, 20) . PHP_EOL;

    pcntl_wait($status); // 回收子進程
}

消息隊列

消息隊列是存放在內存中的一種隊列數據結構。函數

<?php

// 獲取父進程id
$parentPid = posix_getpid();
echo "parent progress pid:{$parentPid}\n";
$childList = array();
// 建立消息隊列,定義消息類型
$id = ftok(__FILE__, 'm');
$msgQueue = msg_get_queue($id);
const MSG_TYEP = 1;

// 生產者
function producer()
{
    global $msgQueue;
    $pid = posix_getpid();
    $repeatNum = 5;
    for ($i = 0; $i <= $repeatNum; $i++) {
        $str = "({$pid}) progress create! {$i}";
        msg_send($msgQueue, MSG_TYEP, $str);
        $rand = rand(1, 3);
        sleep($rand);
    }
}

// 消費者
function consumer()
{
    global $msgQueue;
    $pid = posix_getpid();
    $repeatNum = 6;
    for ($i = 1; $i<= $repeatNum; $i++) {
        $rel = msg_receive($msgQueue, MSG_TYEP, $msgType, 1024, $message);
        echo "{$message} | consumer({$pid}) destroy \n";
        $rand = rand(1, 3);
        sleep($rand);
    }
}

function createProgress($callback)
{
    $pid = pcntl_fork();
    if ($pid == -1) {
        // 建立失敗
        exit("fork progresses error\n");
    } elseif ($pid == 0) {
        // 子進程執行程序
        $pid = posix_getpid();
        $callback();
        exit("({$pid})child progress end!\n");
    } else {
        // 父進程
        return $pid;
    }
}

for ($i = 0; $i < 3; $i++) {
    $pid = createProgress('producer');
    $childList[$pid] = 1;
    echo "create producer progresses: {$pid}\n";
}

for ($i = 0; $i < 2; $i++) {
    $pid = createProgress('consumer');
    $childList[$pid] = 1;
    echo "create consumer progresses: {$pid}\n";
}

while (!empty($childList)) {
    $childPid = pcntl_wait($status);
    if ($childPid > 0) {
        unset($childList[$childPid]);
    }
}
echo "({$parentPid})main progress end!\n";

運行結果:ui

create producer progresses: 21432
create producer progresses: 21433
create producer progresses: 21434
create consumer progresses: 21435
(21426) progress create! 2 | consumer(21435) destroy
(21424) progress create! 1 | consumer(21436) destroy
create consumer progresses: 21436
(21426) progress create! 3 | consumer(21436) destroy
(21426) progress create! 4 | consumer(21435) destroy
(21425) progress create! 3 | consumer(21436) destroy
(21424) progress create! 2 | consumer(21435) destroy
(21426) progress create! 5 | consumer(21435) destroy
(21424) progress create! 3 | consumer(21436) destroy
(21433)child progress end!
(21425) progress create! 4 | consumer(21435) destroy
(21424) progress create! 4 | consumer(21436) destroy
(21434)child progress end!
(21424) progress create! 5 | consumer(21435) destroy
(21425) progress create! 5 | consumer(21436) destroy
(21432)child progress end!
(21435)child progress end!
(21436)child progress end!
(21431)main progress end!

信號量與共享內存

<?php
$parentPid = posix_getpid();
echo "parent progress pid:{$parentPid}\n";

// 建立共享內存,建立信號量,定義共享key
// ftok(文件路徑,資源標識符) 建立一個IPC通訊所需的id
$shm_id = ftok(__FILE__, 'm');
$shm_id = ftok(__FILE__, 's');
// shm_attach(id) 建立或者打開一個共享內存
$shareMemory = shm_attach($shm_id);
// 返回一個可用戶訪問系統信號量的id
$signal = sem_get($shm_id);
const SHARE_KEY = 1;
// 生產者
function producer() {
    global $shareMemory;
    global $signal;
    $pid = posix_getpid();
    $repeatNum = 5;
    for ($i = 1; $i <= $repeatNum; $i++) {
        // 得到信號量 - 阻塞進程,直到信號量被獲取到[lock鎖機制的關鍵]
        sem_acquire($signal);

        // 檢查某個key是否存在與共享內存中
        if (shm_has_var($shareMemory, SHARE_KEY)) {
            // 獲取共享內存中的key的值
            $count = shm_get_var($shareMemory, SHARE_KEY);
            $count ++;
            // 爲共享內存中的key賦值
            shm_put_var($shareMemory, SHARE_KEY, $count);
            echo "({$pid}) count: {$count}\n";
        } else {
            // 初始化
            shm_put_var($shareMemory, SHARE_KEY, 0);
            echo "({$pid}) count: 0\n";
        }
        // 釋放
        sem_release($signal);
    }
}

function createProgress($callback) {
    $pid = pcntl_fork();
    if ($pid == -1) {
        // 建立失敗
        exit("fork progress error!\n");
    } elseif ($pid == 0) {
        // 子進程
        $pid = posix_getpid();
        $callback();
        exit("({$pid}) child progress end!\n");
    } else {
        // 父進程
        return $pid;
    }
}

// 3個寫進程
for ($i = 0; $i < 3; $i ++) {
    $pid = createProgress('producer');
    $childList[$pid] = 1;
    echo "create producer child progress: {$pid} \n";
}

// 等待全部子進程
while (!empty($childList)) {
    $childPid = pcntl_wait($status);
    if ($childPid > 0) {
        unset($childList[$childPid]);
    }
}

// 釋放共享內存與信號量
shm_remove($shareMemory);
sem_remove($signal);
echo "({$parentPid}) main progress end!\n";

運行結果:
使用信號量來實現共享內存的鎖機制code

parent progress pid:31720
create producer child progress: 31721 
create producer child progress: 31722 
(31721) count: 0
(31721) count: 1
(31721) count: 2
(31721) count: 3
(31721) count: 4
(31721) child progress end!
create producer child progress: 31723 
(31722) count: 5
(31722) count: 6
(31722) count: 7
(31722) count: 8
(31722) count: 9
(31722) child progress end!
(31723) count: 10
(31723) count: 11
(31723) count: 12
(31723) count: 13
(31723) count: 14
(31723) child progress end!
(31720) main progress end!

無鎖狀況隊列

Warning: sem_release(): SysV semaphore 4357894312 (key 0x73048925) is not currently acquired in /Users/easyboom/www/example/信號量與共享內存.php on line 38
相關文章
相關標籤/搜索