Skip to content

Commit

Permalink
Update: Decoupling the worker module
Browse files Browse the repository at this point in the history
  • Loading branch information
cclilshy committed Jan 31, 2025
1 parent dc73eb4 commit c0ea8bd
Show file tree
Hide file tree
Showing 5 changed files with 614 additions and 274 deletions.
12 changes: 11 additions & 1 deletion src/Utils/Output.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static function info(string $title, string ...$contents): void
*/
public static function writeln(string $message): void
{
fwrite(STDOUT, $message . PHP_EOL);
Output::write($message . PHP_EOL);
}

/**
Expand All @@ -97,4 +97,14 @@ public static function error(string $title, string ...$contents): void
{
Output::writeln("\033[31m{$title}\033[0m \033[33m" . implode(' ', $contents) . "\033[0m");
}

/**
* @param string $message
*
* @return void
*/
public static function write(string $message): void
{
fwrite(STDOUT, $message);
}
}
164 changes: 117 additions & 47 deletions src/Worker/Manager.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@
use Ripple\Stream\Exception\ConnectionException;
use Ripple\Utils\Output;
use Ripple\Utils\Serialization\Zx7e;
use BadMethodCallException;

use function getmypid;
use function posix_getpid;

/**
* @Author cclilshy
* @Date 2024/8/16 11:51
* @method void onCommand(Command $workerCommand, string $name, int $index)
*/
class Manager
{
public const COMMAND_COMMAND_TO_WORKER = 'manager.commandToWorker';
public const COMMAND_COMMAND_TO_ALL = 'manager.commandToAll';
public const COMMAND_COMMAND_TO_WORKER = '__manager__.commandToWorker';
public const COMMAND_COMMAND_TO_ALL = '__manager__.commandToAll';

/**
* @var Worker[]
Expand Down Expand Up @@ -56,8 +58,19 @@ class Manager
* @param Worker $worker
*
* @return void
* @deprecated
*/
public function addWorker(Worker $worker): void
{
$this->add($worker);
}

/**
* @param \Ripple\Worker\Worker $worker
*
* @return void
*/
public function add(Worker $worker): void
{
$workerName = $worker->getName();
if (isset($this->workers[$workerName])) {
Expand All @@ -77,41 +90,31 @@ public function addWorker(Worker $worker): void
*/
public function removeWorker(string $name): void
{
if ($this->workers[$name] ?? null) {
$this->stopWorker($name);
if ($worker = $this->workers[$name] ?? null) {
$worker->isRunning() && $this->terminate($name);
unset($this->workers[$name]);
}
}

/**
* @Author cclilshy
* @Date 2024/8/17 10:14
* @Date 2024/8/17 00:44
*
* @param string $name
* @param string|null $name
*
* @return void
*/
public function stopWorker(string $name): void
public function terminate(string|null $name = null): void
{
if ($worker = $this->workers[$name] ?? null) {
foreach ($worker->runtimes as $runtime) {
$runtime->stop();
}
foreach ($worker->streams as $stream) {
$stream->close();
if ($name) {
if ($worker = $this->workers[$name] ?? null) {
$worker->terminate();
}
return;
}
}

/**
* @Author cclilshy
* @Date 2024/8/17 00:44
* @return void
*/
public function stop(): void
{
foreach ($this->workers as $worker) {
$this->stopWorker($worker->getName());
$worker->terminate();
}
}

Expand All @@ -132,31 +135,37 @@ public function getWorkers(): array
* @param int $index
*
* @return void
* @throws ConnectionException
*/
public function onCommand(Command $workerCommand, string $name, int $index): void
protected function __onCommand(Command $workerCommand, string $name, int $index): void
{
switch ($workerCommand->name) {
case Worker::COMMAND_RELOAD:
case WorkerContext::COMMAND_RELOAD:
$name = $workerCommand->arguments['name'] ?? null;
$this->reload($name);
return;
break;

case WorkerContext::COMMAND_SYNC_ID:
if ($stream = $this->workers[$name]?->streams[$index] ?? null) {
$sync = $this->index++;
$id = $workerCommand->arguments['id'];
$command = Command::make(WorkerContext::COMMAND_SYNC_ID, ['sync' => $sync, 'id' => $id]);
try {
$stream->write($this->zx7e->encodeFrame($command->__toString()));
} catch (ConnectionException $e) {
Output::warning($e->getMessage());
}
}
break;

case Manager::COMMAND_COMMAND_TO_WORKER:
$command = $workerCommand->arguments['command'];
$target = $workerCommand->arguments['name'];
$this->commandToWorker($command, $target);
$this->sendCommand($command, $target);
break;

case Manager::COMMAND_COMMAND_TO_ALL:
$command = $workerCommand->arguments['command'];
$this->commandToAll($command);
break;
case Worker::COMMAND_SYNC_ID:
if ($stream = $this->workers[$name]?->streams[$index] ?? null) {
$sync = $this->index++;
$id = $workerCommand->arguments['id'];
$command = Command::make(Worker::COMMAND_SYNC_ID, ['sync' => $sync, 'id' => $id]);
$stream->write($this->zx7e->encodeFrame($command->__toString()));
}
$this->sendCommand($command);
break;
}
}
Expand All @@ -168,17 +177,16 @@ public function onCommand(Command $workerCommand, string $name, int $index): voi
* @param string|null $name
*
* @return void
* @throws ConnectionException
*/
public function reload(string|null $name = null): void
{
if ($name) {
if (isset($this->workers[$name])) {
$this->commandToWorker(Command::make(Worker::COMMAND_RELOAD), $name);
$this->sendCommand(Command::make(WorkerContext::COMMAND_RELOAD), $name);
}
return;
}
$this->commandToAll(Command::make(Worker::COMMAND_RELOAD));
$this->sendCommand(Command::make(WorkerContext::COMMAND_RELOAD));
}

/**
Expand All @@ -189,13 +197,17 @@ public function reload(string|null $name = null): void
* @param string $name
*
* @return void
* @throws ConnectionException
* @deprecated
*/
public function commandToWorker(Command $command, string $name): void
{
if (isset($this->workers[$name])) {
foreach ($this->workers[$name]->streams as $stream) {
$stream->write($this->zx7e->encodeFrame($command->__toString()));
try {
$stream->write($this->zx7e->encodeFrame($command->__toString()));
} catch (ConnectionException $e) {
Output::warning($e->getMessage());
}
}
}
}
Expand All @@ -207,16 +219,56 @@ public function commandToWorker(Command $command, string $name): void
* @param Command $command
*
* @return void
* @throws ConnectionException
* @deprecated
*/
public function commandToAll(Command $command): void
{
$workers = $this->workers;
foreach ($workers as $worker) {
foreach ($worker->streams as $stream) {
$stream->write($this->zx7e->encodeFrame($command->__toString()));
try {
$stream->write($this->zx7e->encodeFrame($command->__toString()));
} catch (ConnectionException $e) {
Output::warning($e->getMessage());
}
}
}
}

/**
* @param \Ripple\Worker\Command $command
* @param string|null $name
*
* @return bool
*/
public function sendCommand(Command $command, string|null $name = null): bool
{
if ($name) {
if (isset($this->workers[$name])) {
foreach ($this->workers[$name]->streams as $stream) {
try {
$stream->write($this->zx7e->encodeFrame($command->__toString()));
return true;
} catch (ConnectionException $e) {
Output::warning($e->getMessage());
}
}
}
} else {
$workers = $this->workers;
foreach ($workers as $worker) {
foreach ($worker->streams as $stream) {
try {
$stream->write($this->zx7e->encodeFrame($command->__toString()));
return true;
} catch (ConnectionException $e) {
Output::warning($e->getMessage());
}
}
}
}

return false;
}

/**
Expand All @@ -234,22 +286,40 @@ public function run(): bool
}
$this->zx7e = new Zx7e();
foreach ($this->workers as $worker) {
$worker->register($this);
if (!$worker($this)) {
if (!$worker->run($this)) {
Output::error("worker {$worker->getName()} failed to start");
$this->stop();
$this->terminate();
}
}
return true;
}

/**
*
*/
public function __destruct()
{
if (!Kernel::getInstance()->supportProcessControl()) {
return;
}
if (isset($this->processID) && $this->processID === posix_getpid()) {
$this->stop();
$this->terminate();
}
}

/**
* @param string $name
* @param array $arguments
*
* @return void
*/
public function __call(string $name, array $arguments): void
{
if ($name === 'onCommand') {
$this->__onCommand(...$arguments);
return;
}

throw new BadMethodCallException("Call to undefined method " . static::class . "::{$name}()");
}
}
Loading

0 comments on commit c0ea8bd

Please sign in to comment.