Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented event steam #8

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ $latestBlockHash = $latestBlock->getHash();
## More examples
- [Key management](docs/Example/KeyManagement.md)
- [Sending a Transfer](docs/Example/SendingTransfer.md)
- [Event Stream](docs/Example/EventStream.md)
- CEP78
- [Install](docs/Example/CEP78/Install.md)
- [Mint](docs/Example/CEP78/Mint.md)
Expand Down
35 changes: 35 additions & 0 deletions docs/Example/EventStream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Event stream

Create `EventStream` instance by passing node url and stream path to the constructor. The third argument `$startFromEvent` is not required but can be used to filter out all events with id below `$startFromEvent` value.

Available stream paths:
- `EventStream::STREAM_PATH_MAIN` - listen system events
- `EventStream::STREAM_PATH_DEPLOYS` - listen deploy events
- `EventStream::STREAM_PATH_SIGS` - listen finality signatures events

Set event handler callback function to `EventStream` instance with `onEvent()` method.
For event stream aborting use `abort()` method.

Use `listen()` method for connecting to the node.

## Example

```php
$nodeUrl = 'http://localhost:9999';
$streamPath = \Casper\EventStream\EventStream::STREAM_PATH_MAIN;
$startFromEvent = 12345;

$es = new Casper\EventStream\EventStream($nodeUrl, $streamPath, $startFromEvent);
$es->onEvent(
function (\Casper\EventStream\Event $event) use ($es) {
if ($event->getId() === null) {
$es->abort();
return;
}

echo json_encode($event->getData()) . "\n";
}
);

$es->listen();
```
35 changes: 35 additions & 0 deletions src/EventStream/Event.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

namespace Casper\EventStream;

class Event
{
private ?int $id = null;

private ?array $data = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RomanovSci we need to add typing here. I guess, the most convenient way would be to have event-specific event classes. The parsing could be moved to the EventStream.

Note, that events include the standard Casper types inside of them.


/**
* @param string $data
* @throws \Exception
*/
public function __construct(string $data)
{
if (preg_match_all("/data:(?<data>.*)/", $data, $match)) {
$this->data = json_decode($match['data'][0], true);
}

if (preg_match_all("/id:(?<id>.*)/", $data, $match)) {
$this->id = $match['id'][0];
}
}

public function getId(): ?string
{
return $this->id;
}

public function getData(): ?array
{
return $this->data;
}
}
83 changes: 83 additions & 0 deletions src/EventStream/EventStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<?php

namespace Casper\EventStream;

class EventStream
{
public const STREAM_PATH_MAIN = '/events/main';
public const STREAM_PATH_DEPLOYS = '/events/deploys';
public const STREAM_PATH_SIGS = '/events/sigs';

private string $nodeUrl;

private string $streamPath;

private int $startFromEvent;

private $onEvent;

private bool $aborted = false;

/**
* @throws \Exception
*/
public function __construct(string $nodeUrl, string $streamPath, int $startFromEvent = 0)
{
$this->assertSteamPathIsValid($streamPath);

$this->nodeUrl = $nodeUrl;
$this->streamPath = $streamPath;
$this->startFromEvent = $startFromEvent;
}

public function onEvent(callable $onEvent): void
{
$this->onEvent = $onEvent;
}

public function abort()
{
$this->aborted = true;
}

/**
* @throws \Exception
*/
public function listen(): void
{
$curl = curl_init($this->nodeUrl . $this->streamPath);
curl_setopt_array($curl, array(
CURLOPT_WRITEFUNCTION => function ($_, $data) {
$event = new Event(trim($data));

if (is_callable($this->onEvent) && $event->getId() >= $this->startFromEvent && $event->getData()) {
($this->onEvent)($event);
}

return strlen($data);
},
CURLOPT_NOPROGRESS => false,
CURLOPT_PROGRESSFUNCTION => function () {
return $this->aborted;
}
));
curl_exec($curl);
$error = curl_error($curl);

if (!$this->aborted && $error) {
throw new \Exception($error);
}

curl_close($curl);
}

/**
* @throws \Exception
*/
private function assertSteamPathIsValid(string $streamPath): void
{
if (!in_array($streamPath, [self::STREAM_PATH_MAIN, self::STREAM_PATH_DEPLOYS, self::STREAM_PATH_SIGS])) {
throw new \Exception('Invalid stream path');
}
}
}