Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto-discover long-running tasks #608

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions ct-app/core/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from .baseclass import Base
from .components import AsyncLoop, Parameters, Utils
from .components.messages import MessageQueue
from .core import Core
from .node import Node

Expand All @@ -17,11 +16,11 @@ def main(configfile: str):

params = Parameters()
params.parse(config, entrypoint=True)
params.from_env("SUBGRAPH", "PG")
params.from_env("SUBGRAPH")
params.overrides("OVERRIDE")

# create the core and nodes instances
nodes = Node.fromCredentials(*Utils.nodesCredentials("NODE_ADDRESS", "NODE_KEY"))
nodes = [Node(*pair) for pair in zip(*Utils.nodesCredentials("NODE_ADDRESS", "NODE_KEY"))]

# start the prometheus client
try:
Expand All @@ -35,8 +34,5 @@ def main(configfile: str):

AsyncLoop.run(core.start, core.stop)

MessageQueue.clear()


if __name__ == "__main__":
main()
43 changes: 43 additions & 0 deletions ct-app/core/components/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import ast

from core.baseclass import Base
from core.subgraph.entries import Safe

Expand Down Expand Up @@ -148,3 +150,44 @@ async def balanceInChannels(cls, channels: list) -> dict[str, dict]:
c.balance) / 1e18

return results

@classmethod
def decorated_methods(cls, file: str, target: str):
try:
with open(file, "r") as f:
source_code = f.read()

tree = ast.parse(source_code)
except FileNotFoundError as e:
cls().error(f"Could not find file {file}: {e}")
return []
except SyntaxError as e:
cls().error(f"Could not parse {file}: {e}")
return []

keepalive_methods = []

for node in ast.walk(tree):
if not isinstance(node, ast.FunctionDef) and not isinstance(node, ast.AsyncFunctionDef):
continue

for decorator in node.decorator_list:
try:
if isinstance(decorator, ast.Call):
args_name = [arg.id for arg in decorator.args if isinstance(arg, ast.Name)]

if not hasattr(decorator.func, 'id') or (decorator.func.id != target and target not in args_name):
continue

elif isinstance(decorator, ast.Name):
if not hasattr(decorator, 'id') or decorator.id != target:
continue
else:
continue
except AttributeError:
continue

keepalive_methods.append(node.name)
break

return keepalive_methods
36 changes: 9 additions & 27 deletions ct-app/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(self, nodes: list[Node], params: Parameters):
s: s.provider(URL(self.params.subgraph, s.value)) for s in Type
}

self.running = False
self.running = True

@property
def api(self) -> HoprdAPI:
Expand Down Expand Up @@ -396,37 +396,19 @@ async def safe_fundings(self):
f"Fetched safe fundings ({amount} + {self.params.fundings.constant})"
)

@property
async def tasks(self):
return [getattr(self, method) for method in Utils.decorated_methods(__file__, "formalin")]

async def start(self):
"""
Start the node.
"""
self.info(f"CTCore started with {len(self.nodes)} nodes.")

for node in self.nodes:
node.running = True
await node._healthcheck()
AsyncLoop.update(await node.tasks())

self.running = True

AsyncLoop.update(
[
self.rotate_subgraphs,
self.peers_rewards,
self.ticket_parameters,
self.connected_peers,
self.registered_nodes,
self.topology,
self.nft_holders,
self.allocations,
self.eoa_balances,
self.apply_economic_model,
self.safe_fundings,
]
)

for node in self.nodes:
AsyncLoop.add(node.observe_message_queue)
[await node._healthcheck() for node in self.nodes]
AsyncLoop.update(sum([node.tasks for node in self.nodes], []))
AsyncLoop.update(self.tasks)

await AsyncLoop.gather()

Expand All @@ -435,4 +417,4 @@ def stop(self):
Stop the node.
"""
self.info("CTCore stopped.")
self.running = False
self.running = False
26 changes: 4 additions & 22 deletions ct-app/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, url: str, key: str):

self.params = Parameters()
self.connected = False
self.running = False
self.running = True

@property
async def safe_address(self):
Expand Down Expand Up @@ -346,24 +346,6 @@ async def observe_message_queue(self):
AsyncLoop.add(self.api.send_message, self.address.hopr, message.format(), [message.relayer])
MESSAGES_STATS.labels("sent", self.address.hopr, message.relayer).inc()

async def tasks(self):
callbacks = [
self.healthcheck,
self.retrieve_peers,
self.retrieve_balances,
self.retrieve_channels,
self.open_channels,
self.fund_channels,
self.close_old_channels,
self.close_incoming_channels,
self.close_pending_channels,
self.get_total_channel_funds,
self.observe_message_queue,
self.observe_relayed_messages,
]

return callbacks

@classmethod
def fromCredentials(cls, addresses: list[str], keys: list[str]):
return [cls(address, key) for address, key in zip(addresses, keys)]
@property
def tasks(self):
return [getattr(self, method) for method in Utils.decorated_methods(__file__, "formalin")]
12 changes: 1 addition & 11 deletions ct-app/test/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,4 @@ async def test_get_total_channel_funds(node: Node, channels: Channels):

@pytest.mark.asyncio
async def test_check_inbox(node: Node):
pytest.skip(f"{inspect.stack()[0][3]} not implemented")


@pytest.mark.asyncio
async def test_fromAddressAndKeyLists(node: Node):
addresses = ["LOCALHOST:9091", "LOCALHOST:9092", "LOCALHOST:9093"]
keys = ["key1", "key2", "key3"]

nodes = Node.fromCredentials(addresses, keys)

assert len(nodes) == len(addresses) == len(keys)
pytest.skip(f"{inspect.stack()[0][3]} not implemented")
Loading