From c45e3ddefda5e0a500cab71ff0c98310c541f08f Mon Sep 17 00:00:00 2001 From: cclilshy Date: Wed, 13 Nov 2024 13:28:48 +0800 Subject: [PATCH] Pre-release: updated copyrighted content --- composer.json | 4 +- example/file.php | 4 +- example/socket-client.php | 4 +- example/socket-server.php | 9 +- example/socket-tunnel.php | 6 +- src/{ => Channel}/Channel.php | 118 +++--- src/Channel/Exception/ChannelException.php | 36 +- src/Channel/README.md | 0 src/{ => Coroutine}/Coroutine.php | 153 +++---- src/Coroutine/Exception/EscapeException.php | 36 +- src/Coroutine/Exception/Exception.php | 36 +- .../Exception/PromiseAggregateError.php | 36 +- .../Exception/PromiseRejectException.php | 36 +- src/Coroutine/Futures.php | 117 ------ src/Coroutine/Suspension.php | 36 +- src/Coroutine/WaitGroup.php | 65 ++- src/File.php | 146 ------- src/File/Exception/FileException.php | 36 +- src/File/File.php | 122 ++++++ src/File/Lock.php | 125 ++++++ src/File/Lock/Lock.php | 147 ------- src/File/Monitor.php | 90 ++-- src/Futures.php | 94 +++++ src/Kernel.php | 66 +-- src/Parallel/Channel.php | 36 +- src/Parallel/Context.php | 39 +- src/Parallel/Future.php | 42 +- src/{ => Parallel}/Parallel.php | 42 +- src/Parallel/Thread.php | 37 +- src/Proc.php | 84 ---- src/Proc/Proc.php | 54 +++ src/Proc/ProcStream.php | 45 -- src/Proc/Session.php | 58 +-- src/Proc/Stream.php | 21 + src/Process/Exception/ProcessException.php | 36 +- src/{ => Process}/Process.php | 119 +++--- src/Process/Runtime.php | 37 +- src/Process/Task.php | 38 +- src/Promise.php | 279 ++++++------- src/Socket.php | 384 +++++++++++++++--- src/Socket/SocketStream.php | 374 ----------------- src/Socket/Tunnel/Tunnel.php | 112 ----- src/Stream.php | 283 ++++++------- .../Exception/ConnectionCloseException.php | 14 +- src/Stream/Exception/ConnectionException.php | 40 +- .../Exception/ConnectionTimeoutException.php | 14 +- src/Stream/Exception/Exception.php | 36 +- src/Stream/Exception/RuntimeException.php | 36 +- src/Stream/{StreamBase.php => Stream.php} | 76 ++-- src/Stream/StreamInterface.php | 36 +- src/Stream/Transaction.php | 102 ++--- src/Supports/Base.php | 36 +- src/Supports/IO.php | 61 --- src/Supports/System.php | 38 +- src/{Socket => }/Tunnel/Http.php | 42 +- src/{Socket => }/Tunnel/Socks5.php | 42 +- src/Tunnel/Tunnel.php | 89 ++++ src/Utils/Format.php | 10 + src/Utils/Output.php | 36 +- src/Utils/Path.php | 30 ++ src/Utils/Serialization/Zx7e.php | 36 +- src/Utils/Utils.php | 20 - src/Worker/Command.php | 36 +- src/Worker/Manager.php | 37 +- src/{ => Worker}/Worker.php | 61 +-- src/functions.php | 121 +++--- tests/CoroutineException.php | 2 +- tests/FileStreamTest.php | 4 +- tests/PromiseTest.php | 2 +- tests/SocketTest.php | 9 +- tests/UnixTest.php | 13 +- 71 files changed, 1807 insertions(+), 2914 deletions(-) rename src/{ => Channel}/Channel.php (67%) delete mode 100644 src/Channel/README.md rename src/{ => Coroutine}/Coroutine.php (79%) delete mode 100644 src/Coroutine/Futures.php delete mode 100644 src/File.php create mode 100644 src/File/File.php create mode 100644 src/File/Lock.php delete mode 100644 src/File/Lock/Lock.php create mode 100644 src/Futures.php rename src/{ => Parallel}/Parallel.php (80%) delete mode 100644 src/Proc.php create mode 100644 src/Proc/Proc.php delete mode 100644 src/Proc/ProcStream.php create mode 100644 src/Proc/Stream.php rename src/{ => Process}/Process.php (78%) delete mode 100644 src/Socket/SocketStream.php delete mode 100644 src/Socket/Tunnel/Tunnel.php rename src/Stream/{StreamBase.php => Stream.php} (61%) delete mode 100644 src/Supports/IO.php rename src/{Socket => }/Tunnel/Http.php (60%) rename src/{Socket => }/Tunnel/Socks5.php (73%) create mode 100644 src/Tunnel/Tunnel.php create mode 100644 src/Utils/Path.php delete mode 100644 src/Utils/Utils.php rename src/{ => Worker}/Worker.php (75%) diff --git a/composer.json b/composer.json index ff4e25a..e9ac371 100644 --- a/composer.json +++ b/composer.json @@ -25,8 +25,8 @@ "php": ">=8.1", "ext-sockets": "*", "ext-openssl": "*", - "revolt/event-loop": "^1.0", - "psr/http-message": "*" + "psr/http-message": "*", + "revolt/event-loop": "^1.0" }, "require-dev": { "ext-posix": "*", diff --git a/example/file.php b/example/file.php index d001a8d..de1e3b7 100644 --- a/example/file.php +++ b/example/file.php @@ -1,12 +1,12 @@ getContents(__FILE__), \PHP_EOL; + echo File::getContents(__FILE__), \PHP_EOL; } catch (FileException $e) { echo $e->getMessage(), \PHP_EOL; exit(1); diff --git a/example/socket-client.php b/example/socket-client.php index 675022a..0c262fd 100644 --- a/example/socket-client.php +++ b/example/socket-client.php @@ -2,14 +2,14 @@ include __DIR__ . '/../vendor/autoload.php'; -use Co\IO; +use Ripple\Socket; use Ripple\Stream\Exception\ConnectionException; use Ripple\Utils\Output; use function Co\wait; try { - $connection = IO::Socket()->connect('tcp://127.0.0.1:1080'); + $connection = Socket::connect('tcp://127.0.0.1:1080'); # Enable SSL // $connection->enableSSL(); diff --git a/example/socket-server.php b/example/socket-server.php index 814a107..0b4bb08 100644 --- a/example/socket-server.php +++ b/example/socket-server.php @@ -2,16 +2,15 @@ include __DIR__ . '/../vendor/autoload.php'; -use Co\IO; -use Ripple\Socket\SocketStream; +use Ripple\Socket; use function Co\wait; -$onMessage = static function (string $data, SocketStream $stream) { +$onMessage = static function (string $data, Socket $stream) { $stream->write("Received: $data"); }; -$listenClient = static function (SocketStream $stream) use ($onMessage) { +$listenClient = static function (Socket $stream) use ($onMessage) { $stream->setBlocking(false); $stream->onReadable(static function () use ($stream, $onMessage) { $data = $stream->read(1024); @@ -23,7 +22,7 @@ }); }; -$server = IO::Socket()->server('tcp://127.0.0.1:9080'); +$server = Socket::server('tcp://127.0.0.1:9080'); $server->setBlocking(false); $server->setOption(\SOL_SOCKET, \SO_KEEPALIVE, 1); $server->onReadable(fn () => $listenClient($server->accept())); diff --git a/example/socket-tunnel.php b/example/socket-tunnel.php index 69ae1fe..a437bd4 100644 --- a/example/socket-tunnel.php +++ b/example/socket-tunnel.php @@ -2,8 +2,8 @@ include __DIR__ . '/../vendor/autoload.php'; -use Ripple\Socket\Tunnel\Socks5; use Ripple\Stream\Exception\ConnectionException; +use Ripple\Tunnel\Socks5; use Ripple\Utils\Output; use function Co\wait; @@ -15,7 +15,7 @@ 'port' => 443 ]); - $googleStream = $googleSocks5->getSocketStream(); + $googleStream = $googleSocks5->getSocket(); $googleStream->enableSSL(); $googleStream->write("GET / HTTP/1.1\r\nHost: www.google.com\r\nConnection: close\r\n\r\n"); $googleStream->onReadable(function () use ($googleStream) { @@ -42,7 +42,7 @@ ] ); - $connection = $google->getSocketStream(); + $connection = $google->getSocket(); $connection->enableSSL(); $connection->write("GET / HTTP/1.1\r\nHost: www.google.com\r\nConnection: close\r\n\r\n"); $connection->onReadable(function () use ($connection) { diff --git a/src/Channel.php b/src/Channel/Channel.php similarity index 67% rename from src/Channel.php rename to src/Channel/Channel.php index c767344..9e5816c 100644 --- a/src/Channel.php +++ b/src/Channel/Channel.php @@ -1,44 +1,24 @@ path = Utils::tempPath($this->name, 'channel'); + public function __construct(protected readonly string $name, protected bool $owner = false) + { + $this->path = Path::temp($this->name, 'channel'); $this->readLock = \Co\lock("{$this->name}.read"); $this->writeLock = \Co\lock("{$this->name}.write"); @@ -120,6 +98,35 @@ public function __construct( }); } + /** + * @return void + */ + protected function openStream(): void + { + $this->stream = new Stream(fopen($this->path, 'r+')); + $this->stream->setBlocking(false); + $this->zx7e = new Zx7e(); + } + + /*** @return void */ + public function close(): void + { + if ($this->closed) { + return; + } + + $this->stream->close(); + $this->readLock->close(); + $this->writeLock->close(); + + if ($this->owner) { + file_exists($this->path) && unlink($this->path); + } + + $this->closed = true; + cancelForked($this->forkHandlerID); + } + /** * @param string $name * @@ -153,16 +160,6 @@ public function send(mixed $data): bool return true; } - /** - * @return void - */ - protected function openStream(): void - { - $this->stream = new Stream(fopen($this->path, 'r+')); - $this->stream->setBlocking(false); - $this->zx7e = new Zx7e(); - } - /** * @param bool $blocking * @@ -177,8 +174,8 @@ public function receive(bool $blocking = true): mixed while (1) { try { $blocking && $this->stream->waitForReadable(); - } catch (Throwable $e) { - throw new ChannelException($e->getMessage()); + } catch (Throwable $exception) { + throw new ChannelException($exception->getMessage()); } if ($this->readLock->lock(blocking: false)) { @@ -209,8 +206,8 @@ public function receive(bool $blocking = true): mixed } return unserialize($data); - } catch (Exception $e) { - throw new ChannelException($e->getMessage()); + } catch (Exception $exception) { + throw new ChannelException($exception->getMessage()); } finally { $this->readLock->unlock(); } @@ -228,25 +225,6 @@ public function getPath(): string return $this->path; } - /*** @return void */ - public function close(): void - { - if ($this->closed) { - return; - } - - $this->stream->close(); - $this->readLock->close(); - $this->writeLock->close(); - - if ($this->owner) { - file_exists($this->path) && unlink($this->path); - } - - $this->closed = true; - cancelForked($this->forkHandlerID); - } - public function __destruct() { $this->close(); diff --git a/src/Channel/Exception/ChannelException.php b/src/Channel/Exception/ChannelException.php index 88eea30..92b040b 100644 --- a/src/Channel/Exception/ChannelException.php +++ b/src/Channel/Exception/ChannelException.php @@ -1,35 +1,13 @@ resume($result); + } catch (EscapeException $exception) { + Coroutine::getInstance()->handleEscapeException($exception); + } catch (Throwable $exception) { + Output::warning($exception->getMessage()); + } + + return null; + } + /** * @param EscapeException $exception * @@ -189,6 +195,35 @@ public function handleEscapeException(EscapeException $exception): void Process::getInstance()->processedInMain($exception->lastWords); } + /** + * @param \Revolt\EventLoop\Suspension $suspension + * @param Throwable $exception + * + * @return void + */ + public static function throw(EventLoop\Suspension $suspension, Throwable $exception): void + { + try { + $suspension->throw($exception); + } catch (Throwable $exception) { + } + } + + /** + * @param \Revolt\EventLoop\Suspension $suspension + * + * @return mixed + * @throws Throwable + */ + public static function suspend(EventLoop\Suspension $suspension): mixed + { + try { + return $suspension->suspend(); + } catch (EscapeException $exception) { + Coroutine::getInstance()->handleEscapeException($exception); + } + } + /** * @param Closure $closure * @@ -230,60 +265,4 @@ public function sleep(int|float $second): int|float delay(static fn () => Coroutine::resume($suspension, $second), $second); return Coroutine::suspend($suspension); } - - /** - * @param \Revolt\EventLoop\Suspension $suspension - * - * @return mixed - * @throws Throwable - */ - public static function suspend(EventLoop\Suspension $suspension): mixed - { - try { - return $suspension->suspend(); - } catch (EscapeException $exception) { - Coroutine::getInstance()->handleEscapeException($exception); - } - } - - /** - * - * The coroutine that cannot be restored can only throw an exception. - * If it is a ripple type exception, it will be caught and the contract will be rejected. - * - * This method attempts to resume a suspended coroutine and take over the coroutine context. - * When the recovery fails or an exception occurs within the coroutine, an exception will be thrown. - * This method will not return any value yet - * - * @param \Revolt\EventLoop\Suspension $suspension - * @param mixed|null $result - * - * @return mixed - */ - public static function resume(EventLoop\Suspension $suspension, mixed $result = null): mixed - { - try { - $suspension->resume($result); - } catch (EscapeException $exception) { - Coroutine::getInstance()->handleEscapeException($exception); - } catch (Throwable $exception) { - Output::warning($exception->getMessage()); - } - - return null; - } - - /** - * @param \Revolt\EventLoop\Suspension $suspension - * @param Throwable $exception - * - * @return void - */ - public static function throw(EventLoop\Suspension $suspension, Throwable $exception): void - { - try { - $suspension->throw($exception); - } catch (Throwable $exception) { - } - } } diff --git a/src/Coroutine/Exception/EscapeException.php b/src/Coroutine/Exception/EscapeException.php index 90c4307..91989e1 100644 --- a/src/Coroutine/Exception/EscapeException.php +++ b/src/Coroutine/Exception/EscapeException.php @@ -1,35 +1,13 @@ finally(function ($result) { - $this->results[] = $result; - while ($waiter = array_shift($this->waiters)) { - Coroutine::resume($waiter, $result); - } - }); - } - } - - /** - * @return mixed - * @throws Throwable - */ - public function current(): mixed - { - if (isset($this->results[$this->index])) { - return $this->results[$this->index]; - } else { - $this->waiters[] = $suspension = getSuspension(); - return Coroutine::suspend($suspension); - } - } - - /** - * @return void - */ - public function next(): void - { - $this->index++; - } - - /** - * @return int - */ - public function key(): int - { - return $this->index; - } - - /** - * @return bool - */ - public function valid(): bool - { - return isset($this->promises[$this->index]); - } - - /** - * @return void - */ - public function rewind(): void - { - $this->index = 0; - } -} diff --git a/src/Coroutine/Suspension.php b/src/Coroutine/Suspension.php index 492f0f1..5a0ee72 100644 --- a/src/Coroutine/Suspension.php +++ b/src/Coroutine/Suspension.php @@ -1,35 +1,13 @@ waiters)) { try { Coroutine::resume($suspension); - } catch (Throwable $e) { - Output::warning($e->getMessage()); + } catch (Throwable $exception) { + Output::warning($exception->getMessage()); continue; } } @@ -111,11 +91,24 @@ public function wait(int|float $timeout = 0): void return; } - $this->waiters[] = $suspension = getSuspension(); + $suspension = getSuspension(); + $this->waiters[$hash = spl_object_hash($suspension)] = $suspension; + + if ($timeout > 0) { + $timeoutOID = delay(function () use ($hash) { + $suspension = $this->waiters[$hash]; + unset($this->waiters[$hash]); + Coroutine::throw($suspension, new RuntimeException('WaitGroup timeout')); + }, $timeout); + } + try { Coroutine::suspend($suspension); - } catch (Throwable $e) { - throw new BadMethodCallException($e->getMessage(), $e->getCode(), $e); + if (isset($timeoutOID)) { + cancel($timeoutOID); + } + } catch (Throwable $exception) { + throw new RuntimeException($exception->getMessage(), $exception->getCode()); } } diff --git a/src/File.php b/src/File.php deleted file mode 100644 index 99ef762..0000000 --- a/src/File.php +++ /dev/null @@ -1,146 +0,0 @@ -registerOnFork(); - } - - /** - * @return void - */ - private function registerOnFork(): void - { - forked(function () { - while ($monitor = array_shift($this->monitors)) { - $monitor->stop(); - } - $this->registerOnFork(); - }); - } - - /** - * @param string $path - * - * @return string - * @throws FileException - */ - public function getContents(string $path): string - { - try { - return promise(static function (Closure $resolve, Closure $reject) use ($path) { - if (!$resource = fopen($path, 'r')) { - $reject(new FileException('Failed to open file: ' . $path)); - return; - } - - $stream = new Stream($resource); - $stream->setBlocking(false); - $content = ''; - - $stream->onReadable(static function (Stream $stream) use ($resolve, $reject, &$content) { - $fragment = ''; - while ($buffer = $stream->read(8192)) { - $fragment .= $buffer; - } - - if ($fragment === '') { - if ($stream->eof()) { - $stream->close(); - $resolve($content); - } - return; - } - - $content .= $fragment; - - if ($stream->eof()) { - $stream->close(); - $resolve($content); - } - }); - })->await(); - } catch (Throwable $e) { - throw new FileException($e->getMessage()); - } - } - - /** - * @param string $path - * @param string $mode - * - * @return Stream - */ - public function open(string $path, string $mode): Stream - { - return new Stream(fopen($path, $mode)); - } - - /** - * @return Monitor - */ - public function watch(): Monitor - { - $this->monitors[] = $monitor = new Monitor(); - return $monitor; - } -} diff --git a/src/File/Exception/FileException.php b/src/File/Exception/FileException.php index dbc50b4..94420a0 100644 --- a/src/File/Exception/FileException.php +++ b/src/File/Exception/FileException.php @@ -1,35 +1,13 @@ registerOnFork(); + } + + /** + * @return void + */ + private function registerOnFork(): void + { + forked(function () { + while ($monitor = array_shift($this->monitors)) { + $monitor->stop(); + } + $this->registerOnFork(); + }); + } + + /** + * @param string $path + * + * @return string + * @throws FileException + */ + public static function getContents(string $path): string + { + try { + return promise(static function (Closure $resolve, Closure $reject) use ($path) { + if (!$resource = fopen($path, 'r')) { + $reject(new FileException('Failed to open file: ' . $path)); + return; + } + + $stream = new Stream($resource); + $stream->setBlocking(false); + $content = ''; + + $stream->onReadable(static function (Stream $stream) use ($resolve, $reject, &$content) { + $fragment = ''; + while ($buffer = $stream->read(8192)) { + $fragment .= $buffer; + } + + if ($fragment === '') { + if ($stream->eof()) { + $stream->close(); + $resolve($content); + } + return; + } + + $content .= $fragment; + + if ($stream->eof()) { + $stream->close(); + $resolve($content); + } + }); + })->await(); + } catch (Throwable $exception) { + throw new FileException($exception->getMessage()); + } + } + + /** + * @param string $path + * @param string $mode + * + * @return Stream + */ + public static function open(string $path, string $mode): Stream + { + return new Stream(fopen($path, $mode)); + } + + /** + * @return \Ripple\File\Monitor + */ + public function monitor(): Monitor + { + $monitor = new Monitor(); + $this->monitors[] = $monitor; + return $monitor; + } +} diff --git a/src/File/Lock.php b/src/File/Lock.php new file mode 100644 index 0000000..cc713d3 --- /dev/null +++ b/src/File/Lock.php @@ -0,0 +1,125 @@ +path = Path::temp($this->name, 'lock'); + + if (!file_exists($this->path)) { + touch($this->path); + } + + $this->resource = fopen($this->path, 'r'); + + $this->forkHandlerEventID = forked(function () { + fclose($this->resource); + $this->resource = fopen($this->path, 'r'); + }); + } + + public function __destruct() + { + $this->close(); + } + + /** + * @return void + */ + public function close(): void + { + if ($this->closed) { + return; + } + + $this->closed = true; + + if (is_resource($this->resource)) { + fclose($this->resource); + } + + cancelForked($this->forkHandlerEventID); + } + + /** + * @param int $flag + * @param bool $blocking + * + * @return bool + */ + public function lock(int $flag = LOCK_EX, bool $blocking = true): bool + { + return flock($this->resource, $blocking ? $flag : $flag | LOCK_NB); + } + + /** + * @param bool $blocking + * + * @return bool + */ + public function exclusion(bool $blocking = true): bool + { + return flock($this->resource, $blocking ? LOCK_EX : LOCK_EX | LOCK_NB); + } + + /** + * @param bool $blocking + * + * @return bool + */ + public function shareable(bool $blocking = true): bool + { + return flock($this->resource, $blocking ? LOCK_SH : LOCK_SH | LOCK_NB); + } + + /** + * @return bool + */ + public function unlock(): bool + { + return flock($this->resource, LOCK_UN); + } +} diff --git a/src/File/Lock/Lock.php b/src/File/Lock/Lock.php deleted file mode 100644 index ef7bb19..0000000 --- a/src/File/Lock/Lock.php +++ /dev/null @@ -1,147 +0,0 @@ -path = Utils::tempPath($this->name, 'lock'); - - if (!file_exists($this->path)) { - touch($this->path); - } - - $this->resource = fopen($this->path, 'r'); - - $this->forkHandlerEventID = forked(function () { - fclose($this->resource); - $this->resource = fopen($this->path, 'r'); - }); - } - - public function __destruct() - { - $this->close(); - } - - /** - * @return void - */ - public function close(): void - { - if ($this->closed) { - return; - } - - $this->closed = true; - - if (is_resource($this->resource)) { - fclose($this->resource); - } - - cancelForked($this->forkHandlerEventID); - } - - /** - * @param int $flag - * @param bool $blocking - * - * @return bool - */ - public function lock(int $flag = LOCK_EX, bool $blocking = true): bool - { - return flock($this->resource, $blocking ? $flag : $flag | LOCK_NB); - } - - /** - * @param bool $blocking - * - * @return bool - */ - public function exclusiveLock(bool $blocking = true): bool - { - return flock($this->resource, $blocking ? LOCK_EX : LOCK_EX | LOCK_NB); - } - - /** - * @param bool $blocking - * - * @return bool - */ - public function sharedLock(bool $blocking = true): bool - { - return flock($this->resource, $blocking ? LOCK_SH : LOCK_SH | LOCK_NB); - } - - /** - * @return bool - */ - public function unlock(): bool - { - return flock($this->resource, LOCK_UN); - } -} diff --git a/src/File/Monitor.php b/src/File/Monitor.php index 344157b..bc971e1 100644 --- a/src/File/Monitor.php +++ b/src/File/Monitor.php @@ -1,35 +1,13 @@ timer1)) { + cancel($this->timer1); + } + + if (isset($this->timer2)) { + cancel($this->timer2); + } + } + + /** + * @Description Please use `start` method + * @Author cclilshy + * @Date 2024/10/7 17:57 + * @return void + */ + public function run(): void + { + $this->start(); + } + /** * @Author cclilshy * @Date 2024/8/26 21:51 @@ -235,31 +240,4 @@ private function inspector(): void } } } - - /** - * @Author cclilshy - * @Date 2024/8/26 21:53 - * @return void - */ - public function stop(): void - { - if (isset($this->timer1)) { - cancel($this->timer1); - } - - if (isset($this->timer2)) { - cancel($this->timer2); - } - } - - /** - * @Description Please use `start` method - * @Author cclilshy - * @Date 2024/10/7 17:57 - * @return void - */ - public function run(): void - { - $this->start(); - } } diff --git a/src/Futures.php b/src/Futures.php new file mode 100644 index 0000000..9d5e728 --- /dev/null +++ b/src/Futures.php @@ -0,0 +1,94 @@ +finally(function ($result) { + $this->results[] = $result; + while ($waiter = array_shift($this->waiters)) { + Coroutine::resume($waiter, $result); + } + }); + } + } + + /** + * @return mixed + * @throws Throwable + */ + public function current(): mixed + { + if (isset($this->results[$this->index])) { + return $this->results[$this->index]; + } else { + $this->waiters[] = $suspension = getSuspension(); + return Coroutine::suspend($suspension); + } + } + + /** + * @return void + */ + public function next(): void + { + $this->index++; + } + + /** + * @return int + */ + public function key(): int + { + return $this->index; + } + + /** + * @return bool + */ + public function valid(): bool + { + return isset($this->promises[$this->index]); + } + + /** + * @return void + */ + public function rewind(): void + { + $this->index = 0; + } +} diff --git a/src/Kernel.php b/src/Kernel.php index 2eadbce..b39b1ca 100644 --- a/src/Kernel.php +++ b/src/Kernel.php @@ -1,35 +1,13 @@ processControl = extension_loaded('pcntl') && extension_loaded('posix'); } - /** - * @return Kernel - */ - public static function getInstance(): Kernel - { - if (!isset(Kernel::$instance)) { - Kernel::$instance = new self(); - } - return Kernel::$instance; - } - /** * This method is different from onReject, which allows accepting any type of rejected futures object. * When await promise is rejected, an error will be thrown instead of returning the rejected value. @@ -124,7 +91,18 @@ public static function getInstance(): Kernel */ public function await(Promise $promise): mixed { - return \Ripple\Coroutine::getInstance()->await($promise); + return Coroutine\Coroutine::getInstance()->await($promise); + } + + /** + * @return Kernel + */ + public static function getInstance(): Kernel + { + if (!isset(Kernel::$instance)) { + Kernel::$instance = new self(); + } + return Kernel::$instance; } /** @@ -137,7 +115,7 @@ public function await(Promise $promise): mixed */ public function async(Closure $closure): Promise { - return \Ripple\Coroutine::getInstance()->async($closure); + return Coroutine\Coroutine::getInstance()->async($closure); } /** @@ -247,7 +225,7 @@ public function wait(Closure|null $result = null): void if (!$this->mainRunning) { try { - \Ripple\Coroutine::resume($this->mainSuspension, $result); + Coroutine\Coroutine::resume($this->mainSuspension, $result); if (Fiber::getCurrent()) { Fiber::suspend(); } @@ -262,7 +240,7 @@ public function wait(Closure|null $result = null): void try { $this->mainRunning = false; - $result = \Ripple\Coroutine::suspend($this->mainSuspension); + $result = Coroutine\Coroutine::suspend($this->mainSuspension); $this->mainRunning = true; if ($result instanceof Closure) { diff --git a/src/Parallel/Channel.php b/src/Parallel/Channel.php index 60ab345..24a2024 100644 --- a/src/Parallel/Channel.php +++ b/src/Parallel/Channel.php @@ -1,35 +1,13 @@ ['pipe', 'r'], - 1 => ['pipe', 'w'], - 2 => ['pipe', 'w'], - ), $pipes); - - if (is_resource($process)) { - return new Session( - $process, - $pipes[0], - $pipes[1], - $pipes[2], - ); - } - - return false; - } -} diff --git a/src/Proc/Proc.php b/src/Proc/Proc.php new file mode 100644 index 0000000..4659155 --- /dev/null +++ b/src/Proc/Proc.php @@ -0,0 +1,54 @@ + ['pipe', 'r'], + 1 => ['pipe', 'w'], + 2 => ['pipe', 'w'], + ), $pipes); + + if (is_resource($process)) { + return new Session( + $process, + $pipes[0], + $pipes[1], + $pipes[2], + ); + } + + return false; + } +} diff --git a/src/Proc/ProcStream.php b/src/Proc/ProcStream.php deleted file mode 100644 index 77d87d1..0000000 --- a/src/Proc/ProcStream.php +++ /dev/null @@ -1,45 +0,0 @@ -status = proc_get_status($this->proc); - $this->streamStdInput = new ProcStream($streamStdInput); - $this->streamStdOutput = new ProcStream($streamStdOutput); - $this->streamStdError = new ProcStream($streamStdError); + $this->streamStdInput = new Stream($streamStdInput); + $this->streamStdOutput = new Stream($streamStdOutput); + $this->streamStdError = new Stream($streamStdError); $this->streamStdInput->setBlocking(false); $this->streamStdOutput->setBlocking(false); $this->streamStdError->setBlocking(false); @@ -187,8 +165,8 @@ public function write(string $content): bool try { $this->streamStdInput->write($content); return true; - } catch (ConnectionException $e) { - Output::error($e->getMessage()); + } catch (ConnectionException $exception) { + Output::error($exception->getMessage()); $this->close(); return false; } diff --git a/src/Proc/Stream.php b/src/Proc/Stream.php new file mode 100644 index 0000000..abc6624 --- /dev/null +++ b/src/Proc/Stream.php @@ -0,0 +1,21 @@ +create()); + $this->distributeForked(); + } + /** * @return void */ @@ -219,8 +233,8 @@ public function distributeForked(): void try { unset($this->onFork[$key]); $closure(); - } catch (Throwable $e) { - Output::error($e->getMessage()); + } catch (Throwable $exception) { + Output::error($exception->getMessage()); } } @@ -314,41 +328,4 @@ public function getRootProcessID(): int { return $this->rootProcessID; } - - /** - * @return void - */ - public function forgetEvents(): void - { - foreach (EventLoop::getIDentifiers() as $identifier) { - @cancel($identifier); - } - EventLoop::run(); - EventLoop::setDriver((new EventLoop\DriverFactory())->create()); - $this->distributeForked(); - } - - /** - * @param Closure $closure - * - * @return void - */ - public function processedInMain(Closure $closure): void - { - $suspension = getSuspension(); - if ($suspension instanceof Suspension) { - throw new EscapeException($closure); - } else { - // this is main - if (!Fiber::getCurrent()) { - $closure(); - wait(); - exit(0); - } - - // in fiber - wait($closure); - exit(0); - } - } } diff --git a/src/Process/Runtime.php b/src/Process/Runtime.php index cb4f845..b057d44 100644 --- a/src/Process/Runtime.php +++ b/src/Process/Runtime.php @@ -1,35 +1,13 @@ processID, SIGKILL); } diff --git a/src/Process/Task.php b/src/Process/Task.php index de330c5..879d29b 100644 --- a/src/Process/Task.php +++ b/src/Process/Task.php @@ -1,42 +1,20 @@ reject($exception); - } catch (Throwable $e) { - Output::warning($e->getMessage()); + } catch (Throwable $exception) { + Output::warning($exception->getMessage()); } } } @@ -124,8 +100,8 @@ public function resolve(mixed $value): void if ($value instanceof Promise) { try { $this->resolve(await($value)); - } catch (Throwable $e) { - $this->reject($e); + } catch (Throwable $exception) { + $this->reject($exception); } return; } @@ -171,6 +147,33 @@ public function reject(mixed $reason): void } } + /** + * This method returns a Promise object, which will only be triggered successfully when + * all promise objects in the iterable parameter object are successful. + * + * @param Promise[] $promises + * + * @return \Ripple\Promise + */ + public static function all(array $promises): Promise + { + return new Promise(static function (Closure $resolve, Closure $reject) use ($promises) { + Promise::allSettled($promises) + ->then(static function (array $results) use ($resolve, $reject) { + $values = []; + foreach ($results as $result) { + if ($result->getStatus() === Promise::FULFILLED) { + $values[] = $result->getResult(); + } else { + $reject($result->getResult()); + return; + } + } + $resolve($values); + }); + }); + } + /** * Define subsequent behavior. When the Promise state changes, it will be called in the order of the then method. * If the Promise has been completed, execute it immediately @@ -225,65 +228,29 @@ public function except(Closure $onRejected): Promise } /** - * Define subsequent behavior. When the Promise state changes, it will be called in the order of the then method. - * - * @param Closure $onFinally - * - * @return $this - */ - public function finally(Closure $onFinally): Promise - { - if ($this->status !== Promise::PENDING) { - try { - call_user_func($onFinally, $this->result); - } catch (Throwable $exception) { - Output::warning($exception->getMessage()); - } - return $this; - } else { - $this->onFulfilled[] = $onFinally; - $this->onRejected[] = $onFinally; - } - - return $this; - } - - /** - * Define the behavior after rejection. When the Promise state changes, it will be called in the order of the catch method. - * If the Promise has been rejected, execute it immediately + * This method is used to wait for all Promises to be settled, whether fulfilled or rejected. + * The final callback function will receive an array containing all promises * - * @param Closure $onRejected + * @param Promise[] $promises * - * @return $this - * @deprecated You should use the except method because this method is a reserved keyword + * @return \Ripple\Promise */ - public function catch(Closure $onRejected): Promise + public static function allSettled(array $promises): Promise { - return $this->except($onRejected); - } + return async(static function (Closure $resolve) use ($promises) { + $waitGroup = new WaitGroup(count($promises)); - /** - * @return string - */ - public function getStatus(): string - { - return $this->status; - } + foreach ($promises as $promise) { + $promise->then( + static fn () => $waitGroup->done(), + static fn () => $waitGroup->done() + ); + } - /** - * @return mixed - */ - public function getResult(): mixed - { - return $this->result; - } + $waitGroup->wait(); - /** - * @return string - */ - public function getState(): string - { - return $this->status; + $resolve($promises); + }); } /** @@ -319,77 +286,19 @@ public function await(): mixed } /** - * @return void - * @throws Exception - */ - public function cancel(): void - { - // TODO: Implement cancel() method. - throw new Exception('Method not implemented'); - } - - /** - * @param Closure $onRejected - * - * @return Promise - */ - public function otherwise(Closure $onRejected): Promise - { - return $this->except($onRejected); - } - - - /** - * This method returns a Promise object, which will only be triggered successfully when - * all promise objects in the iterable parameter object are successful. - * - * @param Promise[] $promises - * - * @return \Ripple\Promise + * @return string */ - public static function all(array $promises): Promise + public function getStatus(): string { - return new Promise(static function (Closure $resolve, Closure $reject) use ($promises) { - Promise::allSettled($promises) - ->then(static function (array $results) use ($resolve, $reject) { - $values = []; - foreach ($results as $result) { - if ($result->getStatus() === Promise::FULFILLED) { - $values[] = $result->getResult(); - } else { - $reject($result->getResult()); - return; - } - } - $resolve($values); - }); - }); + return $this->status; } /** - * This method is used to wait for all Promises to be settled, whether fulfilled or rejected. - * The final callback function will receive an array containing all promises - * - * @param Promise[] $promises - * - * @return \Ripple\Promise + * @return mixed */ - public static function allSettled(array $promises): Promise + public function getResult(): mixed { - return async(static function (Closure $resolve) use ($promises) { - $waitGroup = new WaitGroup(count($promises)); - - foreach ($promises as $promise) { - $promise->then( - static fn () => $waitGroup->done(), - static fn () => $waitGroup->done() - ); - } - - $waitGroup->wait(); - - $resolve($promises); - }); + return $this->result; } /** @@ -411,6 +320,30 @@ public static function race(array $promises): Promise }); } + /** + * Define subsequent behavior. When the Promise state changes, it will be called in the order of the then method. + * + * @param Closure $onFinally + * + * @return $this + */ + public function finally(Closure $onFinally): Promise + { + if ($this->status !== Promise::PENDING) { + try { + call_user_func($onFinally, $this->result); + } catch (Throwable $exception) { + Output::warning($exception->getMessage()); + } + return $this; + } else { + $this->onFulfilled[] = $onFinally; + $this->onRejected[] = $onFinally; + } + + return $this; + } + /** * When any of the incoming Promise succeeds, this new Promise will succeed. * If all Promises fail, the new Promise will fail with an AggregateError. @@ -435,10 +368,52 @@ public static function any(array $promises): Promise /** * @param array $promises * - * @return \Ripple\Coroutine\Futures + * @return \Ripple\Futures */ public static function futures(array $promises): Futures { return new Futures($promises); } + + /** + * Define the behavior after rejection. When the Promise state changes, it will be called in the order of the catch method. + * If the Promise has been rejected, execute it immediately + * + * @param Closure $onRejected + * + * @return $this + * @deprecated You should use the except method because this method is a reserved keyword + */ + public function catch(Closure $onRejected): Promise + { + return $this->except($onRejected); + } + + /** + * @return string + */ + public function getState(): string + { + return $this->status; + } + + /** + * @return void + * @throws Exception + */ + public function cancel(): void + { + // TODO: Implement cancel() method. + throw new Exception('Method not implemented'); + } + + /** + * @param Closure $onRejected + * + * @return Promise + */ + public function otherwise(Closure $onRejected): Promise + { + return $this->except($onRejected); + } } diff --git a/src/Socket.php b/src/Socket.php index a4c2fa1..0bff48a 100644 --- a/src/Socket.php +++ b/src/Socket.php @@ -1,56 +1,52 @@ stream)) { + throw new RuntimeException('Failed to import stream'); + } + + $this->socket = $socket; + + if (!$peerName) { + $peerName = stream_socket_get_name($this->stream, true); + } + + if ($peerName) { + $this->address = $peerName; + } else { + $this->address = null; + } + + if ($this->address) { + $exploded = explode(':', $this->address); + $this->host = $exploded[0]; + $this->port = intval($exploded[1] ?? 0); + } + } /** * @param string $address * @param int $timeout * @param mixed|null $context * - * @return SocketStream + * @return Socket * @throws \Ripple\Stream\Exception\ConnectionException */ - public function connectWithSSL(string $address, int $timeout = 0, mixed $context = null): SocketStream + public static function connectWithSSL(string $address, int $timeout = 0, mixed $context = null): Socket { $address = str_replace('ssl://', 'tcp://', $address); - $streamSocket = $this->connect($address, $timeout, $context); - $this->enableSSL($streamSocket, $timeout); - return $streamSocket; + $Socket = Socket::connect($address, $timeout, $context); + $Socket->enableSSL(); + return $Socket; } /** @@ -88,10 +135,10 @@ public function connectWithSSL(string $address, int $timeout = 0, mixed $context * @param int $timeout * @param mixed|null $context * - * @return SocketStream + * @return Socket * @throws ConnectionException */ - public function connect(string $address, int $timeout = 0, mixed $context = null): SocketStream + public static function connect(string $address, int $timeout = 0, mixed $context = null): Socket { try { return promise(static function (Closure $resolve, Closure $reject) use ($address, $timeout, $context) { @@ -109,7 +156,7 @@ public function connect(string $address, int $timeout = 0, mixed $context = null return; } - $stream = new SocketStream($connection, $address); + $stream = new static($connection, $address); if ($timeout > 0) { $timeoutEventID = delay(static function () use ($stream, $reject) { @@ -121,28 +168,28 @@ public function connect(string $address, int $timeout = 0, mixed $context = null $timeoutEventCancel = fn () => null; } - $stream->onWriteable(static function (SocketStream $stream, Closure $cancel) use ($resolve, $timeoutEventCancel) { + $stream->onWriteable(static function (Socket $stream, Closure $cancel) use ($resolve, $timeoutEventCancel) { $cancel(); $resolve($stream); $timeoutEventCancel(); }); })->await(); - } catch (Throwable $e) { - throw new ConnectionException('Failed to connect to the server.', ConnectionException::CONNECTION_ERROR, $e); + } catch (Throwable $exception) { + throw new ConnectionException('Failed to connect to the server.', ConnectionException::CONNECTION_ERROR, $exception); } } /** - * @param SocketStream $stream - * @param float $timeout + * @param float $timeout * - * @return SocketStream + * @return Socket * @throws ConnectionException */ - public function enableSSL(SocketStream $stream, float $timeout = 0): SocketStream + public function enableSSL(float $timeout = 0): Socket { try { - return promise(static function (Closure $resolve, Closure $reject, Promise $promise) use ($stream, $timeout) { + return promise(function (Closure $resolve, Closure $reject, Promise $promise) use ($timeout) { + $stream = $this; if ($timeout > 0) { $timeoutEventID = delay(static function () use ($reject) { $reject(new ConnectionException('SSL handshake timeout.', ConnectionException::CONNECTION_TIMEOUT)); @@ -168,7 +215,7 @@ public function enableSSL(SocketStream $stream, float $timeout = 0): SocketStrea } if ($handshakeResult === 0) { - $stream->onReadable(static function (SocketStream $stream, Closure $cancel) use ($resolve, $reject) { + $stream->onReadable(static function (Socket $stream, Closure $cancel) use ($resolve, $reject) { try { $handshakeResult = @stream_socket_enable_crypto( $stream->stream, @@ -195,8 +242,8 @@ public function enableSSL(SocketStream $stream, float $timeout = 0): SocketStrea }); } })->await(); - } catch (Throwable $e) { - throw new ConnectionException('Failed to enable SSL.', ConnectionException::CONNECTION_CRYPTO, $e); + } catch (Throwable $exception) { + throw new ConnectionException('Failed to enable SSL.', ConnectionException::CONNECTION_CRYPTO, $exception); } } @@ -204,9 +251,9 @@ public function enableSSL(SocketStream $stream, float $timeout = 0): SocketStrea * @param string $address * @param mixed|null $context * - * @return SocketStream|false + * @return static|false */ - public function server(string $address, mixed $context = null): SocketStream|false + public static function server(string $address, mixed $context = null): Socket|false { if (is_array($context)) { $context = stream_context_create($context); @@ -220,6 +267,231 @@ public function server(string $address, mixed $context = null): SocketStream|fal $context ); - return $server ? new SocketStream($server) : false; + return $server ? new static($server) : false; + } + + /** + * @param int|float $timeout + * + * @return static|false + */ + public function accept(int|float $timeout = 0): Socket|false + { + $socket = @stream_socket_accept($this->stream, $timeout, $peerName); + if ($socket === false) { + return false; + } + return new static($socket, $peerName); + } + + /** + * @param int $level + * @param int $option + * @param mixed $value + * + * @return void + */ + public function setOption(int $level, int $option, mixed $value): void + { + if (!socket_set_option($this->socket, $level, $option, $value)) { + throw new RuntimeException('Failed to set socket option: ' . socket_strerror(socket_last_error($this->socket))); + } + } + + /** + * @Author cclilshy + * @Date 2024/9/2 20:41 + * + * @param int $length + * @param mixed $target + * @param int|null $flags + * + * @return int + * @throws ConnectionException + */ + public function receive(int $length, mixed &$target, int|null $flags = 0): int + { + $realLength = socket_recv($this->socket, $target, $length, $flags); + if ($realLength === false) { + $this->close(); + throw new ConnectionException('Unable to read from stream', ConnectionException::CONNECTION_READ_FAIL); + } + return $realLength; + } + + /** + * @param string $string + * + * @return int + * @throws ConnectionException + */ + public function write(string $string): int + { + try { + return $this->writeInternal($string); + } catch (Throwable $exception) { + $this->close(); + throw new ConnectionException($exception->getMessage(), ConnectionException::CONNECTION_WRITE_FAIL); + } + } + + /** + * @param string $string + * @param bool $wait + * + * @return int + * @throws \Ripple\Stream\Exception\ConnectionException + */ + public function writeInternal(string $string, bool $wait = true): int + { + $writeLength = 0; + if (!$this->blocking) { + $length = parent::write($string); + $string2cache = substr($string, $length); + if ($string2cache === '') { + return $length; + } + + $writeLength += $length; + + $this->blocking = true; + $tempFilePath = sys_get_temp_dir() . '/' . uniqid('buf_'); + + $this->storageCacheWrite = File::open($tempFilePath, 'w+'); + $this->storageCacheWrite->setBlocking(true); + + $this->storageCacheRead = File::open($tempFilePath, 'r+'); + $this->storageCacheRead->setBlocking(false); + + $eventID = $this->onClose(function () use ($tempFilePath) { + $this->cleanupTempFiles($tempFilePath); + }); + + $this->onWriteable(function (Socket $_, Closure $cancel) use ($tempFilePath, $eventID) { + if ($buffer = $this->storageCacheRead->read($this->getOption(SOL_SOCKET, SO_SNDLOWAT))) { + try { + parent::write($buffer); + } catch (ConnectionException $exception) { + $this->blocking = false; + $this->cleanupTempFiles($tempFilePath); + $cancel(); + $this->cancelOnClose($eventID); + + if (isset($this->writePromise)) { + $this->writePromise->reject($exception); + + unset($this->writePromise); + } + return; + } + } + + if ($this->storageCacheRead->eof()) { + $this->blocking = false; + $this->cleanupTempFiles($tempFilePath); + $cancel(); + $this->cancelOnClose($eventID); + + if (isset($this->writePromise)) { + $this->writePromise->resolve(0); + + unset($this->writePromise); + } + } + }); + } else { + $string2cache = $string; + } + + $writeLength += $this->storageCacheWrite->write($string2cache); + + /*** @var Promise $writePromise */ + if ($wait) { + if (!isset($this->writePromise)) { + $this->writePromise = promise(static function () { + }); + } + + try { + $this->writePromise->await(); + return strlen($string); + } catch (Throwable $exception) { + $this->close(); + throw new ConnectionException($exception->getMessage(), ConnectionException::CONNECTION_WRITE_FAIL); + } + } + + return $writeLength; + } + + /** + * Clean up temp files and close file handles. + * + * @param string $tempFilePath + */ + protected function cleanupTempFiles(string $tempFilePath): void + { + $this->storageCacheWrite->close(); + $this->storageCacheRead->close(); + if (file_exists($tempFilePath)) { + unlink($tempFilePath); + } + $this->storageCacheWrite = null; + $this->storageCacheRead = null; + } + + /** + * @param int $level + * @param int $option + * + * @return array|int + */ + public function getOption(int $level, int $option): array|int + { + $option = socket_get_option($this->socket, $level, $option); + if ($option === false) { + throw new RuntimeException('Failed to get socket option: ' . socket_strerror(socket_last_error($this->socket))); + } + return $option; + } + + /** + * @return string + */ + public function getAddress(): string + { + return $this->address; + } + + /** + * @return string + */ + public function getHost(): string + { + return $this->host; + } + + /** + * @return int + */ + public function getPort(): int + { + return $this->port; + } + + /** + * @param int $length + * + * @return string + * @throws ConnectionException + */ + public function readContinuously(int $length): string + { + $content = ''; + while ($buffer = $this->read($length)) { + $content .= $buffer; + } + + return $content; } } diff --git a/src/Socket/SocketStream.php b/src/Socket/SocketStream.php deleted file mode 100644 index d9c1a03..0000000 --- a/src/Socket/SocketStream.php +++ /dev/null @@ -1,374 +0,0 @@ -stream)) { - throw new RuntimeException('Failed to import stream'); - } - - $this->socket = $socket; - - if (!$peerName) { - $peerName = stream_socket_get_name($this->stream, true); - } - - if ($peerName) { - $this->address = $peerName; - } else { - $this->address = null; - } - - if ($this->address) { - $exploded = explode(':', $this->address); - $this->host = $exploded[0]; - $this->port = intval($exploded[1] ?? 0); - } - } - - /** - * @param int|float $timeout - * - * @return \Ripple\Socket\SocketStream|false - */ - public function accept(int|float $timeout = 0): SocketStream|false - { - $socket = @stream_socket_accept($this->stream, $timeout, $peerName); - if ($socket === false) { - return false; - } - return new static($socket, $peerName); - } - - /** - * @param int $level - * @param int $option - * @param mixed $value - * - * @return void - */ - public function setOption(int $level, int $option, mixed $value): void - { - if (!socket_set_option($this->socket, $level, $option, $value)) { - throw new RuntimeException('Failed to set socket option: ' . socket_strerror(socket_last_error($this->socket))); - } - } - - /** - * @Author cclilshy - * @Date 2024/9/2 20:41 - * - * @param int $length - * @param mixed $target - * @param int|null $flags - * - * @return int - * @throws ConnectionException - */ - public function receive(int $length, mixed &$target, int|null $flags = 0): int - { - $realLength = socket_recv($this->socket, $target, $length, $flags); - if ($realLength === false) { - throw new ConnectionException( - 'Unable to read from stream', - ConnectionException::CONNECTION_READ_FAIL, - null, - $this - ); - } - return $realLength; - } - - /** - * @param string $string - * - * @return int - * @throws ConnectionException - */ - public function write(string $string): int - { - try { - return $this->writeInternal($string); - } catch (Throwable $e) { - throw new ConnectionException( - $e->getMessage(), - ConnectionException::CONNECTION_WRITE_FAIL, - null, - $this - ); - } - } - - /** - * @param string $string - * @param bool $wait - * - * @return int - * @throws \Ripple\Stream\Exception\ConnectionException - */ - public function writeInternal(string $string, bool $wait = true): int - { - $writeLength = 0; - if (!$this->blocking) { - $length = parent::write($string); - $string2cache = substr($string, $length); - if ($string2cache === '') { - return $length; - } - - $writeLength += $length; - - $this->blocking = true; - $tempFilePath = sys_get_temp_dir() . '/' . uniqid('buf_'); - - $this->storageCacheWrite = IO::File()->open($tempFilePath, 'w+'); - $this->storageCacheWrite->setBlocking(true); - - $this->storageCacheRead = IO::File()->open($tempFilePath, 'r+'); - $this->storageCacheRead->setBlocking(false); - - $eventID = $this->onClose(function () use ($tempFilePath) { - $this->cleanupTempFiles($tempFilePath); - }); - - $this->onWriteable(function (SocketStream $_, Closure $cancel) use ($tempFilePath, $eventID) { - if ($buffer = $this->storageCacheRead->read($this->getOption(SOL_SOCKET, SO_SNDLOWAT))) { - try { - parent::write($buffer); - } catch (ConnectionException $e) { - $this->blocking = false; - $this->cleanupTempFiles($tempFilePath); - $cancel(); - $this->cancelOnClose($eventID); - - if (isset($this->writePromise)) { - $this->writePromise->reject($e); - - unset($this->writePromise); - } - return; - } - } - - if ($this->storageCacheRead->eof()) { - $this->blocking = false; - $this->cleanupTempFiles($tempFilePath); - $cancel(); - $this->cancelOnClose($eventID); - - if (isset($this->writePromise)) { - $this->writePromise->resolve(0); - - unset($this->writePromise); - } - } - }); - } else { - $string2cache = $string; - } - - $writeLength += $this->storageCacheWrite->write($string2cache); - - /*** @var Promise $writePromise */ - if ($wait) { - if (!isset($this->writePromise)) { - $this->writePromise = \Co\promise(static function () { - }); - } - - try { - $this->writePromise->await(); - return strlen($string); - } catch (Throwable $e) { - throw new ConnectionException( - $e->getMessage(), - ConnectionException::CONNECTION_WRITE_FAIL, - null, - $this - ); - } - } - - return $writeLength; - } - - /** - * Clean up temp files and close file handles. - * - * @param string $tempFilePath - */ - protected function cleanupTempFiles(string $tempFilePath): void - { - $this->storageCacheWrite->close(); - $this->storageCacheRead->close(); - if (file_exists($tempFilePath)) { - unlink($tempFilePath); - } - $this->storageCacheWrite = null; - $this->storageCacheRead = null; - } - - /** - * @param int $level - * @param int $option - * - * @return array|int - */ - public function getOption(int $level, int $option): array|int - { - $option = socket_get_option($this->socket, $level, $option); - if ($option === false) { - throw new RuntimeException('Failed to get socket option: ' . socket_strerror(socket_last_error($this->socket))); - } - return $option; - } - - /** - * @return string - */ - public function getAddress(): string - { - return $this->address; - } - - /** - * @return string - */ - public function getHost(): string - { - return $this->host; - } - - /** - * @return int - */ - public function getPort(): int - { - return $this->port; - } - - /** - * @param int $length - * - * @return string - * @throws ConnectionException - */ - public function readContinuously(int $length): string - { - $content = ''; - while ($buffer = $this->read($length)) { - $content .= $buffer; - } - - return $content; - } - - /** - * @Author cclilshy - * @Date 2024/9/29 11:01 - * @return SocketStream - * @throws ConnectionException - */ - public function enableSSL(): SocketStream - { - return IO::Socket()->enableSSL($this); - } -} diff --git a/src/Socket/Tunnel/Tunnel.php b/src/Socket/Tunnel/Tunnel.php deleted file mode 100644 index ded2a89..0000000 --- a/src/Socket/Tunnel/Tunnel.php +++ /dev/null @@ -1,112 +0,0 @@ -proxy->setBlocking(false); - } - - /** - * @Author cclilshy - * @Date 2024/8/29 12:38 - * - * @param SocketStream|string $target - * @param array $payload - * @param bool $wait - * - * @return static - * @throws \Ripple\Stream\Exception\ConnectionException - */ - public static function connect(SocketStream|string $target, array $payload, bool $wait = true): static - { - if (is_string($target)) { - $context = stream_context_create([ - 'ssl' => [ - 'peer_name' => $payload['host'], - 'allow_self_signed' => true - ] - ]); - - $target = match (str_starts_with($target, 'ssl://')) { - true => IO::Socket()->connectWithSSL($target, 0, $context), - default => IO::Socket()->connect($target, 0, $context) - }; - } - - $tunnel = new static($target, $payload); - if ($wait) { - $tunnel->handshake(); - } - return $tunnel; - } - - /** - * @Author cclilshy - * @Date 2024/8/29 11:34 - * @return void - */ - abstract public function handshake(): void; - - /** - * @Author cclilshy - * @Date 2024/8/29 12:33 - * @return SocketStream - */ - public function getSocketStream(): SocketStream - { - return $this->proxy; - } -} diff --git a/src/Stream.php b/src/Stream.php index 38b967f..479f708 100644 --- a/src/Stream.php +++ b/src/Stream.php @@ -1,35 +1,13 @@ onCloseCallbacks[$key]); - } - - /** - * - * @param Closure $closure - * - * @return string - */ - public function onReadable(Closure $closure): string - { - $this->cancelReadable(); - return $this->onReadable = EventLoop::onReadable($this->stream, function () use ($closure) { - try { - call_user_func_array($closure, [$this, fn () => $this->cancelReadable()]); - } catch (Throwable $e) { - Output::error($e->getMessage()); - } - }); - } - /** * @return void */ @@ -168,39 +119,6 @@ public function cancelReadable(): void } } - /** - * @return void - */ - public function close(): void - { - if ($this->isClosed()) { - return; - } - - // Effective closing of the stream should occur before any callbacks to prevent the close method from being called again in the callbacks. - parent::close(); - - $this->cancelReadable(); - $this->cancelWriteable(); - - if (isset($this->transaction)) { - $this->failTransaction(new ConnectionException( - 'Stream has been closed', - ConnectionException::CONNECTION_CLOSED, - null, - $this - )); - } - - foreach ($this->onCloseCallbacks as $callback) { - try { - call_user_func($callback); - } catch (Throwable $e) { - Output::error($e->getMessage()); - } - } - } - /** * @return void */ @@ -212,35 +130,6 @@ public function cancelWriteable(): void } } - /** - * @param Throwable $exception - * - * @return void - */ - public function failTransaction(Throwable $exception): void - { - if (isset($this->transaction)) { - $this->transaction->fail($exception); - } - } - - /** - * @param Closure $closure - * - * @return string - */ - public function onWriteable(Closure $closure): string - { - $this->cancelWriteable(); - return $this->onWriteable = EventLoop::onWritable($this->stream, function () use ($closure) { - try { - call_user_func_array($closure, [$this, fn () => $this->cancelWriteable()]); - } catch (Throwable $e) { - Output::error($e->getMessage()); - } - }); - } - /** * @param bool $bool * @@ -267,6 +156,17 @@ public function transaction(Closure $closure): void call_user_func_array($closure, [$this->getTransaction()]); } + /** + * @return Transaction|null + */ + public function getTransaction(): Transaction|null + { + if (isset($this->transaction)) { + return $this->transaction; + } + return null; + } + /** * @param Transaction $transaction * @@ -291,17 +191,6 @@ public function completeTransaction(): void } } - /** - * @return Transaction|null - */ - public function getTransaction(): Transaction|null - { - if (isset($this->transaction)) { - return $this->transaction; - } - return null; - } - /** * Wait for readable events. This method is only valid when there are no readable events to listen for. * After enabling this method, it is forbidden to use the onReadable method elsewhere unless you know what you are doing. @@ -324,16 +213,20 @@ public function waitForReadable(int $timeout = 0): bool } // If the stream is closed, return false directly. - $closeOID = $this->onClose(fn () => Coroutine::throw( - $suspension, - new ConnectionCloseException('Stream has been closed', null, $this) - )); + $closeOID = $this->onClose(function () use ($suspension) { + Coroutine::throw( + $suspension, + new ConnectionCloseException('Stream has been closed', null) + ); + $this->close(); + }); + if ($timeout > 0) { // If a timeout is set, the suspension will be canceled after the timeout - $timeoutOID = delay(fn () => Coroutine::throw( - $suspension, - new ConnectionTimeoutException('Stream read timeout', null, $this, false) - ), $timeout); + $timeoutOID = delay(static function () use ($suspension) { + Coroutine::throw($suspension, new ConnectionTimeoutException('Stream read timeout', null)); + $this->close(); + }, $timeout); } $result = Coroutine::suspend($suspension); @@ -342,6 +235,87 @@ public function waitForReadable(int $timeout = 0): bool return $result; } + /** + * + * @param Closure $closure + * + * @return string + */ + public function onReadable(Closure $closure): string + { + $this->cancelReadable(); + return $this->onReadable = EventLoop::onReadable($this->stream, function () use ($closure) { + try { + call_user_func_array($closure, [$this, fn () => $this->cancelReadable()]); + } catch (Throwable $exception) { + Output::error($exception->getMessage()); + } + }); + } + + /** + * @return void + */ + public function close(): void + { + if ($this->isClosed()) { + return; + } + + // Effective closing of the stream should occur before any callbacks to prevent the close method from being called again in the callbacks. + parent::close(); + + $this->cancelReadable(); + $this->cancelWriteable(); + + if (isset($this->transaction)) { + $this->failTransaction(new ConnectionException( + 'Stream has been closed', + ConnectionException::CONNECTION_CLOSED, + null, + $this + )); + } + + foreach ($this->onCloseCallbacks as $callback) { + try { + call_user_func($callback); + } catch (Throwable $exception) { + Output::error($exception->getMessage()); + } + } + } + + /** + * @return bool + */ + public function isClosed(): bool + { + return !is_resource($this->stream); + } + + /** + * @param Throwable $exception + * + * @return void + */ + public function failTransaction(Throwable $exception): void + { + if (isset($this->transaction)) { + $this->transaction->fail($exception); + } + } + + /** + * @param string $key + * + * @return void + */ + public function cancelOnClose(string $key): void + { + unset($this->onCloseCallbacks[$key]); + } + /** * Wait for writeable events. This method is only valid when there is no writeable event listener. * After enabling this method, it is forbidden to use the onWritable method elsewhere unless you know what you are doing. @@ -366,16 +340,16 @@ public function waitForWriteable(int $timeout = 0): bool } // If the stream is closed, return false directly. - $closeOID = $this->onClose(fn () => Coroutine::throw( - $suspension, - new ConnectionCloseException('Stream has been closed', null, $this) - )); + $closeOID = $this->onClose(function () use ($suspension) { + Coroutine::throw($suspension, new ConnectionCloseException('Stream has been closed')); + $this->close(); + }); + if ($timeout > 0) { // If a timeout is set, the suspension will be canceled after the timeout - $timeoutOID = delay(fn () => Coroutine::throw( - $suspension, - new ConnectionTimeoutException('Stream write timeout', null, $this, false) - ), $timeout); + $timeoutOID = delay(static function () use ($suspension) { + Coroutine::throw($suspension, new ConnectionTimeoutException('Stream write timeout')); + }, $timeout); } $result = Coroutine::suspend($suspension); @@ -385,10 +359,19 @@ public function waitForWriteable(int $timeout = 0): bool } /** - * @return bool + * @param Closure $closure + * + * @return string */ - public function isClosed(): bool + public function onWriteable(Closure $closure): string { - return !is_resource($this->stream); + $this->cancelWriteable(); + return $this->onWriteable = EventLoop::onWritable($this->stream, function () use ($closure) { + try { + call_user_func_array($closure, [$this, fn () => $this->cancelWriteable()]); + } catch (Throwable $exception) { + Output::error($exception->getMessage()); + } + }); } } diff --git a/src/Stream/Exception/ConnectionCloseException.php b/src/Stream/Exception/ConnectionCloseException.php index 46e2cd9..efd2258 100644 --- a/src/Stream/Exception/ConnectionCloseException.php +++ b/src/Stream/Exception/ConnectionCloseException.php @@ -1,4 +1,14 @@ close(); + $close && $stream?->close(); } } diff --git a/src/Stream/Exception/ConnectionTimeoutException.php b/src/Stream/Exception/ConnectionTimeoutException.php index 4aca058..d263769 100644 --- a/src/Stream/Exception/ConnectionTimeoutException.php +++ b/src/Stream/Exception/ConnectionTimeoutException.php @@ -1,4 +1,14 @@ id = get_resource_id($resource); } - /** - * @return void - */ - public function close(): void - { - if (!is_resource($this->stream)) { - return; - } - fclose($this->stream); - } - /** * @param int|null $length * @@ -105,16 +72,23 @@ public function read(int|null $length): string { $content = @fread($this->stream, $length); if ($content === false) { - throw new ConnectionException( - 'Unable to read from stream', - ConnectionException::CONNECTION_READ_FAIL, - null, - $this, - ); + $this->close(); + throw new ConnectionException('Unable to read from stream', ConnectionException::CONNECTION_READ_FAIL); } return $content; } + /** + * @return void + */ + public function close(): void + { + if (!is_resource($this->stream)) { + return; + } + fclose($this->stream); + } + /** * @param string $string * @@ -125,12 +99,8 @@ public function write(string $string): int { $result = @fwrite($this->stream, $string); if ($result === false) { - throw new ConnectionException( - 'Unable to write to stream', - ConnectionException::CONNECTION_WRITE_FAIL, - null, - $this - ); + $this->close(); + throw new ConnectionException('Unable to write to stream', ConnectionException::CONNECTION_WRITE_FAIL); } return $result; } diff --git a/src/Stream/StreamInterface.php b/src/Stream/StreamInterface.php index ab7e311..8661ed0 100755 --- a/src/Stream/StreamInterface.php +++ b/src/Stream/StreamInterface.php @@ -1,35 +1,13 @@ finally(fn () => $this->cancelAll()); } + /** + * @return void + */ + protected function cancelAll(): void + { + foreach ($this->onCloseIDs as $id) { + $this->stream->cancelOnClose($id); + } + + if (isset($this->onReadableID)) { + cancel($this->onReadableID); + unset($this->onReadableID); + } + + if (isset($this->onWriteableID)) { + cancel($this->onWriteableID); + unset($this->onWriteableID); + } + } + /** * @param Closure $closure * @@ -102,6 +100,19 @@ public function onReadable(Closure $closure): string }); } + /** + * @param Throwable $exception + * + * @return void + */ + public function fail(Throwable $exception): void + { + if ($this->promise->getStatus() !== Promise::PENDING) { + return; + } + ($this->reject)($exception); + } + /** * @param Closure $closure * @@ -129,39 +140,6 @@ public function complete(): void ($this->resolve)(); } - /** - * @param Throwable $exception - * - * @return void - */ - public function fail(Throwable $exception): void - { - if ($this->promise->getStatus() !== Promise::PENDING) { - return; - } - ($this->reject)($exception); - } - - /** - * @return void - */ - protected function cancelAll(): void - { - foreach ($this->onCloseIDs as $id) { - $this->stream->cancelOnClose($id); - } - - if (isset($this->onReadableID)) { - cancel($this->onReadableID); - unset($this->onReadableID); - } - - if (isset($this->onWriteableID)) { - cancel($this->onWriteableID); - unset($this->onWriteableID); - } - } - /** * @param Closure $closure * diff --git a/src/Supports/Base.php b/src/Supports/Base.php index cf02a9e..e8612b3 100644 --- a/src/Supports/Base.php +++ b/src/Supports/Base.php @@ -1,35 +1,13 @@ buffer .= $this->proxy->readContinuously(8192); $this->processBuffer($resolve, $reject); - } catch (Exception $e) { - $reject($e); + } catch (Exception $exception) { + $reject($exception); } }); })->finally(function () { diff --git a/src/Socket/Tunnel/Socks5.php b/src/Tunnel/Socks5.php similarity index 73% rename from src/Socket/Tunnel/Socks5.php rename to src/Tunnel/Socks5.php index 03d295f..d0514d4 100644 --- a/src/Socket/Tunnel/Socks5.php +++ b/src/Tunnel/Socks5.php @@ -1,38 +1,16 @@ buffer .= $this->proxy->readContinuously(1024); $this->processBuffer($resolve, $reject); - } catch (Exception $e) { - $reject($e); + } catch (Exception $exception) { + $reject($exception); } }); })->finally(function () { diff --git a/src/Tunnel/Tunnel.php b/src/Tunnel/Tunnel.php new file mode 100644 index 0000000..e70b88b --- /dev/null +++ b/src/Tunnel/Tunnel.php @@ -0,0 +1,89 @@ +proxy->setBlocking(false); + } + + /** + * @Author cclilshy + * @Date 2024/8/29 12:38 + * + * @param Socket|string $target + * @param array $payload + * @param bool $wait + * + * @return static + * @throws \Ripple\Stream\Exception\ConnectionException + */ + public static function connect(Socket|string $target, array $payload, bool $wait = true): Tunnel + { + if (is_string($target)) { + $context = stream_context_create([ + 'ssl' => [ + 'peer_name' => $payload['host'], + 'allow_self_signed' => true + ] + ]); + + $target = match (str_starts_with($target, 'ssl://')) { + true => Socket::connectWithSSL($target, 0, $context), + default => Socket::connect($target, 0, $context) + }; + } + + $tunnel = new static($target, $payload); + if ($wait) { + $tunnel->handshake(); + } + return $tunnel; + } + + /** + * @Author cclilshy + * @Date 2024/8/29 11:34 + * @return void + */ + abstract public function handshake(): void; + + /** + * @Author cclilshy + * @Date 2024/8/29 12:33 + * @return Socket + */ + public function getSocket(): Socket + { + return $this->proxy; + } +} diff --git a/src/Utils/Format.php b/src/Utils/Format.php index 1d32a1c..ab9f7f6 100644 --- a/src/Utils/Format.php +++ b/src/Utils/Format.php @@ -1,4 +1,14 @@ setBlocking(false); $streamB->setBlocking(false); $streamA->onClose(fn () => $streamB->close()); $zx7e = new Zx7e(); $this->streams[$index] = $streamA; - $this->streams[$index]->onReadable(function (SocketStream $socketStream) use ($streamA, $index, $zx7e, $manager) { - $content = $socketStream->readContinuously(1024); + $this->streams[$index]->onReadable(function (Socket $Socket) use ($streamA, $index, &$zx7e, $manager) { + $content = $Socket->readContinuously(1024); foreach ($zx7e->decodeStream($content) as $string) { $manager->onCommand(Command::fromString($string), $this->getName(), $index); } @@ -263,8 +240,8 @@ private function guard(Manager $manager, int $index): bool $this->boot(); $this->zx7e = new Zx7e(); - $this->parentSocket->onReadable(function (SocketStream $socketStream) { - $content = $socketStream->readContinuously(1024); + $this->parentSocket->onReadable(function (Socket $Socket) { + $content = $Socket->readContinuously(1024); foreach ($this->zx7e->decodeStream($content) as $string) { $this->__onCommand(Command::fromString($string)); } diff --git a/src/functions.php b/src/functions.php index e03e447..b8a52be 100644 --- a/src/functions.php +++ b/src/functions.php @@ -1,35 +1,13 @@ sleep($second); - } catch (Throwable $e) { - throw new BadFunctionCallException($e->getMessage(), $e->getCode(), $e); + return Coroutine::getInstance()->sleep($second); + } catch (Throwable $exception) { + throw new BadFunctionCallException($exception->getMessage(), $exception->getCode(), $exception); } } @@ -220,8 +198,8 @@ function wait(Closure|null $closure = null): void { try { Kernel::getInstance()->wait($closure); - } catch (Throwable $e) { - throw new BadFunctionCallException($e->getMessage(), $e->getCode(), $e); + } catch (Throwable $exception) { + throw new BadFunctionCallException($exception->getMessage(), $exception->getCode(), $exception); } } @@ -266,21 +244,13 @@ function cancelForkHandler(string $eventID): void */ function getSuspension(): EventLoop\Suspension { - return \Ripple\Coroutine::getInstance()->getSuspension(); -} - -/** - * @return string - */ -function getID(): string -{ - return spl_object_hash(getSuspension()); + return Coroutine::getInstance()->getSuspension(); } /** * @param string $name * - * @return \Ripple\Channel + * @return \Ripple\Channel\Channel */ function channel(string $name): Channel { @@ -290,7 +260,7 @@ function channel(string $name): Channel /** * @param string $name * - * @return \Ripple\File\Lock\Lock + * @return \Ripple\File\Lock */ function lock(string $name): Lock { @@ -304,5 +274,52 @@ function lock(string $name): Lock */ function proc(string|array $entrance = '/bin/sh'): Session|false { - return Proc::getInstance()->open($entrance); + return Proc::open($entrance); +} + +/** + * @param Closure $closure + * + * @return \Ripple\Process\Task + */ +function fork(Closure $closure): Task +{ + return System::Process()->task($closure); +} + +/** + * @param \Revolt\EventLoop\Suspension|null $suspension + * + * @return mixed + * @throws Throwable + */ +function suspend(EventLoop\Suspension $suspension = null): mixed +{ + if ($suspension === null) { + $suspension = getSuspension(); + } + + return Coroutine::suspend($suspension); +} + +/** + * @param \Revolt\EventLoop\Suspension $suspension + * @param mixed|null $result + * + * @return mixed + */ +function resume(EventLoop\Suspension $suspension, mixed $result = null): mixed +{ + return Coroutine::resume($suspension, $result); +} + +/** + * @param \Revolt\EventLoop\Suspension $suspension + * @param Throwable $exception + * + * @return void + */ +function throw_(EventLoop\Suspension $suspension, Throwable $exception): void +{ + Coroutine::throw($suspension, $exception); } diff --git a/tests/CoroutineException.php b/tests/CoroutineException.php index 68880f8..ebf8272 100644 --- a/tests/CoroutineException.php +++ b/tests/CoroutineException.php @@ -3,7 +3,7 @@ namespace Tests; use PHPUnit\Framework\TestCase; -use Ripple\Coroutine; +use Ripple\Coroutine\Coroutine; use Ripple\Stream\Exception\Exception; use Throwable; diff --git a/tests/FileStreamTest.php b/tests/FileStreamTest.php index 14e949c..e39503f 100644 --- a/tests/FileStreamTest.php +++ b/tests/FileStreamTest.php @@ -2,9 +2,9 @@ namespace Tests; -use Co\IO; use PHPUnit\Framework\Attributes\Test; use PHPUnit\Framework\TestCase; +use Ripple\File\File; use Throwable; use function md5; @@ -26,7 +26,7 @@ class FileStreamTest extends TestCase public function test_fileStream(): void { $hash = md5_file(__FILE__); - $content = IO::File()->getContents(__FILE__); + $content = File::getContents(__FILE__); $this->assertEquals($hash, md5($content)); } } diff --git a/tests/PromiseTest.php b/tests/PromiseTest.php index 18c110b..306941b 100644 --- a/tests/PromiseTest.php +++ b/tests/PromiseTest.php @@ -7,7 +7,7 @@ use PHPUnit\Framework\Attributes\RunClassInSeparateProcess; use PHPUnit\Framework\TestCase; use Ripple\Coroutine\Exception\PromiseAggregateError; -use Ripple\Coroutine\Futures; +use Ripple\Futures; use Ripple\Promise; use function Co\async; diff --git a/tests/SocketTest.php b/tests/SocketTest.php index 0329894..cb127c0 100644 --- a/tests/SocketTest.php +++ b/tests/SocketTest.php @@ -3,9 +3,8 @@ namespace Tests; use Closure; -use Co\IO; use PHPUnit\Framework\TestCase; -use Ripple\Socket\SocketStream; +use Ripple\Socket; use Throwable; use function Co\repeat; @@ -26,14 +25,14 @@ public function test_delayedBlocking(): void { $size = 1024 * 1024 * 20; $current = 0; - $listen = IO::Socket()->server('tcp://127.0.0.1:8002', stream_context_create([ + $listen = Socket::server('tcp://127.0.0.1:8002', stream_context_create([ 'socket' => [ 'so_reuseport' => 1, 'so_reuseaddr' => 1 ], ])); - $listen->onReadable(function (SocketStream $listen) use ($size, &$current) { + $listen->onReadable(function (Socket $listen) use ($size, &$current) { $client = $listen->accept(); $client->setBlocking(false); $listen->close(); @@ -50,7 +49,7 @@ public function test_delayedBlocking(): void }, 0.1); }); - $server = IO::Socket()->connect('tcp://127.0.0.1:8002'); + $server = Socket::connect('tcp://127.0.0.1:8002'); $server->setBlocking(false); $data = str_repeat('A', $size); $server->writeInternal($data, false); diff --git a/tests/UnixTest.php b/tests/UnixTest.php index 95aa0e3..a5d43d2 100644 --- a/tests/UnixTest.php +++ b/tests/UnixTest.php @@ -2,10 +2,9 @@ namespace Tests; -use Co\IO; use PHPUnit\Framework\Attributes\Test; use PHPUnit\Framework\TestCase; -use Ripple\Socket\SocketStream; +use Ripple\Socket; use Ripple\Stream\Exception\ConnectionException; use Ripple\Utils\Output; use Throwable; @@ -33,13 +32,13 @@ class UnixTest extends TestCase public function test_unix(): void { $path = sys_get_temp_dir() . '/' . md5(uniqid()) . '.sock'; - $server = IO::Socket()->server('unix://' . $path); + $server = Socket::server('unix://' . $path); $server->setBlocking(false); - $server->onReadable(function (SocketStream $stream) { + $server->onReadable(function (Socket $stream) { $client = $stream->accept(); $client->setBlocking(false); - $client->onReadable(function (SocketStream $stream) { + $client->onReadable(function (Socket $stream) { $data = $stream->read(1024); $stream->write($data); }); @@ -66,11 +65,11 @@ public function test_unix(): void */ private function call(string $path): void { - $client = IO::Socket()->connect('unix://' . $path); + $client = Socket::connect('unix://' . $path); $client->setBlocking(false); $client->write('hello'); - $client->onReadable(function (SocketStream $stream) { + $client->onReadable(function (Socket $stream) { $data = $stream->read(1024); $this->assertEquals('hello', $data); cancelAll();