diff --git a/.gitignore b/.gitignore index e53dfc9..d6ec641 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ composer.lock .php-cs-fixer.cache .phpunit.cache .phpunit.result.cache +.cursor diff --git a/.tarignore b/.tarignore new file mode 100644 index 0000000..8b18053 --- /dev/null +++ b/.tarignore @@ -0,0 +1,4 @@ +/assets/ +/docs/ +/examples/ +/tests/ diff --git a/README.md b/README.md index e0928eb..0c20555 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@

-Logo +Logo

Build Status @@ -35,19 +35,22 @@ visit [Quick Deployment](https://ripple.cloudtay.com/docs/install/server) > We allow users to choose applicable component libraries by themselves. All components can be used as described in the > document without additional configuration. -**🚀 [Guzzle](https://docs.guzzlephp.org/en/stable/)** +**🚀 [Guzzle](https://docs.guzzlephp.org/en/stable/)** PHP is the most widely used HTTP client -**🔥 [AmPHP](https://amphp.org/)** +**🔥 [AmPHP](https://amphp.org/)** Provides rich PHP asynchronous components for users to encapsulate by themselves -**🚀 [Driver](https://github.com/cloudtay/ripple-driver)** +**🚀 [Laravel-ripple](https://github.com/cloudtay/laravel-ripple)** The official high-performance driver library provides seamless access to your traditional applications. -**🚀 [Webman-coroutine](https://github.com/workbunny/webman-coroutine)** +**🚀 [Workerman-ripple](https://github.com/cloudtay/workerman-ripple)** +The official high-performance driver library provides seamless access to your traditional applications. + +**🚀 [Webman-coroutine](https://github.com/workbunny/webman-coroutine)** The workbunny team's integrated webman coroutine extension provides coroutine support for Webman. -**🟢[ripple](https://github.com/cloudtay/ripple)** +**🟢 [ripple](https://github.com/cloudtay/ripple)** Provides standard coroutine architecture and tools for rapid development or packaging of traditional applications ### Event Library Guide diff --git a/README.zh_cn.md b/README.zh_cn.md index 3ce24f0..0fefa6c 100644 --- a/README.zh_cn.md +++ b/README.zh_cn.md @@ -1,5 +1,5 @@

-Logo +Logo

Build Status @@ -18,9 +18,9 @@ ripple是一个现代化的、高性能的原生PHP协程引擎, 旨在解决PHP **`🎉 加入方式`** 通过以下方式添加作者微信即可加入交流群 -| 微信二维码 | -|----------------------------------------------------------------------------------------------------------------------| -| | +| 微信二维码 | +|----------------------------------------------------| +| | ## 安装 @@ -42,20 +42,23 @@ composer require cloudtay/ripple > 我们允许用户自行选择适用的组件库, 所有组件只需像文档中描述的方式即可无需额外配置 -**🚀 [Guzzle](https://docs.guzzlephp.org/en/stable/)** -PHP应用最为广泛的HTTP客户端 +**🚀 [Guzzle](https://docs.guzzlephp.org/en/stable/)** +PHP 是使用最广泛的 HTTP 客户端 -**🔥 [AmPHP](https://amphp.org/)** -提供丰富的PHP异步组件供用户自行封装 +**🔥 [AmPHP](https://amphp.org/)** +提供丰富的 PHP 异步组件供用户自行封装 -**🚀 [Driver](https://github.com/cloudtay/ripple-driver)** -官方提供的高性能驱动库,无缝接入你的传统应用 +**🚀 [Laravel-ripple](https://github.com/cloudtay/laravel-ripple)** +官方高性能驱动程序库提供对传统应用程序的无缝访问。 -**🚀 [Webman-coroutine](https://github.com/workbunny/webman-coroutine)** -workbunny团队体统的webman协程扩展, 为Webman提供了协程支持 +**🚀 [Workerman-ripple](https://github.com/cloudtay/workerman-ripple)** +官方高性能驱动程序库提供对传统应用程序的无缝访问。 -**🟢 [ripple](https://github.com/cloudtay/ripple)** -提供标准的协程架构与工具用于迅速开发或封装传统应用 +**🚀 [webman-coroutine](https://github.com/workbunny/webman-coroutine)** +workbunny 团队的集成 webman 协程扩展为 Webman 提供协程支持。 + +**🟢[ripple](https://github.com/cloudtay/ripple)** +提供标准协程架构和工具,用于快速开发或打包传统应用程序 ### 事件库指南 diff --git a/assets/images/logo.png b/assets/images/logo.png index 0958c4b..c23619f 100644 Binary files a/assets/images/logo.png and b/assets/images/logo.png differ diff --git a/example/worker.php b/example/worker.php index 81a619b..7a9f39f 100644 --- a/example/worker.php +++ b/example/worker.php @@ -23,7 +23,7 @@ /** * @return void */ - protected function boot(): void + public function boot(): void { \var_dump($this->getIndex()); \Co\sleep(10); @@ -34,7 +34,7 @@ protected function boot(): void * * @return void */ - protected function register(Manager $manager): void + public function register(Manager $manager): void { // TODO: Implement register() method. } @@ -64,13 +64,13 @@ protected function onCommand(Command $command): void /** * @return void */ - protected function boot(): void + public function boot(): void { \Co\sleep(6); $this->forwardCommand(Command::make('test'), 'abc', 2); } - protected function register(Manager $manager): void + public function register(Manager $manager): void { // TODO: Implement register() method. } diff --git a/src/Kernel.php b/src/Kernel.php index 07f9e19..5c02166 100644 --- a/src/Kernel.php +++ b/src/Kernel.php @@ -327,6 +327,6 @@ public function getMemorySize(): int break; } - return $this->memorySize = 0; + return $this->memorySize = -1; } } diff --git a/src/Worker/Manager.php b/src/Worker/Manager.php index 9b196ca..50b10a4 100644 --- a/src/Worker/Manager.php +++ b/src/Worker/Manager.php @@ -17,6 +17,7 @@ use Ripple\Utils\Output; use Ripple\Utils\Serialization\Zx7e; use BadMethodCallException; +use Throwable; use function getmypid; use function posix_getpid; @@ -128,12 +129,12 @@ protected function __onCommand(Command $workerCommand, string $name, int $index) break; case WorkerContext::COMMAND_SYNC_ID: - if ($stream = $this->workers[$name]?->getStreams()[$index] ?? null) { + if ($workerProcess = $this->workers[$name]?->getWorkerProcess($index)) { $sync = $this->index++; $id = $workerCommand->arguments['id']; $command = Command::make(WorkerContext::COMMAND_SYNC_ID, ['sync' => $sync, 'id' => $id]); try { - $stream->write($this->zx7e->encodeFrame($command->__toString())); + $workerProcess->command($command); } catch (ConnectionException $e) { Output::warning($e->getMessage()); } @@ -183,35 +184,22 @@ public function reload(string|null $name = null): void public function sendCommand(Command $command, string|null $name = null, int|null $index = null): void { if ($name) { - if (isset($this->workers[$name])) { - if ($index) { - if ($stream = $this->workers[$name]?->getStreams()[$index] ?? null) { - try { - $stream->write($this->zx7e->encodeFrame($command->__toString())); - } catch (ConnectionException $e) { - Output::warning($e->getMessage()); - } - } - return; - } - foreach ($this->workers[$name]->getStreams() as $stream) { - try { - $stream->write($this->zx7e->encodeFrame($command->__toString())); - } catch (ConnectionException $e) { - Output::warning($e->getMessage()); - } + if (!$worker = $this->workers[$name] ?? null) { + return; + } + + $workerProcesses = $index ? [$worker->getWorkerProcess($index)] : $worker->getWorkerProcess(); + foreach ($workerProcesses as $workerProcess) { + try { + $workerProcess?->command($command); + } catch (ConnectionException $e) { + Output::warning($e->getMessage()); } } } else { $workers = $this->workers; foreach ($workers as $worker) { - foreach ($worker->getStreams() as $stream) { - try { - $stream->write($this->zx7e->encodeFrame($command->__toString())); - } catch (ConnectionException $e) { - Output::warning($e->getMessage()); - } - } + $this->sendCommand($command, $worker->getName()); } } } @@ -229,7 +217,18 @@ public function run(): bool } else { $this->processID = posix_getpid(); } + $this->zx7e = new Zx7e(); + foreach ($this->workers as $worker) { + try { + $worker->register($this); + } catch (Throwable $exception) { + Output::error("Worker {$worker->getName()} registration failed: {$exception->getMessage()}, will be removed"); + $this->remove($worker->getName()); + return false; + } + } + foreach ($this->workers as $worker) { if (!$worker->run($this)) { Output::error("worker {$worker->getName()} failed to start"); diff --git a/src/Worker/Worker.php b/src/Worker/Worker.php index bd478f7..3d772a0 100644 --- a/src/Worker/Worker.php +++ b/src/Worker/Worker.php @@ -12,7 +12,16 @@ namespace Ripple\Worker; +use Closure; use JetBrains\PhpStorm\NoReturn; +use Ripple\Socket; +use Ripple\Stream\Exception\ConnectionException; +use Ripple\Utils\Output; +use Ripple\Utils\Serialization\Zx7e; +use Throwable; + +use function Co\promise; +use function spl_object_hash; /** * @Author cclilshy @@ -20,6 +29,151 @@ */ abstract class Worker extends WorkerContext { + /** + * @Context worker + * @var Zx7e + */ + protected Zx7e $zx7e; + + /*** @var \Ripple\Socket */ + protected Socket $parentSocket; + + /*** @var array */ + private array $queue = []; + + /*** @var int */ + private int $index; + + /** + * Use the worker to send commands to other workers + * + * @param \Ripple\Worker\Command $command + * @param string|null $name + * @param int|null $index + * + * @return void + */ + public function forwardCommand(Command $command, string|null $name = null, int|null $index = null): void + { + if ($name) { + $this->sc2m(Command::make(Manager::COMMAND_COMMAND_TO_WORKER, [ + 'command' => $command, + 'name' => $name, + 'index' => $index + ])); + } else { + $this->sc2m(Command::make(Manager::COMMAND_COMMAND_TO_ALL, [ + 'command' => $command, + ])); + } + } + + /** + * @Author cclilshy + * @Date 2024/8/17 17:32 + * @return int|false + */ + protected function syncID(): int|false + { + try { + return promise(function (Closure $resolve, Closure $reject) { + $id = spl_object_hash($resolve); + $command = Command::make(WorkerContext::COMMAND_SYNC_ID, ['id' => $id]); + $this->sc2m($command); + + $this->queue[$id] = [ + 'resolve' => $resolve, + 'reject' => $reject + ]; + })->await(); + } catch (Throwable) { + return false; + } + } + + /** + * 发送指令 + * + * @Context worker + * @Author cclilshy + * @Date 2024/8/17 00:07 + * + * @param Command $command + * + * @return void + */ + public function sc2m(Command $command): void + { + try { + $this->parentSocket->write($this->zx7e->encodeFrame($command->__toString())); + } catch (ConnectionException $exception) { + Output::error($exception->getMessage()); + + // Writing a message to the parent process fails. There is only one possibility that the parent process has exited. + exit(1); + } + } + + /** + * @param \Ripple\Socket $parentStream + * @param int $index + * + * @return void + */ + final protected function onProcess(Socket $parentStream, int $index): void + { + $this->parentSocket = $parentStream; + + $this->zx7e = new Zx7e(); + $this->parentSocket->onReadable(function (Socket $Socket) { + $content = $Socket->readContinuously(1024); + foreach ($this->zx7e->decodeStream($content) as $string) { + $command = Command::fromString($string); + switch ($command->name) { + case WorkerContext::COMMAND_RELOAD: + $this->onReload(); + break; + + case WorkerContext::COMMAND_TERMINATE: + $this->onTerminate(); + exit(1); + break; + + case WorkerContext::COMMAND_SYNC_ID: + $id = $command->arguments['id']; + $sync = $command->arguments['sync']; + + if ($callback = $this->queue[$id] ?? null) { + unset($this->queue[$id]); + $callback['resolve']($sync); + } + break; + + default: + $this->onCommand($command); + } + } + }); + + try { + $this->onDefinedIndex($index); + $this->boot(); + } catch (Throwable $exception) { + Output::error('Worker boot failed: ' . $exception->getMessage()); + exit(128); + } + } + + /** + * @Context worker + * @Author cclilshy + * @Date 2024/8/16 11:53 + * @return void + */ + public function boot(): void + { + } + /** * Triggered during hot restart. The notified process should follow the hot restart rules to release resources and then exit. * @@ -57,26 +211,20 @@ protected function onCommand(Command $command): void } /** - * Use the worker to send commands to other workers - * - * @param \Ripple\Worker\Command $command - * @param string|null $name - * @param int|null $index + * @param int $index * * @return void */ - public function forwardCommand(Command $command, string|null $name = null, int|null $index = null): void + protected function onDefinedIndex(int $index): void { - if ($name) { - $this->sc2m(Command::make(Manager::COMMAND_COMMAND_TO_WORKER, [ - 'command' => $command, - 'name' => $name, - 'index' => $index - ])); - } else { - $this->sc2m(Command::make(Manager::COMMAND_COMMAND_TO_ALL, [ - 'command' => $command, - ])); - } + $this->index = $index; + } + + /** + * @return int + */ + protected function getIndex(): int + { + return $this->index; } } diff --git a/src/Worker/WorkerContext.php b/src/Worker/WorkerContext.php index 32d68ad..731801b 100644 --- a/src/Worker/WorkerContext.php +++ b/src/Worker/WorkerContext.php @@ -12,25 +12,19 @@ namespace Ripple\Worker; -use Closure; use Ripple\Kernel; use Ripple\Process\Exception\ProcessException; -use Ripple\Process\Runtime; use Ripple\Socket; -use Ripple\Stream\Exception\ConnectionException; use Ripple\Utils\Output; use Ripple\Utils\Serialization\Zx7e; -use Throwable; use function Co\delay; use function Co\process; -use function Co\promise; use function socket_create_pair; use function socket_export_stream; use function min; use function pow; use function is_int; -use function spl_object_hash; use const AF_INET; use const AF_UNIX; @@ -45,39 +39,9 @@ abstract class WorkerContext public const COMMAND_TERMINATE = '__worker__.terminate'; public const COMMAND_SYNC_ID = '__worker__.sync.id'; - /** - * @Context manager - * @var Runtime[] - */ - private array $runtimes = []; - - /** - * @Context manager - * @var Socket[] - */ - private array $streams = []; - - /** - * @Context worker - * @var Zx7e - */ - private Zx7e $zx7e; - /*** @var bool */ private bool $running = false; - /** - * @Context worker - * @var Socket - */ - private Socket $parentSocket; - - /*** @var array */ - private array $queue = []; - - /*** @var bool */ - private bool $booted = false; - /*** @var bool */ private bool $terminated = false; @@ -87,44 +51,66 @@ abstract class WorkerContext /*** @var array */ private array $restartAttempts = []; - /*** @var int */ - private int $index; - /*** @var string */ protected string $name; /*** @var int */ protected int $count = 1; + /*** @var WorkerProcess[] */ + protected array $processes = []; + /** * */ private const MAX_RESTART_ATTEMPTS = 10; /** + * @Context manager + * @Author cclilshy + * @Date 2024/8/16 11:53 + * + * @param Manager $manager + * * @return void */ - public function terminate(): void + public function register(Manager $manager): void { - if ($this->terminated) { - return; - } + } - $this->terminated = true; + /** + * @Context manager + * @Author cclilshy + * @Date 2024/8/16 23:50 + * + * @param Manager $manager + * + * @return bool + */ + public function run(Manager $manager): bool + { + $this->manager = $manager; - $this->manager->sendCommand( - Command::make(WorkerContext::COMMAND_TERMINATE), - $this->getName() - ); + /*** @compatible:Windows */ + $count = !Kernel::getInstance()->supportProcessControl() ? 1 : $this->getCount(); + for ($index = 1; $index <= $count; $index++) { + if (!$this->guard($index)) { + $this->terminate(); + $manager->remove($this->getName()); + return false; + } + } - $this->running = false; + $this->running = true; + return true; } + /** * @Author cclilshy * @Date 2024/8/17 14:25 * - * @param int $index + * @param int $index * * @return bool */ @@ -142,10 +128,11 @@ private function guard(int $index): bool $streamA->setBlocking(false); $streamB->setBlocking(false); $streamA->onClose(fn () => $streamB->close()); + $zx7e = new Zx7e(); + $runtime = process(fn () => $this->onProcess($streamB, $index))->run(); + $this->processes[$index] = $workerProcess = new WorkerProcess($runtime, $streamA, $index); - $zx7e = new Zx7e(); - $this->streams[$index] = $streamA; - $this->streams[$index]->onReadable(function (Socket $Socket) use ($streamA, $index, &$zx7e) { + $workerProcess->getStream()->onReadable(function (Socket $Socket) use ($streamA, $index, &$zx7e) { $content = $Socket->readContinuously(1024); foreach ($zx7e->decodeStream($content) as $string) { $this->manager->onCommand( @@ -156,51 +143,7 @@ private function guard(int $index): bool } }); - $this->runtimes[$index] = $runtime = process(function () use ($streamB, $index) { - $this->parentSocket = $streamB; - - $this->zx7e = new Zx7e(); - $this->parentSocket->onReadable(function (Socket $Socket) { - $content = $Socket->readContinuously(1024); - foreach ($this->zx7e->decodeStream($content) as $string) { - $command = Command::fromString($string); - switch ($command->name) { - case WorkerContext::COMMAND_RELOAD: - $this->onReload(); - break; - - case WorkerContext::COMMAND_TERMINATE: - $this->onTerminate(); - exit(1); - break; - - case WorkerContext::COMMAND_SYNC_ID: - $id = $command->arguments['id']; - $sync = $command->arguments['sync']; - - if ($callback = $this->queue[$id] ?? null) { - unset($this->queue[$id]); - $callback['resolve']($sync); - } - break; - - default: - $this->onCommand($command); - } - } - }); - - try { - $this->onDefinedIndex($index); - $this->boot(); - $this->booted = true; - } catch (Throwable $exception) { - Output::error('Worker boot failed: ' . $exception->getMessage()); - exit(128); - } - })->run(); - - $runtime->finally(function (mixed $result) use ($index) { + $workerProcess->getRuntime()->finally(function (mixed $result) use ($index) { if (is_int($result)) { $exitCode = $result; } elseif ($result instanceof ProcessException) { @@ -223,13 +166,8 @@ private function guard(int $index): bool */ private function onExit(int $index, int $exitCode): void { - if (isset($this->streams[$index])) { - $this->streams[$index]->close(); - unset($this->streams[$index]); - } - - if (isset($this->runtimes[$index])) { - unset($this->runtimes[$index]); + if (isset($this->processes[$index])) { + unset($this->processes[$index]); } if ($exitCode === 128) { @@ -253,39 +191,24 @@ private function onExit(int $index, int $exitCode): void } } + /** - * @Context manager - * @Author cclilshy - * @Date 2024/8/16 23:50 - * - * @param Manager $manager - * - * @return bool + * @return void */ - public function run(Manager $manager): bool + public function terminate(): void { - $this->manager = $manager; - - try { - $this->register($manager); - } catch (Throwable $exception) { - Output::error("Worker {$this->getName()} registration failed: {$exception->getMessage()}, will be removed"); - $manager->remove($this->getName()); - return false; + if ($this->terminated) { + return; } - /*** @compatible:Windows */ - $count = !Kernel::getInstance()->supportProcessControl() ? 1 : $this->getCount(); - for ($index = 1; $index <= $count; $index++) { - if (!$this->guard($index)) { - $this->terminate(); - $manager->remove($this->getName()); - return false; - } - } + $this->terminated = true; - $this->running = true; - return true; + $this->manager->sendCommand( + Command::make(WorkerContext::COMMAND_TERMINATE), + $this->getName() + ); + + $this->running = false; } /** @@ -322,138 +245,23 @@ public function isRunning(): bool } /** - * 发送指令 - * - * @Context worker - * @Author cclilshy - * @Date 2024/8/17 00:07 - * - * @param Command $command - * - * @return void - */ - public function sc2m(Command $command): void - { - try { - $this->parentSocket->write($this->zx7e->encodeFrame($command->__toString())); - } catch (ConnectionException $exception) { - Output::error($exception->getMessage()); - - // Writing a message to the parent process fails. There is only one possibility that the parent process has exited. - exit(1); - } - } - - /** - * @return \Ripple\Socket[] - */ - public function getStreams(): array - { - return $this->streams; - } - - /** + * @param int|null $index * - * @return array - */ - public function getRuntimes(): array - { - return $this->runtimes; - } - - /** - * @Author cclilshy - * @Date 2024/8/17 17:32 - * @return int|false + * @return WorkerProcess[]|WorkerProcess|null */ - protected function syncID(): int|false + public function getWorkerProcess(int|null $index = null): array|WorkerProcess|null { - try { - return promise(function (Closure $resolve, Closure $reject) { - $id = spl_object_hash($resolve); - $command = Command::make(WorkerContext::COMMAND_SYNC_ID, ['id' => $id]); - $this->sc2m($command); - - $this->queue[$id] = [ - 'resolve' => $resolve, - 'reject' => $reject - ]; - })->await(); - } catch (Throwable) { - return false; + if (!$index) { + return $this->processes; } + return $this->processes[$index] ?? null; } /** - * @return bool - */ - public function isBooted(): bool - { - return $this->booted; - } - - /** - * @param int $index - * - * @return void - */ - private function onDefinedIndex(int $index): void - { - $this->index = $index; - } - - /** - * @return int - */ - protected function getIndex(): int - { - return $this->index; - } - - /** - * @Context manager - * @Author cclilshy - * @Date 2024/8/16 11:53 - * - * @param Manager $manager - * - * @return void - */ - abstract protected function register(Manager $manager): void; - - /** - * @Context worker - * @Author cclilshy - * @Date 2024/8/16 11:53 - * @return void - */ - abstract protected function boot(): void; - - /** - * Triggered during hot restart. The notified process should follow the hot restart rules to release resources and then exit. - * - * @Context worker - * @Author cclilshy - * @Date 2024/8/17 00:59 - * @return void - */ - abstract protected function onReload(): void; - - /** - * @return void - */ - abstract protected function onTerminate(): void; - - /** - * Triggered when command is received - * - * @Context worker - * @Author cclilshy - * @Date 2024/8/16 11:54 - * - * @param Command $command + * @param \Ripple\Socket $parentStream + * @param int $index * * @return void */ - abstract protected function onCommand(Command $command): void; + abstract protected function onProcess(Socket $parentStream, int $index): void; } diff --git a/src/Worker/WorkerProcess.php b/src/Worker/WorkerProcess.php new file mode 100644 index 0000000..e8d04d3 --- /dev/null +++ b/src/Worker/WorkerProcess.php @@ -0,0 +1,70 @@ +zx7e = new Zx7e(); + } + + /** + * @return \Ripple\Process\Runtime + */ + public function getRuntime(): Runtime + { + return $this->runtime; + } + + /** + * @return \Ripple\Stream + */ + public function getStream(): Stream + { + return $this->stream; + } + + /** + * @return int + */ + public function getIndex(): int + { + return $this->index; + } + + /** + * @param \Ripple\Worker\Command $command + * + * @return void + * @throws \Ripple\Stream\Exception\ConnectionException + */ + public function command(Command $command): void + { + $this->stream->write($this->zx7e->encodeFrame($command->__toString())); + } +} diff --git a/tests/WorkerTest.php b/tests/WorkerTest.php index 30cec15..c9d9fef 100644 --- a/tests/WorkerTest.php +++ b/tests/WorkerTest.php @@ -20,7 +20,6 @@ use function Co\cancelAll; use function Co\wait; -use function sleep; #[RunClassInSeparateProcess] class WorkerTest extends TestCase @@ -118,14 +117,14 @@ class TestWorker extends Worker * * @return void */ - protected function register(Manager $manager): void + public function register(Manager $manager): void { } /** * @return void */ - protected function boot(): void + public function boot(): void { } } @@ -141,7 +140,7 @@ class UnstableWorker extends TestWorker /** * @return void */ - protected function boot(): void + public function boot(): void { if ($this->exitCount < 3) { $this->exitCount++;