Skip to content

Commit

Permalink
Merge pull request oanda#26 from hootnot/generic-streaming
Browse files Browse the repository at this point in the history
Generic streaming
  • Loading branch information
ayfchan committed Mar 7, 2016
2 parents f102007 + 3968557 commit 9f38e00
Show file tree
Hide file tree
Showing 10 changed files with 434 additions and 189 deletions.
31 changes: 22 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,26 +56,39 @@ Rates Streaming
======
Create a custom streamer class to setup how you want to handle the data.
Each tick is sent through the `on_success` and `on_error` functions.
You can override these functions to handle the streaming data.
Since these methods are abstract methods, you need to override these methods
to handle the streaming data.

The following example prints _count_ ticks from the stream then disconnects.

The following example prints the first 10 ticks from the stream then disconnects.

class MyStreamer(oandapy.Streamer):
def __init__(self, *args, **kwargs):
oandapy.Streamer.__init__(self, *args, **kwargs)
self.ticks = 0
def __init__(self, count=10, *args, **kwargs):
super(MyStreamer, self).__init__(*args, **kwargs)
self.count = count
self.reccnt = 0

def on_success(self, data):
self.ticks += 1
print data
if self.ticks == 10:
print data, "\n"
self.reccnt += 1
if self.reccnt == self.count:
self.disconnect()

def on_error(self, data):
self.disconnect()


Initialize an instance of your custom streamer, and start connecting to the stream.
See http://developer.oanda.com/rest-live/streaming/ for further documentation.

account = "12345"
stream = MyStreamer(environment="practice", access_token="abcdefghijk...")
stream.rates(account, instruments="EUR_USD,EUR_JPY,US30_USD,DE30_EUR")


The same procedure can be used for streaming events.


stream = MyStreamer(environment="practice", access_token="abcdefghijk...")
stream.start(accountId=12345, instruments="EUR_USD,USD_CAD")
stream.events(ignore_heartbeat=False)

4 changes: 3 additions & 1 deletion oandapy/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .oandapy import *
from .oandapy import API
from .exceptions import OandaError, BadEnvironment
from .stream.stream import Streamer
20 changes: 20 additions & 0 deletions oandapy/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Exceptions."""


class OandaError(Exception):
""" Generic error class, catches oanda response errors
"""

def __init__(self, error_response):
self.error_response = error_response
msg = "OANDA API returned error code %s (%s) " % \
(error_response['code'], error_response['message'])

super(OandaError, self).__init__(msg)


class BadEnvironment(Exception):
"""environment should be: sandbox, practice or live."""
def __init__(self, environment):
msg = "Environment '%s' does not exist" % environment
super(BadEnvironment, self).__init__(msg)
97 changes: 3 additions & 94 deletions oandapy/oandapy.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import requests
from .exceptions import BadEnvironment, OandaError

""" OANDA API wrapper for OANDA's REST API """

