diff --git a/ct-app/core/__main__.py b/ct-app/core/__main__.py index eb112438..04e75c39 100644 --- a/ct-app/core/__main__.py +++ b/ct-app/core/__main__.py @@ -4,7 +4,6 @@ from .baseclass import Base from .components import AsyncLoop, Parameters, Utils -from .components.messages import MessageQueue from .core import Core from .node import Node @@ -17,11 +16,11 @@ def main(configfile: str): params = Parameters() params.parse(config, entrypoint=True) - params.from_env("SUBGRAPH", "PG") + params.from_env("SUBGRAPH") params.overrides("OVERRIDE") # create the core and nodes instances - nodes = Node.fromCredentials(*Utils.nodesCredentials("NODE_ADDRESS", "NODE_KEY")) + nodes = [Node(*pair) for pair in zip(*Utils.nodesCredentials("NODE_ADDRESS", "NODE_KEY"))] # start the prometheus client try: @@ -35,8 +34,5 @@ def main(configfile: str): AsyncLoop.run(core.start, core.stop) - MessageQueue.clear() - - if __name__ == "__main__": main() diff --git a/ct-app/core/components/utils.py b/ct-app/core/components/utils.py index 4c8ca1bb..edfd4c1d 100644 --- a/ct-app/core/components/utils.py +++ b/ct-app/core/components/utils.py @@ -1,3 +1,5 @@ +import ast + from core.baseclass import Base from core.subgraph.entries import Safe @@ -148,3 +150,44 @@ async def balanceInChannels(cls, channels: list) -> dict[str, dict]: c.balance) / 1e18 return results + + @classmethod + def decorated_methods(cls, file: str, target: str): + try: + with open(file, "r") as f: + source_code = f.read() + + tree = ast.parse(source_code) + except FileNotFoundError as e: + cls().error(f"Could not find file {file}: {e}") + return [] + except SyntaxError as e: + cls().error(f"Could not parse {file}: {e}") + return [] + + keepalive_methods = [] + + for node in ast.walk(tree): + if not isinstance(node, ast.FunctionDef) and not isinstance(node, ast.AsyncFunctionDef): + continue + + for decorator in node.decorator_list: + try: + if isinstance(decorator, ast.Call): + args_name = [arg.id for arg in decorator.args if isinstance(arg, ast.Name)] + + if not hasattr(decorator.func, 'id') or (decorator.func.id != target and target not in args_name): + continue + + elif isinstance(decorator, ast.Name): + if not hasattr(decorator, 'id') or decorator.id != target: + continue + else: + continue + except AttributeError: + continue + + keepalive_methods.append(node.name) + break + + return keepalive_methods diff --git a/ct-app/core/core.py b/ct-app/core/core.py index ab152ae8..f8ee8077 100644 --- a/ct-app/core/core.py +++ b/ct-app/core/core.py @@ -59,7 +59,7 @@ def __init__(self, nodes: list[Node], params: Parameters): s: s.provider(URL(self.params.subgraph, s.value)) for s in Type } - self.running = False + self.running = True @property def api(self) -> HoprdAPI: @@ -388,37 +388,19 @@ async def safe_fundings(self): f"Fetched safe fundings ({amount} + {self.params.fundings.constant})" ) + @property + async def tasks(self): + return [getattr(self, method) for method in Utils.decorated_methods(__file__, "formalin")] + async def start(self): """ Start the node. """ self.info(f"CTCore started with {len(self.nodes)} nodes.") - for node in self.nodes: - node.running = True - await node._healthcheck() - AsyncLoop.update(await node.tasks()) - - self.running = True - - AsyncLoop.update( - [ - self.rotate_subgraphs, - self.peers_rewards, - self.ticket_parameters, - self.connected_peers, - self.registered_nodes, - self.topology, - self.nft_holders, - self.allocations, - self.eoa_balances, - self.apply_economic_model, - self.safe_fundings, - ] - ) - - for node in self.nodes: - AsyncLoop.add(node.observe_message_queue) + [await node._healthcheck() for node in self.nodes] + AsyncLoop.update(sum([node.tasks for node in self.nodes], [])) + AsyncLoop.update(self.tasks) await AsyncLoop.gather() diff --git a/ct-app/core/node.py b/ct-app/core/node.py index 811d4802..59a7cbc7 100644 --- a/ct-app/core/node.py +++ b/ct-app/core/node.py @@ -54,7 +54,7 @@ def __init__(self, url: str, key: str): self.session_management = dict[str, SessionToSocket]() self.connected = False - self.running = False + self.running = True @property async def safe_address(self): @@ -349,8 +349,7 @@ async def observe_message_queue(self): self.session_management[message.relayer].send(message.bytes) MESSAGES_STATS.labels("sent", self.address.hopr, message.relayer).inc() - - + @master(flagguard, formalin, connectguard) async def open_sessions(self): known_peers_addresses: set[Address] = set([peer.address for peer in await self.peers.get()]) @@ -365,7 +364,6 @@ async def open_session(self, relayer: Address): if session := await NodeHelper.open_session(self.address, self.api, relayer.address): self.session_management[relayer] = SessionToSocket(session) - @master(flagguard, formalin, connectguard) async def close_sessions(self): active_sessions = await self.api.get_sessions(Protocol.UDP) @@ -382,28 +380,8 @@ async def close_sessions(self): self.address, self.api, peer, self.session_management.pop(peer).session, publish_to_task_set=False) - - - async def tasks(self): - callbacks = [ - self.healthcheck, - self.retrieve_peers, - self.retrieve_balances, - self.retrieve_channels, - self.open_channels, - self.fund_channels, - self.close_old_channels, - self.close_incoming_channels, - self.close_pending_channels, - self.get_total_channel_funds, - self.observe_message_queue, - self.observe_relayed_messages, - self.open_sessions, - self.close_sessions - ] - - return callbacks - - @classmethod - def fromCredentials(cls, addresses: list[str], keys: list[str]): - return [cls(address, key) for address, key in zip(addresses, keys)] + + + @property + def tasks(self): + return [getattr(self, method) for method in Utils.decorated_methods(__file__, "formalin")] diff --git a/ct-app/test/test_node.py b/ct-app/test/test_node.py index 50fa2558..0c489728 100644 --- a/ct-app/test/test_node.py +++ b/ct-app/test/test_node.py @@ -88,14 +88,4 @@ async def test_get_total_channel_funds(node: Node, channels: Channels): @pytest.mark.asyncio async def test_check_inbox(node: Node): - pytest.skip(f"{inspect.stack()[0][3]} not implemented") - - -@pytest.mark.asyncio -async def test_fromAddressAndKeyLists(node: Node): - addresses = ["LOCALHOST:9091", "LOCALHOST:9092", "LOCALHOST:9093"] - keys = ["key1", "key2", "key3"] - - nodes = Node.fromCredentials(addresses, keys) - - assert len(nodes) == len(addresses) == len(keys) + pytest.skip(f"{inspect.stack()[0][3]} not implemented") \ No newline at end of file