Skip to content

Commit

Permalink
Remove HTTP mode
Browse files Browse the repository at this point in the history
  • Loading branch information
kalabukdima committed Nov 14, 2024
1 parent ece920b commit c5285c5
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 291 deletions.
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
## SQD worker

This is a Rust implementation of the Worker. The previous (Python) version can be found [here](https://github.com/subsquid/archive.py/tree/master).
A worker is a service that downloads assigned data chunks from persistent storage (currently S3) and processes incoming data queries that reference those data chunks. It can be used in two modes:
- Centralized. In this setup, the assignment is received from a centralized [router](https://github.com/subsquid/archive-router/tree/main/crates/router) that is aware of each worker and relies on it (almost) always being available. In this case, the communication between the worker and the router happens directly through HTTP requests.
- Decentralized. In this setup, the assignment comes from a [scheduler](https://github.com/subsquid/archive-router/tree/b01d86aaf9fb5e14b16c3d24eb7419d413ce8b46/crates/network-scheduler) via a [P2P communication protocol](https://github.com/subsquid/subsquid-network/tree/main/transport). The chunks can be reassigned as the workers join and leave the network.

The full network architecture can be found [here](https://github.com/subsquid/subsquid-network-contracts/wiki/Network-architecture#panthalassa-testnet).
A worker is a service that downloads assigned data chunks from persistent storage (currently S3) and processes incoming data queries that reference those data chunks.

For details see [network RFC](https://github.com/subsquid/specs/tree/main/network-rfc) page.

## Usage

You can find instructions for how to run your own worker [here](https://docs.subsquid.io/subsquid-network/participate/worker/).
You can find instructions for how to run your own worker [here](https://docs.sqd.dev/subsquid-network/participate/worker/).
47 changes: 9 additions & 38 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ pub struct Args {
pub data_dir: PathBuf,

/// Port to listen on
#[clap(short, long, env, default_value_t = 8000)]
pub port: u16,

#[command(subcommand)]
pub mode: Mode,
#[clap(short, long, env, default_value_t = 8000, alias = "port")]
pub http_port: u16,

#[clap(env, default_value_t = 20)]
pub parallel_queries: usize,
Expand All @@ -34,33 +31,8 @@ pub struct Args {
#[clap(env)]
pub query_threads: Option<usize>,

#[clap(env = "PING_INTERVAL_SEC", hide(true), value_parser=parse_seconds, default_value = "55")]
pub ping_interval: Duration,

#[clap(env, hide(true))]
pub sentry_dsn: Option<String>,

#[clap(env, hide(true), default_value_t = 0.001)]
pub sentry_traces_sample_rate: f32,
}

#[derive(clap::Args, Debug, Clone)]
pub struct HttpArgs {
/// URL of the router to connect to
#[clap(long, env, value_name = "URL")]
pub router: String,

/// Unique id of this worker
#[clap(long, env, value_name = "UID")]
pub worker_id: String,

/// Externally visible URL of this worker
#[clap(long, env, value_name = "URL")]
pub worker_url: String,
}

#[derive(clap::Args, Clone)]
pub struct P2PArgs {
#[clap(env = "PING_INTERVAL_SEC", hide(true), value_parser=parse_seconds, default_value = "55", alias = "ping-interval")]
pub heartbeat_interval: Duration,
/// Peer ID of the scheduler
#[clap(long, env)]
pub scheduler_id: PeerId,
Expand All @@ -78,13 +50,12 @@ pub struct P2PArgs {

#[command(flatten)]
pub transport: TransportArgs,
}

#[allow(clippy::large_enum_variant)]
#[derive(clap::Subcommand, Clone)]
pub enum Mode {
Http(HttpArgs),
P2P(P2PArgs),
#[clap(env, hide(true))]
pub sentry_dsn: Option<String>,

#[clap(env, hide(true), default_value_t = 0.001)]
pub sentry_traces_sample_rate: f32,
}

fn parse_seconds(s: &str) -> Result<Duration> {
Expand Down
80 changes: 0 additions & 80 deletions src/controller/http.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/controller/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
pub mod http;
pub mod p2p;
pub mod worker;
38 changes: 18 additions & 20 deletions src/controller/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};

use crate::{
cli::{self, Args},
gateway_allocations::{self, allocations_checker::AllocationsChecker},
cli::Args,
gateway_allocations::{
self,
allocations_checker::{self, AllocationsChecker},
},
logs_storage::{LoadedLogs, LogsStorage},
metrics,
query::result::{QueryError, QueryResult},
Expand Down Expand Up @@ -63,21 +66,9 @@ pub struct P2PController<EventStream> {
pub async fn create_p2p_controller(
worker: Arc<Worker>,
transport_builder: P2PTransportBuilder,
allocations_checker: AllocationsChecker,
scheduler_id: PeerId,
logs_collector_id: PeerId,
args: Args,
) -> Result<P2PController<impl Stream<Item = WorkerEvent>>> {
let data_dir = args.data_dir.clone();
let heartbeat_interval = args.ping_interval;
let assignment_check_interval;
let network: Network;
if let cli::Mode::P2P(p2p_args) = &args.mode {
network = p2p_args.transport.rpc.network;
assignment_check_interval = p2p_args.assignment_check_interval;
} else {
panic!("P2PContoller needs p2p config");
};
let network = args.transport.rpc.network;

let worker_id = transport_builder.local_peer_id();
let keypair = transport_builder.keypair();
Expand All @@ -89,22 +80,29 @@ pub async fn create_p2p_controller(
.as_ref()
.to_vec();
info!("Local peer ID: {worker_id}");
check_peer_id(worker_id, data_dir.join("peer_id"));
check_peer_id(worker_id, args.data_dir.join("peer_id"));

let allocations_checker = allocations_checker::AllocationsChecker::new(
transport_builder.contract_client(),
worker_id,
args.network_polling_interval,
)
.await?;

let mut config = WorkerConfig::new();
config.service_nodes = vec![scheduler_id, logs_collector_id];
config.service_nodes = vec![args.scheduler_id, args.logs_collector_id];
let (event_stream, transport_handle) = transport_builder.build_worker(config).await?;

let (queries_tx, queries_rx) = mpsc::channel(QUERIES_POOL_SIZE);
let (log_requests_tx, log_requests_rx) = mpsc::channel(LOG_REQUESTS_QUEUE_SIZE);

Ok(P2PController {
worker,
heartbeat_interval,
assignment_check_interval,
heartbeat_interval: args.heartbeat_interval,
assignment_check_interval: args.assignment_check_interval,
raw_event_stream: UseOnce::new(event_stream),
transport_handle,
logs_storage: LogsStorage::new(data_dir.join("logs.db").as_str()).await?,
logs_storage: LogsStorage::new(args.data_dir.join("logs.db").as_str()).await?,
allocations_checker,
worker_id,
keypair,
Expand Down
11 changes: 3 additions & 8 deletions src/controller/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct Worker {
state_manager: Arc<StateManager>,
queries_running: AtomicUsize,
max_parallel_queries: usize,
pub peer_id: Option<PeerId>,
pub peer_id: PeerId,
}

pub struct QueryTask {
Expand All @@ -38,20 +38,15 @@ pub struct QueryTask {
}

impl Worker {
pub fn new(state_manager: StateManager, parallel_queries: usize) -> Self {
pub fn new(state_manager: StateManager, parallel_queries: usize, peer_id: PeerId) -> Self {
Self {
state_manager: Arc::new(state_manager),
queries_running: 0.into(),
max_parallel_queries: parallel_queries,
peer_id: None,
peer_id,
}
}

pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
self.peer_id = Some(peer_id);
self
}

pub fn set_desired_chunks(&self, chunks: ChunkSet) {
self.state_manager.set_desired_chunks(chunks);
}
Expand Down
64 changes: 14 additions & 50 deletions src/http_server.rs
Original file line number Diff line number Diff line change
@@ -1,57 +1,28 @@
use std::sync::Arc;

use crate::{cli::HttpArgs, controller::worker::Worker, metrics, types::dataset::Dataset};
use crate::controller::worker::Worker;

use axum::{
extract::Path,
http::{header, HeaderMap},
response::{IntoResponse, Response},
routing::{get, post},
response::IntoResponse,
routing::get,
Json,
};
use prometheus_client::{encoding::text::encode, registry::Registry};
use reqwest::StatusCode;
use tokio_util::sync::CancellationToken;

async fn get_status(worker: Arc<Worker>, args: Option<HttpArgs>) -> Json<serde_json::Value> {
async fn get_status(worker: Arc<Worker>) -> Json<serde_json::Value> {
let status = worker.status();
match args {
Some(args) => Json(serde_json::json!({
"router_url": args.router,
"worker_id": args.worker_id,
"worker_url": args.worker_url,
"state": {
"available": status.available,
"downloading": status.downloading,
}
})),
None => Json(serde_json::json!({
"state": {
"available": status.available,
"downloading": status.downloading,
}
})),
}
}

async fn get_peer_id(worker: Arc<Worker>) -> (StatusCode, String) {
match worker.peer_id {
Some(peer_id) => (StatusCode::OK, peer_id.to_string()),
None => (StatusCode::NOT_FOUND, "".to_owned()),
}
Json(serde_json::json!({
"state": {
"available": status.available,
"downloading": status.downloading,
}
}))
}

async fn run_query(
worker: Arc<Worker>,
Path(dataset): Path<Dataset>,
query_str: String,
) -> Response {
// TODO: remove HTTP transport
let result = worker
.run_query(&query_str, dataset, None, "<unimplemented>", None)
.await;
metrics::query_executed(&result);
result.map(|result| result.data).into_response()
async fn get_peer_id(worker: Arc<Worker>) -> String {
worker.peer_id.to_string()
}

async fn get_metrics(registry: Arc<Registry>) -> impl IntoResponse {
Expand Down Expand Up @@ -79,14 +50,14 @@ pub struct Server {
}

impl Server {
pub fn new(worker: Arc<Worker>, args: Option<HttpArgs>, metrics_registry: Registry) -> Self {
pub fn new(worker: Arc<Worker>, metrics_registry: Registry) -> Self {
let metrics_registry = Arc::new(metrics_registry);
let router = axum::Router::new()
.route(
"/worker/status",
get({
let worker = worker.clone();
move || get_status(worker, args)
move || get_status(worker)
}),
)
.route(
Expand All @@ -96,13 +67,6 @@ impl Server {
move || get_peer_id(worker)
}),
)
.route(
"/query/:dataset",
post({
let worker = worker.clone();
move |path, body| run_query(worker, path, body)
}),
)
.route("/metrics", get(move || get_metrics(metrics_registry)));
let router = Self::add_common_layers(router);
Self { router }
Expand Down
Loading

0 comments on commit c5285c5

Please sign in to comment.