Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added discrete states (Closes #117) #178

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
541 changes: 541 additions & 0 deletions examples/discrete_state.ipynb

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/progpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

__all__ = ['predictors', 'uncertain_data', 'state_estimators', 'run_prog_playback', 'metrics']

from progpy.discrete_state import create_discrete_state
from progpy.prognostics_model import PrognosticsModel
from progpy.ensemble_model import EnsembleModel
from progpy.composite_model import CompositeModel
Expand Down
99 changes: 99 additions & 0 deletions src/progpy/discrete_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Copyright © 2021 United States Government as represented by the Administrator of the
# National Aeronautics and Space Administration. All Rights Reserved.

import enum
import random

# Transition Functions
def _random_transition(state, disruption):
"""
Random transition from one state to another if disruption is >= 0.5
"""
if abs(disruption) >= 0.5:
return random.randint(0, len(type(state))-1)
return state

def _sequential_transition(state, disruption):
"""
Sequential transition from one state to the next if disruption is >= 0.5.

Examples:
Mode 1 + 0.5 -> Mode 2
Mode 2 - 0.5 -> Mode 1
Mode 1 + 1.5 -> Mode 3
Mode 1 + 0.4 -> Mode 1
"""
return state._value_ + round(disruption)

def _no_transition(state, disruption):
"""
No transition at all. This is used for case where all transitions are done manually (e.g., in state transition equation). This way state will not be effected by noise
"""
return state

TRANSITION_LOOKUP = {
'random': _random_transition,
'sequential': _sequential_transition,
'none': _no_transition
}

class DiscreteState(enum.Enum):
"""
.. versionadded:: 1.8.0

Class for discrete state. Users wont be constructing this directly, but will instead be using the factory function create_discrete_state.

This is an enum, so discrete states will be accessed directly. For example:
DiscreteState.state_name
DiscreteState(1)

The add method is overwritten to provide logic for transition (according to provided function)
"""
def __add__(self, other):
return type(self)(self._transition(other))

def __sub__(self, other):
return type(self)(self._transition(-other))

def create_discrete_state(n_states: int, names: list=None, transition=_random_transition) -> DiscreteState:
"""
.. versionadded:: 1.8.0

Crate a discrete state for use with a progpy model. Users construct a discrete state for the default x0 to make that state discrete.

Args:
n_states (int): Number of possible states.
names (list[str], optional): Names for states. Defaults to using "State [#]" for each state (e.g., "State 1")
transition ({function, str}, optional): Transition logic. Can be either a string ('random', 'none', or 'sequential') or a function (DiscreteState, float)->int of state and disruption to state number. Defaults to "random".

Returns:
DiscreteState class: Class to construct a discrete state

Example:
>>> Switch = create_discrete_state(2, ['on', 'off'])
>>> x0['switch'] = Switch.off

Example:
>>> # Representing 'gear' of car
>>> Gear = create_discrete_state(5, transition='sequential')
>>> x0['gear] = Gear(1)
"""
# Input Validation
if isinstance(transition, str):
if transition in TRANSITION_LOOKUP:
transition = TRANSITION_LOOKUP[transition]
else:
raise Exception(f'Transition name {transition} not recognized. Supported modes are {str(list(TRANSITION_LOOKUP.keys()))}')

if names is None:
names = [f'State {i}' for i in range(n_states)]
elif len(names) != n_states:
raise ValueError(f'If providing names, must provide one for each state. Provided {len(names)} for {n_states} states.')

# Enumerated states
members = {name: i for i, name in enumerate(names)}

# Transition is set to be nonmember (meaning it's not an enumerated state)
members['_transition'] = enum.nonmember(transition)

return DiscreteState('Discrete State', members)
2 changes: 1 addition & 1 deletion src/progpy/mixture_of_experts.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def next_state(self, x, u, dt):

# If z is present and not none - update score
if (len(set(self.outputs)- set(u.keys())) == 0 and # Not missing an output
not np.any(np.isnan([u[key] for key in self.outputs]))): # Not case where Output is NaN
not np.any(np.isnan([u[key] if isinstance(u[key], (float, int, complex)) else 0 for key in self.outputs]))): # Not case where Output is NaN
# If none in not u, that means that we have an updated output, so update the scores
# u excluded when there is not update
mses = []
Expand Down
40 changes: 23 additions & 17 deletions src/progpy/models/aircraft_model/small_rotorcraft.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def __init__(self, **kwargs):
air_density=self.parameters['air_density']),
'lift': None}

def dx(self, x, u):
def next_state(self, x, u, dt):
# Extract useful values
m = self.parameters['mass']['total'] # vehicle mass
Ixx, Iyy, Izz = self.parameters['mass']['Ixx'], self.parameters['mass']['Iyy'], self.parameters['mass']['Izz'] # vehicle inertia
Expand Down Expand Up @@ -217,27 +217,33 @@ def dx(self, x, u):

