Skip to content

Commit

Permalink
Update: Refactor parallel && for Windows compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
cclilshy committed Feb 8, 2025
1 parent 6c975fd commit d0bb905
Show file tree
Hide file tree
Showing 21 changed files with 588 additions and 399 deletions.
34 changes: 34 additions & 0 deletions example/parallel.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php declare(strict_types=1);
/**
* Copyright © 2024 cclilshy
* Email: [email protected]
*
* This software is licensed under the MIT License.
* For full license details, please visit: https://opensource.org/licenses/MIT
*
* By using this software, you agree to the terms of the license.
* Contributions, suggestions, and feedback are always welcome!
*/

use Ripple\Parallel\Parallel;

use function Co\wait;

include 'vendor/autoload.php';

$parallel = Parallel::getInstance();
$function = function ($input) {
\sleep(1);
return $input;
};

$futures = [];
for ($i = 0; $i < 100; $i++) {
$futures[] = $future = $parallel->run($function, ['name']);
}

foreach ($futures as $future) {
echo \microtime(true) , ':', $future->value() , \PHP_EOL;
}

wait();
118 changes: 4 additions & 114 deletions src/Coroutine/WaitGroup.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,119 +12,9 @@

namespace Ripple\Coroutine;

use LogicException;
use Ripple\Utils\Output;
use RuntimeException;
use Throwable;

use function array_shift;
use function Co\cancel;
use function Co\delay;
use function Co\getSuspension;
use function spl_object_hash;

class WaitGroup
/**
* @deprecated use \Ripple\WaitGroup instead
*/
class WaitGroup extends \Ripple\WaitGroup
{
/*** @var bool */
protected bool $done = true;

/*** @var \Revolt\EventLoop\Suspension[] */
protected array $waiters = [];

/*** @var int */
protected int $count = 0;

/*** @param int $count */
public function __construct(int $count = 0)
{
$this->add($count);
}

/**
* @param int $delta
*
* @return void
*/
public function add(int $delta = 1): void
{
if ($delta > 0) {
$this->count += $delta;
$this->done = false;
} elseif ($delta < 0) {
throw new LogicException('delta must be greater than or equal to 0');
}

// For the case where $delta is 0, no operation is performed
}

/**
* @return void
*/
public function done(): void
{
if ($this->count <= 0) {
throw new LogicException('No tasks to mark as done');
}

$this->count--;
if ($this->count === 0) {
$this->done = true;
while ($suspension = array_shift($this->waiters)) {
try {
Coroutine::resume($suspension);
} catch (Throwable $exception) {
Output::warning($exception->getMessage());
continue;
}
}
}
}

/**
* @param int|float $timeout
*
* @return void
*/
public function wait(int|float $timeout = 0): void
{
if ($this->done) {
return;
}

$suspension = getSuspension();
$this->waiters[$hash = spl_object_hash($suspension)] = $suspension;

if ($timeout > 0) {
$timeoutOID = delay(function () use ($hash) {
$suspension = $this->waiters[$hash];
unset($this->waiters[$hash]);
Coroutine::throw($suspension, new RuntimeException('WaitGroup timeout'));
}, $timeout);
}

try {
Coroutine::suspend($suspension);
if (isset($timeoutOID)) {
cancel($timeoutOID);
}
} catch (Throwable $exception) {
throw new RuntimeException($exception->getMessage(), $exception->getCode());
}
}

/**
* @return int
*/
public function getCount(): int
{
return $this->count;
}

/**
* @return bool
*/
public function isDone(): bool
{
return $this->done;
}
}
10 changes: 10 additions & 0 deletions src/Kernel.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
use function intval;
use function preg_match;
use function shell_exec;
use function getmygid;
use function posix_getpid;

use const PHP_OS_FAMILY;

Expand Down Expand Up @@ -329,4 +331,12 @@ public function getMemorySize(): int

return $this->memorySize = -1;
}

/**
* @return int
*/
public function getProcessId(): int
{
return $this->supportProcessControl() ? posix_getpid() : getmygid();
}
}
145 changes: 143 additions & 2 deletions src/Parallel/Future.php
Original file line number Diff line number Diff line change
@@ -1,7 +1,148 @@
<?php
<?php declare(strict_types=1);
/**
* Copyright © 2024 cclilshy
* Email: [email protected]
*
* This software is licensed under the MIT License.
* For full license details, please visit: https://opensource.org/licenses/MIT
*
* By using this software, you agree to the terms of the license.
* Contributions, suggestions, and feedback are always welcome!
*/

namespace Ripple\Parallel;

class Future {
use parallel\Runtime;
use Ripple\WaitGroup;
use Throwable;

use function extension_loaded;

if (!extension_loaded('parallel')) {
return;
}

class Future
{
public const STATUS_PENDING = 0;
public const STATUS_FULFILLED = 1;
public const STATUS_REJECTED = 2;

/*** @var \Ripple\Coroutine\WaitGroup */
private WaitGroup $waitGroup;

/**
* @var int
*/
private int $status = Future::STATUS_PENDING;

/**
* @var mixed
*/
private mixed $result;

/**
* @param \parallel\Future $parallelFuture
* @param \parallel\Runtime $runtime
*/
public function __construct(private readonly \parallel\Future $parallelFuture, private readonly Runtime $runtime)
{
$this->waitGroup = new WaitGroup(1);
}

/**
* @param mixed $result
*
* @return void
*/
public function resolve(mixed $result): void
{
$this->status = Future::STATUS_FULFILLED;
$this->result = $result;
$this->waitGroup->done();
}

/**
* @param Throwable $exception
*
* @return void
*/
public function reject(Throwable $exception): void
{
$this->status = Future::STATUS_REJECTED;
$this->result = $exception;
$this->waitGroup->done();
}

/**
* @return int
*/
public function getStatus(): int
{
return $this->status;
}

/**
* @return \parallel\Future
*/
public function getParallelFuture(): \parallel\Future
{
return $this->parallelFuture;
}

/**
* @return \parallel\Runtime
*/
public function getRuntime(): Runtime
{
return $this->runtime;
}

/**
* @return bool
*/
public function done(): bool
{
$this->waitGroup->wait();
return $this->status === Future::STATUS_FULFILLED;
}

/**
* @return mixed
* @throws Throwable
*/
public function value(): mixed
{
if (!$this->done()) {
throw $this->result;
}
return $this->result;
}

/**
* @return bool
*/
public function cancel(): bool
{
$bool = $this->getParallelFuture()->cancel();
Parallel::getInstance()->poll();
return $bool;
}

/**
* @return bool
*/
public function canceled(): bool
{
return $this->getParallelFuture()->cancelled();
}

/**
* @return void
*/
public function kill(): void
{
$this->getRuntime()->kill();
Parallel::getInstance()->poll();
}
}
Loading

0 comments on commit d0bb905

Please sign in to comment.