Skip to content

Commit

Permalink
Docs: Update annotations for boundary handling
Browse files Browse the repository at this point in the history
  • Loading branch information
cclilshy committed Feb 9, 2025
1 parent 2898e0d commit 45d6d8c
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 74 deletions.
6 changes: 3 additions & 3 deletions examples/thread.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@
$group[] = thread(static function () {
\sleep(1);
echo 'Thread 1 ', \microtime(true), \PHP_EOL;
})->run();
});

$group[] = thread(static function () {
\sleep(1);

echo 'Thread 2 ', \microtime(true), \PHP_EOL;
})->run();
});

$group[] = thread(static function () {
\sleep(1);

echo 'Thread 3 ', \microtime(true), \PHP_EOL;
})->run();
});

// Wait for all threads to complete execution
while ($future = \array_shift($group)) {
Expand Down
2 changes: 2 additions & 0 deletions examples/worker.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?php declare(strict_types=1);

use Ripple\Utils\Output;
use Ripple\Worker\Command;
use Ripple\Worker\Manager;

Expand All @@ -19,6 +20,7 @@
/*** @return void */
public function boot(): void
{
Output::info('Worker started');
}

/**
Expand Down
5 changes: 5 additions & 0 deletions src/Coroutine/Exception/EscapeException.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
use Closure;
use RuntimeException;

/**
* 协程中运行子进程时使用 suspension->suspend 会跳出协程从而导致协程上下文逃逸,
* 会发生资源泄漏等不可预料的问题,因此在协程中使用 Process::fork 时,
* ripple会通过 EscapeException 的方式向上抛出异常最终在 mainSuspension 中执行交换 EventDriver
*/
class EscapeException extends RuntimeException
{
/**
Expand Down
114 changes: 60 additions & 54 deletions src/Kernel.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,11 @@
use function define;
use function defined;
use function extension_loaded;
use function file_exists;
use function file_get_contents;
use function fopen;
use function ini_set;
use function intval;
use function preg_match;
use function shell_exec;
use function getmygid;
use function posix_getpid;

use const PHP_OS_FAMILY;

/**
* @Author cclilshy
* @Date 2024/8/29 23:28
Expand All @@ -61,23 +54,73 @@ class Kernel
/*** @var bool */
private bool $mainRunning = true;

/*** @var int */
private int $memorySize;

public function __construct()
{
ini_set('memory_limit', -1);
ini_set('max_execution_time', 0);

if (!defined('STDIN')) {
define('STDIN', fopen('php://stdin', 'r'));
}
if (!defined('STDOUT')) {
define('STDOUT', fopen('php://stdout', 'w'));
}

$this->parallel = extension_loaded('parallel');
$this->processControl = extension_loaded('pcntl') && extension_loaded('posix');
$this->defineConstants();
}

/**
* @return void
*/
private function defineConstants(): void
{
defined('STDIN') || define('STDIN', fopen('php://stdin', 'r'));
defined('STDOUT') || define('STDOUT', fopen('php://stdout', 'w'));

if (!$this->processControl) {
/**
* @see https://www.php.net/manual/en/pcntl.constants.php
*/
defined('WNOHANG') || define('WNOHANG', 1);
defined('WUNTRACED') || define('WUNTRACED', 2);
defined('WCONTINUED') || define('WCONTINUED', 8);
defined('SIG_IGN') || define('SIG_IGN', 1);
defined('SIG_DFL') || define('SIG_DFL', 0);
defined('SIG_ERR') || define('SIG_ERR', -1);
defined('SIGHUP') || define('SIGHUP', 1);
defined('SIGINT') || define('SIGINT', 2);
defined('SIGQUIT') || define('SIGQUIT', 3);
defined('SIGILL') || define('SIGILL', 4);
defined('SIGTRAP') || define('SIGTRAP', 5);
defined('SIGABRT') || define('SIGABRT', 6);
defined('SIGIOT') || define('SIGIOT', 6);
defined('SIGBUS') || define('SIGBUS', 7);
defined('SIGFPE') || define('SIGFPE', 8);
defined('SIGKILL') || define('SIGKILL', 9);
defined('SIGUSR1') || define('SIGUSR1', 10);
defined('SIGSEGV') || define('SIGSEGV', 11);
defined('SIGUSR2') || define('SIGUSR2', 12);
defined('SIGPIPE') || define('SIGPIPE', 13);
defined('SIGALRM') || define('SIGALRM', 14);
defined('SIGTERM') || define('SIGTERM', 15);
defined('SIGSTKFLT') || define('SIGSTKFLT', 16);
defined('SIGCLD') || define('SIGCLD', 17);
defined('SIGCHLD') || define('SIGCHLD', 17);
defined('SIGCONT') || define('SIGCONT', 18);
defined('SIGSTOP') || define('SIGSTOP', 19);
defined('SIGTSTP') || define('SIGTSTP', 20);
defined('SIGTTIN') || define('SIGTTIN', 21);
defined('SIGTTOU') || define('SIGTTOU', 22);
defined('SIGURG') || define('SIGURG', 23);
defined('SIGXCPU') || define('SIGXCPU', 24);
defined('SIGXFSZ') || define('SIGXFSZ', 25);
defined('SIGVTALRM') || define('SIGVTALRM', 26);
defined('SIGPROF') || define('SIGPROF', 27);
defined('SIGWINCH') || define('SIGWINCH', 28);
defined('SIGPOLL') || define('SIGPOLL', 29);
defined('SIGIO') || define('SIGIO', 29);
defined('SIGPWR') || define('SIGPWR', 30);
defined('SIGSYS') || define('SIGSYS', 31);
defined('SIGBABY') || define('SIGBABY', 31);
defined('PRIO_PGRP') || define('PRIO_PGRP', 1);
defined('PRIO_USER') || define('PRIO_USER', 2);
defined('PRIO_PROCESS') || define('PRIO_PROCESS', 0);
}
}

/**
Expand Down Expand Up @@ -287,43 +330,6 @@ public function supportParallel(): bool
return $this->parallel;
}

/**
* @Author cclilshy
* @Date 2024/9/24 14:27
* @return int
*/
public function getMemorySize(): int
{
if (isset($this->memorySize)) {
return $this->memorySize;
}

switch (PHP_OS_FAMILY) {
case 'Linux':
if (file_exists('/proc/meminfo')) {
$data = file_get_contents("/proc/meminfo");
if ($data && preg_match("/MemTotal:\s+(\d+)\skB/", $data, $matches)) {
return $this->memorySize = intval(($matches[1] * 1024));
}
}
break;
case 'Windows':
$memory = shell_exec("wmic computersystem get totalphysicalmemory");
if (preg_match("/\d+/", $memory, $matches)) {
return $this->memorySize = intval($matches[0]);
}
break;
case 'Darwin':
$memory = shell_exec("sysctl -n hw.memsize");
if ($memory) {
return $this->memorySize = intval($memory);
}
break;
}

return $this->memorySize = -1;
}

/**
* @return int
*/
Expand Down
24 changes: 17 additions & 7 deletions src/Process/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,16 @@ public function create(Closure $closure): Task|false
}

if ($processID === 0) {
/**
* 通过 processedInMain 的方式将闭包运行于 mainSuspension 中,
* 实现在 EventDriver 交换之后运行闭包
*/
$this->processedInMain(function () use ($closure, $args) {
$this->forgetEvents();
$this->distributeForked();
call_user_func_array($closure, $args);
wait();
exit(0);
});
}

Expand All @@ -176,6 +183,8 @@ public function create(Closure $closure): Task|false
}

