From 8d119975253ac3c86f24ca178cf177fec84a9b82 Mon Sep 17 00:00:00 2001 From: Daniel Mueller Date: Thu, 16 Jan 2020 15:45:57 -0800 Subject: [PATCH] Add serialization support for account & trade events Certain clients may be interested in printing account or trade events in JSON format, for interoperability with other utilities. This change adds such support to the types involved. --- CHANGELOG.md | 1 + Cargo.toml | 2 +- src/api/v2/account.rs | 3 ++- src/api/v2/events.rs | 61 ++++++++++++++++++++++++++++++++++++++++--- src/api/v2/order.rs | 19 +++++++++++--- src/events/stream.rs | 54 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 131 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index af0a362d..a256837a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ Unreleased ---------- - Added support for accessing `/v2/account/configurations` endpoint - Added support for querying `/v1/bars/` endpoint +- Added support for serializing account & trade events - Switched from using `log` to `tracing` as a logging/tracing provider - Switched to using `serde_urlencoded` for encoding query parameters - Bumped `http-endpoint` dependency to `0.1.1` diff --git a/Cargo.toml b/Cargo.toml index 3e321d66..49df47fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ hyper = {version = "0.13", features = ["stream"]} hyper-tls = {version = "0.4", default-features = false} num-decimal = {version = "0.1", features = ["serde"]} serde = {version = "1.0", features = ["derive"]} -serde_json = "1.0" +serde_json = {version = "1.0", default-features = false} serde_urlencoded = {version = "0.6", default-features = false} tracing = {version = "0.1", default-features = false, features = ["attributes", "std"]} tracing-futures = {version = "0.2", default-features = false, features = ["std-future"]} diff --git a/src/api/v2/account.rs b/src/api/v2/account.rs index bb3712f8..15d10bad 100644 --- a/src/api/v2/account.rs +++ b/src/api/v2/account.rs @@ -7,6 +7,7 @@ use std::time::SystemTime; use num_decimal::Num; use serde::Deserialize; +use serde::Serialize; use uuid::Uuid; @@ -15,7 +16,7 @@ use crate::Str; /// A type representing an account ID. -#[derive(Clone, Copy, Debug, Deserialize, PartialEq)] +#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)] pub struct Id(pub Uuid); impl Deref for Id { diff --git a/src/api/v2/events.rs b/src/api/v2/events.rs index 42c0bb51..c311c513 100644 --- a/src/api/v2/events.rs +++ b/src/api/v2/events.rs @@ -6,17 +6,19 @@ use std::time::SystemTime; use num_decimal::Num; use serde::Deserialize; +use serde::Serialize; use crate::api::v2::account; use crate::api::v2::order; use crate::events::EventStream; use crate::events::StreamType; use crate::time_util::optional_system_time_from_str; +use crate::time_util::optional_system_time_to_rfc3339; /// A representation of an account update that we receive through the /// "account_updates" stream. -#[derive(Clone, Debug, Deserialize, PartialEq)] +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct AccountUpdate { /// The corresponding account's ID. #[serde(rename = "id")] @@ -25,18 +27,21 @@ pub struct AccountUpdate { #[serde( rename = "created_at", deserialize_with = "optional_system_time_from_str", + serialize_with = "optional_system_time_to_rfc3339", )] pub created_at: Option, /// The time the account was updated last. #[serde( rename = "updated_at", deserialize_with = "optional_system_time_from_str", + serialize_with = "optional_system_time_to_rfc3339", )] pub updated_at: Option, /// The time the account was deleted at. #[serde( rename = "deleted_at", deserialize_with = "optional_system_time_from_str", + serialize_with = "optional_system_time_to_rfc3339", )] pub deleted_at: Option, /// The account's status. @@ -69,7 +74,7 @@ impl EventStream for AccountUpdates { /// The status of a trade, as reported as part of a `TradeUpdate`. -#[derive(Clone, Copy, Debug, Deserialize, PartialEq)] +#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)] pub enum TradeStatus { /// The order has been received by Alpaca, and routed to exchanges for /// execution. @@ -122,7 +127,8 @@ pub enum TradeStatus { /// A representation of a trade update that we receive through the /// "trade_updates" stream. -#[derive(Clone, Debug, Deserialize, PartialEq)] +// TODO: There is also a timestamp field that we may want to hook up. +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct TradeUpdate { /// The event that occurred. #[serde(rename = "event")] @@ -157,6 +163,9 @@ mod tests { use http_endpoint::Error as EndpointError; + use serde_json::from_str as from_json; + use serde_json::to_string as to_json; + use test_env_log::test; use url::Url; @@ -169,6 +178,52 @@ mod tests { use crate::Error; + #[test] + fn deserialize_and_serialize_trade_event() { + let response = r#"{ + "event":"canceled", + "order":{ + "asset_class":"us_equity", + "asset_id":"3ece3182-5903-4902-b963-f875a0f416e7", + "canceled_at":"2020-01-19T06:19:40.137087268Z", + "client_order_id":"be7d3030-a53e-47ee-9dd3-d9ff3460a174", + "created_at":"2020-01-19T06:19:34.344561Z", + "expired_at":null, + "extended_hours":false, + "failed_at":null, + "filled_at":null, + "filled_avg_price":null, + "filled_qty":"0", + "id":"7bb4a536-d59b-4e65-aacf-a8b118d815f4", + "legs":null, + "limit_price":"1", + "order_type":"limit", + "qty":"1", + "replaced_at":null, + "replaced_by":null, + "replaces":null, + "side":"buy", + "status":"canceled", + "stop_price":null, + "submitted_at":"2020-01-19T06:19:34.32909Z", + "symbol":"VMW", + "time_in_force":"gtc", + "type":"limit", + "updated_at":"2020-01-19T06:19:40.147946209Z" + } +}"#; + + // It's hard to compare two JSON objects semantically when all we + // have is their textual representation (as white spaces may be + // different and map items reordered). So we just serialize, + // deserialize, and serialize again, checking that what we + // ultimately end up with is what we started off with. + let update = from_json::(&response).unwrap(); + let json = to_json(&update).unwrap(); + let new = from_json::(&json).unwrap(); + assert_eq!(new, update); + } + #[test(tokio::test)] async fn stream_trade_events() -> Result<(), Error> { // TODO: There may be something amiss here. If we don't cancel the diff --git a/src/api/v2/order.rs b/src/api/v2/order.rs index bbf1c7b3..b91dfa61 100644 --- a/src/api/v2/order.rs +++ b/src/api/v2/order.rs @@ -18,12 +18,14 @@ use uuid::Uuid; use crate::api::v2::asset; use crate::time_util::optional_system_time_from_str; +use crate::time_util::optional_system_time_to_rfc3339; use crate::time_util::system_time_from_str; +use crate::time_util::system_time_to_rfc3339; use crate::Str; /// An ID uniquely identifying an order. -#[derive(Clone, Copy, Debug, Deserialize, PartialEq)] +#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)] pub struct Id(pub Uuid); impl Deref for Id { @@ -36,7 +38,7 @@ impl Deref for Id { /// The status an order can have. -#[derive(Clone, Copy, Debug, Deserialize, PartialEq)] +#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)] pub enum Status { /// The order has been received by Alpaca, and routed to exchanges for /// execution. This is the usual initial state of an order. @@ -180,7 +182,7 @@ pub struct OrderReq { /// A single order as returned by the /v2/orders endpoint on a GET /// request. -#[derive(Clone, Debug, Deserialize, PartialEq)] +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct Order { /// The order's ID. #[serde(rename = "id")] @@ -192,36 +194,45 @@ pub struct Order { #[serde(rename = "status")] pub status: Status, /// Timestamp this order was created at. - #[serde(rename = "created_at", deserialize_with = "system_time_from_str")] + #[serde( + rename = "created_at", + deserialize_with = "system_time_from_str", + serialize_with = "system_time_to_rfc3339", + )] pub created_at: SystemTime, /// Timestamp this order was updated at last. #[serde( rename = "updated_at", deserialize_with = "optional_system_time_from_str", + serialize_with = "optional_system_time_to_rfc3339", )] pub updated_at: Option, /// Timestamp this order was submitted at. #[serde( rename = "submitted_at", deserialize_with = "optional_system_time_from_str", + serialize_with = "optional_system_time_to_rfc3339", )] pub submitted_at: Option, /// Timestamp this order was filled at. #[serde( rename = "filled_at", deserialize_with = "optional_system_time_from_str", + serialize_with = "optional_system_time_to_rfc3339", )] pub filled_at: Option, /// Timestamp this order expired at. #[serde( rename = "expired_at", deserialize_with = "optional_system_time_from_str", + serialize_with = "optional_system_time_to_rfc3339", )] pub expired_at: Option, /// Timestamp this order expired at. #[serde( rename = "canceled_at", deserialize_with = "optional_system_time_from_str", + serialize_with = "optional_system_time_to_rfc3339", )] pub canceled_at: Option, /// The order's asset class. diff --git a/src/events/stream.rs b/src/events/stream.rs index 8237bb9b..517fe68f 100644 --- a/src/events/stream.rs +++ b/src/events/stream.rs @@ -111,6 +111,8 @@ mod tests { use futures::SinkExt; use futures::TryStreamExt; + use serde_json::from_str as from_json; + use test_env_log::test; use tungstenite::tungstenite::Message; @@ -120,6 +122,9 @@ mod tests { use websocket_util::test::mock_server; use websocket_util::test::WebSocketStream; + use crate::api::v2::events; + use crate::api::v2::order; + const KEY_ID: &str = "USER12345678"; const SECRET: &str = "justletmein"; const AUTH_REQ: &str = { @@ -162,6 +167,55 @@ mod tests { stream::(api_info).await } + #[test] + fn parse_trade_event() { + let response = r#"{ + "stream":"trade_updates", + "data":{ + "event":"canceled", + "order":{ + "asset_class":"us_equity", + "asset_id":"3ece3182-5903-4902-b963-f875a0f416e7", + "canceled_at":"2020-01-19T06:19:40.137087268Z", + "client_order_id":"be7d3030-a53e-47ee-9dd3-d9ff3460a174", + "created_at":"2020-01-19T06:19:34.344561Z", + "expired_at":null, + "extended_hours":false, + "failed_at":null, + "filled_at":null, + "filled_avg_price":null, + "filled_qty":"0", + "id":"7bb4a536-d59b-4e65-aacf-a8b118d815f4", + "legs":null, + "limit_price":"1", + "order_type":"limit", + "qty":"1", + "replaced_at":null, + "replaced_by":null, + "replaces":null, + "side":"buy", + "status":"canceled", + "stop_price":null, + "submitted_at":"2020-01-19T06:19:34.32909Z", + "symbol":"VMW", + "time_in_force":"gtc", + "type":"limit", + "updated_at":"2020-01-19T06:19:40.147946209Z" + }, + "timestamp":"2020-01-19T06:19:40.137087268Z" + } +}"#; + + let event = from_json::>(&response).unwrap(); + assert_eq!(event.stream, StreamType::TradeUpdates); + assert_eq!(event.data.0.event, events::TradeStatus::Canceled); + assert_eq!(event.data.0.order.status, order::Status::Canceled); + assert_eq!( + event.data.0.order.time_in_force, + order::TimeInForce::UntilCanceled + ); + } + #[test(tokio::test)] async fn broken_stream() { async fn test(mut stream: WebSocketStream) -> Result<(), WebSocketError> {