From 8d756a130891c1f69ffaf11d6d52f92979012909 Mon Sep 17 00:00:00 2001 From: cclilshy Date: Thu, 6 Feb 2025 17:29:56 +0800 Subject: [PATCH] Extending the process and worker modules --- phpunit.sh | 3 ++ src/Process/Process.php | 44 ++++++++++++++--- src/Process/Runtime.php | 27 +++++++---- src/Worker/Manager.php | 12 +++++ src/Worker/WorkerContext.php | 93 ++++++++++++++++++++++++++++-------- tests/ProcessTest.php | 26 ++++++++-- tests/WorkerTest.php | 44 ++++++++++++++++- 7 files changed, 209 insertions(+), 40 deletions(-) create mode 100644 phpunit.sh diff --git a/phpunit.sh b/phpunit.sh new file mode 100644 index 0000000..8dc1dc5 --- /dev/null +++ b/phpunit.sh @@ -0,0 +1,3 @@ +php vendor/bin/phpunit +php -d extension=ev vendor/bin/phpunit +php -d extension=event vendor/bin/phpunit diff --git a/src/Process/Process.php b/src/Process/Process.php index 8c391c2..66714ee 100644 --- a/src/Process/Process.php +++ b/src/Process/Process.php @@ -36,12 +36,16 @@ use function pcntl_wait; use function pcntl_wexitstatus; use function pcntl_wifexited; +use function pcntl_wifsignaled; +use function pcntl_wifstopped; +use function pcntl_wstopsig; +use function pcntl_wtermsig; use function posix_getpid; use const SIGCHLD; -use const SIGKILL; use const WNOHANG; use const WUNTRACED; +use const SIGTERM; /** * @compatible:Windows @@ -89,6 +93,9 @@ public function __construct() $this->processID = posix_getpid(); } + /** + * + */ public function __destruct() { $this->destroy(); @@ -100,7 +107,7 @@ public function __destruct() private function destroy(): void { foreach ($this->process2runtime as $runtime) { - $runtime->signal(SIGKILL); + $runtime->signal(SIGTERM); } } @@ -301,18 +308,35 @@ private function signalSIGCHLDHandler(): void */ private function onProcessExit(int $processID, int $status): void { - $exit = pcntl_wifexited($status) ? pcntl_wexitstatus($status) : -1; + $exitCode = -1; + $exitReason = ''; + + if (pcntl_wifexited($status)) { + $exitCode = pcntl_wexitstatus($status); + $exitReason = 'normal exit'; + } elseif (pcntl_wifsignaled($status)) { + $exitCode = pcntl_wtermsig($status); + $exitReason = 'terminated by signal'; + } elseif (pcntl_wifstopped($status)) { + $exitCode = pcntl_wstopsig($status); + $exitReason = 'stopped by signal'; + } + $promiseCallback = $this->process2promiseCallback[$processID] ?? null; if (!$promiseCallback) { return; } - if ($exit === -1) { - call_user_func($promiseCallback['reject'], new ProcessException('The process is abnormal.', $exit)); + if ($exitCode !== 0) { + call_user_func( + $promiseCallback['reject'], + new ProcessException("Process failed: {$exitReason}", $exitCode) + ); } else { - call_user_func($promiseCallback['resolve'], $exit); + call_user_func($promiseCallback['resolve'], $exitCode); } + // Clean up resources unset($this->process2promiseCallback[$processID]); unset($this->process2runtime[$processID]); @@ -328,4 +352,12 @@ public function getRootProcessID(): int { return $this->rootProcessID; } + + /** + * @return int + */ + public function getProcessID(): int + { + return $this->processID; + } } diff --git a/src/Process/Runtime.php b/src/Process/Runtime.php index b057d44..a2cff6e 100644 --- a/src/Process/Runtime.php +++ b/src/Process/Runtime.php @@ -43,17 +43,11 @@ public function __construct( * @param bool $force * * @return void + * @deprecated Use stop() instead */ public function stop(bool $force = false): void { - /*** @compatible:Windows */ - if (!Kernel::getInstance()->supportProcessControl()) { - exit(0); - } - - $force - ? $this->kill() - : $this->signal(SIGTERM); + $this->terminate($force); } /*** @return void */ @@ -139,4 +133,21 @@ public function getPromise(): Promise { return $this->promise; } + + /** + * @param bool $force + * + * @return void + */ + public function terminate(bool $force = false): void + { + /*** @compatible:Windows */ + if (!Kernel::getInstance()->supportProcessControl()) { + exit(0); + } + + $force + ? $this->kill() + : $this->signal(SIGTERM); + } } diff --git a/src/Worker/Manager.php b/src/Worker/Manager.php index de8231d..8d6c8b4 100644 --- a/src/Worker/Manager.php +++ b/src/Worker/Manager.php @@ -87,8 +87,19 @@ public function add(Worker $worker): void * @param string $name * * @return void + * @deprecated use remove() */ public function removeWorker(string $name): void + { + $this->remove($name); + } + + /** + * @param string $name + * + * @return void + */ + public function remove(string $name): void { if ($worker = $this->workers[$name] ?? null) { $worker->isRunning() && $this->terminate($name); @@ -298,6 +309,7 @@ public function __destruct() if (!Kernel::getInstance()->supportProcessControl()) { return; } + if (isset($this->processID) && $this->processID === posix_getpid()) { $this->terminate(); } diff --git a/src/Worker/WorkerContext.php b/src/Worker/WorkerContext.php index 6a6c704..6159a16 100644 --- a/src/Worker/WorkerContext.php +++ b/src/Worker/WorkerContext.php @@ -13,14 +13,20 @@ namespace Ripple\Worker; use Ripple\Kernel; +use Ripple\Process\Exception\ProcessException; use Ripple\Process\Runtime; use Ripple\Socket; +use Ripple\Utils\Output; use Ripple\Utils\Serialization\Zx7e; +use Throwable; use function Co\delay; use function Co\process; use function socket_create_pair; use function socket_export_stream; +use function min; +use function pow; +use function is_int; use const AF_INET; use const AF_UNIX; @@ -86,6 +92,14 @@ abstract class WorkerContext /*** @var \Ripple\Worker\Manager */ protected Manager $manager; + /*** @var array */ + protected array $restartAttempts = []; + + /** + * + */ + private const MAX_RESTART_ATTEMPTS = 10; + /** * @return void */ @@ -102,27 +116,19 @@ public function terminate(): void $this->getName() ); - // foreach ($this->runtimes as $runtime) { - // $runtime->stop(); - // } - - // foreach ($this->streams as $stream) { - // $stream->close(); - // } + $this->running = false; } /** * @Author cclilshy * @Date 2024/8/17 14:25 * - * @param Manager $manager * @param int $index * * @return bool */ - private function guard(Manager $manager, int $index): bool + private function guard(int $index): bool { - $this->manager = $manager; /*** @compatible:Windows */ $domain = !Kernel::getInstance()->supportProcessControl() ? AF_INET : AF_UNIX; @@ -138,18 +144,20 @@ private function guard(Manager $manager, int $index): bool $zx7e = new Zx7e(); $this->streams[$index] = $streamA; - $this->streams[$index]->onReadable(function (Socket $Socket) use ($streamA, $index, &$zx7e, $manager) { + $this->streams[$index]->onReadable(function (Socket $Socket) use ($streamA, $index, &$zx7e) { $content = $Socket->readContinuously(1024); foreach ($zx7e->decodeStream($content) as $string) { - $manager->onCommand(Command::fromString($string), $this->getName(), $index); + $this->manager->onCommand( + Command::fromString($string), + $this->getName(), + $index + ); } }); $this->runtimes[$index] = $runtime = process(function () use ($streamB) { $this->parent = false; $this->parentSocket = $streamB; - $this->boot(); - $this->booted = true; $this->zx7e = new Zx7e(); $this->parentSocket->onReadable(function (Socket $Socket) { @@ -180,18 +188,38 @@ private function guard(Manager $manager, int $index): bool } } }); + + try { + $this->boot(); + $this->booted = true; + } catch (Throwable $exception) { + Output::error('Worker boot failed: ' . $exception->getMessage()); + exit(128); + } })->run(); - $runtime->finally(fn () => $this->onExit($index)); + $runtime->finally(function (mixed $result) use ($index) { + if (is_int($result)) { + $exitCode = $result; + } elseif ($result instanceof ProcessException) { + $exitCode = $result->getCode(); + } else { + $exitCode = 0; + } + + $this->onExit($index, $exitCode); + }); + return true; } /** * @param int $index + * @param int $exitCode * * @return void */ - private function onExit(int $index): void + private function onExit(int $index, int $exitCode): void { if (isset($this->streams[$index])) { $this->streams[$index]->close(); @@ -202,10 +230,24 @@ private function onExit(int $index): void unset($this->runtimes[$index]); } + if ($exitCode === 128) { + Output::error("Worker '{$this->getName()}' process has exited with code 1."); + return; + } + + // Restart the process if (!$this->terminated) { + $this->restartAttempts[$index] = ($this->restartAttempts[$index] ?? 0) + 1; + + if ($this->restartAttempts[$index] > WorkerContext::MAX_RESTART_ATTEMPTS) { + Output::warning('Worker process has exited too many times, please check the code.'); + return; + } + + $delay = min(0.1 * pow(2, $this->restartAttempts[$index] - 1), 30); delay(function () use ($index) { - $this->guard($this->manager, $index); - }, 0.1); + $this->guard($index); + }, $delay); } } @@ -220,11 +262,22 @@ private function onExit(int $index): void */ public function run(Manager $manager): bool { - $this->register($manager); + $this->manager = $manager; + + try { + $this->register($manager); + } catch (Throwable $exception) { + Output::error("Worker {$this->getName()} registration failed: {$exception->getMessage()}, will be removed"); + $manager->remove($this->getName()); + return false; + } + /*** @compatible:Windows */ $count = !Kernel::getInstance()->supportProcessControl() ? 1 : $this->getCount(); for ($index = 1; $index <= $count; $index++) { - if (!$this->guard($manager, $index)) { + if (!$this->guard($index)) { + $this->terminate(); + $manager->remove($this->getName()); return false; } } diff --git a/tests/ProcessTest.php b/tests/ProcessTest.php index 4a8cb51..a03fb5b 100644 --- a/tests/ProcessTest.php +++ b/tests/ProcessTest.php @@ -3,6 +3,7 @@ namespace Tests; use PHPUnit\Framework\TestCase; +use Ripple\Kernel; use Ripple\Process\Exception\ProcessException; use Throwable; @@ -27,7 +28,11 @@ public function test_process(): void }); $runtime = $task->run(); - $exitCode = $runtime->await(); + try { + $exitCode = $runtime->await(); + } catch (ProcessException $exception) { + $exitCode = $exception->getCode(); + } $this->assertEquals($code, $exitCode, 'Process exit code'); } @@ -46,7 +51,11 @@ public function test_coroutine(): void $runtime = $task->run(); return $runtime->getPromise()->await(); }); - $exitCode = $async->await(); + try { + $exitCode = $async->await(); + } catch (ProcessException $exception) { + $exitCode = $exception->getCode(); + } $this->assertEquals($code, $exitCode, 'Process exit code'); } @@ -56,6 +65,10 @@ public function test_coroutine(): void */ public function test_parallel(): void { + if (!Kernel::getInstance()->supportParallel()) { + $this->markTestSkipped('The current environment does not support parallel processing'); + } + $code = mt_rand(0, 255); $task = process(function () use ($code) { $thread = thread(static function ($context) { @@ -72,9 +85,12 @@ public function test_parallel(): void $this->assertEquals($code, $exitCode, 'Process exit code'); }); - $runtime->except(function (ProcessException $exception) { - $this->fail($exception->getMessage()); + $runtime->except(function (ProcessException $exception) use ($code) { + $this->assertEquals($code, $exception->getCode(), 'Process exit code'); }); - $runtime->await(); + try { + $runtime->await(); + } catch (Throwable) { + } } } diff --git a/tests/WorkerTest.php b/tests/WorkerTest.php index 29a6051..30cec15 100644 --- a/tests/WorkerTest.php +++ b/tests/WorkerTest.php @@ -20,6 +20,7 @@ use function Co\cancelAll; use function Co\wait; +use function sleep; #[RunClassInSeparateProcess] class WorkerTest extends TestCase @@ -53,7 +54,7 @@ public function testWorkerRemoval(): void $manager->add($worker); $this->assertCount(1, $manager->getWorkers()); - $manager->removeWorker($worker->getName()); + $manager->remove($worker->getName()); $this->assertCount(0, $manager->getWorkers()); cancelAll(); wait(); @@ -78,6 +79,27 @@ public function testMultipleWorkers(): void cancelAll(); wait(); } + + /** + * Test the situation where the worker process exits frequently + */ + public function testWorkerFrequentExit(): void + { + $manager = new Manager(); + $worker = new UnstableWorker(); + $manager->add($worker); + + $result = $manager->run(); + $this->assertTrue($result); + + \Co\sleep(1); + + $this->assertTrue($worker->isRunning()); + + $manager->terminate(); + cancelAll(); + wait(); + } } /** @@ -107,3 +129,23 @@ protected function boot(): void { } } + +/** + * Simulate unstable work processes + */ +class UnstableWorker extends TestWorker +{ + /*** @var int */ + private int $exitCount = 0; + + /** + * @return void + */ + protected function boot(): void + { + if ($this->exitCount < 3) { + $this->exitCount++; + exit(1); + } + } +}