Skip to content

Commit

Permalink
Add serialization support for account & trade events
Browse files Browse the repository at this point in the history
Certain clients may be interested in printing account or trade events in
JSON format, for interoperability with other utilities. This change adds
such support to the types involved.
  • Loading branch information
d-e-s-o committed Jan 16, 2020
1 parent e986e55 commit 8d11997
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ Unreleased
----------
- Added support for accessing `/v2/account/configurations` endpoint
- Added support for querying `/v1/bars/<timeframe>` endpoint
- Added support for serializing account & trade events
- Switched from using `log` to `tracing` as a logging/tracing provider
- Switched to using `serde_urlencoded` for encoding query parameters
- Bumped `http-endpoint` dependency to `0.1.1`
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ hyper = {version = "0.13", features = ["stream"]}
hyper-tls = {version = "0.4", default-features = false}
num-decimal = {version = "0.1", features = ["serde"]}
serde = {version = "1.0", features = ["derive"]}
serde_json = "1.0"
serde_json = {version = "1.0", default-features = false}
serde_urlencoded = {version = "0.6", default-features = false}
tracing = {version = "0.1", default-features = false, features = ["attributes", "std"]}
tracing-futures = {version = "0.2", default-features = false, features = ["std-future"]}
Expand Down
3 changes: 2 additions & 1 deletion src/api/v2/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::time::SystemTime;
use num_decimal::Num;

use serde::Deserialize;
use serde::Serialize;

use uuid::Uuid;

Expand All @@ -15,7 +16,7 @@ use crate::Str;


/// A type representing an account ID.
#[derive(Clone, Copy, Debug, Deserialize, PartialEq)]
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)]
pub struct Id(pub Uuid);

