-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathvlm.py
296 lines (287 loc) · 17 KB
/
vlm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
import cv2
from shapely.geometry import *
from shapely.affinity import *
from openai import OpenAI
from VLM_CaP.src.key import projectkey
import os
import re
import argparse
import csv
import ffmpy
import ast
from VLM_CaP.src.vlm_video import extract_frame_list # import extract_frames
# set up your openai api key
client = OpenAI(api_key=projectkey)
# def for calling openai api with different prompts
def call_openai_api(prompt_messages):
params = {
"model": "gpt-4o",
"messages": prompt_messages,
"max_tokens": 400,
"temperature": 0
}
result = client.chat.completions.create(**params)
return result.choices[0].message.content
# "Notice that there might be similar objects. You are supposed to use the index annotated on the objects to distinguish between only similar objects that is hard to distinguish with language."
def get_object_list(selected_frames):
# first prompt to get objects in the environment
prompt_messages_state = [
{
"role": "system",
"content": [
"You are a visual object detector. Your task is to count and identify the objects in the provided image that are on the desk. Focus on objects classified as grasped_objects and containers.",
],
},
{
"role": "user",
"content": [
"There are two kinds of objects, grasped_objects and containers in the environment. We only care about objects on the desk. Do not count in hand or person as objects.",
"Based on the input picture, answer:",
"1. How many objects are there in the environment?",
"2. What are these objects?",
"You should respond in the format of the following example:",
"Number: 1",
"Objects: purple eggplant, red tomato, white bowl, white bowl",
*map(lambda x: {"image": x, "resize": 768}, selected_frames[0:1]), # use first picture for environment objects
],
},
]
response_state = call_openai_api(prompt_messages_state)
return response_state
def extract_num_object(response_state):
# extract number of objects
num_match = re.search(r"Number: (\d+)", response_state)
num = int(num_match.group(1)) if num_match else 0
# extract objects
objects_match = re.search(r"Objects: (.+)", response_state)
objects_list = objects_match.group(1).split(", ") if objects_match else []
# construct object list
objects = [obj for obj in objects_list]
return num, objects
def extract_keywords_pick(response):
try:
return response.split(': ')[1]
except IndexError:
print("Error extracting pick keyword from response:", response)
return None
def extract_keywords_drop(response):
try:
return response.split(': ')[1]
except IndexError:
print("Error extracting drop keyword from response:", response)
return None
def extract_keywords_reference(response):
try:
return response.split(': ')[1]
except IndexError:
print("Error extracting reference object from response:", response)
return None
def is_frame_relevant(response):
return "hand is manipulating an object" in response.lower()
def parse_closest_object_and_relationship(response):
pattern = r"Closest Object: ([^,]+), (.+)"
match = re.search(pattern, response)
if match:
return match.group(1), match.group(2)
print("Error parsing reference object and relationship from response:", response)
return None, None
def process_images(selected_frames, obj_list):
string_cache = "" # cache for CaP operations
i = 1
while i < len(selected_frames):
input_frame_pick = selected_frames[i:i+1]
prompt_messages_relevance_pick = [
{
"role": "system",
"content": [
"You are an operations inspector. You need to check whether the hand in operation is holding an object. The objects have been outlined with contours of different colors and labeled with indexes for easier distinction."
],
},
{
"role": "user",
"content": [
"This is a picture from a pick-and-drop task. Please determine if the hand is manipulating an object.",
"Respond with 'Hand is manipulating an object' or 'Hand is not manipulating an object'.",
*map(lambda x: {"image": x, "resize": 768}, input_frame_pick),
],
},
]
response_relevance_pick = call_openai_api(prompt_messages_relevance_pick)
print(response_relevance_pick)
if not is_frame_relevant(response_relevance_pick):
i += 1
continue
# which to pick
prompt_messages_pick = [
{
"role": "system",
"content": [
"You are an operation inspector. You need to check which object is being picked in a pick-and-drop task. Some of the objects have been outlined with contours of different colors and labeled with indexes for easier distinction.",
"The contour and index is only used to help. Due to limitation of vision models, the contours and index labels might not cover every objects in the environment. If you notice any unannotated objects in the demo or in the object list, make sure you name it and handle them properly.",
],
},
{
"role": "user",
"content": [
f"This is a picture describing the pick state of a pick-and-drop task. The objects in the environment are {obj_list}. One of the objects is being picked by a human hand or robot gripper now. The objects have been outlined with contours of different colors and labeled with indexes for easier distinction.",
"Based on the input picture and object list, answer:",
"1. Which object is being picked",
"You should respond in the format of the following example:",
"Object Picked: red block",
*map(lambda x: {"image": x, "resize": 768}, input_frame_pick),
],
},
]
response_pick = call_openai_api(prompt_messages_pick)
print(response_pick)
object_picked = extract_keywords_pick(response_pick)
i += 1
# Ensure there is another frame for drop and relative position reasoning
if i >= len(selected_frames):
break
# Check if the second frame (i) is relevant (i.e., hand is holding an object)
input_frame_drop = selected_frames[i:i+1]
# reference object
prompt_messages_reference = [
{
"role": "system",
"content": [
"You are an operation inspector. You need to find the reference object for the placement location of the picked object in the pick-and-place process. Notice that the reference object can vary based on the task. If this is a storage task, the reference object should be the container into which the items are stored. If this is a stacking task, the reference object should be the object that best expresses the orientation of the arrangement."
],
},
{
"role": "user",
"content": [
f"This is a picture describing the drop state of a pick-and-place task. The objects in the environment are {obj_list}. {object_picked} is being dropped by a human hand or robot gripper now.",
"Based on the input picture and object list, answer:",
f"1. Which object in the rest of object list do you choose as a reference object to {object_picked}",
"You should respond in the format of the following example without any additional information or reason steps:",
"Reference Object: red block",
*map(lambda x: {"image": x, "resize": 768}, input_frame_drop),
],
},
]
response_reference = call_openai_api(prompt_messages_reference)
print(response_reference)
object_reference = extract_keywords_reference(response_reference)
# current_bbx = bbx_list[i] if i < len(bbx_list) else {}
# "Due to limitation of vision models, the contours and index labels might not cover every objects in the environment. If you notice any unannotated objects in the demo or in the object list, make sure you handle them properly.",
prompt_messages_relationship = [
{
"role": "system",
"content": [
"You are a VLMTutor. You will describe the drop state of a pick-and-drop task from a demo picture. You must pay specific attention to the spatial relationship between picked object and reference object in the picture and be correct and accurate with directions.",
],
},
{
"role": "user",
"content": [
f"This is a picture describing the drop state of a pick-and-drop task. The objects in the environment are object list: {obj_list}. {object_picked} is said to be being dropped by a human hand or robot gripper now.",
f"However, the object being dropped might be wrong due to bad visual prompt. If you feel that object being picked is not {object_picked} but some other object, red chili is said to be the object picked but you feel it is an orange carrot, you MUST modify it and change the name!"
# "But notice that due to limitation of vision models, the contours and index labels might not cover every objects in the environment. If you notice any unannotated objects in the demo or in the object list, make sure you mention their name and handle their spatial relationships."
# "The ID is only used to help with your reasoning. You should only mention them when the objects are the same in language description. For example, when there are two white bowls, you must specify white bowl (ID:1), white bowl (ID:2) in your answer. But for different objects like vegetables, you do not need to specify their IDs."
# f"To help you better understand the spatial relationship, a bounding box list is given to you. Notice that the bounding boxes of objects in the bounding box list are distinguished by labels. These labels correspond one-to-one with the labels of the objects in the image. The bounding box list is: {bbx_list}",
# "The coordinates of the bounding box represent the center point of the object. The format is two coordinates (x,y). The origin of the coordinates is at the top-left corner of the image. If there are two objects A(x1, y1) and B(x2, y2), a significantly smaller x2 compared to x1 indicates that B is to the left of A; a significantly greater x2 compared to x1 indicates that B is to the right of A; a significantly smaller y2 compared to y1 indicates that B is at the back of A; a significantly greater y2 compared to y1 indicates that B is in front of A."
# "Pay attention to distinguish between at the back of and on top of. If B and A has a visual gap, they are not in touch. Thus B is at the back of A. However, if they are very close, this means B and A are in contact, thus B is on top of A."
# "Notice that the largest difference in corresponding coordinates often represents the most significant feature. If you have coordinates with small difference in x but large difference in y, then coordinates y will represent most significant feature. Make sure to use the picture together with coordinates."
f"The object picked is being dropped somewhere near {object_reference}. Based on the input picture, object list answer:",
f"Drop object picked to which relative position to the {object_reference}? You need to mention the name of objects in your answer.",
f"There are totally six kinds of relative position, and the direction means the visual direction of the picture.",
f"1. In ((object picked is contained in the {object_reference})",
f"2. On top of (object picked is stacked on the {object_reference}, {object_reference} supports object picked)",
f"3. At the back of (in demo it means object picked is positioned farther to the viewer relative to the {object_reference})",
f"4. In front of (in demo it means object picked is positioned closer to the viewer or relative to the {object_reference})",
"5. to the left",
"6. to the right",
f"You must choose one relative position."
"You should respond in the format of the following example without any additional information or reason steps, be sure to mention the object picked and reference object.",
f"Drop yellow corn to the left of the red chili",
f"Drop red chili in the white bowl",
f"Drop wooden block (ID:1) to the right of the wooden block (ID:0)",
*map(lambda x: {"image": x, "resize": 768}, input_frame_drop),
],
},
]
response_relationship = call_openai_api(prompt_messages_relationship)
print(response_relationship)
string_cache += response_relationship + " and then "
i += 1
return string_cache
def save_results_to_csv(demo_name, num, obj_list, string_cache, output_file):
file_exists = os.path.exists(output_file)
with open(output_file, mode='a', newline='') as file:
writer = csv.writer(file)
if not file_exists:
writer.writerow(["demo", "object", "action list"])
writer.writerow([f"{demo_name}", f"{num} objects: {', '.join(obj_list)}", string_cache])
print(f"Results appended to {output_file}")
def convert_video_to_mp4(input_path):
"""
Converts the input video file to H.264 encoded .mp4 format using ffmpy.
The output path will be the same as the input path with '_converted' appended before the extension.
"""
# Get the file name without extension and append '_converted'
base_name, ext = os.path.splitext(input_path)
output_path = f"{base_name}_converted.mp4"
# Run FFmpeg command to convert the video
ff = ffmpy.FFmpeg(
inputs={input_path: None},
outputs={output_path: '-c:v libx264 -crf 23 -preset fast -r 30'}
)
ff.run()
print(f"Video converted successfully: {output_path}")
return output_path
def main(input_video_path, frame_index_list, bbx_list):
video_path = input_video_path
# list to store key frames
selected_raw_frames1 = []
# list to store key frame indexes
frame_index_list = ast.literal_eval(frame_index_list)
selected_frame_index = frame_index_list
# Convert the video to H.264 encoded .mp4 format
# converted_video_path = convert_video_to_mp4(video_path)
# Open the converted video
cap = cv2.VideoCapture(video_path)
# Manually calculate total number of frames
actual_frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
actual_frame_count += 1
# Reset the capture to the beginning of the video
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
print(f"Actual frame count: {actual_frame_count}")
# Iterate through index list and get the frame list
for index in selected_frame_index:
if index < actual_frame_count:
cap.set(cv2.CAP_PROP_POS_FRAMES, index)
ret, cv2_image = cap.read()
if ret:
selected_raw_frames1.append(cv2_image)
else:
print(f"Failed to retrieve frame at index {index}")
else:
print(f"Frame index {index} is out of range for this video.")
# Release video capture object
cap.release()
selected_frames1 = extract_frame_list(selected_raw_frames1)
response_state = get_object_list(selected_frames1)
num, obj_list = extract_num_object(response_state)
print("Number of objects:", num)
print("available objects:", obj_list)
# obj_list = "green corn, orange carrot, red pepper, white bowl, glass container"
# process the key frames
string_cache = process_images(selected_frames1, obj_list)
if string_cache.endswith(" and then "):
my_string = string_cache.removesuffix(" and then ")
print(my_string)
return my_string
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process video and key frame extraction.")
parser.add_argument('--input', type=str, required=True, help='Input video path')
parser.add_argument('--list', type=str, required=True, help='List of key frame indexes')
parser.add_argument('--bbx_list', type=str, required=True, help='Bbx of key frames')
args = parser.parse_args()
# Call the main function with arguments
main(args.input, args.list, args.bbx_list)