最近在xmake中,用lua的協程實現了多任務編譯,效果仍是不錯的,不事後來發現一個問題:linux
若是全部編譯進程都在處理編譯,沒有退出的時候,xmake的lua主進程會不斷地在這些任務間,不停的切換輪詢進程的狀態,可是有沒有機會執行其餘任務,致使cpu太高,搶佔了編譯進程的cpu時間。。git
那若是在等不到完成的進程時候,加入sleep等待呢,又會致使編譯速度變慢,無法合理利用cpu。。github
所以,爲了解決這個問題,我打算擴展下lua的接口,實現了一個跨平臺的多進程等待接口: process.waitlist
實現多個未完成進程的同時等待,讓出xmake主進程的cpu時間,給其餘編譯進程充分利用shell
xmake中的lua代碼以下:windows
-- wait processes local tasks_finished = {} local procs_count = #procs if procs_count > 0 then -- wait them local procinfos = process.waitlist(procs, ifelse(procs_count < jobs, 0, -1)) for _, procinfo in ipairs(procinfos) do -- the process info local proc = procinfo[1] local procid = procinfo[2] local status = procinfo[3] -- check assert(procs[procid] == proc) -- resume this task local job_task = tasks[procid] local job_proc = coroutine.resume(job_task, 1, status) -- the other process is pending for this task? if coroutine.status(job_task) ~= "dead" then -- check assert(job_proc) -- update the pending process procs[procid] = job_proc -- this task has been finised? else -- mark this task as finised tasks_finished[procid] = true end end end
在os.exec
運行進程的接口實現中,若是當前進程沒有當即退出,就經過協程切換出去,知道上面的多進程等待,獲取到實際的退出進程後,直接定向切換到退出進程的os.exec
中,繼續完成後續操做,這樣就不會有冗餘切換問題:oop
-- execute shell function os.exec(cmd, outfile, errfile) -- open command local ok = -1 local proc = process.open(cmd, outfile, errfile) if proc ~= nil then -- wait process local waitok = -1 local status = -1 if coroutine.running() then -- save the current directory local curdir = os.curdir() -- wait it repeat -- poll it waitok, status = process.wait(proc, 0) if waitok == 0 then -- 外面的多進程等待到實際的狀態值後,直接進行處理 waitok, status = coroutine.yield(proc) end until waitok ~= 0 -- resume the current directory os.cd(curdir) else waitok, status = process.wait(proc, -1) end -- get status if waitok > 0 then ok = status end -- close process process.close(proc) end -- ok? return ok end
lua的上層調用有了,那怎麼去實現這個跨平臺的多進程等待呢?this
在windows上咱們能想到就是WaitForMultipleObjects
這個接口了,我把它封裝到了tbox裏面具體實現以下:lua
tb_long_t tb_process_waitlist(tb_process_ref_t const* processes, tb_process_waitinfo_ref_t infolist, tb_size_t infomaxn, tb_long_t timeout) { // check tb_assert_and_check_return_val(processes && infolist && infomaxn, -1); // make the process list tb_size_t procsize = 0; HANDLE proclist[256] = {0}; tb_process_t const** pprocess = (tb_process_t const**)processes; for (; *pprocess && procsize < tb_arrayn(proclist); pprocess++, procsize++) proclist[procsize] = (*pprocess)->pi.hProcess; tb_assertf(procsize < tb_arrayn(proclist), "too much waited processes!"); // wait processes DWORD exitcode = 0; tb_long_t infosize = 0; DWORD result = tb_kernel32()->WaitForMultipleObjects(procsize, proclist, FALSE, timeout < 0? INFINITE : (DWORD)timeout); switch (result) { case WAIT_TIMEOUT: break; case WAIT_FAILED: return -1; default: { // the process index DWORD index = result - WAIT_OBJECT_0; // the process tb_process_t* process = (tb_process_t*)processes[index]; tb_assert_and_check_return_val(process, -1); // save process info infolist[infosize].index = index; infolist[infosize].process = (tb_process_ref_t)process; infolist[infosize].status = tb_kernel32()->GetExitCodeProcess(process->pi.hProcess, &exitcode)? (tb_long_t)exitcode : -1; infosize++; // close thread handle tb_kernel32()->CloseHandle(process->pi.hThread); process->pi.hThread = INVALID_HANDLE_VALUE; // close process tb_kernel32()->CloseHandle(process->pi.hProcess); process->pi.hProcess = INVALID_HANDLE_VALUE; // next index index++; while (index < procsize) { // attempt to wait next process result = tb_kernel32()->WaitForMultipleObjects(procsize - index, proclist + index, FALSE, 0); switch (result) { case WAIT_TIMEOUT: // no more, exit loop index = procsize; break; case WAIT_FAILED: return -1; default: { // the process index index += result - WAIT_OBJECT_0; // the process process = (tb_process_t*)processes[index]; tb_assert_and_check_return_val(process, -1); // save process info infolist[infosize].index = index; infolist[infosize].process = (tb_process_ref_t)process; infolist[infosize].status = tb_kernel32()->GetExitCodeProcess(process->pi.hProcess, &exitcode)? (tb_long_t)exitcode : -1; infosize++; // close thread handle tb_kernel32()->CloseHandle(process->pi.hThread); process->pi.hThread = INVALID_HANDLE_VALUE; // close process tb_kernel32()->CloseHandle(process->pi.hProcess); process->pi.hProcess = INVALID_HANDLE_VALUE; // next index index++; } break; } } } break; } // ok? return infosize; }
若是在linux以及其餘posix系統上,可使用wait
或者waitpid
接口,其實wait
也就是至關於調用了 waitpid(-1, &status, ..)
,因此我這裏就直接使用waitpid
來實現了。。code
它跟windows的WaitForMultipleObjects
有些不一樣,不能傳遞指定須要等待哪些進程句柄,想要等待多個進程,只能傳遞-1,表示等待全部子進程協程
不過咱們在封裝接口的時候,能夠仍是傳入多個要等待的子進程列表,若是獲取到的子進程不在這個列表裏面,就直接忽略掉,有的話就返回出來,這樣的話,行爲上就跟windows的差很少了。。
tb_long_t tb_process_waitlist(tb_process_ref_t const* processes, tb_process_waitinfo_ref_t infolist, tb_size_t infomaxn, tb_long_t timeout) { // check tb_assert_and_check_return_val(processes && infolist && infomaxn, -1); // done tb_long_t infosize = 0; tb_hong_t time = tb_mclock(); do { // wait it tb_int_t status = -1; tb_long_t result = waitpid(-1, &status, timeout < 0? 0 : WNOHANG | WUNTRACED); tb_check_return_val(result != -1, -1); // exited? if (result != 0) { // find this process tb_process_t const** pprocess = (tb_process_t const**)processes; for (; *pprocess && (*pprocess)->pid != result; pprocess++) ; // found? if (*pprocess) { // save process info infolist[infosize].index = (tb_process_ref_t const*)pprocess - processes; infolist[infosize].process = (tb_process_ref_t)*pprocess; infolist[infosize].status = WIFEXITED(status)? WEXITSTATUS(status) : -1; infosize++; // attempt to wait other processes while (infosize < infomaxn) { // attempt to wait it status = -1; result = waitpid(-1, &status, WNOHANG | WUNTRACED); // error or timeout? end tb_check_break(result != 0); // find this process tb_process_t const** pprocess = (tb_process_t const**)processes; for (; *pprocess && (*pprocess)->pid != result; pprocess++) ; // found? if (*pprocess) { // save process info infolist[infosize].index = (tb_process_ref_t const*)pprocess - processes; infolist[infosize].process = (tb_process_ref_t)*pprocess; infolist[infosize].status = WIFEXITED(status)? WEXITSTATUS(status) : -1; infosize++; } else break; } // end break; } } // wait some time if (timeout > 0) tb_msleep(tb_min(timeout, 60)); } while (timeout > 0 && tb_mclock() - time < (tb_hong_t)timeout); // ok? return infosize; }
最後貼下這個跨平臺接口的是如何使用的,這裏給了一個比較完整的demo
// init processes tb_size_t count1 = 0; tb_process_ref_t processes1[5] = {0}; tb_process_ref_t processes2[5] = {0}; for (; count1 < 4; count1++) { processes1[count1] = tb_process_init(argv[1], (tb_char_t const**)(argv + 1), tb_null); tb_assert_and_check_break(processes1[count1]); } // ok? while (count1) { // trace tb_trace_i("waiting: %ld", count1); // wait processes tb_long_t infosize = -1; tb_process_waitinfo_t infolist[4]; if ((infosize = tb_process_waitlist(processes1, infolist, tb_arrayn(infolist), -1)) > 0) { tb_size_t i = 0; for (i = 0; i < infosize; i++) { // trace tb_trace_i("process(%ld:%p) exited: %ld", infolist[i].index, infolist[i].process, infolist[i].status); // exit process if (infolist[i].process) tb_process_exit(infolist[i].process); // remove this process processes1[infolist[i].index] = tb_null; } // update processes tb_size_t count2 = 0; for (i = 0; i < count1; i++) { if (processes1[i]) processes2[count2++] = processes1[i]; } tb_memcpy(processes1, processes2, count2 * sizeof(tb_process_ref_t)); processes1[count2] = tb_null; count1 = count2; } }