/**
* 通过 processedInMain 的方式将闭包运行于 mainSuspension 中,
* 实现在 EventDriver 交换之后运行闭包
* @param Closure $closure
*
* @return void
Expand All @@ -184,18 +193,18 @@ public function processedInMain(Closure $closure): void
{
$suspension = getSuspension();
if ($suspension instanceof Suspension) {
// 属于ripple协程时将向上抛出异常,该异常最终会在 Suspension::start 时被捕获
throw new EscapeException($closure);
} else {
// this is main
// 该闭包运行于 mainSuspension 中, 可以直接执行
if (!Fiber::getCurrent()) {
$closure();
wait();
exit(0);
return;
}

// in fiber
// 通过 wait 的方式将闭包运行于 mainSuspension 中
wait($closure);
exit(0);
return;
}
}

Expand All @@ -209,7 +218,6 @@ public function forgetEvents(): void
}
EventLoop::run();
EventLoop::setDriver((new EventLoop\DriverFactory())->create());
$this->distributeForked();
}

/**
Expand All @@ -221,12 +229,14 @@ public function distributeForked(): void
$this->unregisterSignalHandler();
}

// onFork可能在运行过程中被写入,因此不能使用while+array_shift方式重构
foreach ($this->onFork as $key => $closure) {
try {
unset($this->onFork[$key]);
$closure();
} catch (Throwable $exception) {
Output::error($exception->getMessage());
} finally {
unset($this->onFork[$key]);
}
}

Expand Down
5 changes: 0 additions & 5 deletions src/Process/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ public function __construct(
*/
public function run(...$argv): Runtime|false
{
// if (Kernel::getInstance()->supportParallel() && Parallel::hasInstance()) {
// // @bug: Unable to run child processes in an environment that has multithreading enabled
// // throw: new RuntimeException('Unable to run child processes in an environment with multithreading enabled');
// return false;
// }
return call_user_func($this->closure, ...$argv);
}
}
28 changes: 23 additions & 5 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
use Ripple\Utils\Output;
use Throwable;

use function function_exists;
use function getmypid;

/**
* This method is different from onReject, which allows accepting any type of rejected futures object.
* When await promise is rejected, an error will be thrown instead of returning the rejected value.
Expand Down Expand Up @@ -194,13 +197,14 @@ function forked(Closure $closure): string
return Kernel::getInstance()->forked($closure);
}

/**
* @param Closure|null $closure
*
* @return void
*/
function wait(Closure|null $closure = null): void
{
try {
Kernel::getInstance()->wait($closure);
} catch (Throwable $exception) {
throw new BadFunctionCallException($exception->getMessage(), $exception->getCode(), $exception);
}
Kernel::getInstance()->wait($closure);
}

/**
Expand Down Expand Up @@ -324,3 +328,17 @@ function __throw(EventLoop\Suspension $suspension, Throwable $exception): void
{
Coroutine::throw($suspension, $exception);
}


/**
*
*/
if (!function_exists('posix_getpid')) {
/**
* @return int
*/
function posix_getpid(): int
{
return getmypid();
}
}

0 comments on commit 45d6d8c

Please sign in to comment.