diff --git a/mautrix_telegram/__main__.py b/mautrix_telegram/__main__.py index 83ee0fb1..38295ceb 100644 --- a/mautrix_telegram/__main__.py +++ b/mautrix_telegram/__main__.py @@ -13,8 +13,8 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import asyncio from typing import Dict, Any - from telethon import __version__ as __telethon_version__ from alchemysession import AlchemySessionContainer @@ -55,6 +55,7 @@ class TelegramBridge(Bridge): matrix_class = MatrixHandler config: Config + context: Context session_container: AlchemySessionContainer bot: Bot @@ -65,33 +66,49 @@ def prepare_db(self) -> None: engine=self.db, table_base=Base, session=False, table_prefix="telethon_", manage_tables=False) - def _prepare_website(self, context: Context) -> None: + def _prepare_website(self) -> None: if self.config["appservice.public.enabled"]: public_website = PublicBridgeWebsite(self.loop) self.az.app.add_subapp(self.config["appservice.public.prefix"], public_website.app) - context.public_website = public_website + self.context.public_website = public_website if self.config["appservice.provisioning.enabled"]: - provisioning_api = ProvisioningAPI(context) + provisioning_api = ProvisioningAPI(self.context) self.az.app.add_subapp(self.config["appservice.provisioning.prefix"], provisioning_api.app) - context.provisioning_api = provisioning_api + self.context.provisioning_api = provisioning_api def prepare_bridge(self) -> None: self.bot = init_bot(self.config) - context = Context(self.az, self.config, self.loop, self.session_container, self, self.bot) - self._prepare_website(context) - self.matrix = context.mx = MatrixHandler(context) - - init_abstract_user(context) - init_formatter(context) - init_portal(context) - self.add_startup_actions(init_puppet(context)) - self.add_startup_actions(init_user(context)) + self.context = Context(self.az, self.config, self.loop, self.session_container, self, self.bot) + self._prepare_website() + self.matrix = self.context.mx = MatrixHandler(self.context) + + init_abstract_user(self.context) + init_formatter(self.context) + init_portal(self.context) + self.add_startup_actions(init_puppet(self.context)) + if self.bot: self.add_startup_actions(self.bot.start()) if self.config["bridge.resend_bridge_info"]: self.add_startup_actions(self.resend_bridge_info()) + + async def start(self) -> None: + await super().start() + + semaphore = None + concurrency = self.config['telegram.connection.concurrent_connections_startup'] + if concurrency: + semaphore = asyncio.Semaphore(concurrency) + + async def sem_task(task): + if not semaphore: + return await task + async with semaphore: + return await task + + await asyncio.gather(*(sem_task(task) for task in init_user(self.context))) async def resend_bridge_info(self) -> None: self.config["bridge.resend_bridge_info"] = False diff --git a/mautrix_telegram/config.py b/mautrix_telegram/config.py index 30a9d691..0defed97 100644 --- a/mautrix_telegram/config.py +++ b/mautrix_telegram/config.py @@ -203,6 +203,7 @@ def do_update(self, helper: ConfigUpdateHelper) -> None: copy("telegram.connection.retry_delay") copy("telegram.connection.flood_sleep_threshold") copy("telegram.connection.request_retries") + copy("telegram.connection.concurrent_connections_startup") copy("telegram.device_info.device_model") copy("telegram.device_info.system_version") diff --git a/mautrix_telegram/example-config.yaml b/mautrix_telegram/example-config.yaml index 4fab863e..15438c28 100644 --- a/mautrix_telegram/example-config.yaml +++ b/mautrix_telegram/example-config.yaml @@ -475,6 +475,9 @@ telegram: # is not recommended, since some requests can always trigger a call fail (such as searching # for messages). request_retries: 5 + # How many concurrent connections should be handled on startup. Set to 0 to allow unlimited connections + # Defualts to 0 + concurrent_connections_startup: 0 # Device info sent to Telegram. device_info: diff --git a/mautrix_telegram/user.py b/mautrix_telegram/user.py index 7430fb16..db834c24 100644 --- a/mautrix_telegram/user.py +++ b/mautrix_telegram/user.py @@ -54,6 +54,7 @@ METRIC_LOGGED_IN = Gauge('bridge_logged_in', 'Users logged into bridge') METRIC_CONNECTED = Gauge('bridge_connected', 'Users connected to Telegram') +METRIC_CONNECTING = Gauge('bridge_connecting', 'Users connecting to Telegram') BridgeState.human_readable_errors.update({ "tg-not-connected": "Your Telegram connection failed", @@ -204,7 +205,11 @@ async def ensure_started(self, even_if_no_session=False) -> 'User': if not self.puppet_whitelisted or self.connected: return self async with self._ensure_started_lock: - return cast(User, await super().ensure_started(even_if_no_session)) + try: + METRIC_CONNECTING.inc() + return cast(User, await super().ensure_started(even_if_no_session)) + finally: + METRIC_CONNECTING.dec() async def start(self, delete_unless_authenticated: bool = False) -> 'User': try: @@ -674,7 +679,6 @@ def find_by_username(cls, username: str) -> Optional['User']: return None # endregion - def init(context: 'Context') -> Iterable[Awaitable['User']]: global config config = context.config