# Update state vector
# -------------------
dxdt = np.zeros((len(x),))

dxdt[0] = vx_a # x-position increment (airspeed along x-direction)
dxdt[1] = vy_a # y-position increment (airspeed along y-direction)
dxdt[2] = vz_a # z-position increment (airspeed along z-direction)
# Positions
x['x'] += vx_a*dt
x['y'] += vy_a*dt
x['z'] += vz_a*dt

dxdt[3] = p + q * sin_phi * tan_theta + r * cos_phi * tan_theta # Euler's angle phi increment
dxdt[4] = q * cos_phi - r * sin_phi # Euler's angle theta increment
dxdt[5] = q * sin_phi / cos_theta + r * cos_phi / cos_theta # Euler's angle psi increment
# Euler's angles
x['phi'] += (p + q * sin_phi * tan_theta + r * cos_phi * tan_theta)*dt
x['theta'] += (q * cos_phi - r * sin_phi)*dt
x['psi'] += (q * sin_phi / cos_theta + r * cos_phi / cos_theta)*dt

dxdt[6] = ((sin_theta * cos_psi * cos_phi + sin_phi * sin_psi) * T - fe_drag[0]) / m # Acceleration along x-axis
dxdt[7] = ((sin_theta * sin_psi * cos_phi - sin_phi * cos_psi) * T - fe_drag[1]) / m # Acceleration along y-axis
dxdt[8] = -self.parameters['gravity'] + (cos_phi * cos_theta * T - fe_drag[2]) / m # Acceleration along z-axis
# Velocities
x['vx'] += dt*((sin_theta * cos_psi * cos_phi + sin_phi * sin_psi) * T - fe_drag[0]) / m
x['vy'] += dt*((sin_theta * sin_psi * cos_phi - sin_phi * cos_psi) * T - fe_drag[1]) / m
x['vz'] += dt*((cos_phi * cos_theta * T - fe_drag[2]) / m - self.parameters['gravity'])

dxdt[9] = ((Iyy - Izz) * q * r + tp * self.parameters['geom']['arm_length']) / Ixx # Angular acceleration along body x-axis: roll rate
dxdt[10] = ((Izz - Ixx) * p * r + tq * self.parameters['geom']['arm_length']) / Iyy # Angular acceleration along body y-axis: pitch rate
dxdt[11] = ((Ixx - Iyy) * p * q + tr * 1) / Izz # Angular acceleration along body z-axis: yaw rate
dxdt[12] = 1 # Auxiliary time variable
dxdt[13] = (u['mission_complete'] - x['mission_complete']) / self.parameters['dt'] # Value to keep track of percentage of mission completed
# Angular rates
x['p'] += dt*((Iyy - Izz) * q * r + tp * self.parameters['geom']['arm_length']) / Ixx
x['q'] += dt*((Izz - Ixx) * p * r + tq * self.parameters['geom']['arm_length']) / Iyy
x['r'] += dt*((Ixx - Iyy) * p * q + tr * 1) / Izz

return self.StateContainer(np.array([np.atleast_1d(item) for item in dxdt]))
# Time
x['t'] += dt

x['mission_complete'] = u['mission_complete']

return x

def event_state(self, x) -> dict:
# Based on percentage of reference trajectory completed
Expand Down
106 changes: 106 additions & 0 deletions src/progpy/models/test_models/tank_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Copyright © 2021 United States Government as represented by the Administrator of the
# National Aeronautics and Space Administration. All Rights Reserved.

import math
from progpy import PrognosticsModel, create_discrete_state

ValveState = create_discrete_state(2, ['open', 'closed'])

class Tank(PrognosticsModel):
"""
A simple model of liquid in a tank draining through a drain with a controllable valve. This is used as an example for discrete states. Default parameters represent a tank that is a cube with 1m along each edge

:term:`Events<event>`: (2)
full: The tank is full
empty: The tank is empty

:term:`Inputs/Loading<input>`: (2)
| q_in: Flowrate into the tank (m^3/s)
| valve_command (DiscreteState): Discrete state to command the valve

:term:`States<state>`: (2)
| state (DiscreteState): state of the valve
| h: Height of the fluid in the tank (m)

:term:`Outputs<output>`: (1)
h: Height of the fluid in the tank (m)

Keyword Args
------------
process_noise : Optional, float or dict[str, float]
:term:`Process noise<process noise>` (applied at dx/next_state).
Can be number (e.g., .2) applied to every state, a dictionary of values for each
state (e.g., {'x1': 0.2, 'x2': 0.3}), or a function (x) -> x
process_noise_dist : Optional, str
distribution for :term:`process noise` (e.g., normal, uniform, triangular)
measurement_noise : Optional, float or dict[str, float]
:term:`Measurement noise<measurement noise>` (applied in output eqn).
Can be number (e.g., .2) applied to every output, a dictionary of values for each
output (e.g., {'z1': 0.2, 'z2': 0.3}), or a function (z) -> z
measurement_noise_dist : Optional, str
distribution for :term:`measurement noise` (e.g., normal, uniform, triangular)
crosssection_area : Optional, float
Crosssectional area of the tank in m^2. Default is 1
height : Optional, float
Height of the tank in m. Default is 1
rho : Optional, float
Fluid density in kg/m^3. Default is for water
g : Optional, float
Acceleration due to gravity in m/s^2. Default is earth gravity at surface
valve_r : Optional, float
Radius of valve opening in m
valve_l : Optional, float
Length of valve in m
viscosity : Optional, float
Viscosity of the fluid in Pa*s. Default is for water
x0 : Optional, dict
Initial State
"""
inputs = ['q_in', 'valve_command']
states = ['valve', 'h']
outputs = ['h']
events = ['full', 'empty']