Expand Down Expand Up @@ -223,6 +224,8 @@ def __init__(self,
self.api_url = 'https://api-fxpractice.oanda.com'
elif environment == 'live':
self.api_url = 'https://api-fxtrade.oanda.com'
else:
raise BadEnvironment(environment)

self.access_token = access_token
self.client = requests.Session()
Expand Down Expand Up @@ -272,97 +275,3 @@ def request(self, endpoint, method='GET', params=None):
raise OandaError(content)

return content

"""HTTPS Streaming"""


class Streamer(object):
""" Provides functionality for HTTPS Streaming
Docs: http://developer.oanda.com/rest-live/streaming
"""

def __init__(self, environment="practice", access_token=None):
"""Instantiates an instance of OandaPy's streaming API wrapper.
:param environment: (optional) Provide the environment for oanda's
REST api, either 'practice', or 'live'. Default: practice
:param access_token: (optional) Provide a valid access token if you
have one. This is required if the environment is not sandbox.
"""

if environment == 'practice':
self.api_url = 'https://stream-fxpractice.oanda.com/v1/prices'
elif environment == 'live':
self.api_url = 'https://stream-fxtrade.oanda.com/v1/prices'

self.access_token = access_token
self.client = requests.Session()
self.client.stream = True
self.connected = False

# personal token authentication
if self.access_token:
self.client.headers['Authorization'] = 'Bearer '+self.access_token

def start(self, ignore_heartbeat=True, **params):
""" Starts the stream with the given parameters
:param accountId: (Required) The account that prices are applicable for
:param instruments: (Required) A (URL encoded) comma separated list of
instruments to fetch prices for.
:param ignore_heartbeat: (optional) Whether or not to display the
heartbeat. Default: True
"""
self.connected = True

request_args = {}
request_args['params'] = params

while self.connected:
response = self.client.get(self.api_url, **request_args)

if response.status_code != 200:
self.on_error(response.content)

for line in response.iter_lines(90):
if not self.connected:
break

if line:
data = json.loads(line.decode("utf-8"))
if not (ignore_heartbeat and "heartbeat" in data):
self.on_success(data)

def on_success(self, data):
""" Called when data is successfully retrieved from the stream
Override this to handle your streaming data.
:param data: response object sent from stream
"""

return True

def on_error(self, data):
""" Called when stream returns non-200 status code
Override this to handle your streaming data.
:param data: error response object sent from stream
"""

return

def disconnect(self):
""" Manually disconnects the streaming client
"""
self.connected = False


""" Contains OANDA exception """


class OandaError(Exception):
""" Generic error class, catches oanda response errors
"""

def __init__(self, error_response):
self.error_response = error_response
msg = "OANDA API returned error code %s (%s) " % \
(error_response['code'], error_response['message'])

super(OandaError, self).__init__(msg)
Empty file added oandapy/stream/__init__.py
Empty file.
134 changes: 134 additions & 0 deletions oandapy/stream/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import warnings

import json
import requests
from ..exceptions import BadEnvironment
from abc import ABCMeta, abstractmethod

""" OANDA API wrapper for OANDA's REST API """

""" HTTPS Streaming """


class EndpointsMixin(object):

"""Stream"""

def rates(self, account_id, instruments, **params):
""" Get streaming rates
Docs: http://developer.oanda.com/rest-live/streaming
:param accountId: (Required) The account that prices are applicable for
:param instruments: (Required) A (URL encoded) comma separated list of
instruments to fetch prices for.
"""
params['accountId'] = account_id
params['instruments'] = instruments
endpoint = 'v1/prices'
return self.run(endpoint, params=params)

def events(self, **params):
""" Get streaming events
Docs: http://developer.oanda.com/rest-live/streaming
"""
endpoint = 'v1/events'
return self.run(endpoint, params=params)


class Streamer(EndpointsMixin, object):
""" Provides functionality for HTTPS Streaming
"""
__metaclass__ = ABCMeta

def __init__(self, environment, access_token):
"""Instantiates an instance of OandaPy's streaming API wrapper.
:param environment: (required) Provide the environment for oanda's
REST api, either 'practice', or 'live'.
:param access_token: (required)
"""

if environment == 'practice':
self.api_url = 'https://stream-fxpractice.oanda.com'
elif environment == 'live':
self.api_url = 'https://stream-fxtrade.oanda.com'
else:
raise BadEnvironment(environment)

self.access_token = access_token
self.client = requests.Session()
self.client.stream = True
self.connected = False

# personal token authentication
if self.access_token:
self.client.headers['Authorization'] = 'Bearer '+self.access_token

def start(self, ignore_heartbeat=True, **params):
""" This method only serves backwards compatibility with the
pre-EndpointsMixin version that only streamed prices
"""
warnings.warn("Streamer() supports the use of multiple endpoints "
"use the rates() method instead",
stacklevel=2)
params['ignore_heartbeat'] = ignore_heartbeat
self.run("v1/prices", params=params)

def run(self, endpoint, params=None):
""" Starts the stream with the given parameters
:param ignore_heartbeat: (optional) Whether or not to display the
heartbeat. Default: True
"""
self.connected = True

params = params or {}

ignore_heartbeat = None
if "ignore_heartbeat" in params:
ignore_heartbeat = params['ignore_heartbeat']

request_args = {}
request_args['params'] = params

url = '%s/%s' % (self.api_url, endpoint)

while self.connected:
response = self.client.get(url, **request_args)

if response.status_code != 200:
self.on_error(response.content)

for line in response.iter_lines(90):
if not self.connected:
break

if line:
data = json.loads(line.decode("utf-8"))
if not (ignore_heartbeat and "heartbeat" in data):
self.on_success(data)

@abstractmethod
def on_success(self, data):
""" Called when data is successfully retrieved from the stream
Override this to handle your streaming data.
:param data: response object sent from stream
"""

return True

@abstractmethod
def on_error(self, data):
""" Called when stream returns non-200 status code
Override this to handle your streaming data.
:param data: error response object sent from stream
"""

return

def disconnect(self):
""" Manually disconnects the streaming client
"""
self.connected = False


class StreamerError(Exception):
def __init__(self, msg):
super(StreamerError, self).__init__(msg)
Loading

0 comments on commit 9f38e00

Please sign in to comment.