Skip to content

Commit

Permalink
Merge pull request bitcoin-dev-project#284 from willcl-ark/parse-grap…
Browse files Browse the repository at this point in the history
…h-node

Add graph schema
  • Loading branch information
m3dwards authored Feb 22, 2024
2 parents 67960e5 + 1c10aad commit 347a55c
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 55 deletions.
11 changes: 5 additions & 6 deletions src/backends/compose/compose_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,10 @@ def generate_deployment_file(self, warnet):
dirs_exist_ok=True,
)

def config_args(self, tank):
def config_args(self, tank: Tank):
args = self.default_config_args(tank)
if tank.conf is not None:
args = f"{args} -{tank.conf.replace(',', ' -')}"
if tank.bitcoin_config is not None:
args = f"{args} -{tank.bitcoin_config.replace(',', ' -')}"
return args

def default_config_args(self, tank):
Expand All @@ -359,14 +359,13 @@ def add_services(self, tank: Tank, services):
assert tank.index is not None
container_name = self.get_container_name(tank.index, ServiceType.BITCOIN)
services[container_name] = {}
logger.debug(f"{tank.version=}")

# Setup bitcoind, either release binary, pre-built image or built from source on demand
if "/" and "#" in tank.version:
if tank.version and ("/" and "#" in tank.version):
# it's a git branch, building step is necessary
repo, branch = tank.version.split("#")
services[container_name]["image"] = f"{LOCAL_REGISTRY}:{branch}"
build_image(repo, branch, LOCAL_REGISTRY, branch, tank.DEFAULT_BUILD_ARGS + tank.extra_build_args, arches="amd64")
build_image(repo, branch, LOCAL_REGISTRY, branch, tank.DEFAULT_BUILD_ARGS + tank.build_args, arches="amd64")
self.copy_configs(tank)
elif tank.image:
# Pre-built custom image
Expand Down
6 changes: 3 additions & 3 deletions src/backends/kubernetes/kubernetes_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ def default_bitcoind_config_args(self, tank):
defaults += f" -zmqpubrawtx=tcp://0.0.0.0:{tank.zmqtxport}"
return defaults

def create_bitcoind_container(self, tank) -> client.V1Container:
def create_bitcoind_container(self, tank: Tank) -> client.V1Container:
self.log.debug(f"Creating bitcoind container for tank {tank.index}")
container_name = BITCOIN_CONTAINER_NAME
container_image = None
Expand All @@ -388,15 +388,15 @@ def create_bitcoind_container(self, tank) -> client.V1Container:
branch,
LOCAL_REGISTRY,
branch,
tank.DEFAULT_BUILD_ARGS + tank.extra_build_args,
tank.DEFAULT_BUILD_ARGS + tank.build_args,
arches="amd64",
)
# Prebuilt major version
else:
container_image = f"{DOCKER_REGISTRY_CORE}:{tank.version}"

bitcoind_options = self.default_bitcoind_config_args(tank)
bitcoind_options += f" {tank.conf}"
bitcoind_options += f" {tank.bitcoin_config}"
container_env = [client.V1EnvVar(name="BITCOIN_ARGS", value=bitcoind_options)]

bitcoind_container = client.V1Container(
Expand Down
8 changes: 8 additions & 0 deletions src/cli/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,11 @@ def create(
},
)
)

@graph.command()
@click.argument("graph", type=Path)
def validate(graph: Path):
"""
Validate a graph file against the schema.
"""
print(rpc_call("graph_validate", {"graph_path": graph.as_posix()}))
22 changes: 22 additions & 0 deletions src/templates/node_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"type": "object",
"properties": {
"degree": {"type": "number"},
"x": {"type": "number"},
"y": {"type": "number"},
"version": {"type": "string"},
"image": {"type": "string"},
"bitcoin_config": {"type": "string", "default": ""},
"tc_netem": {"type": "string"},
"exporter": {"type": "boolean", "default": false},
"collect_logs": {"type": "boolean", "default": false},
"build_args": {"type": "string"},
"ln": {"type": "string"}
},
"additionalProperties": false,
"oneOf": [
{"required": ["version"]},
{"required": ["image"]}
],
"required": []
}
46 changes: 41 additions & 5 deletions src/warnet/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@
import sys
import threading
import time
import traceback
from datetime import datetime
from io import BytesIO
from logging import StreamHandler
from logging.handlers import RotatingFileHandler
from pathlib import Path

