diff --git a/README.md b/README.md index 063fbfa..5e99cfc 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,7 @@ $latestBlockHash = $latestBlock->getHash(); ## More examples - [Key management](docs/Example/KeyManagement.md) - [Sending a Transfer](docs/Example/SendingTransfer.md) +- [Event Stream](docs/Example/EventStream.md) - CEP78 - [Install](docs/Example/CEP78/Install.md) - [Mint](docs/Example/CEP78/Mint.md) diff --git a/docs/Example/EventStream.md b/docs/Example/EventStream.md new file mode 100644 index 0000000..17080ee --- /dev/null +++ b/docs/Example/EventStream.md @@ -0,0 +1,35 @@ +# Event stream + +Create `EventStream` instance by passing node url and stream path to the constructor. The third argument `$startFromEvent` is not required but can be used to filter out all events with id below `$startFromEvent` value. + +Available stream paths: +- `EventStream::STREAM_PATH_MAIN` - listen system events +- `EventStream::STREAM_PATH_DEPLOYS` - listen deploy events +- `EventStream::STREAM_PATH_SIGS` - listen finality signatures events + +Set event handler callback function to `EventStream` instance with `onEvent()` method. +For event stream aborting use `abort()` method. + +Use `listen()` method for connecting to the node. + +## Example + +```php +$nodeUrl = 'http://localhost:9999'; +$streamPath = \Casper\EventStream\EventStream::STREAM_PATH_MAIN; +$startFromEvent = 12345; + +$es = new Casper\EventStream\EventStream($nodeUrl, $streamPath, $startFromEvent); +$es->onEvent( + function (\Casper\EventStream\Event $event) use ($es) { + if ($event->getId() === null) { + $es->abort(); + return; + } + + echo json_encode($event->getData()) . "\n"; + } +); + +$es->listen(); +``` diff --git a/src/EventStream/Event.php b/src/EventStream/Event.php new file mode 100644 index 0000000..0fad767 --- /dev/null +++ b/src/EventStream/Event.php @@ -0,0 +1,35 @@ +.*)/", $data, $match)) { + $this->data = json_decode($match['data'][0], true); + } + + if (preg_match_all("/id:(?.*)/", $data, $match)) { + $this->id = $match['id'][0]; + } + } + + public function getId(): ?string + { + return $this->id; + } + + public function getData(): ?array + { + return $this->data; + } +} diff --git a/src/EventStream/EventStream.php b/src/EventStream/EventStream.php new file mode 100644 index 0000000..132eca4 --- /dev/null +++ b/src/EventStream/EventStream.php @@ -0,0 +1,83 @@ +assertSteamPathIsValid($streamPath); + + $this->nodeUrl = $nodeUrl; + $this->streamPath = $streamPath; + $this->startFromEvent = $startFromEvent; + } + + public function onEvent(callable $onEvent): void + { + $this->onEvent = $onEvent; + } + + public function abort() + { + $this->aborted = true; + } + + /** + * @throws \Exception + */ + public function listen(): void + { + $curl = curl_init($this->nodeUrl . $this->streamPath); + curl_setopt_array($curl, array( + CURLOPT_WRITEFUNCTION => function ($_, $data) { + $event = new Event(trim($data)); + + if (is_callable($this->onEvent) && $event->getId() >= $this->startFromEvent && $event->getData()) { + ($this->onEvent)($event); + } + + return strlen($data); + }, + CURLOPT_NOPROGRESS => false, + CURLOPT_PROGRESSFUNCTION => function () { + return $this->aborted; + } + )); + curl_exec($curl); + $error = curl_error($curl); + + if (!$this->aborted && $error) { + throw new \Exception($error); + } + + curl_close($curl); + } + + /** + * @throws \Exception + */ + private function assertSteamPathIsValid(string $streamPath): void + { + if (!in_array($streamPath, [self::STREAM_PATH_MAIN, self::STREAM_PATH_DEPLOYS, self::STREAM_PATH_SIGS])) { + throw new \Exception('Invalid stream path'); + } + } +}