diff --git a/src/Events/ClientEvent.php b/src/Events/ClientEvent.php index 701f3aa..eba6dfd 100644 --- a/src/Events/ClientEvent.php +++ b/src/Events/ClientEvent.php @@ -47,12 +47,12 @@ public function response(TcpConnection $connection, array $request): void return; } // 当前链接没有订阅这个channel - if (!isset(PushServer::_getConnectionProperty($connection, 'channels')[$channel])) { + if (!isset(PushServer::getConnectionProperty($connection, 'channels')[$channel])) { PushServer::error($connection, null, 'Client event rejected - you didn\'t subscribe this channel'); return; } // 客户端触发事件必须是private 或者 presence的channel - $channelType = PushServer::_getChannelType($channel); + $channelType = PushServer::getChannelType($channel); if ($channelType !== CHANNEL_TYPE_PRIVATE and $channelType !== CHANNEL_TYPE_PRESENCE) { PushServer::error($connection, null, 'Client event rejected - only supported on private and presence channels'); return; @@ -60,11 +60,11 @@ public function response(TcpConnection $connection, array $request): void try { // 广播 客户端消息 PushServer::publish(PushServer::$publishTypeClient, [ - 'appKey' => PushServer::_getConnectionProperty($connection,'appKey'), + 'appKey' => PushServer::getConnectionProperty($connection,'appKey'), 'channel' => $channel, 'event' => $this->getEvent(), 'data' => $data, - 'socketId' => PushServer::_getConnectionProperty($connection,'socketId') + 'socketId' => PushServer::getConnectionProperty($connection,'socketId') ]); } catch (RedisException $exception) { Log::channel('plugin.workbunny.webman-push-server.error') diff --git a/src/Events/Subscribe.php b/src/Events/Subscribe.php index 9286cf9..fc9b778 100644 --- a/src/Events/Subscribe.php +++ b/src/Events/Subscribe.php @@ -50,14 +50,14 @@ public function response(TcpConnection $connection, array $request): void $channelData = $request['data']['channel_data'] ?? []; $clientAuth = $request['data']['auth'] ?? ''; $auth = self::auth( - $appKey = PushServer::_getConnectionProperty($connection, 'appKey'), + $appKey = PushServer::getConnectionProperty($connection, 'appKey'), PushServer::getConfig('apps_query')($appKey)['app_secret'], - PushServer::_getConnectionProperty($connection, 'socketId'), + PushServer::getConnectionProperty($connection, 'socketId'), $channel, $channelData ); // private- 和 presence- 开头的channel需要验证 - switch ($channelType = PushServer::_getChannelType($channel)){ + switch ($channelType = PushServer::getChannelType($channel)){ case CHANNEL_TYPE_PRESENCE: if (!$channelData) { PushServer::error($connection, null, 'Empty channel_data'); @@ -122,17 +122,17 @@ public static function auth(string $appKey, string $appSecret, string $socketId, public static function subscribeChannel(TcpConnection $connection, string $channel, string $type, string ...$params): void { try { - $appKey = PushServer::_getConnectionProperty($connection, 'appKey'); - $socketId = PushServer::_getConnectionProperty($connection, 'socketId'); - $channels = PushServer::_getConnectionProperty($connection, 'channels'); + $appKey = PushServer::getConnectionProperty($connection, 'appKey'); + $socketId = PushServer::getConnectionProperty($connection, 'socketId'); + $channels = PushServer::getConnectionProperty($connection, 'channels'); $userId = $params[0] ?? 'unknown'; $userInfo = $params[1] ?? '{}'; // 为当前进程增加订阅的通道 - PushServer::_setChannel($appKey, $channel, $socketId); + PushServer::setChannel($appKey, $channel, $socketId); $storage = PushServer::getStorageClient(); // 通道是否已经被建立 - $channelExists = $storage->exists($key = PushServer::_getChannelStorageKey($appKey, $channel)); + $channelExists = $storage->exists($key = PushServer::getChannelStorageKey($appKey, $channel)); if (!$channelExists) { /** @see PushServer::$_storage */ $storage->hSet($key, 'type', $type); @@ -153,15 +153,15 @@ public static function subscribeChannel(TcpConnection $connection, string $chann $type = $channels[$channel] ?? null; if (!$type) { $channels[$channel] = $type; - PushServer::_setConnectionProperty($connection, 'channels', $channels); - PushServer::_setConnection($appKey, $socketId, $socketId); + PushServer::setConnectionProperty($connection, 'channels', $channels); + PushServer::setConnection($appKey, $socketId, $socketId); // 递增订阅数 /** @see PushServer::$_storage */ $storage->hIncrBy($key,'subscription_count', 1); } // 如果是presence通道 if ($isPresence = ($type === CHANNEL_TYPE_PRESENCE)) { - if (!$storage->exists($userKey = PushServer::_getUserStorageKey($appKey, $channel, $userId))) { + if (!$storage->exists($userKey = PushServer::getUserStorageKey($appKey, $channel, $userId))) { $storage->hIncrBy($key ,'user_count', 1); $storage->hMSet($userKey, [ 'user_id' => $userId, @@ -199,7 +199,7 @@ public static function subscribeChannel(TcpConnection $connection, string $chann $channel, EVENT_SUBSCRIPTION_SUCCEEDED, $isPresence ? - PushServer::_getPresenceChannelDataForSubscribe($appKey, $channel) : + PushServer::getPresenceChannelDataForSubscribe($appKey, $channel) : '{}' ); } catch (RedisException $exception){ diff --git a/src/Events/Unsubscribe.php b/src/Events/Unsubscribe.php index 0cd7acd..1754320 100644 --- a/src/Events/Unsubscribe.php +++ b/src/Events/Unsubscribe.php @@ -34,7 +34,7 @@ class Unsubscribe extends AbstractEvent public function response(TcpConnection $connection, array $request): void { $channel = $request['data']['channel'] ?? ''; - switch ($channelType = PushServer::_getChannelType($channel)) { + switch ($channelType = PushServer::getChannelType($channel)) { case CHANNEL_TYPE_PUBLIC: case CHANNEL_TYPE_PRIVATE: self::unsubscribeChannel($connection, $channel, $channelType); @@ -64,16 +64,16 @@ public function response(TcpConnection $connection, array $request): void public static function unsubscribeChannel(TcpConnection $connection, string $channel, ?string $uid = null, bool $send = true): void { try { - $appKey = PushServer::_getConnectionProperty($connection, 'appKey'); - $socketId = PushServer::_getConnectionProperty($connection, 'socketId'); - $channels = PushServer::_getConnectionProperty($connection, 'channels'); + $appKey = PushServer::getConnectionProperty($connection, 'appKey'); + $socketId = PushServer::getConnectionProperty($connection, 'socketId'); + $channels = PushServer::getConnectionProperty($connection, 'channels'); if ($type = $channels[$channel] ?? null) { $storage = PushServer::getStorageClient(); // presence通道 if ($type === CHANNEL_TYPE_PRESENCE) { - if ($users = $storage->keys(PushServer::_getUserStorageKey($appKey, $channel, $uid))) { - $userCount = $storage->hIncrBy(PushServer::_getChannelStorageKey($appKey, $channel), 'user_count', -count($users)); + if ($users = $storage->keys(PushServer::getUserStorageKey($appKey, $channel, $uid))) { + $userCount = $storage->hIncrBy(PushServer::getChannelStorageKey($appKey, $channel), 'user_count', -count($users)); if ($userCount <= 0) { $storage->del(...$users); } @@ -95,7 +95,7 @@ public static function unsubscribeChannel(TcpConnection $connection, string $cha } } // 查询通道订阅数量 - $subCount = $storage->hIncrBy($key = PushServer::_getChannelStorageKey($appKey, $channel), 'subscription_count', -1); + $subCount = $storage->hIncrBy($key = PushServer::getChannelStorageKey($appKey, $channel), 'subscription_count', -1); if ($subCount <= 0) { $storage->del($key); // 内部事件广播 通道被移除事件 @@ -113,8 +113,8 @@ public static function unsubscribeChannel(TcpConnection $connection, string $cha } // 移除通道 unset($channels[$channel]); - PushServer::_setConnectionProperty($connection, 'channels', $channels); - PushServer::_unsetChannels($appKey, $channel, $socketId); + PushServer::setConnectionProperty($connection, 'channels', $channels); + PushServer::unsetChannels($appKey, $channel, $socketId); if ($send) { /** * 发送退订成功事件消息 diff --git a/src/PushServer.php b/src/PushServer.php index 64486ee..bf5539d 100644 --- a/src/PushServer.php +++ b/src/PushServer.php @@ -44,8 +44,8 @@ class PushServer * * @var TcpConnection[][] = [ * 'appKey_1' => [ - * 'socketId_1' => TcpConnection_1, @see self::_getConnectionProperty() - * 'socketId_2' => TcpConnection_2, @see self::_getConnectionProperty() + * 'socketId_1' => TcpConnection_1, @see self::getConnectionProperty() + * 'socketId_2' => TcpConnection_2, @see self::getConnectionProperty() * ], * ] */ @@ -131,11 +131,11 @@ public function onWorkerStop(): void{ public function onConnect(TcpConnection $connection): void { // 为TcpConnection object设置属性 - static::_setConnectionProperty($connection, 'appKey', $appKey = static::$unknownTag); - static::_setConnectionProperty($connection, 'clientNotSendPingCount', 0); - static::_setConnectionProperty($connection, 'socketId', $socketId = static::_createSocketId()); + static::setConnectionProperty($connection, 'appKey', $appKey = static::$unknownTag); + static::setConnectionProperty($connection, 'clientNotSendPingCount', 0); + static::setConnectionProperty($connection, 'socketId', $socketId = static::createSocketId()); // 设置websocket握手事件回调 - static::_setConnectionProperty($connection, 'onWebSocketConnect', + static::setConnectionProperty($connection, 'onWebSocketConnect', // ws 连接会调用该回调 function (TcpConnection $connection, string $header) use ($appKey, $socketId) { $request = new Request($header); @@ -152,9 +152,9 @@ function (TcpConnection $connection, string $header) use ($appKey, $socketId) { } } // 设置push client connection属性 - static::_setConnectionProperty($connection, 'appKey', $appKey); - static::_setConnectionProperty($connection, 'queryString', $request->queryString() ?? ''); - static::_setConnectionProperty($connection, 'channels', []); + static::setConnectionProperty($connection, 'appKey', $appKey); + static::setConnectionProperty($connection, 'queryString', $request->queryString() ?? ''); + static::setConnectionProperty($connection, 'channels', []); /** * 向客户端发送链接成功的消息 * {"event":"pusher:connection_established","data":"{"socket_id":"208836.27464492","activity_timeout":120}"} @@ -165,7 +165,7 @@ function (TcpConnection $connection, string $header) use ($appKey, $socketId) { ]); }); // 设置连接 - static::_setConnection($appKey, $socketId, $connection); + static::setConnection($appKey, $socketId, $connection); } /** @@ -181,7 +181,7 @@ public function onMessage(TcpConnection $connection, $data): void $this->setLastEvent(AbstractEvent::factory($data['event'] ?? '')); if ($event = $this->getLastEvent()) { // 心跳计数归零 - static::_setConnectionProperty($connection, 'clientNotSendPingCount', 0); + static::setConnectionProperty($connection, 'clientNotSendPingCount', 0); // 事件响应 $event->response($connection, $data); return; @@ -198,20 +198,20 @@ public function onMessage(TcpConnection $connection, $data): void public function onClose(TcpConnection $connection): void { if ( - $socketId = static::_getConnectionProperty($connection, 'socketId') and - $appKey = static::_getConnectionProperty($connection, 'appKey') + $socketId = static::getConnectionProperty($connection, 'socketId') and + $appKey = static::getConnectionProperty($connection, 'appKey') ) { // 退订频道 - if ($channels = static::_getConnectionProperty($connection, 'channels', [])) { + if ($channels = static::getConnectionProperty($connection, 'channels', [])) { foreach ($channels as $channel => $type) { // 退订事件 Unsubscribe::unsubscribeChannel($connection, $channel); // 移除通道 - static::_unsetChannels($appKey, $channel, $socketId); + static::unsetChannels($appKey, $channel, $socketId); } } // 移除连接 - static::_unsetConnection($appKey, $socketId); + static::unsetConnection($appKey, $socketId); } } @@ -266,23 +266,6 @@ public function setKeepaliveTimeout(int $keepaliveTimeout): void $this->_keepaliveTimeout = $keepaliveTimeout; } - /** - * @return \Workerman\Connection\TcpConnection[][] - */ - public static function getConnections(): array - { - return self::$_connections; - } - - /** - * @param array $connections - * @return void - */ - public static function setConnections(array $connections): void - { - self::$_connections = $connections; - } - /** * 向连接发送错误消息 * @@ -303,9 +286,9 @@ public static function error(TcpConnection $connection, ?string $code, ?string $ if (static::getConfig('heartbeat', 0) <= 0) { Timer::add(60, function() use ($connection) { $connection->destroy(); - static::_unsetConnection( - static::_getConnectionProperty($connection, 'appKey'), - static::_getConnectionProperty($connection, 'socketId') + static::unsetConnection( + static::getConnectionProperty($connection, 'appKey'), + static::getConnectionProperty($connection, 'socketId') ); }); } @@ -327,7 +310,7 @@ public static function error(TcpConnection $connection, ?string $code, ?string $ */ public static function send(TcpConnection $connection, ?string $channel, ?string $event, mixed $data): void { - $response = static::staticFilter([ + $response = static::filter([ 'timestamp' => intval(microtime(true) * 1000), 'channel' => $channel, 'event' => $event, @@ -357,67 +340,12 @@ public static function terminateConnections(string $appKey, string $socketId, ar } } - /** @inheritDoc */ - public static function _subscribeResponse(string $type, array $data): void - { - try { - // 客户端事件 - if ($type === static::$publishTypeClient) { - static::staticVerify($data, [ - ['appKey', 'is_string', true], - ['channel', 'is_string', true], - ['event', 'is_string', true], - ['socketId', 'is_string', false] - ]); - // 查询通道下的所有socketId - $socketIds = static::_getChannels($appKey = $data['appKey'], $data['channel']); - // 发送至socketId对应的连接 - foreach ($socketIds as $socketId) { - // 如果存在socketId字段,则是需要做忽略发送 - if ($socketId !== ($data['socketId'] ?? null)) { - // 获取对应connection对象 - if ($connection = static::_getConnection($appKey, $socketId)) { - // 发送 - static::send( - $connection, - $data['channel'], - $data['event'], - $data['data'] ?? '{}' - ); - } - } - } - } - // 服务事件 - if ($type === static::$publishTypeServer) { - static::staticVerify($data, [ - ['appKey', 'is_string', true], - ['event', 'is_string', true], - ['socketId', 'is_string', false], - ]); - // 断开连接事件 - if ( - ($socketId = $data['socketId'] ?? null) and - $data['event'] === EVENT_TERMINATE_CONNECTION - ) { - static::terminateConnections($data['appKey'], $socketId, $data['data'] ?? []); - } - } - } catch (\InvalidArgumentException $exception) { - Log::channel('plugin.workbunny.webman-push-server.warning') - ->warning("[PUSH-SERVER] {$exception->getMessage()}", [ - 'args' => func_get_args(), - 'method' => __METHOD__ - ]); - } - } - /** * 创建一个全局的客户端id * * @return string */ - public static function _createSocketId(): string + public static function createSocketId(): string { return uuid(); } @@ -428,7 +356,7 @@ public static function _createSocketId(): string * @param string $channel * @return string */ - public static function _getChannelType(string $channel): string + public static function getChannelType(string $channel): string { return (str_starts_with($channel, 'private-')) ? CHANNEL_TYPE_PRIVATE @@ -443,7 +371,7 @@ public static function _getChannelType(string $channel): string * @param mixed|null $value * @return void */ - public static function _setConnectionProperty(TcpConnection $connection, string $property, mixed $value): void + public static function setConnectionProperty(TcpConnection $connection, string $property, mixed $value): void { $connection->$property = $value; } @@ -456,11 +384,28 @@ public static function _setConnectionProperty(TcpConnection $connection, string * @param mixed|null $default * @return mixed|null */ - public static function _getConnectionProperty(TcpConnection $connection, string $property, mixed $default = null): mixed + public static function getConnectionProperty(TcpConnection $connection, string $property, mixed $default = null): mixed { return $connection->$property ?? $default; } + /** + * @return TcpConnection[][] + */ + public static function getConnections(): array + { + return self::$_connections; + } + + /** + * @param array $connections + * @return void + */ + public static function setConnections(array $connections): void + { + self::$_connections = $connections; + } + /** * 设置连接 * @@ -469,7 +414,7 @@ public static function _getConnectionProperty(TcpConnection $connection, string * @param TcpConnection $connection * @return void */ - public static function _setConnection(string $appKey, string $socketId, TcpConnection $connection): void + public static function setConnection(string $appKey, string $socketId, TcpConnection $connection): void { static::$_connections[$appKey][$socketId] = $connection; } @@ -481,7 +426,7 @@ public static function _setConnection(string $appKey, string $socketId, TcpConne * @param string $socketId * @return TcpConnection|null */ - public static function _getConnection(string $appKey, string $socketId): ?TcpConnection + public static function getConnection(string $appKey, string $socketId): ?TcpConnection { return static::$_connections[$appKey][$socketId] ?? null; } @@ -493,7 +438,7 @@ public static function _getConnection(string $appKey, string $socketId): ?TcpCon * @param string $socketId * @return void */ - public static function _unsetConnection(string $appKey, string $socketId): void + public static function unsetConnection(string $appKey, string $socketId): void { // 移除connections unset(static::$_connections[$appKey][$socketId]); @@ -507,7 +452,7 @@ public static function _unsetConnection(string $appKey, string $socketId): void * @param string $socketId * @return void */ - public static function _setChannel(string $appKey, string $channel, string $socketId): void + public static function setChannel(string $appKey, string $channel, string $socketId): void { static::$_channels[$appKey][$channel][$socketId] = $socketId; } @@ -520,7 +465,7 @@ public static function _setChannel(string $appKey, string $channel, string $sock * @param string|null $socketId * @return string|array|null */ - public static function _getChannels(string $appKey, string $channel, ?string $socketId = null): string|array|null + public static function getChannels(string $appKey, string $channel, ?string $socketId = null): string|array|null { return ($socketId !== null) ? (static::$_channels[$appKey][$channel][$socketId] ?? null) : @@ -535,7 +480,7 @@ public static function _getChannels(string $appKey, string $channel, ?string $so * @param string|null $socketId * @return void */ - public static function _unsetChannels(string $appKey, string $channel, ?string $socketId = null): void + public static function unsetChannels(string $appKey, string $channel, ?string $socketId = null): void { if ($socketId !== null) { unset(static::$_channels[$appKey][$channel][$socketId]); @@ -544,6 +489,61 @@ public static function _unsetChannels(string $appKey, string $channel, ?string $ unset(static::$_channels[$appKey][$channel]); } + /** @inheritDoc */ + public static function _subscribeResponse(string $type, array $data): void + { + try { + // 客户端事件 + if ($type === static::$publishTypeClient) { + static::verify($data, [ + ['appKey', 'is_string', true], + ['channel', 'is_string', true], + ['event', 'is_string', true], + ['socketId', 'is_string', false] + ]); + // 查询通道下的所有socketId + $socketIds = static::getChannels($appKey = $data['appKey'], $data['channel']); + // 发送至socketId对应的连接 + foreach ($socketIds as $socketId) { + // 如果存在socketId字段,则是需要做忽略发送 + if ($socketId !== ($data['socketId'] ?? null)) { + // 获取对应connection对象 + if ($connection = static::getConnection($appKey, $socketId)) { + // 发送 + static::send( + $connection, + $data['channel'], + $data['event'], + $data['data'] ?? '{}' + ); + } + } + } + } + // 服务事件 + if ($type === static::$publishTypeServer) { + static::verify($data, [ + ['appKey', 'is_string', true], + ['event', 'is_string', true], + ['socketId', 'is_string', false], + ]); + // 断开连接事件 + if ( + ($socketId = $data['socketId'] ?? null) and + $data['event'] === EVENT_TERMINATE_CONNECTION + ) { + static::terminateConnections($data['appKey'], $socketId, $data['data'] ?? []); + } + } + } catch (\InvalidArgumentException $exception) { + Log::channel('plugin.workbunny.webman-push-server.warning') + ->warning("[PUSH-SERVER] {$exception->getMessage()}", [ + 'args' => func_get_args(), + 'method' => __METHOD__ + ]); + } + } + /** * @return void */ @@ -553,13 +553,13 @@ public static function _heartbeatChecker(): void * @var string $appKey * @var array $connections */ - foreach (static::$_connections as $appKey => $connections) { + foreach (static::getConnections() as $appKey => $connections) { /** * @var string $socketId * @var TcpConnection $connection */ foreach ($connections as $socketId => $connection) { - $count = static::_getConnectionProperty($connection, 'clientNotSendPingCount'); + $count = static::getConnectionProperty($connection, 'clientNotSendPingCount'); if ($count === null or $count > 1) { static::terminateConnections($appKey, $socketId, [ 'type' => 'heartbeat', @@ -567,7 +567,7 @@ public static function _heartbeatChecker(): void ]); continue; } - static::_setConnectionProperty($connection, 'clientNotSendPingCount', $count + 1); + static::setConnectionProperty($connection, 'clientNotSendPingCount', $count + 1); } } } diff --git a/src/Traits/ChannelMethods.php b/src/Traits/ChannelMethods.php index f92e541..6fca204 100644 --- a/src/Traits/ChannelMethods.php +++ b/src/Traits/ChannelMethods.php @@ -133,6 +133,8 @@ public static function publishUseRetry(string $type, array $data, float $retryIn } /** + * 订阅回调 + * * @param $channel * @param $message * @return void diff --git a/src/Traits/HelperMethods.php b/src/Traits/HelperMethods.php index b17e813..541d167 100644 --- a/src/Traits/HelperMethods.php +++ b/src/Traits/HelperMethods.php @@ -16,17 +16,12 @@ */ trait HelperMethods { - /** @see PackageTrait::staticFilter() */ - public function filter(array $input): array - { - return self::staticFilter($input); - } /** * @param array $input * @return array */ - public static function staticFilter(array $input): array + public static function filter(array $input): array { $result = []; foreach ($input as $key => $value) { @@ -38,17 +33,6 @@ public static function staticFilter(array $input): array return $result; } - /** - * @param array $options - * @param array $validators - * @return void - * @throws InvalidArgumentException - */ - public function verify(array $options, array $validators): void - { - self::staticVerify($options, $validators); - } - /** * @param array $options * @param array $validators = [ @@ -58,7 +42,7 @@ public function verify(array $options, array $validators): void * @return void * @throws InvalidArgumentException */ - public static function staticVerify(mixed $options, array $validators): void + public static function verify(mixed $options, array $validators): void { if (!is_array($options)) { throw new InvalidArgumentException('Invalid Options. ', -1); diff --git a/src/Traits/StorageMethods.php b/src/Traits/StorageMethods.php index 27bdb26..e7f11b8 100644 --- a/src/Traits/StorageMethods.php +++ b/src/Traits/StorageMethods.php @@ -56,7 +56,7 @@ public static function getStorageClient(): \Redis * @param string|null $channel * @return string */ - public static function _getChannelStorageKey(string $appKey, ?string $channel = null): string + public static function getChannelStorageKey(string $appKey, ?string $channel = null): string { $channel = $channel !== null ? $channel : '*'; return "workbunny:webman-push-server:appKey_$appKey:channel_$channel:info"; @@ -68,7 +68,7 @@ public static function _getChannelStorageKey(string $appKey, ?string $channel = * @param string $channelStorageKey * @return string */ - public static function _getChannelName(string $channelStorageKey): string + public static function getChannelName(string $channelStorageKey): string { $channelKey = explode(':', $channelStorageKey, 5)[3]; return explode('_', $channelKey, 2)[1]; @@ -82,7 +82,7 @@ public static function _getChannelName(string $channelStorageKey): string * @param string|null $uid * @return string */ - public static function _getUserStorageKey(string $appKey, ?string $channel = null, ?string $uid = null): string + public static function getUserStorageKey(string $appKey, ?string $channel = null, ?string $uid = null): string { $channel = $channel !== null ? $channel : '*'; $uid = $uid !== null ? $uid : '*'; @@ -95,7 +95,7 @@ public static function _getUserStorageKey(string $appKey, ?string $channel = nul * @param string $userStorageKey * @return string */ - public static function _getUserId(string $userStorageKey): string + public static function getUserId(string $userStorageKey): string { $userIdKey = explode(':', $userStorageKey, 5)[4]; return explode('_', $userIdKey, 2)[1]; @@ -107,12 +107,12 @@ public static function _getUserId(string $userStorageKey): string * @return array[] * @throws RedisException */ - public static function _getPresenceChannelDataForSubscribe(string $appKey, string $channel): array + public static function getPresenceChannelDataForSubscribe(string $appKey, string $channel): array { $hash = []; $storage = self::getStorageClient(); while( - false !== ($keys = $storage->scan($iterator, self::_getUserStorageKey($appKey, $channel),100)) + false !== ($keys = $storage->scan($iterator, self::getUserStorageKey($appKey, $channel),100)) ) { foreach($keys as $key) { $result = $storage->hGetAll($key); diff --git a/src/config/plugin/workbunny/webman-push-server/redis.php b/src/config/plugin/workbunny/webman-push-server/redis.php index bd39520..2f12346 100644 --- a/src/config/plugin/workbunny/webman-push-server/redis.php +++ b/src/config/plugin/workbunny/webman-push-server/redis.php @@ -3,14 +3,14 @@ return [ // push server 储存器 'server-storage' => [ - 'host' => 'redis', + 'host' => 'host.docker.internal', 'password' => '', 'port' => 6379, 'database' => 0, ], // 服务通讯频道 'server-channel' => [ - 'host' => 'redis', + 'host' => 'host.docker.internal', 'password' => '', 'port' => 6379, 'database' => 0, diff --git a/src/config/plugin/workbunny/webman-push-server/route.php b/src/config/plugin/workbunny/webman-push-server/route.php index 898f339..a0b1537 100644 --- a/src/config/plugin/workbunny/webman-push-server/route.php +++ b/src/config/plugin/workbunny/webman-push-server/route.php @@ -47,8 +47,8 @@ return response(400, ['error' => 'Required channel_name']); } if( - PushServer::_getChannelType($channelName) !== CHANNEL_TYPE_PRESENCE and - PushServer::_getChannelType($channelName) !== CHANNEL_TYPE_PRIVATE + PushServer::getChannelType($channelName) !== CHANNEL_TYPE_PRESENCE and + PushServer::getChannelType($channelName) !== CHANNEL_TYPE_PRIVATE ){ return response(400, ['error' => 'Invalid channel_name']); } @@ -79,7 +79,7 @@ 'U2FsdGVkX1+vlfFH8Q9XdZ9t9h2bABGYAZltEYAX6UM=', // TODO 动态配置 $socketId, $channelName, - PushServer::_getChannelType($channelName) === CHANNEL_TYPE_PRESENCE ? $response['channel_data'] : [] + PushServer::getChannelType($channelName) === CHANNEL_TYPE_PRESENCE ? $response['channel_data'] : [] ); /** * @private {"auth": "workbunny:xxxxxxxxxxxxxxxx"} @@ -112,10 +112,10 @@ } try { $storage = PushServer::getStorageClient(); - $keys = $storage->keys(PushServer::_getChannelStorageKey($appKey)); + $keys = $storage->keys(PushServer::getChannelStorageKey($appKey)); foreach ($keys as $key) { - $channel = PushServer::_getChannelName($key); - $channelType = PushServer::_getChannelType($channel); + $channel = PushServer::getChannelName($key); + $channelType = PushServer::getChannelType($channel); if($prefix !== null and $channelType !== $prefix){ continue; @@ -150,7 +150,7 @@ } try { $storage = PushServer::getStorageClient(); - $channels = $storage->hMGet(PushServer::_getChannelStorageKey($appKey,$channelName), $fields); + $channels = $storage->hMGet(PushServer::getChannelStorageKey($appKey,$channelName), $fields); return response(200, $channels ? array_merge([ 'occupied' => true, ], $channels) : '{}'); @@ -184,7 +184,7 @@ } $channels = ($channel !== null) ? [(string)$channel] : $channels; foreach ($channels as $channel) { - PushServer::publish(PushServer::$publishTypeClient, PushServer::staticFilter([ + PushServer::publish(PushServer::$publishTypeClient, PushServer::filter([ 'appKey' => $appKey, 'channel' => $channel, 'event' => $event, @@ -214,7 +214,7 @@ $event = $package['name']; $data = $package['data']; $socketId = $package['socket_id'] ?? null; - PushServer::publish(PushServer::$publishTypeClient, PushServer::staticFilter([ + PushServer::publish(PushServer::$publishTypeClient, PushServer::filter([ 'appKey' => $appKey, 'channel' => $channel, 'event' => $event, @@ -237,7 +237,7 @@ $userId = $urlParams['userId']; $socketIds = []; $storage = PushServer::getStorageClient(); - $userKeys = $storage->keys(PushServer::_getUserStorageKey($appKey, null, $userId)); + $userKeys = $storage->keys(PushServer::getUserStorageKey($appKey, null, $userId)); foreach ($userKeys as $userKey){ $socketIds[] = $storage->hGet($userKey, 'socket_id'); } @@ -266,14 +266,14 @@ $userIdArray = []; try { $storage = PushServer::getStorageClient(); - $channelType = $storage->hGet(PushServer::_getChannelStorageKey($appKey, $channelName), 'type'); + $channelType = $storage->hGet(PushServer::getChannelStorageKey($appKey, $channelName), 'type'); if(!$channelType){ return response(404, ['error' => "Not Found [$channelName]"]); } if($channelType !== CHANNEL_TYPE_PRESENCE) { return response(400, ['error' => "Invalid channel [$channelName]"]); } - $userKeys = $storage->keys(PushServer::_getUserStorageKey($appKey, $channelName)); + $userKeys = $storage->keys(PushServer::getUserStorageKey($appKey, $channelName)); foreach ($userKeys as $userKey) { $userIdArray[] = $storage->hGet($userKey,'user_id'); } diff --git a/tests/PushServerBaseTest.php b/tests/PushServerBaseTest.php index 98478f8..be675af 100644 --- a/tests/PushServerBaseTest.php +++ b/tests/PushServerBaseTest.php @@ -36,15 +36,15 @@ public function testPushServerOnConnect() $this->getPushServer()->onConnect($connection); $this->assertEquals( PushServer::$unknownTag, - PushServer::_getConnectionProperty($connection, 'appKey', 'has-not') + PushServer::getConnectionProperty($connection, 'appKey', 'has-not') ); $this->assertEquals( 0, - PushServer::_getConnectionProperty($connection, 'clientNotSendPingCount', 'has-not') + PushServer::getConnectionProperty($connection, 'clientNotSendPingCount', 'has-not') ); $this->assertNotEquals( 'has-not', - PushServer::_getConnectionProperty($connection, 'socketId', 'has-not') + PushServer::getConnectionProperty($connection, 'socketId', 'has-not') ); $this->assertFalse(property_exists($connection, 'queryString')); $this->assertFalse(property_exists($connection, 'channels')); @@ -53,23 +53,23 @@ public function testPushServerOnConnect() ($connection->onWebSocketConnect)($connection, $this->getWebsocketHeader()); $this->assertEquals( 'workbunny', - PushServer::_getConnectionProperty($connection, 'appKey', 'has-not') + PushServer::getConnectionProperty($connection, 'appKey', 'has-not') ); $this->assertEquals( 0, - PushServer::_getConnectionProperty($connection, 'clientNotSendPingCount', 'has-not') + PushServer::getConnectionProperty($connection, 'clientNotSendPingCount', 'has-not') ); $this->assertEquals( 'protocol=7&client=js&version=3.2.4&flash=false', - PushServer::_getConnectionProperty($connection, 'queryString', 'has-not') + PushServer::getConnectionProperty($connection, 'queryString', 'has-not') ); $this->assertNotEquals( 'has-not', - PushServer::_getConnectionProperty($connection, 'socketId', 'has-not') + PushServer::getConnectionProperty($connection, 'socketId', 'has-not') ); $this->assertEquals( [], - PushServer::_getConnectionProperty($connection, 'channels', 'has-not') + PushServer::getConnectionProperty($connection, 'channels', 'has-not') ); }