import jsonschema
import networkx as nx
import scenarios
from flask import Flask, request
from flask import Flask, jsonify, request
from flask_jsonrpc.app import JSONRPC
from flask_jsonrpc.exceptions import ServerError
from warnet.utils import (
create_cycle_graph,
gen_config_dir,
load_schema,
validate_graph_schema,
)
from warnet.warnet import Warnet

Expand Down Expand Up @@ -56,6 +60,7 @@ def __init__(self, backend):

self.log_file_path = os.path.join(self.basedir, "warnet.log")
self.logger: logging.Logger
self.setup_global_exception_handler()
self.setup_logging()
self.setup_rpc()
self.logger.info(f"Started server version {SERVER_VERSION}")
Expand All @@ -72,6 +77,25 @@ def __init__(self, backend):
# before the config dir is populated with the deployment info
self.image_build_lock = threading.Lock()

def setup_global_exception_handler(self):
"""
Use flask to log traceback of unhandled excpetions
"""
@self.app.errorhandler(Exception)
def handle_exception(e):
trace = traceback.format_exc()
self.logger.error(f"Unhandled exception: {e}\n{trace}")
response = {
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": "Internal server error",
"data": str(e),
},
"id": request.json.get("id", None) if request.json else None,
}
return jsonify(response), 500

def healthy(self):
return "warnet is healthy"

Expand Down Expand Up @@ -148,6 +172,7 @@ def setup_rpc(self):
self.jsonrpc.register(self.network_export)
# Graph
self.jsonrpc.register(self.graph_generate)
self.jsonrpc.register(self.graph_validate)
# Debug
self.jsonrpc.register(self.generate_deployment)
# Server
Expand Down Expand Up @@ -357,8 +382,8 @@ def thread_start(wn):
f"Resumed warnet named '{network}' from config dir {wn.config_dir}"
)
except Exception as e:
msg = f"Error starting network: {e}"
self.logger.error(msg)
trace = traceback.format_exc()
self.logger.error(f"Unhandled exception bringing network up: {e}\n{trace}")

try:
wn = Warnet.from_network(network, self.backend)
Expand Down Expand Up @@ -389,8 +414,8 @@ def thread_start(wn, lock: threading.Lock):
wn.apply_network_conditions()
wn.connect_edges()
except Exception as e:
msg = f"Error starting warnet: {e}"
self.logger.error(msg)
trace = traceback.format_exc()
self.logger.error(f"Unhandled exception starting warnet: {e}\n{trace}")

