多進程等待的跨平臺實現

最近在xmake中,用lua的協程實現了多任務編譯,效果仍是不錯的,不事後來發現一個問題:linux

若是全部編譯進程都在處理編譯,沒有退出的時候,xmake的lua主進程會不斷地在這些任務間,不停的切換輪詢進程的狀態,可是有沒有機會執行其餘任務,致使cpu太高,搶佔了編譯進程的cpu時間。。git

那若是在等不到完成的進程時候,加入sleep等待呢,又會致使編譯速度變慢,無法合理利用cpu。。github

所以,爲了解決這個問題,我打算擴展下lua的接口,實現了一個跨平臺的多進程等待接口: process.waitlist 實現多個未完成進程的同時等待,讓出xmake主進程的cpu時間,給其餘編譯進程充分利用shell

xmake中的lua代碼以下:windows

-- wait processes
        local tasks_finished = {}
        local procs_count = #procs
        if procs_count > 0 then

            -- wait them
            local procinfos = process.waitlist(procs, ifelse(procs_count < jobs, 0, -1))
            for _, procinfo in ipairs(procinfos) do
                
                -- the process info
                local proc      = procinfo[1]
                local procid    = procinfo[2]
                local status    = procinfo[3]

                -- check
                assert(procs[procid] == proc)

                -- resume this task
                local job_task = tasks[procid]
                local job_proc = coroutine.resume(job_task, 1, status)

                -- the other process is pending for this task?
                if coroutine.status(job_task) ~= "dead" then

                    -- check
                    assert(job_proc)

                    -- update the pending process
                    procs[procid] = job_proc

                -- this task has been finised?
                else

                    -- mark this task as finised
                    tasks_finished[procid] = true
                end
            end
        end

os.exec運行進程的接口實現中,若是當前進程沒有當即退出,就經過協程切換出去,知道上面的多進程等待,獲取到實際的退出進程後,直接定向切換到退出進程的os.exec中,繼續完成後續操做,這樣就不會有冗餘切換問題:oop

-- execute shell 
function os.exec(cmd, outfile, errfile)

    -- open command
    local ok = -1
    local proc = process.open(cmd, outfile, errfile)
    if proc ~= nil then

        -- wait process
        local waitok = -1
        local status = -1 
        if coroutine.running() then

            -- save the current directory
            local curdir = os.curdir()

            -- wait it
            repeat
                -- poll it
                waitok, status = process.wait(proc, 0)
                if waitok == 0 then
                    -- 外面的多進程等待到實際的狀態值後,直接進行處理
                    waitok, status = coroutine.yield(proc)
                end
            until waitok ~= 0

            -- resume the current directory
            os.cd(curdir)
        else
            waitok, status = process.wait(proc, -1)
        end

        -- get status
        if waitok > 0 then
            ok = status
        end

        -- close process
        process.close(proc)
    end

    -- ok?
    return ok
end

lua的上層調用有了,那怎麼去實現這個跨平臺的多進程等待呢?this

在windows上咱們能想到就是WaitForMultipleObjects這個接口了,我把它封裝到了tbox裏面具體實現以下:lua

tb_long_t tb_process_waitlist(tb_process_ref_t const* processes, tb_process_waitinfo_ref_t infolist, tb_size_t infomaxn, tb_long_t timeout)
{
    // check
    tb_assert_and_check_return_val(processes && infolist && infomaxn, -1);

    // make the process list
    tb_size_t               procsize = 0;
    HANDLE                  proclist[256] = {0};
    tb_process_t const**    pprocess = (tb_process_t const**)processes;
    for (; *pprocess && procsize < tb_arrayn(proclist); pprocess++, procsize++)
        proclist[procsize] = (*pprocess)->pi.hProcess;
    tb_assertf(procsize < tb_arrayn(proclist), "too much waited processes!");


    // wait processes
    DWORD       exitcode = 0;
    tb_long_t   infosize = 0;
    DWORD result = tb_kernel32()->WaitForMultipleObjects(procsize, proclist, FALSE, timeout < 0? INFINITE : (DWORD)timeout);
    switch (result)
    {
    case WAIT_TIMEOUT:
        break;
    case WAIT_FAILED:
        return -1;
    default:
        {
            // the process index
            DWORD index = result - WAIT_OBJECT_0;

            // the process
            tb_process_t* process = (tb_process_t*)processes[index];
            tb_assert_and_check_return_val(process, -1);

            // save process info
            infolist[infosize].index    = index;
            infolist[infosize].process  = (tb_process_ref_t)process;
            infolist[infosize].status   = tb_kernel32()->GetExitCodeProcess(process->pi.hProcess, &exitcode)? (tb_long_t)exitcode : -1;  
            infosize++;

            // close thread handle
            tb_kernel32()->CloseHandle(process->pi.hThread);
            process->pi.hThread = INVALID_HANDLE_VALUE;

            // close process
            tb_kernel32()->CloseHandle(process->pi.hProcess);
            process->pi.hProcess = INVALID_HANDLE_VALUE;

            // next index
            index++;
            while (index < procsize)
            {
                // attempt to wait next process
                result = tb_kernel32()->WaitForMultipleObjects(procsize - index, proclist + index, FALSE, 0);
                switch (result)
                {
                case WAIT_TIMEOUT:
                    // no more, exit loop
                    index = procsize;
                    break;
                case WAIT_FAILED:
                    return -1;
                default:
                    {
                        // the process index
                        index += result - WAIT_OBJECT_0;

                        // the process
                        process = (tb_process_t*)processes[index];
                        tb_assert_and_check_return_val(process, -1);

                        // save process info
                        infolist[infosize].index    = index;
                        infolist[infosize].process  = (tb_process_ref_t)process;
                        infolist[infosize].status   = tb_kernel32()->GetExitCodeProcess(process->pi.hProcess, &exitcode)? (tb_long_t)exitcode : -1;  
                        infosize++;

                        // close thread handle
                        tb_kernel32()->CloseHandle(process->pi.hThread);
                        process->pi.hThread = INVALID_HANDLE_VALUE;

                        // close process
                        tb_kernel32()->CloseHandle(process->pi.hProcess);
                        process->pi.hProcess = INVALID_HANDLE_VALUE;

                        // next index
                        index++;
                    }
                    break;
                }
            }
        }
        break;
    }

    // ok?
    return infosize;
}

