Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add Q-Learning and step-based controller #93

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bolero/behavior_search/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .black_box_search import (BlackBoxSearch, ContextualBlackBoxSearch,
JustOptimizer, JustContextualOptimizer)
from .monte_carlo_rl import MonteCarloRL
from .qlearning import QLearning

__all__ = ["BehaviorSearch", "ContextualBehaviorSearch", "BlackBoxSearch",
"ContextualBlackBoxSearch", "JustOptimizer",
Expand Down
15 changes: 8 additions & 7 deletions bolero/behavior_search/monte_carlo_rl.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def get_outputs(self, outputs):
outputs : array-like, shape = (n_outputs,)
outputs, e.g. next action, will be updated
"""
outputs[0] = self.action
outputs[:] = self.action
self.actions_taken.append(self.action)

def step(self):
Expand All @@ -96,15 +96,16 @@ def step(self):
Uses the inputs and meta-parameters to compute the outputs.
"""
if self.random_state.rand() < self.epsilon:
self.action = self.random_state.choice(self.action_space)
i = self.random_state.randint(len(self.action_space))
self.action = self.action_space[i]
else:
self._select_best_action()

def _select_best_action(self):
Qs = np.array([self.Q[self.s][a] for a in self.action_space])
best_actions = np.where(Qs == max(Qs))[0]
self.action = self.action_space[
self.random_state.choice(best_actions)]
i = self.random_state.randint(len(best_actions))
self.action = best_actions[i]

def can_step(self):
"""Returns if step() can be called again.
Expand All @@ -121,9 +122,9 @@ class MonteCarloRL(BehaviorSearch, PickableMixin):
"""Tabular Monte Carlo is a model-free reinforcement learning method.

This implements the epsilon-soft on-policy Monte Carlo control algorithm
shown at page 120 of "Reinforcement Learning: An Introduction"
shown at page 101 of "Reinforcement Learning: An Introduction"
(Sutton and Barto, 2nd edition,
http://people.inf.elte.hu/lorincz/Files/RL_2006/SuttonBook.pdf).
http://incompleteideas.net/book/bookdraft2018mar21.pdf).
The action space and the state space must be discrete for this
implementation.

Expand Down Expand Up @@ -164,7 +165,7 @@ def init(self, n_inputs, n_outputs):
"""
assert n_inputs == 1, "discrete state space required"
assert n_outputs == 1, "discrete action space required"
self.Q = defaultdict(lambda: dict((a, 0.0) for a in self.action_space))
self.Q = defaultdict(lambda: defaultdict(lambda: 0.0))
self.policy = EpsilonGreedyPolicy(
self.Q, self.action_space, self.epsilon, self.random_state)
self.returns = defaultdict(lambda: defaultdict(lambda: []))
Expand Down
124 changes: 124 additions & 0 deletions bolero/behavior_search/qlearning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import numpy as np
from collections import defaultdict
from .behavior_search import BehaviorSearch, PickableMixin
from ..representation import Behavior
from .monte_carlo_rl import EpsilonGreedyPolicy


class QLearning(BehaviorSearch, PickableMixin):
"""Q-Learning is a model-free reinforcement learning method.

This implements the epsilon-soft off-policy TD control algorithm
Q-learning shown at page 131 of "Reinforcement Learning: An Introduction"
(Sutton and Barto, 2nd edition,
http://incompleteideas.net/book/bookdraft2018mar21.pdf).
The action space and the state space must be discrete for this
implementation.

Parameters
----------
action_space : list
Actions that the agent can select from

alpha : float, optional (default: 0.1)
The learning rate. Must be within (0, 1].

gamma : float, optional (default: 0.9)
Discount factor for the discounted infinite horizon model

epsilon : float, optional (default: 0.1)
Exploration probability for epsilon-greedy policy

convergence_threshold : float, optional (default: 1e-3)
Learning will be stopped if the maximum difference of the value
function between iterations is below this threshold

random_state : int or RandomState, optional (default: None)
Seed for the random number generator or RandomState object.
"""
def __init__(self, action_space, alpha=0.1, gamma=0.9, epsilon=0.1,
random_state=None):
self.alpha = alpha
self.action_space = action_space
self.gamma = gamma
self.epsilon = epsilon
self.random_state = random_state

def init(self, n_inputs, n_outputs):
"""Initialize the behavior search.

Parameters
----------
n_inputs : int
number of inputs of the behavior

n_outputs : int
number of outputs of the behavior
"""
self.Q = defaultdict(lambda: defaultdict(lambda: 0.0))
self.policy = EpsilonGreedyPolicy(
self.Q, self.action_space, self.epsilon, self.random_state)
self.returns = defaultdict(lambda: defaultdict(lambda: []))
self.done = False

def get_next_behavior(self):
"""Obtain next behavior for evaluation.

Returns
-------
behavior : Behavior
mapping from input to output
"""
self.policy.init(1, 1)
return self.policy

def set_evaluation_feedback(self, feedbacks):
"""Set feedback for the last behavior.

Parameters
----------
feedbacks : list of float
feedback for each step or for the episode, depends on the problem
"""
visited_states = self.policy.visited_states
actions_taken = self.policy.actions_taken

if len(visited_states) < 2:
return
if len(feedbacks) < 1:
return

self.policy.visited_states = visited_states[1:]
self.policy.actions_taken = actions_taken[1:]

s = visited_states[0]
s2 = visited_states[1]
a = actions_taken[0]
r = feedbacks[0]

last_Q = self.Q[s][a]
best_next_Q = np.max([self.Q[s2][a] for a in self.action_space])
td_error = r + self.gamma * best_next_Q - last_Q
self.Q[s][a] = last_Q + self.alpha * td_error

def is_behavior_learning_done(self):
"""Check if the value function converged.

Returns
-------
finished : bool
Is the learning of a behavior finished?
"""
return False # TODO find a more intelligent way to terminate...

def get_best_behavior(self):
"""Returns the best behavior found so far.

Returns
-------
behavior : Behavior
mapping from input to output
"""
policy = EpsilonGreedyPolicy(self.Q, self.action_space, epsilon=0.0)
policy.init(1, 1)
return policy
3 changes: 2 additions & 1 deletion bolero/controller/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .controller import Controller, ContextualController
from .stepbasedcontroller import StepBasedController


__all__ = ["Controller", "ContextualController"]
__all__ = ["Controller", "ContextualController", "StepBasedController"]
158 changes: 158 additions & 0 deletions bolero/controller/stepbasedcontroller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import numpy as np
from.controller import Controller
from ..utils.validation import check_feedback


class StepBasedController(Controller):
def __init__(self, config=None, environment=None, behavior_search=None,
**kwargs):
super(StepBasedController, self).__init__(
config, environment, behavior_search, **kwargs)

def learn(self, meta_parameter_keys=(), meta_parameters=()):
"""Learn the behavior.

Parameters
----------
meta_parameter_keys : list
Meta parameter keys

meta_parameters : list
Meta parameter values

Returns
-------
feedback_history : array, shape (n_episodes or less, dim_feedback)
Feedbacks for each episode. If is_behavior_learning_done is True
before the n_episodes is reached, the length of feedback_history
is shorter than n_episodes.
"""
feedback_history = []
for _ in range(self.n_episodes):
feedbacks = self.episode(meta_parameter_keys, meta_parameters)
feedback_history.append(feedbacks)
if (self.finish_after_convergence and
(self.behavior_search.is_behavior_learning_done() or
self.environment.is_behavior_learning_done())):
break
if self.verbose >= 2:
print("[Controller] Terminated because of:\nbehavior_search: %s, "
"environment: %s"
% (self.behavior_search.is_behavior_learning_done(),
self.environment.is_behavior_learning_done()))
return np.array(feedback_history)

def episode(self, meta_parameter_keys=(), meta_parameters=()):
"""Execute one learning episode.

Parameters
----------
meta_parameter_keys : array-like, shape = (n_meta_parameters,)
Meta parameter keys

meta_parameters : array-like, shape = (n_meta_parameters,)
Meta parameter values

Returns
-------
accumulated_feedback : float or array-like, shape = (n_feedbacks,)
Feedback(s) of the episode
"""
if self.behavior_search is None:
raise ValueError("A BehaviorSearch is required to execute an "
"episode without specifying a behavior.")

if self.verbose >= 1:
print("[Controller] Episode: #%d" % (self.episode_cnt + 1))

behavior = self.behavior_search.get_next_behavior()
feedbacks = self.episode_with(behavior, meta_parameter_keys,
meta_parameters, learn=True)
self.behavior_search.set_evaluation_feedback(feedbacks)

if self.verbose >= 2:
if self.accumulate_feedbacks:
print("[Controller] Accumulated feedback: %g"
% np.sum(feedbacks))
else:
print("[Controller] Feedbacks: %s"
% np.array_str(feedbacks, precision=4))

self.episode_cnt += 1

if self.do_test and self.episode_cnt % self.n_episodes_before_test == 0:
self.test_results_.append(
self._perform_test(meta_parameter_keys, meta_parameters))

feedbacks = check_feedback(
feedbacks, compute_sum=self.accumulate_feedbacks)

return feedbacks

def episode_with(self, behavior, meta_parameter_keys=[],
meta_parameters=[], record=True, learn=False):
"""Execute a behavior in the environment.

Parameters
----------
behavior : Behavior
Fix behavior

meta_parameter_keys : list, optional (default: [])
Meta parameter keys

meta_parameters : list, optional (default: [])
Meta parameter values

record : bool, optional (default: True)
Record feedbacks or trajectories if activated

learn : bool, optional (default: False)
Use rewards to improve behavior

Returns
-------
feedbacks : array, shape (n_steps,)
Feedback for each step in the environment
"""
behavior.set_meta_parameters(meta_parameter_keys, meta_parameters)
self.environment.reset()

if self.record_inputs:
inputs = []
if self.record_outputs:
outputs = []

feedbacks = []

# Sense initial state
self.environment.get_outputs(self.outputs)
while not self.environment.is_evaluation_done():
behavior.set_inputs(self.outputs)
if behavior.can_step():
behavior.step()
behavior.get_outputs(self.inputs)
# Act
self.environment.set_inputs(self.inputs)
self.environment.step_action()
# Sense
self.environment.get_outputs(self.outputs)
reward = self.environment.get_feedback()
if learn:
self.behavior_search.set_evaluation_feedback(reward)
feedbacks.extend(list(reward))

if record:
if self.record_inputs:
inputs.append(self.inputs.copy())
if self.record_outputs:
outputs.append(self.outputs.copy())

if record:
if self.record_inputs:
self.inputs_.append(inputs)
if self.record_outputs:
self.outputs_.append(outputs)
if self.record_feedbacks:
self.feedbacks_.append(feedbacks)
return feedbacks
Loading