config_dir = gen_config_dir(network)
if config_dir.exists():
Expand Down Expand Up @@ -436,6 +461,17 @@ def graph_generate(
self.logger.error(msg)
raise ServerError(message=msg) from e

def graph_validate(self, graph_path: str) -> str:

schema = load_schema()
with open(graph_path) as f:
graph = nx.parse_graphml(f.read(), node_type=int)
try:
validate_graph_schema(schema, graph)
except (jsonschema.ValidationError, jsonschema.SchemaError) as e:
raise ServerError(message=f"Schema of {graph_path} is invalid: {e}") from e
return f"Schema of {graph_path} is valid"

def network_down(self, network: str = "warnet") -> str:
"""
Stop all containers in <network>.
Expand Down
70 changes: 33 additions & 37 deletions src/warnet/tank.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

CONTAINER_PREFIX_PROMETHEUS = "prometheus_exporter"


logger = logging.getLogger("tank")


Expand All @@ -33,67 +32,64 @@ def __init__(self, index: int, config_dir: Path, warnet):
self.bitcoin_network = warnet.bitcoin_network
self.version = "25.1"
self.image: str = ""
self.conf = ""
self.bitcoin_config = ""
self.conf_file = None
self.netem = None
self.exporter = False
self.collect_logs = False
self.build_args = ""
self.lnnode: LNNode | None = None
self.rpc_port = 18443
self.rpc_user = "warnet_user"
self.rpc_password = "2themoon"
self.zmqblockport = 28332
self.zmqtxport = 28333
self._suffix = None
self._ipv4 = None
self._exporter_name = None
self.extra_build_args = ""
self.lnnode: LNNode | None = None
self.zmqblockport = 28332
self.zmqtxport = 28333

def __str__(self) -> str:
return f"Tank(index: {self.index}, version: {self.version}, conf: {self.conf}, conf file: {self.conf_file}, netem: {self.netem}, IPv4: {self._ipv4})"

def parse_version(self, node):
version = node.get("version", "")
image = node.get("image", "")
logger.debug(f"{version=:}")
logger.debug(f"{image=:}")
if version and image:
return f"Tank(index: {self.index}, version: {self.version}, conf: {self.bitcoin_config}, conf file: {self.conf_file}, netem: {self.netem}, IPv4: {self._ipv4})"

def _parse_version(self):
if self.version not in SUPPORTED_TAGS or ("/" in self.version and "#" in self.version):
raise Exception(
f"Unsupported version: can't be generated from Docker images: {self.version}"
)

def parse_graph_node(self, node):
# Dynamically parse properties based on the schema
for property, specs in self.warnet.node_schema["properties"].items():
value = node.get(property, specs.get("default"))
if property == "version":
self._parse_version()
setattr(self, property, value)
logger.debug(f"{property}={value}")

if self.version and self.image:
raise Exception(
f"Tank has {version=:} and {image=:} supplied and can't be built. Provide one or the other."
f"Tank has {self.version=:} and {self.image=:} supplied and can't be built. Provide one or the other."
)
if image:
self.image = image
else:
if (version in SUPPORTED_TAGS) or ("/" in version and "#" in version):
self.version = version
else:
raise Exception(
f"Unsupported version: can't be generated from Docker images: {version}"
)

# Special handling for complex properties
if "ln" in node:
self.lnnode = LNNode(self.warnet, self, node["ln"], self.warnet.container_interface)

self.config_dir = self.warnet.config_dir / str(self.suffix)
self.config_dir.mkdir(parents=True, exist_ok=True)
logger.debug(f"{self=:}")

@classmethod
def from_graph_node(cls, index, warnet, tank=None):
assert index is not None
index = int(index)
config_dir = warnet.config_dir / str(f"{index:06}")
config_dir.mkdir(parents=True, exist_ok=True)

self = tank
if self is None:
self = cls(index, config_dir, warnet)
node = warnet.graph.nodes[index]
self.parse_version(node)
self.conf = node.get("bitcoin_config", self.conf)
self.netem = node.get("tc_netem", self.netem)
self.exporter = node.get("exporter", self.exporter)
self.collect_logs = node.get("collect_logs", self.collect_logs)
self.extra_build_args = node.get("build_args", self.extra_build_args)

if "ln" in node:
self.lnnode = LNNode(self.warnet, self, node["ln"], self.warnet.container_interface)

self.config_dir = self.warnet.config_dir / str(self.suffix)
self.config_dir.mkdir(parents=True, exist_ok=True)
self.parse_graph_node(node)
return self

@property
Expand Down
21 changes: 19 additions & 2 deletions src/warnet/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from pathlib import Path

import networkx as nx
from jsonschema import validate
from templates import TEMPLATES
from test_framework.messages import ser_uint256
from test_framework.p2p import MESSAGEMAP

Expand All @@ -30,6 +32,7 @@
WEIGHTED_TAGS = [
tag for index, tag in enumerate(reversed(SUPPORTED_TAGS)) for _ in range(index + 1)
]
NODE_SCHEMA_PATH = TEMPLATES / "node_schema.json"


def exponential_backoff(max_retries=5, base_delay=1, max_delay=32):
Expand Down Expand Up @@ -378,7 +381,7 @@ def set_execute_permission(file_path):


def default_bitcoin_conf_args() -> str:
default_conf: Path = Path.cwd() / "src" / "templates" / "bitcoin.conf"
default_conf: Path = TEMPLATES / "bitcoin.conf"

with default_conf.open("r") as f:
defaults = parse_bitcoin_conf(f.read())
Expand Down Expand Up @@ -459,7 +462,7 @@ def create_cycle_graph(
return graph


def convert_unsupported_attributes(graph):
def convert_unsupported_attributes(graph: nx.Graph):
# Sometimes networkx complains about invalid types when writing the graph
# (it just generated itself!). Try to convert them here just in case.
for _, node_data in graph.nodes(data=True):
Expand All @@ -479,3 +482,17 @@ def convert_unsupported_attributes(graph):
continue
else:
edge_data[key] = str(value)


def load_schema():
with open(NODE_SCHEMA_PATH) as schema_file:
return json.load(schema_file)


def validate_graph_schema(node_schema: dict, graph: nx.Graph):
"""
Validate a networkx.Graph against the node schema
"""
for i in list(graph.nodes):
validate(instance=graph.nodes[i], schema=node_schema)

Loading

0 comments on commit 347a55c

Please sign in to comment.