若是在linux以及其餘posix系統上,可使用wait或者waitpid 接口,其實wait也就是至關於調用了 waitpid(-1, &status, ..),因此我這裏就直接使用waitpid來實現了。。code

它跟windows的WaitForMultipleObjects有些不一樣,不能傳遞指定須要等待哪些進程句柄,想要等待多個進程,只能傳遞-1,表示等待全部子進程協程

不過咱們在封裝接口的時候,能夠仍是傳入多個要等待的子進程列表,若是獲取到的子進程不在這個列表裏面,就直接忽略掉,有的話就返回出來,這樣的話,行爲上就跟windows的差很少了。。

tb_long_t tb_process_waitlist(tb_process_ref_t const* processes, tb_process_waitinfo_ref_t infolist, tb_size_t infomaxn, tb_long_t timeout)
{
    // check
    tb_assert_and_check_return_val(processes && infolist && infomaxn, -1);

    // done
    tb_long_t infosize = 0;
    tb_hong_t time = tb_mclock();
    do
    {
        // wait it
        tb_int_t    status = -1;
        tb_long_t   result = waitpid(-1, &status, timeout < 0? 0 : WNOHANG | WUNTRACED);
        tb_check_return_val(result != -1, -1);

        // exited?
        if (result != 0)
        {
            // find this process 
            tb_process_t const** pprocess = (tb_process_t const**)processes;
            for (; *pprocess && (*pprocess)->pid != result; pprocess++) ;

            // found?
            if (*pprocess)
            {
                // save process info
                infolist[infosize].index = (tb_process_ref_t const*)pprocess - processes;
                infolist[infosize].process = (tb_process_ref_t)*pprocess;
                infolist[infosize].status = WIFEXITED(status)? WEXITSTATUS(status) : -1;
                infosize++;

                // attempt to wait other processes
                while (infosize < infomaxn)
                {
                    // attempt to wait it
                    status = -1;
                    result = waitpid(-1, &status, WNOHANG | WUNTRACED);

                    // error or timeout? end
                    tb_check_break(result != 0);

                    // find this process 
                    tb_process_t const** pprocess = (tb_process_t const**)processes;
                    for (; *pprocess && (*pprocess)->pid != result; pprocess++) ;

                    // found?
                    if (*pprocess)
                    {
                        // save process info
                        infolist[infosize].index = (tb_process_ref_t const*)pprocess - processes;
                        infolist[infosize].process = (tb_process_ref_t)*pprocess;
                        infolist[infosize].status = WIFEXITED(status)? WEXITSTATUS(status) : -1;
                        infosize++;
                    }
                    else break;
                }

                // end
                break;
            }
        }

        // wait some time
        if (timeout > 0) tb_msleep(tb_min(timeout, 60));

    } while (timeout > 0 && tb_mclock() - time < (tb_hong_t)timeout);

    // ok?
    return infosize;
}

最後貼下這個跨平臺接口的是如何使用的,這裏給了一個比較完整的demo

// init processes
    tb_size_t        count1 = 0;
    tb_process_ref_t processes1[5] = {0};
    tb_process_ref_t processes2[5] = {0};
    for (; count1 < 4; count1++)
    {
        processes1[count1] = tb_process_init(argv[1], (tb_char_t const**)(argv + 1), tb_null);
        tb_assert_and_check_break(processes1[count1]);
    }

    // ok?
    while (count1)
    {
        // trace
        tb_trace_i("waiting: %ld", count1);

        // wait processes
        tb_long_t               infosize = -1;
        tb_process_waitinfo_t   infolist[4];
        if ((infosize = tb_process_waitlist(processes1, infolist, tb_arrayn(infolist), -1)) > 0)
        {
            tb_size_t i = 0;
            for (i = 0; i < infosize; i++)
            {
                // trace
                tb_trace_i("process(%ld:%p) exited: %ld", infolist[i].index, infolist[i].process, infolist[i].status);

                // exit process
                if (infolist[i].process) tb_process_exit(infolist[i].process);

                // remove this process
                processes1[infolist[i].index] = tb_null;
            }

            // update processes
            tb_size_t count2 = 0;
            for (i = 0; i < count1; i++) 
            {
                if (processes1[i]) processes2[count2++] = processes1[i];
            }
            tb_memcpy(processes1, processes2, count2 * sizeof(tb_process_ref_t));
            processes1[count2] = tb_null;
            count1 = count2;
        }
    }

相關文章
相關標籤/搜索