Skip to content

Commit

Permalink
09/17/2023 04:32:24 PM 💻
Browse files Browse the repository at this point in the history
  • Loading branch information
alloc33 committed Sep 17, 2023
1 parent a04ac8e commit a8bbea5
Show file tree
Hide file tree
Showing 15 changed files with 149 additions and 142 deletions.
Binary file modified .DS_Store
Binary file not shown.
Binary file added market/src/.DS_Store
Binary file not shown.
File renamed without changes.
55 changes: 52 additions & 3 deletions market/src/api/alert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use serde::{Deserialize, Serialize};
use strum_macros::{AsRefStr, EnumString};
use uuid::Uuid;

use super::price::Price;
use super::{error::ApiError, price::Price};
use crate::{app_config::AppConfig, strategy::Strategy};

// NOTE: Webhook body example:
// {
Expand All @@ -24,7 +25,7 @@ use super::price::Price;
// }

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct AlertData {
pub struct WebhookAlertData {
pub strategy_id: Uuid,
pub ticker: String,
pub timeframe: String,
Expand All @@ -42,7 +43,6 @@ pub enum AlertType {
Long,
Short,
StopLoss,
Unknown,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
Expand All @@ -54,3 +54,52 @@ pub struct BarData {
pub close: Price,
pub volume: Decimal,
}

#[derive(Debug, Clone)]
pub struct TradeSignal {
pub strategy: Strategy,
pub ticker: String,
pub timeframe: String,
pub exchange: String,
pub alert_type: AlertType,
pub bar: BarData,
pub time: DateTime<Utc>,
}

impl TradeSignal {
pub fn from_alert_data(
alert_data: WebhookAlertData,
config: &AppConfig,
) -> Result<Self, ApiError> {
let strategy_id = alert_data.strategy_id;

let validated_strategy = config
.strategies
.iter()
.find(|strategy| strategy.id == strategy_id)
.ok_or_else(|| {
let msg = format!("Unknown strategy - {}", strategy_id);
tracing::error!(msg);
ApiError::BadRequest(msg)
})?;

if !validated_strategy.enabled {
let msg = format!(
"Strategy {} with id {} is disabled",
validated_strategy.name, validated_strategy.id
);
tracing::error!(msg);
return Err(ApiError::BadRequest(msg));
}

Ok(Self {
strategy: validated_strategy.clone(),
ticker: alert_data.ticker,
timeframe: alert_data.timeframe,
exchange: alert_data.exchange,
alert_type: alert_data.alert_type,
bar: alert_data.bar,
time: alert_data.time,
})
}
}
36 changes: 5 additions & 31 deletions market/src/api/webhook_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,16 @@ use std::sync::Arc;

use axum::{extract::State, http::StatusCode, Json};
use axum_extra::extract::WithRejection;
use uuid::Uuid;

use super::{error::ApiError, Response};
use crate::{alert::AlertData, app_config::Strategy, events::Event, App};
use super::{alert::TradeSignal, error::ApiError, Response};
use crate::{alert::WebhookAlertData, events::Event, App};

pub async fn receive_alert(
State(app): State<Arc<App>>,
WithRejection(alert_data, _): WithRejection<Json<AlertData>, ApiError>,
WithRejection(alert_data, _): WithRejection<Json<WebhookAlertData>, ApiError>,
) -> Response<()> {
validate_strategy(alert_data.strategy_id, &app.config.strategies)?;
let trade_signal = TradeSignal::from_alert_data(alert_data.0.clone(), &app.config)?;

_ = app
.event_sender
.send(Event::WebhookAlert(alert_data.0.clone()));
_ = app.event_sender.send(Event::WebhookAlert(trade_signal));

_ = sqlx::query!(
r#"
Expand Down Expand Up @@ -57,25 +53,3 @@ pub async fn receive_alert(

Ok((StatusCode::OK, Json::default()))
}

fn validate_strategy(strategy_id: Uuid, strategies: &[Strategy]) -> Result<(), ApiError> {
let validated_strategy = strategies
.iter()
.find(|strategy| strategy.id == strategy_id)
.ok_or_else(|| {
let msg = format!("Unknown strategy - {}", strategy_id);
tracing::error!(msg);
ApiError::BadRequest(msg)
})?;

if !validated_strategy.enabled {
let msg = format!(
"Strategy {} with id {} is disabled",
validated_strategy.name, validated_strategy.id
);
tracing::error!(msg);
return Err(ApiError::BadRequest(msg));
}

Ok(())
}
21 changes: 3 additions & 18 deletions market/src/app_config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use config::{Config, ConfigError, File, Environment};
use config::{Config, ConfigError, File};
use serde::Deserialize;
use uuid::Uuid;
use std::env;

use crate::strategy::Strategy;

#[derive(Debug, Deserialize, Clone)]
pub struct Database {
pub url: String,
Expand All @@ -15,22 +16,6 @@ pub struct Alpaca {
pub apca_api_base_url: String,
}

#[derive(Debug, Deserialize, Clone)]
pub struct Strategy {
pub id: Uuid,
pub name: String,
pub enabled: bool,
pub broker: Broker,
pub max_order_retries: u8,
pub order_retry_delay: f64,
}

#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Broker {
Alpaca,
}

#[derive(Debug, Clone, Deserialize)]
pub struct AppConfig {
pub api_key: String,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use serde::Deserialize;

use super::{trade_error::TradeError, Order};
use crate::api::alert::AlertData;

#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Broker {
Alpaca,
// TODO: ?add more brokers
}

12 changes: 6 additions & 6 deletions market/src/events/mod.rs → market/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use std::{fmt::Debug, sync::Arc};

use thiserror::Error as ThisError;
use tokio::sync::{
mpsc::{error::SendError, unbounded_channel, UnboundedReceiver, UnboundedSender},
Mutex,
};
use tracing::error;

use crate::{
api::{alert::AlertData, strategy::UpdateStrategy},
strategy_manager::{trade_error::TradeError, StrategyManager, StrategyManagerError},
api::{alert::TradeSignal, strategy::UpdateStrategy},
strategy_manager::StrategyManager,
};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -43,7 +41,9 @@ pub async fn dispatch_events(
Event::WebhookAlert(alert_data) => {
let strategy_manager = Arc::clone(&strategy_manager);
tokio::spawn(async move {
strategy_manager.process_trade_signal(alert_data).await;
if let Err(err) = strategy_manager.process_trade_signal(alert_data).await {
tracing::error!("Error processing trade signal: {err:?}");
}
});
}
Event::UpdateStrategy(_) => {}
Expand All @@ -53,7 +53,7 @@ pub async fn dispatch_events(

#[derive(Debug, Clone)]
pub enum Event {
WebhookAlert(AlertData),
WebhookAlert(TradeSignal),
// TODO: UpdateStrategy
UpdateStrategy(UpdateStrategy),
}
2 changes: 2 additions & 0 deletions market/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ pub mod events;
pub mod middleware;
pub mod strategy_manager;
pub mod trade_executor;
pub mod strategy;
pub mod broker;

use std::{sync::Arc, time::Duration};

Expand Down
File renamed without changes.
14 changes: 14 additions & 0 deletions market/src/strategy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use serde::Deserialize;
use uuid::Uuid;
use crate::broker::Broker;

#[derive(Debug, Deserialize, Clone)]
pub struct Strategy {
pub id: Uuid,
pub name: String,
pub enabled: bool,
pub broker: Broker,
pub max_order_retries: u8,
pub order_retry_delay: f64,
}

Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
pub mod broker;
pub mod trade_error;

use std::sync::Arc;

use apca::{ApiInfo, Client as AlpacaClient};
Expand All @@ -9,12 +6,13 @@ use serde::Deserialize;
use thiserror::Error as ThisError;
use tokio::time::{sleep, Duration};
use tracing::{error, info};
use trade_error::TradeError;
use uuid::Uuid;
use uuid7::uuid7;

use self::broker::Broker;
use crate::{api::alert::AlertData, events::Event, trade_executor::TradeExecutor, App};
use crate::{
api::alert::{WebhookAlertData, TradeSignal, AlertType},
trade_executor::TradeExecutor, App, broker::Broker,
};

#[derive(Debug, ThisError)]
pub enum StrategyManagerError {
Expand All @@ -30,6 +28,14 @@ pub enum StrategyManagerError {
StrategyDisabled(String, String),
}

// #[derive(Debug, ThisError)]
// pub enum TradeError {
// #[error("{0}")]
// InsufficientFunds(String),
// #[error("Order max retries reached. {0}")]
// MaxRetriesReached(Order),
// }

pub struct StrategyManager {
app_state: Arc<App>,
alpaca_client: AlpacaClient,
Expand All @@ -38,8 +44,10 @@ pub struct StrategyManager {

#[derive(Debug)]
pub struct Order {
id: Uuid,
ticker: String,
pub id: Uuid,
pub broker: Broker,
pub ticker: String,
pub order_type: AlertType,
}

impl StrategyManager {
Expand All @@ -60,34 +68,21 @@ impl StrategyManager {
})
}

pub async fn process_trade_signal(&self, alert_data: AlertData) -> Result<(), StrategyManagerError> {
Ok(())
}

fn create_order(&self, alert_data: &AlertData) -> Result<Order, StrategyManagerError> {
// Validate strategy - check if strategy exists and it's enabled.
// let validated_strategy = self
// .strategies
// .iter()
// .find(|strategy| strategy.id == alert_data.strategy_id)
// .ok_or_else(|| {
// StrategyManagerError::UnknownStrategy(alert_data.strategy_id.to_string())
// })?;

// if !validated_strategy.enabled {
// return Err(StrategyManagerError::StrategyDisabled(
// validated_strategy.name.clone(),
// validated_strategy.id.to_string(),
// ));
// }

// TODO: Complete Order creation
pub async fn process_trade_signal(
&self,
trade_signal: TradeSignal,
) -> Result<(), StrategyManagerError> {
let order = Order {
id: uuid7::new_v7(),
ticker: alert_data.ticker.clone(),
broker: trade_signal.strategy.broker,
ticker: trade_signal.ticker,
order_type: trade_signal.alert_type
};

Ok(order)
// TODO: Retry
// let result = self.trade_executor.execute_order(&order);

Ok(())
}
}

Expand Down
11 changes: 0 additions & 11 deletions market/src/strategy_manager/trade_error.rs

This file was deleted.

39 changes: 39 additions & 0 deletions market/src/trade_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use std::sync::Arc;

use uuid::Uuid;

use crate::{
api::alert::{WebhookAlertData, AlertType},
App,
};

// pub mod alpaca_client;
// pub mod order;

// pub trait TradeManager {
// fn get_account(&self) -> Result<(), ()>;
// }

// pub struct Account {
// id: Uuid,
// }

pub struct TradeExecutor;

// type TradeExecutorResult = Result<u64, TradeError>;

// impl TradeExecutor {
// // pub fn new(app: Arc<App>) -> Self {
// // Self { app }
// // }

// pub async fn execute_order(&self, order: &Order) -> TradeExecutorResult {
// match order.order_type {
// AlertType::Long => {}
// AlertType::Short => {}
// AlertType::StopLoss => {}
// };

// Ok(chrono::Utc::now().timestamp() as u64)
// }
// }
Loading

0 comments on commit a8bbea5

Please sign in to comment.