diff --git a/src/Utils/Output.php b/src/Utils/Output.php index e2a8f71..dcf63f9 100644 --- a/src/Utils/Output.php +++ b/src/Utils/Output.php @@ -73,7 +73,7 @@ public static function info(string $title, string ...$contents): void */ public static function writeln(string $message): void { - fwrite(STDOUT, $message . PHP_EOL); + Output::write($message . PHP_EOL); } /** @@ -97,4 +97,14 @@ public static function error(string $title, string ...$contents): void { Output::writeln("\033[31m{$title}\033[0m \033[33m" . implode(' ', $contents) . "\033[0m"); } + + /** + * @param string $message + * + * @return void + */ + public static function write(string $message): void + { + fwrite(STDOUT, $message); + } } diff --git a/src/Worker/Manager.php b/src/Worker/Manager.php index b456f9b..de8231d 100644 --- a/src/Worker/Manager.php +++ b/src/Worker/Manager.php @@ -16,6 +16,7 @@ use Ripple\Stream\Exception\ConnectionException; use Ripple\Utils\Output; use Ripple\Utils\Serialization\Zx7e; +use BadMethodCallException; use function getmypid; use function posix_getpid; @@ -23,11 +24,12 @@ /** * @Author cclilshy * @Date 2024/8/16 11:51 + * @method void onCommand(Command $workerCommand, string $name, int $index) */ class Manager { - public const COMMAND_COMMAND_TO_WORKER = 'manager.commandToWorker'; - public const COMMAND_COMMAND_TO_ALL = 'manager.commandToAll'; + public const COMMAND_COMMAND_TO_WORKER = '__manager__.commandToWorker'; + public const COMMAND_COMMAND_TO_ALL = '__manager__.commandToAll'; /** * @var Worker[] @@ -56,8 +58,19 @@ class Manager * @param Worker $worker * * @return void + * @deprecated */ public function addWorker(Worker $worker): void + { + $this->add($worker); + } + + /** + * @param \Ripple\Worker\Worker $worker + * + * @return void + */ + public function add(Worker $worker): void { $workerName = $worker->getName(); if (isset($this->workers[$workerName])) { @@ -77,41 +90,31 @@ public function addWorker(Worker $worker): void */ public function removeWorker(string $name): void { - if ($this->workers[$name] ?? null) { - $this->stopWorker($name); + if ($worker = $this->workers[$name] ?? null) { + $worker->isRunning() && $this->terminate($name); unset($this->workers[$name]); } } /** * @Author cclilshy - * @Date 2024/8/17 10:14 + * @Date 2024/8/17 00:44 * - * @param string $name + * @param string|null $name * * @return void */ - public function stopWorker(string $name): void + public function terminate(string|null $name = null): void { - if ($worker = $this->workers[$name] ?? null) { - foreach ($worker->runtimes as $runtime) { - $runtime->stop(); - } - foreach ($worker->streams as $stream) { - $stream->close(); + if ($name) { + if ($worker = $this->workers[$name] ?? null) { + $worker->terminate(); } + return; } - } - /** - * @Author cclilshy - * @Date 2024/8/17 00:44 - * @return void - */ - public function stop(): void - { foreach ($this->workers as $worker) { - $this->stopWorker($worker->getName()); + $worker->terminate(); } } @@ -132,31 +135,37 @@ public function getWorkers(): array * @param int $index * * @return void - * @throws ConnectionException */ - public function onCommand(Command $workerCommand, string $name, int $index): void + protected function __onCommand(Command $workerCommand, string $name, int $index): void { switch ($workerCommand->name) { - case Worker::COMMAND_RELOAD: + case WorkerContext::COMMAND_RELOAD: $name = $workerCommand->arguments['name'] ?? null; $this->reload($name); - return; + break; + + case WorkerContext::COMMAND_SYNC_ID: + if ($stream = $this->workers[$name]?->streams[$index] ?? null) { + $sync = $this->index++; + $id = $workerCommand->arguments['id']; + $command = Command::make(WorkerContext::COMMAND_SYNC_ID, ['sync' => $sync, 'id' => $id]); + try { + $stream->write($this->zx7e->encodeFrame($command->__toString())); + } catch (ConnectionException $e) { + Output::warning($e->getMessage()); + } + } + break; + case Manager::COMMAND_COMMAND_TO_WORKER: $command = $workerCommand->arguments['command']; $target = $workerCommand->arguments['name']; - $this->commandToWorker($command, $target); + $this->sendCommand($command, $target); break; + case Manager::COMMAND_COMMAND_TO_ALL: $command = $workerCommand->arguments['command']; - $this->commandToAll($command); - break; - case Worker::COMMAND_SYNC_ID: - if ($stream = $this->workers[$name]?->streams[$index] ?? null) { - $sync = $this->index++; - $id = $workerCommand->arguments['id']; - $command = Command::make(Worker::COMMAND_SYNC_ID, ['sync' => $sync, 'id' => $id]); - $stream->write($this->zx7e->encodeFrame($command->__toString())); - } + $this->sendCommand($command); break; } } @@ -168,17 +177,16 @@ public function onCommand(Command $workerCommand, string $name, int $index): voi * @param string|null $name * * @return void - * @throws ConnectionException */ public function reload(string|null $name = null): void { if ($name) { if (isset($this->workers[$name])) { - $this->commandToWorker(Command::make(Worker::COMMAND_RELOAD), $name); + $this->sendCommand(Command::make(WorkerContext::COMMAND_RELOAD), $name); } return; } - $this->commandToAll(Command::make(Worker::COMMAND_RELOAD)); + $this->sendCommand(Command::make(WorkerContext::COMMAND_RELOAD)); } /** @@ -189,13 +197,17 @@ public function reload(string|null $name = null): void * @param string $name * * @return void - * @throws ConnectionException + * @deprecated */ public function commandToWorker(Command $command, string $name): void { if (isset($this->workers[$name])) { foreach ($this->workers[$name]->streams as $stream) { - $stream->write($this->zx7e->encodeFrame($command->__toString())); + try { + $stream->write($this->zx7e->encodeFrame($command->__toString())); + } catch (ConnectionException $e) { + Output::warning($e->getMessage()); + } } } } @@ -207,14 +219,50 @@ public function commandToWorker(Command $command, string $name): void * @param Command $command * * @return void - * @throws ConnectionException + * @deprecated */ public function commandToAll(Command $command): void { $workers = $this->workers; foreach ($workers as $worker) { foreach ($worker->streams as $stream) { - $stream->write($this->zx7e->encodeFrame($command->__toString())); + try { + $stream->write($this->zx7e->encodeFrame($command->__toString())); + } catch (ConnectionException $e) { + Output::warning($e->getMessage()); + } + } + } + } + + /** + * @param \Ripple\Worker\Command $command + * @param string|null $name + * + * @return void + */ + public function sendCommand(Command $command, string|null $name = null): void + { + if ($name) { + if (isset($this->workers[$name])) { + foreach ($this->workers[$name]->streams as $stream) { + try { + $stream->write($this->zx7e->encodeFrame($command->__toString())); + } catch (ConnectionException $e) { + Output::warning($e->getMessage()); + } + } + } + } else { + $workers = $this->workers; + foreach ($workers as $worker) { + foreach ($worker->streams as $stream) { + try { + $stream->write($this->zx7e->encodeFrame($command->__toString())); + } catch (ConnectionException $e) { + Output::warning($e->getMessage()); + } + } } } } @@ -234,22 +282,40 @@ public function run(): bool } $this->zx7e = new Zx7e(); foreach ($this->workers as $worker) { - $worker->register($this); - if (!$worker($this)) { + if (!$worker->run($this)) { Output::error("worker {$worker->getName()} failed to start"); - $this->stop(); + $this->terminate(); } } return true; } + /** + * + */ public function __destruct() { if (!Kernel::getInstance()->supportProcessControl()) { return; } if (isset($this->processID) && $this->processID === posix_getpid()) { - $this->stop(); + $this->terminate(); } } + + /** + * @param string $name + * @param array $arguments + * + * @return void + */ + public function __call(string $name, array $arguments): void + { + if ($name === 'onCommand') { + $this->__onCommand(...$arguments); + return; + } + + throw new BadMethodCallException("Call to undefined method " . static::class . "::{$name}()"); + } } diff --git a/src/Worker/Worker.php b/src/Worker/Worker.php index de4d7e3..6c3bb8b 100644 --- a/src/Worker/Worker.php +++ b/src/Worker/Worker.php @@ -14,218 +14,48 @@ use Closure; use JetBrains\PhpStorm\NoReturn; -use Ripple\Kernel; -use Ripple\Process\Runtime; -use Ripple\Socket; use Ripple\Stream\Exception\ConnectionException; use Ripple\Utils\Output; -use Ripple\Utils\Serialization\Zx7e; use Throwable; -use function Co\delay; -use function Co\process; use function Co\promise; -use function socket_create_pair; -use function socket_export_stream; use function spl_object_hash; -use const AF_INET; -use const AF_UNIX; -use const SOCK_STREAM; - /** * @Author cclilshy * @Date 2024/8/16 11:53 */ -abstract class Worker +abstract class Worker extends WorkerContext { - public const COMMAND_RELOAD = 'worker.reload'; - public const COMMAND_SYNC_ID = 'worker.sync.id'; - - /** - * @Context manager - * @var Runtime[] - */ - public array $runtimes = array(); - - /** - * @Context manager - * @var Socket[] - */ - public array $streams = array(); - /** - * @Context share - * @var bool - */ - protected bool $parent = true; - - /** - * @Context worker - * @var Zx7e - */ - protected Zx7e $zx7e; - - /** - * @Context worker - * @var Socket - */ - protected Socket $parentSocket; - - /** - * @var int - */ - protected int $count = 1; - - /** - * @var string - */ - protected string $name; - - /** - * @var array - */ - protected array $queue = []; - - /** - * @Context manager - * @Author cclilshy - * @Date 2024/8/16 23:50 - * - * @param Manager $manager - * * @return bool */ - public function __invoke(Manager $manager): bool + public function isBooted(): bool { - /*** @compatible:Windows */ - $count = !Kernel::getInstance()->supportProcessControl() ? 1 : $this->getCount(); - for ($index = 1; $index <= $count; $index++) { - if (!$this->guard($manager, $index)) { - return false; - } - } - return true; + return $this->booted; } /** + * 发送指令 + * * @Context worker * @Author cclilshy - * @Date 2024/8/17 01:03 + * @Date 2024/8/17 00:07 * - * @param Command $workerCommand + * @param Command $command * * @return void */ - protected function __onCommand(Command $workerCommand): void - { - switch ($workerCommand->name) { - case Worker::COMMAND_RELOAD: - $this->onReload(); - break; - - case Worker::COMMAND_SYNC_ID: - $id = $workerCommand->arguments['id']; - $sync = $workerCommand->arguments['sync']; - - if ($callback = $this->queue[$id] ?? null) { - unset($this->queue[$id]); - $callback['resolve']($sync); - } - break; - - default: - $this->onCommand($workerCommand); - } - } - - /** - * @Author cclilshy - * @Date 2024/8/17 14:25 - * - * @param Manager $manager - * @param int $index - * - * @return bool - */ - protected function guard(Manager $manager, int $index): bool + public function sendCommandToManager(Command $command): void { - /*** @compatible:Windows */ - $domain = !Kernel::getInstance()->supportProcessControl() ? AF_INET : AF_UNIX; - - if (!socket_create_pair($domain, SOCK_STREAM, 0, $sockets)) { - return false; - } - - $streamA = new Socket(socket_export_stream($sockets[0])); - $streamB = new Socket(socket_export_stream($sockets[1])); - $streamA->setBlocking(false); - $streamB->setBlocking(false); - $streamA->onClose(fn () => $streamB->close()); - - $zx7e = new Zx7e(); - $this->streams[$index] = $streamA; - $this->streams[$index]->onReadable(function (Socket $Socket) use ($streamA, $index, &$zx7e, $manager) { - $content = $Socket->readContinuously(1024); - foreach ($zx7e->decodeStream($content) as $string) { - $manager->onCommand(Command::fromString($string), $this->getName(), $index); - } - }); - - $this->runtimes[$index] = $runtime = process(function () use ($streamB) { - $this->parent = false; - $this->parentSocket = $streamB; - $this->boot(); - - $this->zx7e = new Zx7e(); - $this->parentSocket->onReadable(function (Socket $Socket) { - $content = $Socket->readContinuously(1024); - foreach ($this->zx7e->decodeStream($content) as $string) { - $this->__onCommand(Command::fromString($string)); - } - }); - })->run(); - - $runtime->finally(function () use ($manager, $index) { - if (isset($this->streams[$index])) { - $this->streams[$index]->close(); - unset($this->streams[$index]); - } - - if (isset($this->runtimes[$index])) { - unset($this->runtimes[$index]); - } - delay(function () use ($manager, $index) { - $this->guard($manager, $index); - }, 0.1); - }); - - return true; - } + try { + $this->parentSocket->write($this->zx7e->encodeFrame($command->__toString())); + } catch (ConnectionException $exception) { + Output::error($exception->getMessage()); - /** - * @Context share - * @Author cclilshy - * @Date 2024/8/17 01:05 - * @return string - */ - public function getName(): string - { - if (!isset($this->name)) { - $this->name = static::class; + // Writing a message to the parent process fails. There is only one possibility that the parent process has exited. + exit(1); } - return $this->name; - } - - /** - * @Context share - * @Author cclilshy - * @Date 2024/8/17 01:06 - * @return int - */ - public function getCount(): int - { - return $this->count; } /** @@ -239,36 +69,35 @@ public function getCount(): int * @param string $name * * @return void + * @deprecated */ public function commandToWorker(Command $command, string $name): void { - $this->command(Command::make(Manager::COMMAND_COMMAND_TO_WORKER, [ + $this->sendCommandToManager(Command::make(Manager::COMMAND_COMMAND_TO_WORKER, [ 'command' => $command, 'name' => $name ])); } /** - * 发送指令 - * - * @Context worker - * @Author cclilshy - * @Date 2024/8/17 00:07 - * - * @param Command $command + * @param \Ripple\Worker\Command $command + * @param string|null $name * - * @return void + * @return bool */ - public function command(Command $command): void + public function sendCommand(Command $command, string|null $name = null): bool { - try { - $this->parentSocket->write($this->zx7e->encodeFrame($command->__toString())); - } catch (ConnectionException $exception) { - Output::error($exception->getMessage()); - - // Writing a message to the parent process fails. There is only one possibility that the parent process has exited. - exit(1); + if ($name) { + $this->sendCommandToManager(Command::make(Manager::COMMAND_COMMAND_TO_WORKER, [ + 'command' => $command, + 'name' => $name + ])); + } else { + $this->sendCommandToManager(Command::make(Manager::COMMAND_COMMAND_TO_ALL, [ + 'command' => $command + ])); } + return true; } /** @@ -279,10 +108,11 @@ public function command(Command $command): void * @param Command $command * * @return void + * @deprecated */ public function commandToAll(Command $command): void { - $this->command(Command::make(Manager::COMMAND_COMMAND_TO_ALL, [ + $this->sendCommandToManager(Command::make(Manager::COMMAND_COMMAND_TO_ALL, [ 'command' => $command ])); } @@ -297,8 +127,8 @@ public function syncID(): int|false try { return promise(function (Closure $resolve, Closure $reject) { $id = spl_object_hash($resolve); - $command = Command::make(Worker::COMMAND_SYNC_ID, ['id' => $id]); - $this->command($command); + $command = Command::make(WorkerContext::COMMAND_SYNC_ID, ['id' => $id]); + $this->sendCommandToManager($command); $this->queue[$id] = [ 'resolve' => $resolve, @@ -318,42 +148,31 @@ public function syncID(): int|false * @Date 2024/8/17 00:59 * @return void */ - #[NoReturn] public function onReload(): void + #[NoReturn] protected function onReload(): void { exit(0); } /** - * Triggered when command is received - * - * @Context worker - * @Author cclilshy - * @Date 2024/8/16 11:54 - * - * @param Command $workerCommand - * * @return void */ - public function onCommand(Command $workerCommand): void + #[NoReturn] protected function onTerminate(): void { + exit(0); } /** - * @Context manager - * @Author cclilshy - * @Date 2024/8/16 11:53 - * - * @param Manager $manager + * Triggered when command is received * - * @return void - */ - abstract public function register(Manager $manager): void; - - /** * @Context worker * @Author cclilshy - * @Date 2024/8/16 11:53 + * @Date 2024/8/16 11:54 + * + * @param Command $command + * * @return void */ - abstract public function boot(): void; + public function onCommand(Command $command): void + { + } } diff --git a/src/Worker/WorkerContext.php b/src/Worker/WorkerContext.php new file mode 100644 index 0000000..6a6c704 --- /dev/null +++ b/src/Worker/WorkerContext.php @@ -0,0 +1,315 @@ +terminated) { + return; + } + + $this->terminated = true; + + $this->manager->sendCommand( + Command::make(WorkerContext::COMMAND_TERMINATE), + $this->getName() + ); + + // foreach ($this->runtimes as $runtime) { + // $runtime->stop(); + // } + + // foreach ($this->streams as $stream) { + // $stream->close(); + // } + } + + /** + * @Author cclilshy + * @Date 2024/8/17 14:25 + * + * @param Manager $manager + * @param int $index + * + * @return bool + */ + private function guard(Manager $manager, int $index): bool + { + $this->manager = $manager; + /*** @compatible:Windows */ + $domain = !Kernel::getInstance()->supportProcessControl() ? AF_INET : AF_UNIX; + + if (!socket_create_pair($domain, SOCK_STREAM, 0, $sockets)) { + return false; + } + + $streamA = new Socket(socket_export_stream($sockets[0])); + $streamB = new Socket(socket_export_stream($sockets[1])); + $streamA->setBlocking(false); + $streamB->setBlocking(false); + $streamA->onClose(fn () => $streamB->close()); + + $zx7e = new Zx7e(); + $this->streams[$index] = $streamA; + $this->streams[$index]->onReadable(function (Socket $Socket) use ($streamA, $index, &$zx7e, $manager) { + $content = $Socket->readContinuously(1024); + foreach ($zx7e->decodeStream($content) as $string) { + $manager->onCommand(Command::fromString($string), $this->getName(), $index); + } + }); + + $this->runtimes[$index] = $runtime = process(function () use ($streamB) { + $this->parent = false; + $this->parentSocket = $streamB; + $this->boot(); + $this->booted = true; + + $this->zx7e = new Zx7e(); + $this->parentSocket->onReadable(function (Socket $Socket) { + $content = $Socket->readContinuously(1024); + foreach ($this->zx7e->decodeStream($content) as $string) { + $command = Command::fromString($string); + switch ($command->name) { + case WorkerContext::COMMAND_RELOAD: + $this->onReload(); + break; + + case WorkerContext::COMMAND_TERMINATE: + $this->onTerminate(); + break; + + case WorkerContext::COMMAND_SYNC_ID: + $id = $command->arguments['id']; + $sync = $command->arguments['sync']; + + if ($callback = $this->queue[$id] ?? null) { + unset($this->queue[$id]); + $callback['resolve']($sync); + } + break; + + default: + $this->onCommand($command); + } + } + }); + })->run(); + + $runtime->finally(fn () => $this->onExit($index)); + return true; + } + + /** + * @param int $index + * + * @return void + */ + private function onExit(int $index): void + { + if (isset($this->streams[$index])) { + $this->streams[$index]->close(); + unset($this->streams[$index]); + } + + if (isset($this->runtimes[$index])) { + unset($this->runtimes[$index]); + } + + if (!$this->terminated) { + delay(function () use ($index) { + $this->guard($this->manager, $index); + }, 0.1); + } + } + + /** + * @Context manager + * @Author cclilshy + * @Date 2024/8/16 23:50 + * + * @param Manager $manager + * + * @return bool + */ + public function run(Manager $manager): bool + { + $this->register($manager); + /*** @compatible:Windows */ + $count = !Kernel::getInstance()->supportProcessControl() ? 1 : $this->getCount(); + for ($index = 1; $index <= $count; $index++) { + if (!$this->guard($manager, $index)) { + return false; + } + } + + $this->running = true; + return true; + } + + /** + * @Context share + * @Author cclilshy + * @Date 2024/8/17 01:05 + * @return string + */ + public function getName(): string + { + if (!isset($this->name)) { + $this->name = static::class; + } + return $this->name; + } + + /** + * @Context share + * @Author cclilshy + * @Date 2024/8/17 01:06 + * @return int + */ + public function getCount(): int + { + return $this->count; + } + + /** + * @return bool + */ + public function isRunning(): bool + { + return $this->running; + } + + /** + * @Context manager + * @Author cclilshy + * @Date 2024/8/16 11:53 + * + * @param Manager $manager + * + * @return void + */ + abstract protected function register(Manager $manager): void; + + /** + * @Context worker + * @Author cclilshy + * @Date 2024/8/16 11:53 + * @return void + */ + abstract protected function boot(): void; + + /** + * Triggered during hot restart. The notified process should follow the hot restart rules to release resources and then exit. + * + * @Context worker + * @Author cclilshy + * @Date 2024/8/17 00:59 + * @return void + */ + abstract protected function onReload(): void; + + /** + * @return void + */ + abstract protected function onTerminate(): void; + + /** + * Triggered when command is received + * + * @Context worker + * @Author cclilshy + * @Date 2024/8/16 11:54 + * + * @param Command $command + * + * @return void + */ + abstract public function onCommand(Command $command): void; +} diff --git a/tests/WorkerTest.php b/tests/WorkerTest.php new file mode 100644 index 0000000..29a6051 --- /dev/null +++ b/tests/WorkerTest.php @@ -0,0 +1,109 @@ +add($worker); + $this->assertCount(1, $manager->getWorkers()); + $result = $manager->run(); + $this->assertTrue($result); + $this->assertEquals('Tests\TestWorker', $worker->getName()); + $this->assertEquals(1, $worker->getCount()); + $manager->terminate(); + cancelAll(); + wait(); + } + + /** + * Test worker removal + */ + public function testWorkerRemoval(): void + { + $manager = new Manager(); + $worker = new TestWorker(); + + $manager->add($worker); + $this->assertCount(1, $manager->getWorkers()); + $manager->removeWorker($worker->getName()); + $this->assertCount(0, $manager->getWorkers()); + cancelAll(); + wait(); + } + + /** + * @return void + */ + public function testMultipleWorkers(): void + { + $manager = new Manager(); + $worker1 = new TestWorker(); + $worker2 = new TestWorker(); + $worker2->name = 'worker2'; + $manager->add($worker1); + $manager->add($worker2); + $this->assertCount(2, $manager->getWorkers()); + $manager->run(); + $command = Command::make('test.broadcast', ['data' => 'test']); + $manager->sendCommand($command); + $manager->terminate(); + cancelAll(); + wait(); + } +} + +/** + * Test Worker implementation + */ +class TestWorker extends Worker +{ + /*** @var string */ + public string $name = 'Tests\TestWorker'; + + /*** @var int */ + protected int $count = 1; + + /** + * @param \Ripple\Worker\Manager $manager + * + * @return void + */ + protected function register(Manager $manager): void + { + } + + /** + * @return void + */ + protected function boot(): void + { + } +}