From 536988373de7ac52b82e0db0b6e03c43e154a39d Mon Sep 17 00:00:00 2001 From: chaz6chez Date: Thu, 8 Feb 2024 17:44:26 +0800 Subject: [PATCH] save --- src/ChannelServer.php | 3 ++- src/HookServer.php | 15 +++++++++------ src/ServerInterface.php | 4 ++-- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/ChannelServer.php b/src/ChannelServer.php index 65d2400..6964709 100644 --- a/src/ChannelServer.php +++ b/src/ChannelServer.php @@ -20,7 +20,8 @@ public function __construct() { // 由于使用了webman自定义进程启动,所以无须Server原有的构造方式 } - public function onWorkerStart(Worker $worker) { + public function onWorkerStart(Worker $worker): void + { $worker->count = 1; $worker->onMessage = [$this, 'onMessage']; $worker->onClose = [$this, 'onClose']; diff --git a/src/HookServer.php b/src/HookServer.php index d346708..55e0916 100644 --- a/src/HookServer.php +++ b/src/HookServer.php @@ -49,7 +49,7 @@ class HookServer implements ServerInterface protected array $claimStartTags = []; /** @inheritDoc */ - public static function getConfig(string $key, $default = null) + public static function getConfig(string $key, mixed $default = null): mixed { return config('plugin.workbunny.webman-push-server.app.hook-server.' . $key, $default); } @@ -164,7 +164,7 @@ public function ack(string $queue, string $group, array $idArray): bool * @param string $consumer * @return void */ - public function claim(string $queue, string $group, string $consumer) + public function claim(string $queue, string $group, string $consumer): void { try { if ($idArray = self::getStorage()->xAutoClaim( @@ -199,7 +199,7 @@ public function claim(string $queue, string $group, string $consumer) * @return void * @throws Exception */ - public function consumer(string $queue, string $group, string $consumer, int $blockTime) + public function consumer(string $queue, string $group, string $consumer, int $blockTime): void { try { // 创建组 @@ -237,7 +237,7 @@ public function consumer(string $queue, string $group, string $consumer, int $bl /** * @return void */ - protected function _tempInit() + protected function _tempInit(): void { $config = config('database.connections')['plugin.workbunny.webman-push-server.local-storage'] ?? []; if ($config) { @@ -321,8 +321,11 @@ function () use ($queue, $group, $consumer) { $this->_consumerTimer = Timer::add( $interval = self::getConfig('consumer_interval', 1) / 1000, function () use ($worker, $interval, $queue, $group, $consumer) { - // 处理pending消息 - $this->claim($queue, $group, $consumer); + // 如果没有claim定时器,则每次消费时进行claim + if (!$this->_claimTimer) { + // 处理pending消息 + $this->claim($queue, $group, $consumer); + } // 执行消费 $this->consumer($queue, $group, $consumer, (int)($interval * 1000)); }); diff --git a/src/ServerInterface.php b/src/ServerInterface.php index c188798..75a95b1 100644 --- a/src/ServerInterface.php +++ b/src/ServerInterface.php @@ -29,9 +29,9 @@ public static function getConfig(string $key, mixed $default = null): mixed; /** * 获取储存器 - * @return Redis + * @return Redis|null */ - public static function getStorage(): Redis; + public static function getStorage(): ?Redis; /** * 服务启动