Skip to content

Commit

Permalink
Extending the process and worker modules
Browse files Browse the repository at this point in the history
  • Loading branch information
cclilshy committed Feb 6, 2025
1 parent d94fa70 commit b24e212
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 40 deletions.
36 changes: 36 additions & 0 deletions .github/workflows/phpunit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,39 @@ jobs:

- name: Run PHPUnit
run: ./vendor/bin/phpunit

test-centos:
runs-on: ubuntu-latest
strategy:
matrix:
php-version: [ '8.1', '8.2', '8.3', '8.4' ]
container:
image: centos:7

steps:
- name: Install basic utilities
run: |
yum update -y
yum install -y git wget
- name: Checkout code
uses: actions/checkout@v3

- name: Setup PHP
run: |
yum install -y epel-release
yum install -y https://rpms.remirepo.net/enterprise/remi-release-7.rpm
yum-config-manager --enable remi-php${{ matrix.php-version >= 8.0 ? matrix.php-version * 10 : matrix.php-version }}
yum install -y php php-cli php-common php-curl php-mbstring php-xml php-openssl php-sockets php-pcntl
- name: Install Composer
run: |
php -r "copy('https://getcomposer.org/installer', 'composer-setup.php');"
php composer-setup.php --install-dir=/usr/local/bin --filename=composer
php -r "unlink('composer-setup.php');"
- name: Install dependencies
run: composer install

- name: Run PHPUnit
run: ./vendor/bin/phpunit
3 changes: 3 additions & 0 deletions phpunit.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
php vendor/bin/phpunit
php -d extension=ev vendor/bin/phpunit
php -d extension=event vendor/bin/phpunit
44 changes: 38 additions & 6 deletions src/Process/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,16 @@
use function pcntl_wait;
use function pcntl_wexitstatus;
use function pcntl_wifexited;
use function pcntl_wifsignaled;
use function pcntl_wifstopped;
use function pcntl_wstopsig;
use function pcntl_wtermsig;
use function posix_getpid;

use const SIGCHLD;
use const SIGKILL;
use const WNOHANG;
use const WUNTRACED;
use const SIGTERM;