default_parameters = {
'crosssection_area': 1,
'height': 1,
'rho': 1000,
'g': -9.81,
'valve_r': 3e-3,
'valve_l': 0.001,
'viscosity': 1e-3,
'x0': {
'valve': ValveState.closed,
'h': 0,
}
}

state_limits = {
'h': (0, float('inf'))
}

def next_state(self, x, u, dt):
x['valve'] = ValveState(u['valve_command'])

# Relative pressure of fluid
p = self['rho']*self['g']*x['h']
if x['valve'] == ValveState.open:
# flow rate out through valve m^3/s
q_out = p*math.pi*self['valve_r']**4/(8*self['viscosity']*self['valve_l'])
else:
# Valve is closed, no flow
q_out = 0
x['h'] += (u['q_in']+q_out)*dt/self['crosssection_area']

# Limit to height of tank
x['h'] = min(x['h'], self['height'])
return x

def output(self, x):
return self.OutputContainer({'h': x['h']})

def event_state(self, x):
return {
'full': (self['height']-x['h'])/self['height'],
'empty': x['h']/self['height']
}
32 changes: 28 additions & 4 deletions src/progpy/utils/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Union
from warnings import warn

int_fix = lambda x: np.float64(x) if isinstance(x, (int, type(None))) else x

class DictLikeMatrixWrapper():
"""
Expand All @@ -24,17 +25,33 @@ def __init__(self, keys: list, data: Union[dict, np.array]):
keys = list(keys) # creates list with keys
self._keys = keys.copy()
if isinstance(data, np.matrix):
self._matrix = np.array(data, dtype=np.float64)
self._matrix = np.array(data)
if np.issubdtype(data.dtype, (np.integer)):
# If integer, switch to float
# Using int type will force results to remain ints (so if you add float to it
# then there will be an error or it will again round to int
data = np.array(data, dtype=np.float64)
elif isinstance(data, np.ndarray):
if data.ndim == 1:
data = data[np.newaxis].T
if np.issubdtype(data.dtype, np.integer):
# If integer, switch to float
# Using int type will force results to remain ints (so if you add float to it
# then there will be an error or it will again round to int
data = np.array(data, dtype=np.float64)
elif np.issubdtype(data.dtype, np.dtype('O')):
# if "object" (e.g., includes DiscreteState or None)
# Make sure each element if float or object
for i in range(data.shape[0]):
for j in range(data.shape[1]):
data[i][j] = int_fix(data[i][j])
self._matrix = data
elif isinstance(data, (dict, DictLikeMatrixWrapper)):
# ravel is used to prevent vectorized case, where data[key] returns multiple values, from resulting in a 3D matrix
self._matrix = np.array(
[
np.ravel([data[key]]) if key in data else [None] for key in keys
], dtype=np.float64)
np.ravel([int_fix(data[key])]) if key in data else [None] for key in keys
])
else:
raise TypeError(f"Data must be a dictionary or numpy array, not {type(data)}")

Expand Down Expand Up @@ -107,7 +124,14 @@ def __add__(self, other: "DictLikeMatrixWrapper") -> "DictLikeMatrixWrapper":
"""
add another matrix to the existing matrix
"""
return DictLikeMatrixWrapper(self._keys, self._matrix + other.matrix)
if isinstance(other, DictLikeMatrixWrapper):
return DictLikeMatrixWrapper(self._keys, self._matrix + other.matrix)
elif isinstance(other, np.ndarray):
return DictLikeMatrixWrapper(self._keys, self._matrix + other)
elif isinstance(other, dict):
DictLikeMatrixWrapper(self._keys, [self[key] + other[key] for key in self._keys])
else:
raise TypeError()

def __iter__(self):
"""
Expand Down
Loading
Loading