forked from Pdbz199/Koopman-RL-Old
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkoopman.py
143 lines (117 loc) · 4.46 KB
/
koopman.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#%%
import numpy as np
import pykoopman as pk
from pydmd import OptDMD, DMD
'''====================== TESTING ON SIMPLE FUNCTION ======================'''
M = 4
# random initial state?
# potential constant deviation
X = np.zeros((4, M))
X[:,0] = [1,2,3,4] # sample from distribution, add one on it for a while
for val in range(1, M):
X[:,val] = X[:,val-1] + 1
# Fit Koopman operator using closed-form solution to DMD
optdmd = OptDMD(svd_rank=2)
model_optdmd = pk.Koopman(regressor=optdmd)
model_optdmd.fit(X.T)
test_point = np.array([100,101,102,103])
prediction = model_optdmd.predict(test_point)
print("Prediction:", prediction)
prediction = np.round(np.real(prediction))
expectation = np.array([101,102,103,104], dtype=float)
print("Expectation:", expectation)
print("prediction ~= expectation:", np.array_equal(prediction, expectation))
'''====================== TESTING ON POLICY FUNCTION ======================'''
X = np.load('state-action-inputs.npy') # 20,000 entries
X = X[:int(X.shape[0]*0.0015)] # 30 points!
# Fit Koopman operator using closed-form solution to DMD
optdmd = OptDMD(svd_rank=15)
model_optdmd = pk.Koopman(regressor=optdmd)
model_optdmd.fit(X)
index = np.random.randint(0, X.shape[0])
print(f"Point {index} of X")
test_point = X[index]
prediction = model_optdmd.predict(test_point)
print("Prediction:", prediction)
prediction = np.round(np.real(prediction))
expectation = X[index+1]
print("Expectation:", expectation)
print("prediction ~= expectation:", np.array_equal(prediction, expectation))
#%%
'''====================== TESTING AGAINST GROUND-TRUTH ======================'''
import math
from sklearn.preprocessing import KBinsDiscretizer
from typing import Tuple
import gym
env = gym.make('CartPole-v0')
koopEnv = gym.make('CartPole-v0')
Q_table = np.load('Q_table.npy')
n_bins = ( 6, 12 )
lower_bounds = [ env.observation_space.low[2], -math.radians(50) ]
upper_bounds = [ env.observation_space.high[2], math.radians(50) ]
def discretizer( _, __, angle, pole_velocity ) -> Tuple[int,...]:
"""Convert continuous state into a discrete state"""
est = KBinsDiscretizer(n_bins=n_bins, encode='ordinal', strategy='uniform')
est.fit([ lower_bounds, upper_bounds ])
return tuple( map( int, est.transform([[ angle, pole_velocity ]])[0] ) )
def policy(state: tuple):
""" Choosing an action on epsilon-greedy policy """
return np.argmax(Q_table[state])
num_steps = 200 # 100
correctness_arr = np.zeros(num_steps)
# START FROM BEGINNING STATE
current_state = discretizer(*env.reset())
action = policy(current_state)
prediction = model_optdmd.predict(np.array([*list(current_state), action]))
prediction = np.round(np.real(prediction))
for i in range(num_steps):
# ENV CODE
observation, reward, done, _ = env.step(action)
new_state = discretizer(*observation)
next_action = policy(new_state)
# PREDICT AND GO ON
prediction = model_optdmd.predict(prediction)
prediction = np.round(np.real(prediction))
# CHECK AGAINST GROUND-TRUTH
expectation = np.array([*list(new_state), next_action])
correctness_arr[i] = np.array_equal(prediction, expectation)
# UPDATE
current_state = new_state
action = next_action
print(f"Koopman final state: {prediction}")
print(f"Ground-truth final state: {[*list(current_state), action]}")
# These are surprisingly close running a few tests!
for i in range(num_steps):
if not correctness_arr[i]:
print(f"Step at which Koopman prediction diverges: {i}")
break
#%%
current_state = discretizer(*env.reset())
current_stateK = discretizer(*koopEnv.reset())
action = policy(current_state)
actionK = policy(current_state)
q_learner_reward = 0
koopman_reward = 0
for i in range(num_steps):
# environment details
observation, reward, done, _ = env.step(action)
observationK, rewardK, doneK, _ = koopEnv.step(actionK)
# keep track of rewards
q_learner_reward += reward
koopman_reward += rewardK
# discretize state - hoping generator won't have to!
new_state = discretizer(*observation)
new_stateK = discretizer(*observationK)
# get actions
next_action = policy(new_state)
prediction = model_optdmd.predict(np.array([*list(current_stateK), actionK]))
prediction = np.round(np.real(prediction))
next_actionK = int(prediction[-1])
# update environments
action = next_action
actionK = next_actionK
current_state = new_state
current_stateK = new_stateK
print("Q rewards:", q_learner_reward)
print("K rewards:", koopman_reward)
# %%