diff --git a/README.md b/README.md index 0edc063..dcaa1ce 100644 --- a/README.md +++ b/README.md @@ -27,13 +27,13 @@ from cent import Client Required init arguments: -* `api_url` (str) - Centrifugo HTTP API URL address -* `api_key` (str) - Centrifugo HTTP API key +* `api_url` (`str`) - Centrifugo HTTP API URL address, for example, `http://localhost:8000/api` +* `api_key` (`str`) - Centrifugo HTTP API key Optional arguments: -* `request_timeout` (float) - base timeout for all requests in seconds, default is 10 seconds. -* `session` (requests.Session) - custom `requests` session to use. +* `timeout` (`float`) - base timeout for all requests in seconds, default is 10 seconds. +* `session` (`requests.Session`) - custom `requests` session to use. Example: @@ -56,13 +56,13 @@ from cent import AsyncClient Required init arguments: -* `api_url` (str) - Centrifugo HTTP API URL address -* `api_key` (str) - Centrifugo HTTP API key +* `api_url` (`str`) - Centrifugo HTTP API URL address, for example, `http://localhost:8000 +* `api_key` (`str`) - Centrifugo HTTP API key Optional arguments: -* `request_timeout` (float) - base timeout for all requests in seconds, default is 10 seconds. -* `session` (aiohttp.ClientSession) - custom `aiohttp` session to use. +* `timeout` (`float`) - base timeout for all requests in seconds, default is 10 seconds. +* `session` (`aiohttp.ClientSession`) - custom `aiohttp` session to use. Example: @@ -91,9 +91,9 @@ This library raises exceptions if sth goes wrong. All exceptions are subclasses * `CentNetworkError` - raised in case of network related errors (connection refused) * `CentTransportError` - raised in case of transport related errors (HTTP status code is not 2xx) * `CentTimeoutError` - raised in case of timeout -* `CentUnauthorizedError` - raised in case of unauthorized access +* `CentUnauthorizedError` - raised in case of unauthorized access (signal of invalid API key) * `CentDecodeError` - raised in case of server response decoding error -* `CentAPIError` - raised in case of API error (error returned by Centrifugo itself, you can inspect code and message in this case) +* `CentResponseError` - raised in case of API response error (i.e. error returned by Centrifugo itself, you can inspect code and message returned by Centrifugo in this case) ## For contributors @@ -123,3 +123,11 @@ To run benchmarks, run: ```bash make bench ``` + +## Migrate to Cent v5 + +Cent v5 contains the following notable changes compared to Cent v4: + +* Constructor slightly changed, refer to the examples above. +* Base exception class is now `CentError` instead of `CentException`, exceptions SDK raises were refactored. +* To send multiple commands in one HTTP request SDK provides `batch` method. diff --git a/cent/__init__.py b/cent/__init__.py index 8828aec..7659bdc 100644 --- a/cent/__init__.py +++ b/cent/__init__.py @@ -2,7 +2,7 @@ Client, AsyncClient, ) -from cent.requests import ( +from cent.dto import ( BroadcastRequest, PublishRequest, SubscribeRequest, @@ -16,8 +16,6 @@ DisconnectRequest, InfoRequest, BatchRequest, -) -from cent.results import ( PublishResult, BroadcastResult, SubscribeResult, @@ -31,8 +29,6 @@ DisconnectResult, InfoResult, BatchResult, -) -from cent.types import ( StreamPosition, ChannelOptionsOverride, Disconnect, @@ -48,7 +44,7 @@ CentTransportError, CentUnauthorizedError, CentDecodeError, - CentAPIError, + CentResponseError, ) __all__ = ( @@ -58,10 +54,10 @@ "BoolValue", "BroadcastRequest", "BroadcastResult", - "CentAPIError", "CentDecodeError", "CentError", "CentNetworkError", + "CentResponseError", "CentTransportError", "CentUnauthorizedError", "ChannelOptionsOverride", diff --git a/cent/base.py b/cent/base.py deleted file mode 100644 index 75d488d..0000000 --- a/cent/base.py +++ /dev/null @@ -1,60 +0,0 @@ -from abc import ABC, abstractmethod -from typing import TypeVar, Any, Generic, TYPE_CHECKING, ClassVar, Optional - -from pydantic import BaseModel, ConfigDict - - -CentType = TypeVar("CentType", bound=Any) - - -class Error(BaseModel): - code: int - message: str - - -class Response(BaseModel, Generic[CentType]): - error: Optional[Error] = None - result: Optional[CentType] = None - - -class CentResult(BaseModel, ABC): - model_config = ConfigDict( - use_enum_values=True, - extra="allow", - validate_assignment=True, - frozen=True, - populate_by_name=True, - arbitrary_types_allowed=True, - defer_build=True, - ) - - -class CentRequest(BaseModel, Generic[CentType], ABC): - model_config = ConfigDict( - extra="allow", - populate_by_name=True, - arbitrary_types_allowed=True, - ) - - if TYPE_CHECKING: - __returning__: ClassVar[type] - __api_method__: ClassVar[str] - else: - - @property - @abstractmethod - def __returning__(self) -> type: - pass - - @property - @abstractmethod - def __api_method__(self) -> str: - pass - - -class NestedModel(BaseModel, ABC): - model_config = ConfigDict( - extra="allow", - populate_by_name=True, - arbitrary_types_allowed=True, - ) diff --git a/cent/client/async_client.py b/cent/client/async_client.py index c0c1718..791fd54 100644 --- a/cent/client/async_client.py +++ b/cent/client/async_client.py @@ -3,8 +3,8 @@ from aiohttp import ClientSession from cent.client.session import AiohttpSession -from cent.base import CentRequest -from cent.requests import ( +from cent.dto import ( + CentRequest, BroadcastRequest, PublishRequest, SubscribeRequest, @@ -18,8 +18,6 @@ DisconnectRequest, InfoRequest, BatchRequest, -) -from cent.results import ( BatchResult, PublishResult, BroadcastResult, @@ -33,8 +31,6 @@ ChannelsResult, DisconnectResult, InfoResult, -) -from cent.types import ( StreamPosition, ChannelOptionsOverride, Disconnect, @@ -49,19 +45,19 @@ def __init__( self, api_url: str, api_key: str, - request_timeout: Optional[float] = 10.0, + timeout: Optional[float] = 10.0, session: Optional[ClientSession] = None, ) -> None: """ :param api_url: Centrifugo API URL :param api_key: Centrifugo API key - :param request_timeout: Base timeout for all requests + :param timeout: Base timeout for all requests :param session: Custom `aiohttp` session """ self._api_key = api_key self._session = AiohttpSession( api_url, - timeout=request_timeout, + timeout=timeout, session=session, ) @@ -73,7 +69,7 @@ async def publish( tags: Optional[Dict[str, str]] = None, b64data: Optional[str] = None, idempotency_key: Optional[str] = None, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> PublishResult: call = PublishRequest( channel=channel, @@ -83,7 +79,7 @@ async def publish( b64data=b64data, idempotency_key=idempotency_key, ) - return await self(call, request_timeout=request_timeout) + return await self(call, timeout=timeout) async def broadcast( self, @@ -93,7 +89,7 @@ async def broadcast( tags: Optional[Dict[str, str]] = None, b64data: Optional[str] = None, idempotency_key: Optional[str] = None, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> BroadcastResult: call = BroadcastRequest( channels=channels, @@ -103,7 +99,7 @@ async def broadcast( b64data=b64data, idempotency_key=idempotency_key, ) - return await self(call, request_timeout=request_timeout) + return await self(call, timeout=timeout) async def subscribe( self, @@ -117,7 +113,7 @@ async def subscribe( b64data: Optional[str] = None, recover_since: Optional[StreamPosition] = None, override: Optional[ChannelOptionsOverride] = None, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> SubscribeResult: call = SubscribeRequest( user=user, @@ -131,7 +127,7 @@ async def subscribe( recover_since=recover_since, override=override, ) - return await self(call, request_timeout=request_timeout) + return await self(call, timeout=timeout) async def unsubscribe( self, @@ -139,7 +135,7 @@ async def unsubscribe( channel: str, client: Optional[str] = None, session: Optional[str] = None, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> UnsubscribeResult: call = UnsubscribeRequest( user=user, @@ -147,27 +143,27 @@ async def unsubscribe( client=client, session=session, ) - return await self(call, request_timeout=request_timeout) + return await self(call, timeout=timeout) async def presence( self, channel: str, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> PresenceResult: call = PresenceRequest( channel=channel, ) - return await self(call, request_timeout=request_timeout) + return await self(call, timeout=timeout) async def presence_stats( self, channel: str, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> PresenceStatsResult: call = PresenceStatsRequest( channel=channel, ) - return await self(call, request_timeout=request_timeout) + return await self(call, timeout=timeout) async def history( self, @@ -175,7 +171,7 @@ async def history( limit: Optional[int] = None, since: Optional[StreamPosition] = None, reverse: Optional[bool] = None, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> HistoryResult: call = HistoryRequest( channel=channel, @@ -183,17 +179,17 @@ async def history( since=since, reverse=reverse, ) - return await self(call, request_timeout=request_timeout) + return await self(call, timeout=timeout) async def history_remove( self, channel: str, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> HistoryRemoveResult: call = HistoryRemoveRequest( channel=channel, ) - return await self(call, request_timeout=request_timeout) + return await self(call, timeout=timeout) async def refresh( self, @@ -202,7 +198,7 @@ async def refresh( session: Optional[str] = None, expire_at: Optional[int] = None, expired: Optional[bool] = None, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> RefreshResult: call = RefreshRequest( user=user, @@ -211,17 +207,17 @@ async def refresh( expire_at=expire_at, expired=expired, ) - return await self(call, request_timeout=request_timeout) + return await self(call, timeout=timeout) async def channels( self, pattern: Optional[str] = None, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> ChannelsResult: call = ChannelsRequest( pattern=pattern, ) - return await self(call, request_timeout=request_timeout) + return await self(call, timeout=timeout) async def disconnect( self, @@ -230,7 +226,7 @@ async def disconnect( session: Optional[str] = None, whitelist: Optional[List[str]] = None, disconnect: Optional[Disconnect] = None, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> DisconnectResult: call = DisconnectRequest( user=user, @@ -239,37 +235,35 @@ async def disconnect( whitelist=whitelist, disconnect=disconnect, ) - return await self(call, request_timeout=request_timeout) + return await self(call, timeout=timeout) async def info( self, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> InfoResult: call = InfoRequest() - return await self(call, request_timeout=request_timeout) + return await self(call, timeout=timeout) async def batch( self, commands: List[CentRequest[Any]], parallel: Optional[bool] = None, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> BatchResult: call = BatchRequest.model_construct(commands=commands, parallel=parallel) - return await self(call, request_timeout=request_timeout) + return await self(call, timeout=timeout) async def close(self) -> None: await self._session.close() - async def __call__( - self, request: CentRequest[T], request_timeout: Optional[float] = None - ) -> T: + async def __call__(self, request: CentRequest[T], timeout: Optional[float] = None) -> T: """ Call API method :param request: Centrifugo request :return: Centrifugo response """ - return await self._session(self._api_key, request, timeout=request_timeout) + return await self._session(self._api_key, request, timeout=timeout) async def __aenter__(self) -> "AsyncClient": return self diff --git a/cent/client/session/aiohttp.py b/cent/client/session/aiohttp.py index 792acf4..1cbab00 100644 --- a/cent/client/session/aiohttp.py +++ b/cent/client/session/aiohttp.py @@ -1,11 +1,10 @@ import asyncio from typing import Optional -from aiohttp import ClientSession, ClientError, ClientTimeout +from aiohttp import ClientSession, ClientError from cent.client.session.base_http_async import BaseHttpAsyncSession -from cent.base import CentType, CentRequest -from cent.requests import BatchRequest +from cent.dto import CentType, CentRequest, BatchRequest from cent.exceptions import CentNetworkError, CentTimeoutError @@ -23,13 +22,7 @@ def __init__( if session: self._session = session else: - self._session = ClientSession( - headers={ - "User-Agent": "centrifugal/pycent", - "Content-Type": "application/json", - }, - timeout=ClientTimeout(total=self._timeout), - ) + self._session = ClientSession() async def close(self) -> None: if self._session is not None and not self._session.closed: diff --git a/cent/client/session/base_http.py b/cent/client/session/base_http.py index f3e58e1..d81c5ad 100644 --- a/cent/client/session/base_http.py +++ b/cent/client/session/base_http.py @@ -6,16 +6,16 @@ from cent.exceptions import ( CentDecodeError, - CentAPIError, + CentResponseError, CentUnauthorizedError, CentTransportError, ) -from cent.base import ( +from cent.dto import ( CentRequest, CentType, Response, + BatchRequest, ) -from cent.requests import BatchRequest class BaseHttpSession: @@ -76,7 +76,7 @@ def check_response( raise CentDecodeError from err if response.error: - raise CentAPIError( + raise CentResponseError( request=request, code=response.error.code, message=response.error.message, diff --git a/cent/client/session/base_http_async.py b/cent/client/session/base_http_async.py index 56d0ce8..b0fb057 100644 --- a/cent/client/session/base_http_async.py +++ b/cent/client/session/base_http_async.py @@ -2,7 +2,7 @@ from typing import Optional from cent.client.session.base_http import BaseHttpSession -from cent.base import CentType, CentRequest +from cent.dto import CentType, CentRequest class BaseHttpAsyncSession(BaseHttpSession, ABC): diff --git a/cent/client/session/base_http_sync.py b/cent/client/session/base_http_sync.py index 1d535af..5770817 100644 --- a/cent/client/session/base_http_sync.py +++ b/cent/client/session/base_http_sync.py @@ -2,7 +2,7 @@ from typing import Optional from cent.client.session.base_http import BaseHttpSession -from cent.base import CentType, CentRequest +from cent.dto import CentType, CentRequest class BaseHttpSyncSession(BaseHttpSession, ABC): diff --git a/cent/client/session/requests.py b/cent/client/session/requests.py index d5cc03e..570095a 100644 --- a/cent/client/session/requests.py +++ b/cent/client/session/requests.py @@ -4,8 +4,7 @@ from requests import Session from cent.client.session.base_http_sync import BaseHttpSyncSession -from cent.base import CentType, CentRequest -from cent.requests import BatchRequest +from cent.dto import CentType, CentRequest, BatchRequest from cent.exceptions import CentNetworkError, CentTimeoutError @@ -24,10 +23,6 @@ def __init__( self._session = session else: self._session = Session() - self._session.headers.update({ - "User-Agent": "centrifugal/pycent", - "Content-Type": "application/json", - }) def close(self) -> None: if self._session is not None: diff --git a/cent/client/sync_client.py b/cent/client/sync_client.py index 50c1ac4..b146cfa 100644 --- a/cent/client/sync_client.py +++ b/cent/client/sync_client.py @@ -3,8 +3,8 @@ from requests import Session from cent.client.session import RequestsSession -from cent.base import CentRequest -from cent.requests import ( +from cent.dto import ( + CentRequest, BroadcastRequest, PublishRequest, SubscribeRequest, @@ -18,8 +18,6 @@ DisconnectRequest, InfoRequest, BatchRequest, -) -from cent.results import ( BatchResult, PublishResult, BroadcastResult, @@ -33,8 +31,6 @@ ChannelsResult, DisconnectResult, InfoResult, -) -from cent.types import ( StreamPosition, ChannelOptionsOverride, Disconnect, @@ -49,13 +45,13 @@ def __init__( self, api_url: str, api_key: str, - request_timeout: Optional[float] = 10.0, + timeout: Optional[float] = 10.0, session: Optional[Session] = None, ) -> None: """ :param api_url: Centrifugo API URL :param api_key: Centrifugo API key - :param request_timeout: Base timeout for all requests. + :param timeout: Base timeout for all requests. :param session: Custom `requests` session. """ @@ -63,7 +59,7 @@ def __init__( self._api_key = api_key self._session = RequestsSession( api_url, - timeout=request_timeout, + timeout=timeout, session=session, ) @@ -75,7 +71,7 @@ def publish( tags: Optional[Dict[str, str]] = None, b64data: Optional[str] = None, idempotency_key: Optional[str] = None, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> PublishResult: call = PublishRequest( channel=channel, @@ -85,7 +81,7 @@ def publish( b64data=b64data, idempotency_key=idempotency_key, ) - return self(call, request_timeout=request_timeout) + return self(call, timeout=timeout) def broadcast( self, @@ -95,7 +91,7 @@ def broadcast( tags: Optional[Dict[str, str]] = None, b64data: Optional[str] = None, idempotency_key: Optional[str] = None, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> BroadcastResult: call = BroadcastRequest( channels=channels, @@ -105,7 +101,7 @@ def broadcast( b64data=b64data, idempotency_key=idempotency_key, ) - return self(call, request_timeout=request_timeout) + return self(call, timeout=timeout) def subscribe( self, @@ -119,7 +115,7 @@ def subscribe( b64data: Optional[str] = None, recover_since: Optional[StreamPosition] = None, override: Optional[ChannelOptionsOverride] = None, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> SubscribeResult: call = SubscribeRequest( user=user, @@ -133,7 +129,7 @@ def subscribe( recover_since=recover_since, override=override, ) - return self(call, request_timeout=request_timeout) + return self(call, timeout=timeout) def unsubscribe( self, @@ -141,7 +137,7 @@ def unsubscribe( channel: str, client: Optional[str] = None, session: Optional[str] = None, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> UnsubscribeResult: call = UnsubscribeRequest( user=user, @@ -149,27 +145,27 @@ def unsubscribe( client=client, session=session, ) - return self(call, request_timeout=request_timeout) + return self(call, timeout=timeout) def presence( self, channel: str, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> PresenceResult: call = PresenceRequest( channel=channel, ) - return self(call, request_timeout=request_timeout) + return self(call, timeout=timeout) def presence_stats( self, channel: str, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> PresenceStatsResult: call = PresenceStatsRequest( channel=channel, ) - return self(call, request_timeout=request_timeout) + return self(call, timeout=timeout) def history( self, @@ -177,7 +173,7 @@ def history( limit: Optional[int] = None, since: Optional[StreamPosition] = None, reverse: Optional[bool] = None, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> HistoryResult: call = HistoryRequest( channel=channel, @@ -185,17 +181,17 @@ def history( since=since, reverse=reverse, ) - return self(call, request_timeout=request_timeout) + return self(call, timeout=timeout) def history_remove( self, channel: str, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> HistoryRemoveResult: call = HistoryRemoveRequest( channel=channel, ) - return self(call, request_timeout=request_timeout) + return self(call, timeout=timeout) def refresh( self, @@ -204,7 +200,7 @@ def refresh( session: Optional[str] = None, expired: Optional[bool] = None, expire_at: Optional[int] = None, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> RefreshResult: call = RefreshRequest( user=user, @@ -213,17 +209,17 @@ def refresh( expired=expired, expire_at=expire_at, ) - return self(call, request_timeout=request_timeout) + return self(call, timeout=timeout) def channels( self, pattern: Optional[str] = None, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> ChannelsResult: call = ChannelsRequest( pattern=pattern, ) - return self(call, request_timeout=request_timeout) + return self(call, timeout=timeout) def disconnect( self, @@ -232,7 +228,7 @@ def disconnect( session: Optional[str] = None, whitelist: Optional[List[str]] = None, disconnect: Optional[Disconnect] = None, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> DisconnectResult: call = DisconnectRequest( user=user, @@ -241,35 +237,35 @@ def disconnect( whitelist=whitelist, disconnect=disconnect, ) - return self(call, request_timeout=request_timeout) + return self(call, timeout=timeout) def info( self, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> InfoResult: call = InfoRequest() - return self(call, request_timeout=request_timeout) + return self(call, timeout=timeout) def batch( self, commands: List[CentRequest[Any]], parallel: Optional[bool] = False, - request_timeout: Optional[float] = None, + timeout: Optional[float] = None, ) -> BatchResult: call = BatchRequest.model_construct(commands=commands, parallel=parallel) - return self(call, request_timeout=request_timeout) + return self(call, timeout=timeout) def close(self) -> None: self._session.close() - def __call__(self, request: CentRequest[T], request_timeout: Optional[float] = None) -> T: + def __call__(self, request: CentRequest[T], timeout: Optional[float] = None) -> T: """ Call API method :param request: Centrifugo request :return: Centrifugo response """ - return self._session(self._api_key, request, timeout=request_timeout) + return self._session(self._api_key, request, timeout=timeout) def __enter__(self) -> "Client": return self diff --git a/cent/requests.py b/cent/dto.py similarity index 51% rename from cent/requests.py rename to cent/dto.py index 665448d..5bc43c2 100644 --- a/cent/requests.py +++ b/cent/dto.py @@ -1,23 +1,321 @@ -from typing import Any, Optional, Dict, List - -from cent.base import CentRequest -from cent.types import StreamPosition, ChannelOptionsOverride, Disconnect - -from cent.results import ( - BatchResult, - BroadcastResult, - ChannelsResult, - DisconnectResult, - HistoryResult, - HistoryRemoveResult, - InfoResult, - PresenceResult, - PresenceStatsResult, - PublishResult, - RefreshResult, - SubscribeResult, - UnsubscribeResult, -) +from abc import ABC, abstractmethod +from typing import TypeVar, Any, Generic, TYPE_CHECKING, ClassVar, Optional, List, Dict +from pydantic import BaseModel, ConfigDict, Field + + +class Error(BaseModel): + code: int + message: str + + +CentType = TypeVar("CentType", bound=Any) + + +class Response(BaseModel, Generic[CentType]): + error: Optional[Error] = None + result: Optional[CentType] = None + + +class CentRequest(BaseModel, Generic[CentType], ABC): + model_config = ConfigDict( + extra="allow", + populate_by_name=True, + arbitrary_types_allowed=True, + ) + + if TYPE_CHECKING: + __returning__: ClassVar[type] + __api_method__: ClassVar[str] + else: + + @property + @abstractmethod + def __returning__(self) -> type: + pass + + @property + @abstractmethod + def __api_method__(self) -> str: + pass + + +class CentResult(BaseModel, ABC): + model_config = ConfigDict( + use_enum_values=True, + extra="allow", + validate_assignment=True, + frozen=True, + populate_by_name=True, + arbitrary_types_allowed=True, + defer_build=True, + ) + + +class NestedModel(BaseModel, ABC): + model_config = ConfigDict( + extra="allow", + populate_by_name=True, + arbitrary_types_allowed=True, + ) + + +class Disconnect(NestedModel): + """Disconnect data. + + Attributes: + code (int): Disconnect code. + reason (str): Disconnect reason. + """ + + code: int + reason: str + + +class BoolValue(NestedModel): + """Bool value. + + Attributes: + value (bool): Value. + """ + + value: bool + + +class StreamPosition(NestedModel): + """ + Stream position representation. + + Attributes: + offset (int): Offset of publication in history stream. + epoch (str): Epoch of current stream. + """ + + offset: int + epoch: str + + +class ChannelOptionsOverride(NestedModel): + """ + Override object for channel options. + + Attributes: + presence (Optional[BoolValue]): Override for presence. + join_leave (Optional[BoolValue]): Override for join_leave behavior. + force_push_join_leave (Optional[BoolValue]): Force push for join_leave events. + force_positioning (Optional[BoolValue]): Override for force positioning. + force_recovery (Optional[BoolValue]): Override for force recovery. + """ + + presence: Optional[BoolValue] = None + join_leave: Optional[BoolValue] = None + force_push_join_leave: Optional[BoolValue] = None + force_positioning: Optional[BoolValue] = None + force_recovery: Optional[BoolValue] = None + + +class ProcessStats(CentResult): + """ + Represents statistics of a process. + + Attributes: + cpu (float): Process CPU usage as a percentage. Defaults to 0.0. + rss (int): Process Resident Set Size (RSS) in bytes. + """ + + cpu: float = Field(default=0.0) + rss: int + + +class ClientInfo(CentResult): + """ + Represents the result containing client information. + + Attributes: + client (str): Client ID. + user (str): User ID. + conn_info (Optional[Any]): Optional connection info. This can include details + such as IP address, location, etc. + chan_info (Optional[Any]): Optional channel info. This might include specific + settings or preferences related to the channel. + """ + + client: str + user: str + conn_info: Optional[Any] = None + chan_info: Optional[Any] = None + + +class Publication(CentResult): + """Publication result. + + Attributes: + offset (int): Offset of publication in history stream. + data (Any): Custom JSON inside publication. + tags (Optional[Dict[str, str]]): Tags are optional. + """ + + data: Any + offset: int = Field(default=0) + tags: Optional[Dict[str, str]] = None + + +class Metrics(CentResult): + """Metrics result. + + Attributes: + interval (float): Metrics aggregation interval. + items (Dict[str, float]): metric values. + """ + + interval: float = Field(default=0.0) + items: Dict[str, float] + + +class Node(CentResult): + """Node result. + + Attributes: + uid (str): Node unique identifier. + name (str): Node name. + version (str): Node version. + num_clients (int): Total number of connections. + num_subs (int): Total number of subscriptions. + num_users (int): Total number of users. + num_channels (int): Total number of channels. + uptime (int): Node uptime. + metrics (Optional[Metrics]): Node metrics. + process (Optional[ProcessStats]): Node process stats. + """ + + uid: str + name: str + version: str + num_clients: int = Field(default=0) + num_subs: int = Field(default=0) + num_users: int = Field(default=0) + num_channels: int = Field(default=0) + uptime: int = Field(default=0) + metrics: Optional[Metrics] = None + process: Optional[ProcessStats] = None + + +class BatchResult(CentResult): + """Batch response. + + Attributes: + replies: List of results from batch request. + """ + + replies: List[Any] + + +class PublishResult(CentResult): + """Publish result. + + Attributes: + offset: Offset of publication in history stream. + epoch: Epoch of current stream. + """ + + offset: Optional[int] = None + epoch: Optional[str] = None + + +class BroadcastResult(CentResult): + """Broadcast result. + + Attributes: + responses: List of responses for each individual publish + (with possible error and publish result) + """ + + responses: List[Response[PublishResult]] = Field(default_factory=list) + + +class ChannelInfoResult(CentResult): + """Channel info result. + + Attributes: + num_clients: Total number of connections currently subscribed to a channel. + """ + + num_clients: int = Field(default=0) + + +class ChannelsResult(CentResult): + """Channels result. + + Attributes: + channels: Map where key is channel and value is ChannelInfoResult. + """ + + channels: Dict[str, ChannelInfoResult] + + +class DisconnectResult(CentResult): + """Disconnect result.""" + + +class HistoryRemoveResult(CentResult): + """History remove result.""" + + +class HistoryResult(CentResult): + """History result. + + Attributes: + publications: List of publications in channel. + offset: Top offset in history stream. + epoch: Epoch of current stream. + """ + + publications: List[Publication] = Field(default_factory=list) + offset: Optional[int] = None + epoch: Optional[str] = None + + +class InfoResult(CentResult): + """Info result. + + Attributes: + nodes: Information about all nodes in a cluster. + """ + + nodes: List[Node] + + +class PresenceResult(CentResult): + """Presence result. + + Attributes: + presence: Map where key is client ID and value is ClientInfo. + """ + + presence: Dict[str, ClientInfo] + + +class PresenceStatsResult(CentResult): + """Presence stats result. + + Attributes: + num_clients: Total number of clients in channel. + num_users: Total number of unique users in channel. + """ + + num_clients: int = Field(default=0) + num_users: int = Field(default=0) + + +class RefreshResult(CentResult): + """Refresh result.""" + + +class SubscribeResult(CentResult): + """Subscribe result.""" + + +class UnsubscribeResult(CentResult): + """Unsubscribe result.""" class BatchRequest(CentRequest[BatchResult]): diff --git a/cent/exceptions.py b/cent/exceptions.py index bc0449d..de1e01b 100644 --- a/cent/exceptions.py +++ b/cent/exceptions.py @@ -1,4 +1,4 @@ -from cent.base import CentType, CentRequest +from cent.dto import CentType, CentRequest class CentError(Exception): @@ -8,7 +8,7 @@ class CentError(Exception): class CentNetworkError(CentError): - """CentNetworkError raised when Centrifugo is not available.""" + """CentNetworkError raised when Centrifugo is unreachable or not available.""" def __init__(self, request: CentRequest[CentType], message: str) -> None: self.request = request @@ -22,7 +22,7 @@ def __repr__(self) -> str: class CentTransportError(CentError): - """CentTransportError raised when returns non-200 status code.""" + """CentTransportError raised when HTTP request results into non-200 status code.""" def __init__(self, request: CentRequest[CentType], status_code: int): self.request = request @@ -61,7 +61,7 @@ class CentDecodeError(CentError): """ -class CentAPIError(CentError): +class CentResponseError(CentError): """ CentAPIError raised when response from Centrifugo contains any error as a result of API command execution. @@ -73,7 +73,7 @@ def __init__(self, request: CentRequest[CentType], code: int, message: str) -> N self.message = message def __str__(self) -> str: - return f"API error #{self.code}: {self.message}" + return f"Server API response error #{self.code}: {self.message}" def __repr__(self) -> str: return f"{type(self).__name__}('{self}')" diff --git a/cent/results.py b/cent/results.py deleted file mode 100644 index 4599ef2..0000000 --- a/cent/results.py +++ /dev/null @@ -1,126 +0,0 @@ -from typing import List, Any, Optional, Dict - -from pydantic import Field - -from cent.base import CentResult -from cent.base import Response -from cent.types import Publication, Node, ClientInfo - - -class BatchResult(CentResult): - """Batch response. - - Attributes: - replies: List of results from batch request. - """ - - replies: List[Any] - - -class PublishResult(CentResult): - """Publish result. - - Attributes: - offset: Offset of publication in history stream. - epoch: Epoch of current stream. - """ - - offset: Optional[int] = None - epoch: Optional[str] = None - - -class BroadcastResult(CentResult): - """Broadcast result. - - Attributes: - responses: List of responses for each individual publish - (with possible error and publish result) - """ - - responses: List[Response[PublishResult]] = Field(default_factory=list) - - -class ChannelInfoResult(CentResult): - """Channel info result. - - Attributes: - num_clients: Total number of connections currently subscribed to a channel. - """ - - num_clients: int = Field(default=0) - - -class ChannelsResult(CentResult): - """Channels result. - - Attributes: - channels: Map where key is channel and value is ChannelInfoResult. - """ - - channels: Dict[str, ChannelInfoResult] - - -class DisconnectResult(CentResult): - """Disconnect result.""" - - -class HistoryRemoveResult(CentResult): - """History remove result.""" - - -class HistoryResult(CentResult): - """History result. - - Attributes: - publications: List of publications in channel. - offset: Top offset in history stream. - epoch: Epoch of current stream. - """ - - publications: List[Publication] = Field(default_factory=list) - offset: Optional[int] = None - epoch: Optional[str] = None - - -class InfoResult(CentResult): - """Info result. - - Attributes: - nodes: Information about all nodes in a cluster. - """ - - nodes: List[Node] - - -class PresenceResult(CentResult): - """Presence result. - - Attributes: - presence: Map where key is client ID and value is ClientInfo. - """ - - presence: Dict[str, ClientInfo] - - -class PresenceStatsResult(CentResult): - """Presence stats result. - - Attributes: - num_clients: Total number of clients in channel. - num_users: Total number of unique users in channel. - """ - - num_clients: int = Field(default=0) - num_users: int = Field(default=0) - - -class RefreshResult(CentResult): - """Refresh result.""" - - -class SubscribeResult(CentResult): - """Subscribe result.""" - - -class UnsubscribeResult(CentResult): - """Unsubscribe result.""" diff --git a/cent/types.py b/cent/types.py deleted file mode 100644 index 3f3480f..0000000 --- a/cent/types.py +++ /dev/null @@ -1,145 +0,0 @@ -from typing import Optional, Any, Dict - -from pydantic import Field - -from cent.base import CentResult, NestedModel - - -class Disconnect(NestedModel): - """Disconnect data. - - Attributes: - code (int): Disconnect code. - reason (str): Disconnect reason. - """ - - code: int - reason: str - - -class BoolValue(NestedModel): - """Bool value. - - Attributes: - value (bool): Value. - """ - - value: bool - - -class StreamPosition(NestedModel): - """ - Stream position representation. - - Attributes: - offset (int): Offset of publication in history stream. - epoch (str): Epoch of current stream. - """ - - offset: int - epoch: str - - -class ChannelOptionsOverride(NestedModel): - """ - Override object for channel options. - - Attributes: - presence (Optional[BoolValue]): Override for presence. - join_leave (Optional[BoolValue]): Override for join_leave behavior. - force_push_join_leave (Optional[BoolValue]): Force push for join_leave events. - force_positioning (Optional[BoolValue]): Override for force positioning. - force_recovery (Optional[BoolValue]): Override for force recovery. - """ - - presence: Optional[BoolValue] = None - join_leave: Optional[BoolValue] = None - force_push_join_leave: Optional[BoolValue] = None - force_positioning: Optional[BoolValue] = None - force_recovery: Optional[BoolValue] = None - - -class ProcessStats(CentResult): - """ - Represents statistics of a process. - - Attributes: - cpu (float): Process CPU usage as a percentage. Defaults to 0.0. - rss (int): Process Resident Set Size (RSS) in bytes. - """ - - cpu: float = Field(default=0.0) - rss: int - - -class ClientInfo(CentResult): - """ - Represents the result containing client information. - - Attributes: - client (str): Client ID. - user (str): User ID. - conn_info (Optional[Any]): Optional connection info. This can include details - such as IP address, location, etc. - chan_info (Optional[Any]): Optional channel info. This might include specific - settings or preferences related to the channel. - """ - - client: str - user: str - conn_info: Optional[Any] = None - chan_info: Optional[Any] = None - - -class Publication(CentResult): - """Publication result. - - Attributes: - offset (int): Offset of publication in history stream. - data (Any): Custom JSON inside publication. - tags (Optional[Dict[str, str]]): Tags are optional. - """ - - data: Any - offset: int = Field(default=0) - tags: Optional[Dict[str, str]] = None - - -class Metrics(CentResult): - """Metrics result. - - Attributes: - interval (float): Metrics aggregation interval. - items (Dict[str, float]): metric values. - """ - - interval: float = Field(default=0.0) - items: Dict[str, float] - - -class Node(CentResult): - """Node result. - - Attributes: - uid (str): Node unique identifier. - name (str): Node name. - version (str): Node version. - num_clients (int): Total number of connections. - num_subs (int): Total number of subscriptions. - num_users (int): Total number of users. - num_channels (int): Total number of channels. - uptime (int): Node uptime. - metrics (Optional[Metrics]): Node metrics. - process (Optional[ProcessStats]): Node process stats. - """ - - uid: str - name: str - version: str - num_clients: int = Field(default=0) - num_subs: int = Field(default=0) - num_users: int = Field(default=0) - num_channels: int = Field(default=0) - uptime: int = Field(default=0) - metrics: Optional[Metrics] = None - process: Optional[ProcessStats] = None diff --git a/tests/test_async_validation.py b/tests/test_async_validation.py index b2b70fb..c0bbe5e 100644 --- a/tests/test_async_validation.py +++ b/tests/test_async_validation.py @@ -3,7 +3,7 @@ from cent import ( AsyncClient, - CentAPIError, + CentResponseError, PublishRequest, StreamPosition, Disconnect, @@ -154,7 +154,7 @@ async def test_batch(async_client: AsyncClient) -> None: async def test_error_publish(async_client: AsyncClient) -> None: - with pytest.raises(CentAPIError, match="unknown channel") as exc_info: + with pytest.raises(CentResponseError, match="unknown channel") as exc_info: await async_client.publish( "undefined_channel:123", {"data": "data"}, diff --git a/tests/test_sync_validation.py b/tests/test_sync_validation.py index 07a3aec..edc187e 100644 --- a/tests/test_sync_validation.py +++ b/tests/test_sync_validation.py @@ -3,7 +3,7 @@ from cent import ( Client, - CentAPIError, + CentResponseError, PublishRequest, BroadcastRequest, PresenceRequest, @@ -155,7 +155,7 @@ def test_batch(sync_client: Client) -> None: def test_error_publish(sync_client: Client) -> None: - with pytest.raises(CentAPIError, match="unknown channel") as exc_info: + with pytest.raises(CentResponseError, match="unknown channel") as exc_info: sync_client.publish( "undefined_channel:123", {"data": "data"},