-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtest_graph.py
147 lines (123 loc) · 5.15 KB
/
test_graph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""
CT-graph.
Copyright (C) 2019-2021 Andrea Soltoggio, Pawel Ladosz, Eseoghene Ben-Iwhiwhu, Jeff Dick.
Launch script to test one single navigation episode in automatic mode and manual mode"""
import numpy as np
import gymnasium as gym
from gym_CTgraph import CTgraph_env
from gym_CTgraph.CTgraph_plot import CTgraph_plot
from gym_CTgraph.CTgraph_conf import CTgraph_conf
from gym_CTgraph.CTgraph_images import CTgraph_images
import argparse
import json
import random
import matplotlib.pyplot as plt
import timeit
def printout(p_obs, p_reward, p_act, p_done, p_info, p_counter):
"""Print out the navigation variable at each step"""
print("Feeding action: ", p_act)
print("Step:", p_counter)
# print("Observation: ", p_obs)
print("Reward: ", p_reward)
print("Done: ", p_done)
print("--\nInfo: ", p_info)
parser = argparse.ArgumentParser(description='Process some integers.')
parser.add_argument('-c', '--case', default=0, dest="CASE",
help='exectution mode')
args = parser.parse_args()
# fetch the parameters from the json file
configuration = CTgraph_conf("graph.json")
conf_data = configuration.getParameters()
# print configration data
print(json.dumps(conf_data, indent=3))
imageDataset = CTgraph_images(conf_data)
# instantiate the maze
start = timeit.timeit()
env = gym.make('CTgraph-v1', conf_data=conf_data, images=imageDataset)
end = timeit.timeit()
#print(end - start)
# initialise and get initial observation and info
#observation, reward, done, info = env.init(conf_data, imageDataset)
#observation = env.init(conf_data, imageDataset)
reward = 0.
done = False
info = {}
#plotting: uncomment the following line to plot the observations
#CTgraph_plot.plotImages(imageDataset, False)
# get a random path from the maze
high_reward_path = env.get_random_path()
# use this random path to set the path to the high reward. Note that the maze would have already a high_reward_path from the initialisation
env.set_high_reward_path(high_reward_path)
print("*--- Testing script ----*")
action = 0
counter = 0
print_results = True
env.reset()
CASE = int(args.CASE)
#interactive case: step-by-step with operator inputs
if CASE == 0:
(observation, info) = env.complete_reset()
print("The test script sets the high reward path to: ", env.get_high_reward_path())
printout(observation, reward, action, done, info, counter)
print("Observation:", observation)
print("Observation shape:", np.shape(observation))
start = timeit.timeit()
fig, axs = plt.subplots(nrows=1, ncols=1, figsize=(3, 5))
#plt.figure(figsize=(2,2))
axs.imshow(observation)
plt.show(block=False)
while not done:
action = int(input("Action: "))
observation, reward, done, _, info = env.step(action)
counter = counter + 1
if print_results:
printout(observation, reward, action, done, info, counter)
axs.imshow(observation)
plt.draw()
plt.show(block=False)
print("close images to end")
plt.show(block=True)
#automated: for testing many episodes
if CASE == 1:
#tesing high rewards
total_reward = 0
nr_episodes = 1000
probDelayCrash = 0.0
probDecisionPointCrash = 0.0
probWrongDecision = 0.5
for test in range(0,nr_episodes):
done = False
#observation, reward, done, info = env.complete_reset()
(observation, info) = env.complete_reset()
high_reward_path = env.get_random_path()
env.set_high_reward_path(high_reward_path)
index_decision_point_actions = 0
print("E:%d" % test, end='')
print(" testing path:", high_reward_path, end='\n')
while not done:
# check if I'm in a delay or root stateType
if "1" in env.info().values() or "0" in env.info().values():
action = 0
if random.random() < probDelayCrash:
action = np.randint(1,env.BRANCH+2)
print('x%d' % env.step_counter, end='')
observation, reward, done, _, info = env.step(action)
total_reward = total_reward + reward
if "2" in env.info().values():
# correct action
action = high_reward_path[index_decision_point_actions] + 1
if random.random() < probDecisionPointCrash: #do something wrong with a small prob
action = 0
if random.random() < probWrongDecision: #do something wrong with a small prob, cycling through actions
action = action % (env.BRANCH + 1) + 1
print('(a:%d)' % action, end='')
observation, reward, done, _, info = env.step(action)
index_decision_point_actions = index_decision_point_actions + 1
total_reward = total_reward + reward
if "3" in env.info().values():
print("-E, R:%0.1f" % reward ," in %d" % env.step_counter, "steps")
observation, reward, done, _, info = env.step(0)
total_reward = total_reward + reward
if "4" in env.info().values():
print("Crash at step", env.step_counter, end='\n')
print("total reward: ", total_reward)