diff --git a/.gitignore b/.gitignore index 24d6258..6e0ce26 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ vendor -.phpunit.cache \ No newline at end of file +.phpunit.cache +phpstan.neon diff --git a/composer.json b/composer.json index dedfcca..ce3e168 100644 --- a/composer.json +++ b/composer.json @@ -40,7 +40,7 @@ "scripts": { "tests": [ "phpunit -c phpunit.xml", - "phpstan analyse --memory-limit 512M --no-progress --no-ansi -c phpstan.neon", + "phpstan analyse --memory-limit 512M --no-progress --no-ansi", "phpcs --no-colors --report=checkstyle -q src tests" ] } diff --git a/phpstan.neon b/phpstan.neon.dist similarity index 100% rename from phpstan.neon rename to phpstan.neon.dist diff --git a/src/Client/MultiClient.php b/src/Client/MultiClient.php index 27042a2..647c87e 100644 --- a/src/Client/MultiClient.php +++ b/src/Client/MultiClient.php @@ -8,7 +8,7 @@ use Amp\Pipeline\{ConcurrentIterator, Queue}; use AO\Exceptions\{AccountFrozenException, AccountsFrozenException, LoginException}; -use AO\{Group, Group\GroupId, Package, Package\OutPackage, Parser, Utils}; +use AO\{Group, Group\GroupId, Package, Package\OutPackage, Parser, SendPriority, Utils}; use Closure; use InvalidArgumentException; use Nadylib\LeakyBucket\LeakyBucket; @@ -194,8 +194,8 @@ public function login(): void { */ } - public function write(OutPackage $package, ?string $worker=null): void { - $this->getBestWorker($worker)?->write($package); + public function write(OutPackage $package, ?string $worker=null, SendPriority $priority=SendPriority::Medium): void { + $this->getBestWorker($worker)?->write($package, $priority); } /** @return ConcurrentIterator */ diff --git a/src/Client/SingleClient.php b/src/Client/SingleClient.php index eb56eb7..2534271 100644 --- a/src/Client/SingleClient.php +++ b/src/Client/SingleClient.php @@ -3,6 +3,9 @@ namespace AO\Client; use function Amp\delay; + +use Amp\DeferredFuture; +use AO\Internal\SendQueue; use AO\{ AccountUnfreezer, Connection, @@ -17,6 +20,7 @@ Package\In, Package\Out, Parser, + SendPriority, Utils }; use Closure; @@ -63,6 +67,8 @@ class SingleClient { private readonly LeakyBucket $bucket; + private SendQueue $sendQueue; + private ?string $loggedInChar = null; private ?int $loggedInUid = null; @@ -73,7 +79,9 @@ public function __construct( private ?LoggerInterface $logger=null, ?LeakyBucket $bucket=null, private ?AccountUnfreezer $accountUnfreezer=null, + ?SendQueue $sendQueue=null ) { + $this->sendQueue = $sendQueue ?? new SendQueue(); $this->bucket = $bucket ?? new LeakyBucket(size: 5, refillDelay: 1.0); } @@ -248,14 +256,18 @@ public function read(): ?Package\InPackage { return $package; } - public function write(Package\OutPackage $package): void { + public function write(Package\OutPackage $package, SendPriority $priority=SendPriority::Medium): void { $this->logger?->debug('Sending package {package}', [ 'package' => $package, ]); $binPackage = $package->toBinaryPackage(); if ($package instanceof Package\Out\RateLimited) { $this->logger?->debug('Sending rate-limited package via bucket-queue'); - $this->bucket->take(callback: fn () => $this->connection->write($binPackage->toBinary())); + $suspension = new DeferredFuture(); + $future = $suspension->getFuture(); + $this->sendQueue->push($package, $priority, $suspension); + $this->bucket->take(callback: $this->sendNextQueueItem(...)); + $future->await(); } else { $this->logger?->debug('Sending non-rate-limited package instantly'); $this->connection->write($binPackage->toBinary()); @@ -265,6 +277,10 @@ public function write(Package\OutPackage $package): void { } } + public function getQueueSize(): int { + return $this->sendQueue->getSize(); + } + public function disconnect(): void { $this->connection->end(); } @@ -466,6 +482,17 @@ protected function triggerOnReady(): void { } } + private function sendNextQueueItem(): void { + $item = $this->sendQueue->shift(); + if (!isset($item)) { + return; + } + $this->connection->write($item->package->toBinaryPackage()->toBinary()); + if (!$item->future->isComplete()) { + $item->future->complete(); + } + } + /** Get the UID of the bot character from the charlist package */ private function getUidFromCharlist(In\LoginCharlist $charlist, string $character): ?int { $character = Utils::normalizeCharacter($character); diff --git a/src/Internal/SendQueue.php b/src/Internal/SendQueue.php new file mode 100644 index 0000000..3098c9f --- /dev/null +++ b/src/Internal/SendQueue.php @@ -0,0 +1,51 @@ + + * + * @psalm-var array> + */ + private array $queue = []; + + /** @param DeferredFuture $future */ + public function push(OutPackage $package, SendPriority $priority, ?DeferredFuture $future=null): void { + if (!isset($this->queue[$priority->value])) { + $this->queue[$priority->value] = []; + } + $this->queue[$priority->value] []= new SendQueueItem( + future: $future, + package: $package, + ); + } + + public function shift(): ?SendQueueItem { + $priorities = array_keys($this->queue); + asort($priorities); + foreach ($priorities as $priority) { + if (isset($this->queue[$priority]) && count($this->queue[$priority]) > 0) { + return array_shift($this->queue[$priority]); + } + } + return null; + } + + /** Get the number of total items currently in the queue */ + public function getSize(): int { + $priorities = SendPriority::cases(); + $size = 0; + foreach ($priorities as $priority) { + if (!isset($this->queue[$priority->value])) { + continue; + } + $size += count($this->queue[$priority->value]); + } + return $size; + } +} diff --git a/src/Internal/SendQueueItem.php b/src/Internal/SendQueueItem.php new file mode 100644 index 0000000..4eaf5bf --- /dev/null +++ b/src/Internal/SendQueueItem.php @@ -0,0 +1,15 @@ + $future */ + public function __construct( + public readonly ?DeferredFuture $future, + public readonly OutPackage $package, + ) { + } +} diff --git a/src/SendPriority.php b/src/SendPriority.php new file mode 100644 index 0000000..1bc1bb8 --- /dev/null +++ b/src/SendPriority.php @@ -0,0 +1,9 @@ +push(new Pong('1'), SendPriority::Low); + $queue->push(new Pong('2'), SendPriority::High); + $queue->push(new Pong('3'), SendPriority::Medium); + $queue->push(new Pong('4'), SendPriority::High); + $queue->push(new Pong('5'), SendPriority::Low); + $queue->push(new Pong('6'), SendPriority::Medium); + + foreach (['2', '4', '3', '6', '1', '5'] as $prio) { + $next = $queue->shift(); + $this->assertNotNull($next); + + /** @var Pong */ + $package = $next->package; + $this->assertInstanceOf(Pong::class, $package, 'Wrong package dequeued'); + + $this->assertSame($package->extra, $prio, 'The order of priority is not uphold'); + } + + $next = $queue->shift(); + $this->assertNull($next); + } +}