-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implemented event steam #8
base: master
Are you sure you want to change the base?
Changes from 3 commits
531ae8c
9f26dbb
f00f869
75ce459
1ae7fe8
b10157a
78f9ff8
b4becc3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
# Event stream | ||
|
||
Create `EventSource` instance by passing node url to constructor | ||
|
||
```php | ||
$nodeUrl = 'http://3.208.91.63:9999'; | ||
$es = new Casper\EventStream\EventSource($nodeUrl); | ||
``` | ||
|
||
Use `connect()` method for connecting to the node. The method has two parameters `$eventsType` and `$startFrom`. The first one is mandatory and second is optional. | ||
|
||
Available events type: | ||
- `EventSource::EVENT_TYPE_MAIN` - listen system events | ||
- `EventSource::EVENT_TYPE_DEPLOYS` - listen deploy events | ||
- `EventSource::EVENT_TYPE_SIGS` - listen finality signatures events | ||
|
||
```php | ||
$es->connect(\Casper\EventStream\EventSource::EVENT_TYPE_MAIN); | ||
``` | ||
|
||
Set event handler callback function to `EventSource` instance with `onMessage()` method. | ||
For event stream aborting use `abort()` method. | ||
|
||
## Example | ||
|
||
```php | ||
$nodeUrl = 'http://localhost:9999'; | ||
$es = new Casper\EventStream\EventSource($nodeUrl); | ||
|
||
$es->onMessage( | ||
function (\Casper\EventStream\Event $event) use ($es) { | ||
if ($event->getId() === null) { | ||
$es->abort(); | ||
return; | ||
} | ||
|
||
echo json_encode($event->getData()) . "\n"; | ||
} | ||
); | ||
|
||
$es->connect(\Casper\EventStream\EventSource::EVENT_TYPE_MAIN); | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
<?php | ||
|
||
namespace Casper\EventStream; | ||
|
||
class Event | ||
{ | ||
private ?string $id = null; | ||
|
||
private ?string $name = null; | ||
|
||
private ?array $data = null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @RomanovSci we need to add typing here. I guess, the most convenient way would be to have event-specific event classes. The parsing could be moved to the Note, that events include the standard Casper types inside of them. |
||
|
||
/** | ||
* @param string $data | ||
* @throws \Exception | ||
*/ | ||
public function __construct(string $data) | ||
{ | ||
if (preg_match_all("/data:(?<data>.*)/", $data, $match)) { | ||
$this->data = json_decode($match['data'][0], true); | ||
} | ||
|
||
if (preg_match_all("/(?<key>id|name)\:(?<value>.*)/", $data, $match)) { | ||
foreach ($match['key'] as $i => $key) { | ||
$this->{$key} = trim($match['value'][$i]); | ||
} | ||
} | ||
|
||
if (!$this->data) { | ||
throw new \Exception('Invalid event'); | ||
} | ||
} | ||
|
||
public function getId(): ?string | ||
{ | ||
return $this->id; | ||
} | ||
|
||
public function getName(): ?string | ||
{ | ||
return $this->name; | ||
} | ||
|
||
public function getData(): ?array | ||
{ | ||
return $this->data; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
<?php | ||
|
||
namespace Casper\EventStream; | ||
|
||
class EventSource | ||
{ | ||
public const EVENT_TYPE_MAIN = 'main'; | ||
public const EVENT_TYPE_DEPLOYS = 'deploys'; | ||
public const EVENT_TYPE_SIGS = 'sigs'; | ||
|
||
private string $url; | ||
|
||
private $onMessage; | ||
|
||
private bool $aborted = false; | ||
|
||
public function __construct(string $url) | ||
{ | ||
$this->url = $url; | ||
} | ||
|
||
public function onMessage(callable $callback): void | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
{ | ||
$this->onMessage = $callback; | ||
} | ||
|
||
public function abort() | ||
{ | ||
$this->aborted = true; | ||
} | ||
|
||
/** | ||
* @throws \Exception | ||
*/ | ||
public function connect(string $eventsType, int $startFrom = 0): void | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This function offers a blocking event processing. I guess it would be nice to offer an async interface. This way a consumer could subscribe to several event streams simultaneously and process all the events at once if needed. With the current approach it's not possible to process let's say main event stream and the deploys one. |
||
{ | ||
$this->assertEventTypeIsValid($eventsType); | ||
|
||
$curl = curl_init($this->url . '/events/' . $eventsType); | ||
curl_setopt_array($curl, array( | ||
CURLOPT_WRITEFUNCTION => function ($_, $data) { | ||
try { | ||
$event = new Event(trim($data)); | ||
|
||
if (is_callable($this->onMessage)) { | ||
($this->onMessage)($event); | ||
} | ||
} catch (\Exception $_) { | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why emtpy catch? |
||
return strlen($data); | ||
}, | ||
CURLOPT_NOPROGRESS => false, | ||
CURLOPT_PROGRESSFUNCTION => function () { | ||
return $this->aborted; | ||
} | ||
)); | ||
curl_exec($curl); | ||
$error = curl_error($curl); | ||
|
||
if (!$this->aborted && $error) { | ||
throw new \Exception($error); | ||
} | ||
|
||
curl_close($curl); | ||
} | ||
|
||
/** | ||
* @throws \Exception | ||
*/ | ||
private function assertEventTypeIsValid(string $eventType): void | ||
{ | ||
if (!in_array($eventType, [self::EVENT_TYPE_MAIN, self::EVENT_TYPE_DEPLOYS, self::EVENT_TYPE_SIGS])) { | ||
throw new \Exception('Invalid event type'); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API is a bit confusing here:
EventSource
object and connect it to event source, which isn't logical thus isn't intuitivenull
, why? If that's part of the logic it should be handled by the librarySeparatelly, we should provide possibility to start from the predefined event id, which could be passed in the event stream URL in the
start_from
parameter.Note, the following possible usage:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding 2.
We just show how we can use
abort()
method