-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgenerate_episodes.py
213 lines (161 loc) · 7.92 KB
/
generate_episodes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '1'
from utils.utils import save_episodes
from utils.utils import check_yes
from config import EndToEndConfig
from multiprocessing import Process
from os.path import join
from os import cpu_count
from os import listdir
import numpy as np
import time
def multiprocess_demos(mp_config,
mp_headless,
mp_request,
mp_start_at,
mp_demos_per_loop,
mp_root_save_path,
mp_domain_rand,
mp_dr_images):
mp_env = mp_config.get_env(randomized=mp_domain_rand, headless=mp_headless)
mp_env.launch()
mp_error_count = 0
mp_max_consecutive_error = 30
mp_generated = 0
mp_prior_error = False
mp_remaining = mp_request
# Reset the random seed for each process to ensure unique episodes...
np.random.set_state(np.random.RandomState().get_state())
while mp_remaining > 0:
mp_begin_save_at = mp_start_at + mp_generated
if mp_remaining < mp_demos_per_loop:
mp_demos_per_loop = mp_remaining
try:
mp_task = mp_env.get_task(requested_task)
mp_demos = mp_task.get_demos(mp_demos_per_loop, live_demos=True)
save_episodes(mp_demos, mp_root_save_path, mp_begin_save_at)
del mp_task
if mp_prior_error:
mp_prior_error = False
mp_error_count = 0
except RuntimeError:
if mp_prior_error:
mp_error_count += 1
mp_prior_error = True
if mp_error_count >= mp_max_consecutive_error:
print(f'[WARN] experienced {mp_error_count} consecutive RuntimeError with CopelliaSim. '
f'Abandoning the process. Dataset is likely broken near demonstration episode {mp_begin_save_at}.')
break
if not mp_prior_error:
mp_remaining -= mp_demos_per_loop
mp_generated += mp_demos_per_loop
mp_env.shutdown()
if __name__ == '__main__':
print('[Info] Starting generate_episodes.py')
config = EndToEndConfig()
task_name, requested_task = config.get_task_from_user()
# Define the total number of demos you'd like in the folder
num_total_demos = int(input('\nHow many episodes should be generated? '))
tag = input('\nWhat tag should the directory have? Testing (default), training, misc: ').lower()
if tag not in ['testing', 'training', 'misc']:
tag = 'testing'
if check_yes('\nWill the scene be randomized? (y/n) '):
domain_rand = True
tag += '_randomized'
else:
domain_rand = False
dr_images = config.domain_rand_textures
root_save_path = join(config.data_root,
tag,
task_name)
full_save_path = join(root_save_path,
'variation0',
'episodes')
# Note: from the root demos are saved .../variation#/episodes/episode#
# the count for variations and the episodes within a variation start at 0
# It is important that the dataset's collection of episodes be continuous
# be careful of this when generating new demonstration episodes.
#
# If demos already exist in the folder, the program will try to add more to
# it until there are at least the desired number. It will match the existing
# numbering too. check_episodes.py exists to resolve any errors if this fails.
num_demo_per_loop = 1
headless = True # To save resources by not displaying CoppeliaSim
live_demos = True
processes = []
mp_start_time = time.perf_counter()
try:
num_existing_demos = len(listdir(full_save_path))
except FileNotFoundError:
print(f'[Warn] It looks like {root_save_path} might not exist or is not set up properly. '
f'Before generating episodes the directory {full_save_path} will be created.')
num_existing_demos = 0
required_new_demos = num_total_demos - num_existing_demos
if required_new_demos <= 0:
print(f'[ERROR] Invalid number of demos requested. Providing summary of dataset instead.')
total_num_steps = 0
try:
for folder in listdir(full_save_path):
total_num_steps += len(listdir(full_save_path + "/" + folder + '/front_rgb'))
except FileNotFoundError:
print(f'[ERROR] No dataset exists at {full_save_path} or the one that does is broken.')
exit()
print(f'\n[Info] A total of {len(listdir(full_save_path))} '
f'demonstration episodes have been collected with {total_num_steps} total data points.'
f'\n[Info] All demonstration episodes are located in: {full_save_path}'
f'\n[Info] Exiting program successfully.')
exit()
if cpu_count() <= 8:
num_processes = cpu_count()
else:
num_processes = int(input(f'\nEnter how many processes to spawn for generating episodes '
f'({cpu_count()} cores available in CPU, default number is 8): ') or 8)
demo_per_process = int(required_new_demos / num_processes)
demo_per_process_remainder = required_new_demos % num_processes
print(f'[Info] Requested a total of {num_total_demos} demonstration episodes and '
f'found {num_existing_demos} at the desired location. '
f'\n[Info] Will generate {required_new_demos} new demonstration episodes at {full_save_path}')
ans = input(f'\n[Info] Are you ready for the episode generation to begin? (y/n): ')
if ans not in ['y', 'yes', 'Y', 'Yes']:
exit(f'[Warn] Answer: {ans} not recognized. Exiting program without generating demonstrations.')
num_start_at = num_existing_demos
for i in range(num_processes):
if i < (num_processes - 1) and demo_per_process_remainder > 0:
num_request = demo_per_process + 1
demo_per_process_remainder -= 1
else:
num_demo_per_loop = 1
num_request = demo_per_process
processes.append(Process(target=multiprocess_demos,
args=(config,
headless,
num_request,
num_start_at,
num_demo_per_loop,
root_save_path,
domain_rand,
dr_images)
)
)
print(f'[Info] Registered process {i} to generate {num_request} demos. '
f'Demos will be saved as episodes {num_start_at} to {num_start_at+num_request-1}')
num_start_at += num_request
[process.start() for process in processes]
print(f'[Info] All {len(processes)} processes started. Generating demonstrations...')
[process.join() for process in processes]
mp_end_time = time.perf_counter()
print(f'[Info] All {len(processes)} processes rejoined main.')
delta_min = (mp_end_time - mp_start_time) / 60
total_num_steps = 0
for folder in listdir(full_save_path):
total_num_steps += len(listdir(full_save_path + "/" + folder + '/front_rgb'))
num_after_gen = len(listdir(full_save_path))
if num_total_demos < num_after_gen:
print(f'[ERROR] After generation only {num_after_gen} demonstration episodes exist. '
f'The dataset is broken but potentially fixable with renumber_dataset.py')
print(f'[Info] A total of {num_after_gen} '
f'demonstration episodes have been collected with {total_num_steps} total data points.'
f'\n[Info] The process of adding {required_new_demos} new demonstration episodes '
f'took {delta_min:.3f} minutes. '
f'\n[Info] All demonstration episodes are located in: {full_save_path}'
f'\n[Info] Exiting program successfully.')