/**
* @compatible:Windows
Expand Down Expand Up @@ -89,6 +93,9 @@ public function __construct()
$this->processID = posix_getpid();
}

/**
*
*/
public function __destruct()
{
$this->destroy();
Expand All @@ -100,7 +107,7 @@ public function __destruct()
private function destroy(): void
{
foreach ($this->process2runtime as $runtime) {
$runtime->signal(SIGKILL);
$runtime->signal(SIGTERM);
}
}

Expand Down Expand Up @@ -301,18 +308,35 @@ private function signalSIGCHLDHandler(): void
*/
private function onProcessExit(int $processID, int $status): void
{
$exit = pcntl_wifexited($status) ? pcntl_wexitstatus($status) : -1;
$exitCode = -1;
$exitReason = '';

if (pcntl_wifexited($status)) {
$exitCode = pcntl_wexitstatus($status);
$exitReason = 'normal exit';
} elseif (pcntl_wifsignaled($status)) {
$exitCode = pcntl_wtermsig($status);
$exitReason = 'terminated by signal';
} elseif (pcntl_wifstopped($status)) {
$exitCode = pcntl_wstopsig($status);
$exitReason = 'stopped by signal';
}

$promiseCallback = $this->process2promiseCallback[$processID] ?? null;
if (!$promiseCallback) {
return;
}

if ($exit === -1) {
call_user_func($promiseCallback['reject'], new ProcessException('The process is abnormal.', $exit));
if ($exitCode !== 0) {
call_user_func(
$promiseCallback['reject'],
new ProcessException("Process failed: {$exitReason}", $exitCode)
);
} else {
call_user_func($promiseCallback['resolve'], $exit);
call_user_func($promiseCallback['resolve'], $exitCode);
}

// Clean up resources
unset($this->process2promiseCallback[$processID]);
unset($this->process2runtime[$processID]);

Expand All @@ -328,4 +352,12 @@ public function getRootProcessID(): int
{
return $this->rootProcessID;
}

/**
* @return int
*/
public function getProcessID(): int
{
return $this->processID;
}
}
27 changes: 19 additions & 8 deletions src/Process/Runtime.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,11 @@ public function __construct(
* @param bool $force
*
* @return void
* @deprecated Use stop() instead
*/
public function stop(bool $force = false): void
{
/*** @compatible:Windows */
if (!Kernel::getInstance()->supportProcessControl()) {
exit(0);
}

$force
? $this->kill()
: $this->signal(SIGTERM);
$this->terminate($force);
}

/*** @return void */
Expand Down Expand Up @@ -139,4 +133,21 @@ public function getPromise(): Promise
{
return $this->promise;
}

/**
* @param bool $force
*
* @return void
*/
public function terminate(bool $force = false): void
{
/*** @compatible:Windows */
if (!Kernel::getInstance()->supportProcessControl()) {
exit(0);
}

$force
? $this->kill()
: $this->signal(SIGTERM);
}
}
12 changes: 12 additions & 0 deletions src/Worker/Manager.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,19 @@ public function add(Worker $worker): void
* @param string $name
*
* @return void
* @deprecated use remove()
*/
public function removeWorker(string $name): void
{
$this->remove($name);
}

/**
* @param string $name
*
* @return void
*/
public function remove(string $name): void
{
if ($worker = $this->workers[$name] ?? null) {
$worker->isRunning() && $this->terminate($name);
Expand Down Expand Up @@ -298,6 +309,7 @@ public function __destruct()
if (!Kernel::getInstance()->supportProcessControl()) {
return;
}

if (isset($this->processID) && $this->processID === posix_getpid()) {
$this->terminate();
}
Expand Down
93 changes: 73 additions & 20 deletions src/Worker/WorkerContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,20 @@
namespace Ripple\Worker;

use Ripple\Kernel;
use Ripple\Process\Exception\ProcessException;
use Ripple\Process\Runtime;
use Ripple\Socket;
use Ripple\Utils\Output;
use Ripple\Utils\Serialization\Zx7e;
use Throwable;

use function Co\delay;
use function Co\process;
use function socket_create_pair;
use function socket_export_stream;
use function min;
use function pow;
use function is_int;

use const AF_INET;
use const AF_UNIX;
Expand Down Expand Up @@ -86,6 +92,14 @@ abstract class WorkerContext
/*** @var \Ripple\Worker\Manager */
protected Manager $manager;

/*** @var array */
protected array $restartAttempts = [];

/**
*
*/
private const MAX_RESTART_ATTEMPTS = 10;

/**
* @return void
*/
Expand All @@ -102,27 +116,19 @@ public function terminate(): void
$this->getName()
);

// foreach ($this->runtimes as $runtime) {
// $runtime->stop();
// }

// foreach ($this->streams as $stream) {
// $stream->close();
// }
$this->running = false;
}

/**
* @Author cclilshy
* @Date 2024/8/17 14:25
*
* @param Manager $manager
* @param int $index
*
* @return bool
*/
private function guard(Manager $manager, int $index): bool
private function guard(int $index): bool
{
$this->manager = $manager;
/*** @compatible:Windows */
$domain = !Kernel::getInstance()->supportProcessControl() ? AF_INET : AF_UNIX;

Expand All @@ -138,18 +144,20 @@ private function guard(Manager $manager, int $index): bool

$zx7e = new Zx7e();
$this->streams[$index] = $streamA;
$this->streams[$index]->onReadable(function (Socket $Socket) use ($streamA, $index, &$zx7e, $manager) {
$this->streams[$index]->onReadable(function (Socket $Socket) use ($streamA, $index, &$zx7e) {
$content = $Socket->readContinuously(1024);
foreach ($zx7e->decodeStream($content) as $string) {
$manager->onCommand(Command::fromString($string), $this->getName(), $index);
$this->manager->onCommand(
Command::fromString($string),
$this->getName(),
$index
);
}
});

$this->runtimes[$index] = $runtime = process(function () use ($streamB) {
$this->parent = false;
$this->parentSocket = $streamB;
$this->boot();
$this->booted = true;

$this->zx7e = new Zx7e();
$this->parentSocket->onReadable(function (Socket $Socket) {
Expand Down Expand Up @@ -180,18 +188,38 @@ private function guard(Manager $manager, int $index): bool
}
}
});

try {
$this->boot();
$this->booted = true;
} catch (Throwable $exception) {
Output::error('Worker boot failed: ' . $exception->getMessage());
exit(128);
}
})->run();

$runtime->finally(fn () => $this->onExit($index));
$runtime->finally(function (mixed $result) use ($index) {
if (is_int($result)) {
$exitCode = $result;
} elseif ($result instanceof ProcessException) {
$exitCode = $result->getCode();
} else {
$exitCode = 0;
}

$this->onExit($index, $exitCode);
});

return true;
}

/**
* @param int $index
* @param int $exitCode
*
* @return void
*/
private function onExit(int $index): void
private function onExit(int $index, int $exitCode): void
{
if (isset($this->streams[$index])) {
$this->streams[$index]->close();
Expand All @@ -202,10 +230,24 @@ private function onExit(int $index): void
unset($this->runtimes[$index]);
}

if ($exitCode === 128) {
Output::error("Worker '{$this->getName()}' process has exited with code 1.");
return;
}

// Restart the process
if (!$this->terminated) {
$this->restartAttempts[$index] = ($this->restartAttempts[$index] ?? 0) + 1;

if ($this->restartAttempts[$index] > WorkerContext::MAX_RESTART_ATTEMPTS) {
Output::warning('Worker process has exited too many times, please check the code.');
return;
}

$delay = min(0.1 * pow(2, $this->restartAttempts[$index] - 1), 30);
delay(function () use ($index) {
$this->guard($this->manager, $index);
}, 0.1);
$this->guard($index);
}, $delay);
}
}

Expand All @@ -220,11 +262,22 @@ private function onExit(int $index): void
*/
public function run(Manager $manager): bool
{
$this->register($manager);
$this->manager = $manager;

try {
$this->register($manager);
} catch (Throwable $exception) {
Output::error("Worker {$this->getName()} registration failed: {$exception->getMessage()}, will be removed");
$manager->remove($this->getName());
return false;
}

/*** @compatible:Windows */
$count = !Kernel::getInstance()->supportProcessControl() ? 1 : $this->getCount();
for ($index = 1; $index <= $count; $index++) {
if (!$this->guard($manager, $index)) {
if (!$this->guard($index)) {
$this->terminate();
$manager->remove($this->getName());
return false;
}
}
Expand Down
Loading

0 comments on commit b24e212

Please sign in to comment.