Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented event steam #8

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ $latestBlockHash = $latestBlock->getHash();
## More examples
- [Key management](docs/Example/KeyManagement.md)
- [Sending a Transfer](docs/Example/SendingTransfer.md)
- [Event Stream](docs/Example/EventStream.md)

## API
### RpcClient
Expand Down
42 changes: 42 additions & 0 deletions docs/Example/EventStream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Event stream

Create `EventSource` instance by passing node url to constructor

```php
$nodeUrl = 'http://3.208.91.63:9999';
$es = new Casper\EventStream\EventSource($nodeUrl);
```

Use `connect()` method for connecting to the node. The method has two parameters `$eventsType` and `$startFrom`. The first one is mandatory and second is optional.

Available events type:
- `EventSource::EVENT_TYPE_MAIN` - listen system events
- `EventSource::EVENT_TYPE_DEPLOYS` - listen deploy events
- `EventSource::EVENT_TYPE_SIGS` - listen finality signatures events

```php
$es->connect(\Casper\EventStream\EventSource::EVENT_TYPE_MAIN);
```

Set event handler callback function to `EventSource` instance with `onMessage()` method.
For event stream aborting use `abort()` method.

## Example

```php
$nodeUrl = 'http://localhost:9999';
$es = new Casper\EventStream\EventSource($nodeUrl);

$es->onMessage(
function (\Casper\EventStream\Event $event) use ($es) {
if ($event->getId() === null) {
$es->abort();
return;
}

echo json_encode($event->getData()) . "\n";
}
);

$es->connect(\Casper\EventStream\EventSource::EVENT_TYPE_MAIN);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API is a bit confusing here:

  1. We create an EventSource object and connect it to event source, which isn't logical thus isn't intuitive
  2. We check if event id is not null, why? If that's part of the logic it should be handled by the library

Separatelly, we should provide possibility to start from the predefined event id, which could be passed in the event stream URL in the start_from parameter.

Note, the following possible usage:

$nodeUrl = 'http://localhost:9999';
$streamPath = \Casper\EventStream\EventSource::EVENT_TYPE_MAIN;
$startFromEvent = 12345;

$stream = new Casper\EventStream($nodeUrl, $streamPath, $startFromEvent);
$es->onEvent(
   function (\Casper\EventStream\DeployProcessedEvent $event) {
       echo json_encode($event->getDeployHeader()->getHash()) . "\n";
   }
);

$stream->listen();

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding 2.
We just show how we can use abort() method

```
48 changes: 48 additions & 0 deletions src/EventStream/Event.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?php

namespace Casper\EventStream;

class Event
{
private ?string $id = null;

private ?string $name = null;

private ?array $data = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RomanovSci we need to add typing here. I guess, the most convenient way would be to have event-specific event classes. The parsing could be moved to the EventStream.

Note, that events include the standard Casper types inside of them.


/**
* @param string $data
* @throws \Exception
*/
public function __construct(string $data)
{
if (preg_match_all("/data:(?<data>.*)/", $data, $match)) {
$this->data = json_decode($match['data'][0], true);
}

if (preg_match_all("/(?<key>id|name)\:(?<value>.*)/", $data, $match)) {
foreach ($match['key'] as $i => $key) {
$this->{$key} = trim($match['value'][$i]);
}
}

if (!$this->data) {
throw new \Exception('Invalid event');
}
}

public function getId(): ?string
{
return $this->id;
}

public function getName(): ?string
{
return $this->name;
}

public function getData(): ?array
{
return $this->data;
}
}
76 changes: 76 additions & 0 deletions src/EventStream/EventSource.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?php

namespace Casper\EventStream;

class EventSource
{
public const EVENT_TYPE_MAIN = 'main';
public const EVENT_TYPE_DEPLOYS = 'deploys';
public const EVENT_TYPE_SIGS = 'sigs';

private string $url;

private $onMessage;

private bool $aborted = false;

public function __construct(string $url)
{
$this->url = $url;
}

public function onMessage(callable $callback): void
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onEvent would be a better name

{
$this->onMessage = $callback;
}

public function abort()
{
$this->aborted = true;
}

/**
* @throws \Exception
*/
public function connect(string $eventsType, int $startFrom = 0): void
Copy link
Member

@ihor ihor Jun 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. $startFrom isn't used anywhere
  2. A better name would be listen

This function offers a blocking event processing. I guess it would be nice to offer an async interface. This way a consumer could subscribe to several event streams simultaneously and process all the events at once if needed. With the current approach it's not possible to process let's say main event stream and the deploys one.

{
$this->assertEventTypeIsValid($eventsType);

$curl = curl_init($this->url . '/events/' . $eventsType);
curl_setopt_array($curl, array(
CURLOPT_WRITEFUNCTION => function ($_, $data) {
try {
$event = new Event(trim($data));

if (is_callable($this->onMessage)) {
($this->onMessage)($event);
}
} catch (\Exception $_) {
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why emtpy catch?

return strlen($data);
},
CURLOPT_NOPROGRESS => false,
CURLOPT_PROGRESSFUNCTION => function () {
return $this->aborted;
}
));
curl_exec($curl);
$error = curl_error($curl);

if (!$this->aborted && $error) {
throw new \Exception($error);
}

curl_close($curl);
}

/**
* @throws \Exception
*/
private function assertEventTypeIsValid(string $eventType): void
{
if (!in_array($eventType, [self::EVENT_TYPE_MAIN, self::EVENT_TYPE_DEPLOYS, self::EVENT_TYPE_SIGS])) {
throw new \Exception('Invalid event type');
}
}
}