Skip to content

Commit

Permalink
Add priority to SingleClient::write and MultiClient::write (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nadyita authored Sep 8, 2024
1 parent 03b5965 commit 81cb0d9
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 7 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
vendor
.phpunit.cache
.phpunit.cache
phpstan.neon
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"scripts": {
"tests": [
"phpunit -c phpunit.xml",
"phpstan analyse --memory-limit 512M --no-progress --no-ansi -c phpstan.neon",
"phpstan analyse --memory-limit 512M --no-progress --no-ansi",
"phpcs --no-colors --report=checkstyle -q src tests"
]
}
Expand Down
File renamed without changes.
6 changes: 3 additions & 3 deletions src/Client/MultiClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

use Amp\Pipeline\{ConcurrentIterator, Queue};
use AO\Exceptions\{AccountFrozenException, AccountsFrozenException, LoginException};
use AO\{Group, Group\GroupId, Package, Package\OutPackage, Parser, Utils};
use AO\{Group, Group\GroupId, Package, Package\OutPackage, Parser, SendPriority, Utils};
use Closure;
use InvalidArgumentException;
use Nadylib\LeakyBucket\LeakyBucket;
Expand Down Expand Up @@ -194,8 +194,8 @@ public function login(): void {
*/
}

public function write(OutPackage $package, ?string $worker=null): void {
$this->getBestWorker($worker)?->write($package);
public function write(OutPackage $package, ?string $worker=null, SendPriority $priority=SendPriority::Medium): void {
$this->getBestWorker($worker)?->write($package, $priority);
}

/** @return ConcurrentIterator<WorkerPackage> */
Expand Down
31 changes: 29 additions & 2 deletions src/Client/SingleClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
namespace AO\Client;

use function Amp\delay;

use Amp\DeferredFuture;
use AO\Internal\SendQueue;
use AO\{
AccountUnfreezer,
Connection,
Expand All @@ -17,6 +20,7 @@
Package\In,
Package\Out,
Parser,
SendPriority,
Utils
};
use Closure;
Expand Down Expand Up @@ -63,6 +67,8 @@ class SingleClient {

private readonly LeakyBucket $bucket;

private SendQueue $sendQueue;

private ?string $loggedInChar = null;

private ?int $loggedInUid = null;
Expand All @@ -73,7 +79,9 @@ public function __construct(
private ?LoggerInterface $logger=null,
?LeakyBucket $bucket=null,
private ?AccountUnfreezer $accountUnfreezer=null,
?SendQueue $sendQueue=null
) {
$this->sendQueue = $sendQueue ?? new SendQueue();
$this->bucket = $bucket ?? new LeakyBucket(size: 5, refillDelay: 1.0);
}

Expand Down Expand Up @@ -248,14 +256,18 @@ public function read(): ?Package\InPackage {
return $package;
}

public function write(Package\OutPackage $package): void {
public function write(Package\OutPackage $package, SendPriority $priority=SendPriority::Medium): void {
$this->logger?->debug('Sending package {package}', [
'package' => $package,
]);
$binPackage = $package->toBinaryPackage();
if ($package instanceof Package\Out\RateLimited) {
$this->logger?->debug('Sending rate-limited package via bucket-queue');
$this->bucket->take(callback: fn () => $this->connection->write($binPackage->toBinary()));
$suspension = new DeferredFuture();
$future = $suspension->getFuture();
$this->sendQueue->push($package, $priority, $suspension);
$this->bucket->take(callback: $this->sendNextQueueItem(...));
$future->await();
} else {
$this->logger?->debug('Sending non-rate-limited package instantly');
$this->connection->write($binPackage->toBinary());
Expand All @@ -265,6 +277,10 @@ public function write(Package\OutPackage $package): void {
}
}

public function getQueueSize(): int {
return $this->sendQueue->getSize();
}

public function disconnect(): void {
$this->connection->end();
}
Expand Down Expand Up @@ -466,6 +482,17 @@ protected function triggerOnReady(): void {
}
}

private function sendNextQueueItem(): void {
$item = $this->sendQueue->shift();
if (!isset($item)) {
return;
}
$this->connection->write($item->package->toBinaryPackage()->toBinary());
if (!$item->future->isComplete()) {
$item->future->complete();
}
}

/** Get the UID of the bot character from the charlist package */
private function getUidFromCharlist(In\LoginCharlist $charlist, string $character): ?int {
$character = Utils::normalizeCharacter($character);
Expand Down
51 changes: 51 additions & 0 deletions src/Internal/SendQueue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php declare(strict_types=1);

namespace AO\Internal;

use Amp\{DeferredFuture};
use AO\Package\OutPackage;
use AO\SendPriority;

final class SendQueue {
/**
* @var array<int,SendQueueItem[]>
*
* @psalm-var array<int,list<SendQueueItem>>
*/
private array $queue = [];

/** @param DeferredFuture<void> $future */
public function push(OutPackage $package, SendPriority $priority, ?DeferredFuture $future=null): void {
if (!isset($this->queue[$priority->value])) {
$this->queue[$priority->value] = [];
}
$this->queue[$priority->value] []= new SendQueueItem(
future: $future,
package: $package,
);
}

public function shift(): ?SendQueueItem {
$priorities = array_keys($this->queue);
asort($priorities);
foreach ($priorities as $priority) {
if (isset($this->queue[$priority]) && count($this->queue[$priority]) > 0) {
return array_shift($this->queue[$priority]);
}
}
return null;
}

/** Get the number of total items currently in the queue */
public function getSize(): int {
$priorities = SendPriority::cases();
$size = 0;
foreach ($priorities as $priority) {
if (!isset($this->queue[$priority->value])) {
continue;
}
$size += count($this->queue[$priority->value]);
}
return $size;
}
}
15 changes: 15 additions & 0 deletions src/Internal/SendQueueItem.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php declare(strict_types=1);

namespace AO\Internal;

use Amp\DeferredFuture;
use AO\Package\OutPackage;

final class SendQueueItem {
/** @param DeferredFuture<void> $future */
public function __construct(
public readonly ?DeferredFuture $future,
public readonly OutPackage $package,
) {
}
}
9 changes: 9 additions & 0 deletions src/SendPriority.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php declare(strict_types=1);

namespace AO;

enum SendPriority: int {
case High = 1;
case Medium = 2;
case Low = 3;
}
36 changes: 36 additions & 0 deletions tests/SendQueueTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php declare(strict_types=1);

namespace AO\Tests;

use AO\Internal\SendQueue;
use AO\Package\Out\Pong;
use AO\{SendPriority};
use PHPUnit\Framework\Attributes\{Small};
use PHPUnit\Framework\TestCase;

#[Small]
class SendQueueTest extends TestCase {
public function testCorrectOrder(): void {
$queue = new SendQueue();
$queue->push(new Pong('1'), SendPriority::Low);
$queue->push(new Pong('2'), SendPriority::High);
$queue->push(new Pong('3'), SendPriority::Medium);
$queue->push(new Pong('4'), SendPriority::High);
$queue->push(new Pong('5'), SendPriority::Low);
$queue->push(new Pong('6'), SendPriority::Medium);

foreach (['2', '4', '3', '6', '1', '5'] as $prio) {
$next = $queue->shift();
$this->assertNotNull($next);

/** @var Pong */
$package = $next->package;
$this->assertInstanceOf(Pong::class, $package, 'Wrong package dequeued');

$this->assertSame($package->extra, $prio, 'The order of priority is not uphold');
}

$next = $queue->shift();
$this->assertNull($next);
}
}

0 comments on commit 81cb0d9

Please sign in to comment.