impl Deref for Id {
Expand Down
61 changes: 58 additions & 3 deletions src/api/v2/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ use std::time::SystemTime;
use num_decimal::Num;

use serde::Deserialize;
use serde::Serialize;

use crate::api::v2::account;
use crate::api::v2::order;
use crate::events::EventStream;
use crate::events::StreamType;
use crate::time_util::optional_system_time_from_str;
use crate::time_util::optional_system_time_to_rfc3339;


/// A representation of an account update that we receive through the
/// "account_updates" stream.
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub struct AccountUpdate {
/// The corresponding account's ID.
#[serde(rename = "id")]
Expand All @@ -25,18 +27,21 @@ pub struct AccountUpdate {
#[serde(
rename = "created_at",
deserialize_with = "optional_system_time_from_str",
serialize_with = "optional_system_time_to_rfc3339",
)]
pub created_at: Option<SystemTime>,
/// The time the account was updated last.
#[serde(
rename = "updated_at",
deserialize_with = "optional_system_time_from_str",
serialize_with = "optional_system_time_to_rfc3339",
)]
pub updated_at: Option<SystemTime>,
/// The time the account was deleted at.
#[serde(
rename = "deleted_at",
deserialize_with = "optional_system_time_from_str",
serialize_with = "optional_system_time_to_rfc3339",
)]
pub deleted_at: Option<SystemTime>,
/// The account's status.
Expand Down Expand Up @@ -69,7 +74,7 @@ impl EventStream for AccountUpdates {


/// The status of a trade, as reported as part of a `TradeUpdate`.
#[derive(Clone, Copy, Debug, Deserialize, PartialEq)]
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)]
pub enum TradeStatus {
/// The order has been received by Alpaca, and routed to exchanges for
/// execution.
Expand Down Expand Up @@ -122,7 +127,8 @@ pub enum TradeStatus {

/// A representation of a trade update that we receive through the
/// "trade_updates" stream.
#[derive(Clone, Debug, Deserialize, PartialEq)]
// TODO: There is also a timestamp field that we may want to hook up.
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub struct TradeUpdate {
/// The event that occurred.
#[serde(rename = "event")]
Expand Down Expand Up @@ -157,6 +163,9 @@ mod tests {

use http_endpoint::Error as EndpointError;

use serde_json::from_str as from_json;
use serde_json::to_string as to_json;

use test_env_log::test;

use url::Url;
Expand All @@ -169,6 +178,52 @@ mod tests {
use crate::Error;


#[test]
fn deserialize_and_serialize_trade_event() {
let response = r#"{
"event":"canceled",
"order":{
"asset_class":"us_equity",
"asset_id":"3ece3182-5903-4902-b963-f875a0f416e7",
"canceled_at":"2020-01-19T06:19:40.137087268Z",
"client_order_id":"be7d3030-a53e-47ee-9dd3-d9ff3460a174",
"created_at":"2020-01-19T06:19:34.344561Z",
"expired_at":null,
"extended_hours":false,
"failed_at":null,
"filled_at":null,
"filled_avg_price":null,
"filled_qty":"0",
"id":"7bb4a536-d59b-4e65-aacf-a8b118d815f4",
"legs":null,
"limit_price":"1",
"order_type":"limit",
"qty":"1",
"replaced_at":null,
"replaced_by":null,
"replaces":null,
"side":"buy",
"status":"canceled",
"stop_price":null,
"submitted_at":"2020-01-19T06:19:34.32909Z",
"symbol":"VMW",
"time_in_force":"gtc",
"type":"limit",
"updated_at":"2020-01-19T06:19:40.147946209Z"
}
}"#;

// It's hard to compare two JSON objects semantically when all we
// have is their textual representation (as white spaces may be
// different and map items reordered). So we just serialize,
// deserialize, and serialize again, checking that what we
// ultimately end up with is what we started off with.
let update = from_json::<TradeUpdate>(&response).unwrap();
let json = to_json(&update).unwrap();
let new = from_json::<TradeUpdate>(&json).unwrap();
assert_eq!(new, update);
}

#[test(tokio::test)]
async fn stream_trade_events() -> Result<(), Error> {
// TODO: There may be something amiss here. If we don't cancel the
Expand Down
19 changes: 15 additions & 4 deletions src/api/v2/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ use uuid::Uuid;

use crate::api::v2::asset;
use crate::time_util::optional_system_time_from_str;
use crate::time_util::optional_system_time_to_rfc3339;
use crate::time_util::system_time_from_str;
use crate::time_util::system_time_to_rfc3339;
use crate::Str;


/// An ID uniquely identifying an order.
#[derive(Clone, Copy, Debug, Deserialize, PartialEq)]
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)]
pub struct Id(pub Uuid);

impl Deref for Id {
Expand All @@ -36,7 +38,7 @@ impl Deref for Id {


/// The status an order can have.
#[derive(Clone, Copy, Debug, Deserialize, PartialEq)]
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)]
pub enum Status {
/// The order has been received by Alpaca, and routed to exchanges for
/// execution. This is the usual initial state of an order.
Expand Down Expand Up @@ -180,7 +182,7 @@ pub struct OrderReq {

/// A single order as returned by the /v2/orders endpoint on a GET
/// request.
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub struct Order {
/// The order's ID.
#[serde(rename = "id")]
Expand All @@ -192,36 +194,45 @@ pub struct Order {
#[serde(rename = "status")]
pub status: Status,
/// Timestamp this order was created at.
#[serde(rename = "created_at", deserialize_with = "system_time_from_str")]
#[serde(
rename = "created_at",
deserialize_with = "system_time_from_str",
serialize_with = "system_time_to_rfc3339",
)]
pub created_at: SystemTime,
/// Timestamp this order was updated at last.
#[serde(
rename = "updated_at",
deserialize_with = "optional_system_time_from_str",
serialize_with = "optional_system_time_to_rfc3339",
)]
pub updated_at: Option<SystemTime>,
/// Timestamp this order was submitted at.
#[serde(
rename = "submitted_at",
deserialize_with = "optional_system_time_from_str",
serialize_with = "optional_system_time_to_rfc3339",
)]
pub submitted_at: Option<SystemTime>,
/// Timestamp this order was filled at.
#[serde(
rename = "filled_at",
deserialize_with = "optional_system_time_from_str",
serialize_with = "optional_system_time_to_rfc3339",
)]
pub filled_at: Option<SystemTime>,
/// Timestamp this order expired at.
#[serde(
rename = "expired_at",
deserialize_with = "optional_system_time_from_str",
serialize_with = "optional_system_time_to_rfc3339",
)]
pub expired_at: Option<SystemTime>,
/// Timestamp this order expired at.
#[serde(
rename = "canceled_at",
deserialize_with = "optional_system_time_from_str",
serialize_with = "optional_system_time_to_rfc3339",
)]
pub canceled_at: Option<SystemTime>,
/// The order's asset class.
Expand Down
54 changes: 54 additions & 0 deletions src/events/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ mod tests {
use futures::SinkExt;
use futures::TryStreamExt;

use serde_json::from_str as from_json;

use test_env_log::test;

use tungstenite::tungstenite::Message;
Expand All @@ -120,6 +122,9 @@ mod tests {
use websocket_util::test::mock_server;
use websocket_util::test::WebSocketStream;

use crate::api::v2::events;
use crate::api::v2::order;

const KEY_ID: &str = "USER12345678";
const SECRET: &str = "justletmein";
const AUTH_REQ: &str = {
Expand Down Expand Up @@ -162,6 +167,55 @@ mod tests {
stream::<S>(api_info).await
}

#[test]
fn parse_trade_event() {
let response = r#"{
"stream":"trade_updates",
"data":{
"event":"canceled",
"order":{
"asset_class":"us_equity",
"asset_id":"3ece3182-5903-4902-b963-f875a0f416e7",
"canceled_at":"2020-01-19T06:19:40.137087268Z",
"client_order_id":"be7d3030-a53e-47ee-9dd3-d9ff3460a174",
"created_at":"2020-01-19T06:19:34.344561Z",
"expired_at":null,
"extended_hours":false,
"failed_at":null,
"filled_at":null,
"filled_avg_price":null,
"filled_qty":"0",
"id":"7bb4a536-d59b-4e65-aacf-a8b118d815f4",
"legs":null,
"limit_price":"1",
"order_type":"limit",
"qty":"1",
"replaced_at":null,
"replaced_by":null,
"replaces":null,
"side":"buy",
"status":"canceled",
"stop_price":null,
"submitted_at":"2020-01-19T06:19:34.32909Z",
"symbol":"VMW",
"time_in_force":"gtc",
"type":"limit",
"updated_at":"2020-01-19T06:19:40.147946209Z"
},
"timestamp":"2020-01-19T06:19:40.137087268Z"
}
}"#;

let event = from_json::<stream::Event<events::TradeUpdate>>(&response).unwrap();
assert_eq!(event.stream, StreamType::TradeUpdates);
assert_eq!(event.data.0.event, events::TradeStatus::Canceled);
assert_eq!(event.data.0.order.status, order::Status::Canceled);
assert_eq!(
event.data.0.order.time_in_force,
order::TimeInForce::UntilCanceled
);
}

#[test(tokio::test)]
async fn broken_stream() {
async fn test(mut stream: WebSocketStream) -> Result<(), WebSocketError> {
Expand Down

0 comments on commit 8d11997

Please sign in to comment.