Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[common_logger] logging VQA result #87

Open
wants to merge 5 commits into
base: add_mongo_example
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<launch>
<!-- logging database -->
<param name="robot/database" value="jsk_robot_lifelog"/>

<!-- use central data base -->
<!-- <arg name="host" default="localhost" /> -->
<arg name="host" default="musca.jsk.imi.i.u-tokyo.ac.jp" />
<arg name="port" default="27017" />

<!-- default value -->
<arg name="use_daemon" default="true" />
<arg name="db_path" default="/var/lib/robot/mongodb_store"/>
<arg name="defaults_path" default=""/>
<arg name="replicator_dump_path" default="/tmp/replicator_dumps"/>
<arg name="queue_size" default="100" />
<arg name="use_localdatacenter" default="true" />

<!-- mongodb_store_inc.launch -->
<param name="mongodb_use_daemon" value="$(arg use_daemon)" />
<param name="mongodb_port" value="$(arg port)" />
<param name="mongodb_host" value="$(arg host)" />

<node name="config_manager" pkg="mongodb_store" type="config_manager.py" >
<param name="defaults_path" value="$(arg defaults_path)"/>
</node>

<node name="message_store" pkg="mongodb_store" type="message_store_node.py" >
<param name="mongodb_use_localdatacenter" value="$(arg use_localdatacenter)" />
<param name="queue_size" value="$(arg queue_size)" />
</node>

<node name="replicator_node" pkg="mongodb_store" type="replicator_node.py" >
<param name="replicator_dump_path" value="$(arg replicator_dump_path)"/>
</node>

</launch>
42 changes: 42 additions & 0 deletions jsk_robot_common/jsk_robot_startup/lifelog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,45 @@ Save object detection result to database
* `~robot_frame` (String, default: `base_footprint`)

robot base tf frame


## Examples

You can start a mongodb logging system using `lifelog/mongodb.launch` and `lifelog/common_logger.launch` files.
See [jsk_pr2_lifelog/db_client.launch](https://github.com/jsk-ros-pkg/jsk_robot/blob/5d90d483aaa674d33968f34db83f53cd3d018bd4/jsk_pr2_robot/jsk_pr2_startup/jsk_pr2_lifelog/db_client.launch), [fetch_lifelog.xml](https://github.com/jsk-ros-pkg/jsk_robot/blob/921097b7a9c16cd99d0eb8f9f271bda4784dadc5/jsk_fetch_robot/jsk_fetch_startup/launch/fetch_lifelog.xml) and [jsk_baxter_lifelog/db_client.launch](https://github.com/jsk-ros-pkg/jsk_robot/blob/c03dc5af06d8b7786b4212b132047acaa229eb0e/jsk_baxter_robot/jsk_baxter_startup/jsk_baxter_lifelog/db_client.launch) for examples.

## Sample client

Sample program to retrieve stored data can be found at [jsk_robot_startup/scripts/robot_databse_monogo_example.py](./scripts/robot_databse_monogo_example.py).

To connect replicator server (musca), you need to start [jsk_robot_startup/launch/robot_databse_monogo_server.launch](./launch/robot_databse_monogo_server.launch). Please be careful about `ROS_MASTER_URI`, because `robot_databse_monogo_server.launch` will starts same node name as robot system nodes, thus it may corrupt some settings.

## Troubleshooting

If you encounter following error
```
[ERROR] [1666693339.502586]: Could not get message store services. Maybe the message store has not been started? Retrying..
[ERROR] [1666693339.502586]: Could not get message store services. Maybe the message store has not been started? Retrying..
```

1) Is mongodb service working correctly?
```
sudo systemctl status mongodb.service
sudo systemctl start mongodb.service
```

2) Is `mongo` command works?
```
mongo localhost:27017
```

3) Check /etc/mongodb.conf
```
$ cat /etc/mongodb.conf | grep -v ^#
dbpath=/var/lib/mongodb
logpath=/var/log/mongodb/mongodb.log
logappend=true
bind_ip = 0.0.0.0
journal=true
```
Default `bind_jp` is `127.0.0.1`, and it not work on some machines, see [#1706](https://github.com/jsk-ros-pkg/jsk_robot/issues/1706) for more info.
13 changes: 13 additions & 0 deletions jsk_robot_common/jsk_robot_startup/lifelog/common_logger.launch
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
<arg name="save_joint_states" default="true" />
<arg name="save_speech" default="true" />
<arg name="save_smach" default="true" />
<arg name="save_vqa" default="false" />
<arg name="save_base_trajectory" default="false" />
<arg name="save_object_detection" default="false" />
<arg name="save_action" default="false" />
Expand Down Expand Up @@ -236,6 +237,18 @@
</rosparam>
</node>

<!-- save mongo data -->
<node if="$(arg save_vqa)"
name="vqa_logger"
pkg="jsk_robot_startup" type="mongo_record.py"
machine="$(arg machine)"
respawn="$(arg respawn)">
<rosparam subst_value="true">
topics:
- /vqa/caption_result
</rosparam>
</node>

<!-- base trajecotry logger -->
<node if="$(arg save_base_trajectory)"
name="base_trajectory_logger"
Expand Down
131 changes: 131 additions & 0 deletions jsk_robot_common/jsk_robot_startup/scripts/robot_databse_mongo_dump.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

#
# Copied from https://github.com/strands-project/mongodb_store/blob/melodic-devel/mongodb_store/scripts/mongodb_play.py
#
from __future__ import print_function

import argparse
import calendar
import cv2
import datetime
import json
import mongodb_store.util as mg_util
import os
import pymongo
import rospy
import sys
import yaml

from cv_bridge import CvBridge
from dateutil import tz
from sensor_msgs.msg import Image, CompressedImage

UTC = tz.gettz('UTC')
JST = tz.gettz('Asia/Tokyo')

MongoClient = mg_util.import_MongoClient()

TIME_KEY = '_meta.inserted_at'

def max_time(collection):
return collection.find_one(sort=[(TIME_KEY, pymongo.DESCENDING)])['_meta']['inserted_at']

def min_time(collection):
return collection.find_one(sort=[(TIME_KEY, pymongo.ASCENDING)])['_meta']['inserted_at']

def to_ros_time(dt):
return rospy.Time(calendar.timegm(dt.utctimetuple()), dt.microsecond * 1000)

def to_datetime(rt):
return datetime.datetime.utcfromtimestamp(rt.secs) + datetime.timedelta(microseconds = rt.nsecs / 1000)

def ros_time_strftime(rt, format):
""" converts a ros time to a datetime and calls strftime on it with the given format """
return to_datetime(rt).strftime(format)

def mkdatetime(date_string):
return datetime.datetime.strptime(date_string, '%Y-%m-%d %H:%M:%S')

def main(argv):
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="musca.jsk.imi.i.u-tokyo.ac.jp")
parser.add_argument("--port", "-p", default=27017)
parser.add_argument("--database", "-d", default="jsk_robot_lifelog")
parser.add_argument("--collection", "-c", default="basil")
parser.add_argument("--start", "-s", default="", help='start datetime of query, defaults to the earliest date stored in db, across all requested collections. Formatted "Y-m-d H:M" e.g. "2022/07/14 06:38:00"')
parser.add_argument("--end", "-e", default="", help='end datetime of query, defaults to the latest date stored in db, across all requested collections. Formatted "Y-m-d H:M" e.g. "2022/07/14 06:39:00"')
args = parser.parse_args()
bridge = CvBridge()

print("Connecting ... {}:{}".format(args.host, args.port))
mongo_client=MongoClient(args.host, args.port)
collection = mongo_client[args.database][args.collection]
print("Selected ... {}/{}".format(args.database,args.collection))

# make sure there's an index on time in the collection so the sort operation doesn't require the whole collection to be loaded
collection.ensure_index(TIME_KEY)

# get the min and max time across all collections, conver to ros time
if args.start:
start_time = mkdatetime(args.start).replace(tzinfo=JST)
else:
start_time = min_time(collection).replace(tzinfo=UTC).astimezone(JST)

if args.end:
end_time = mkdatetime(args.end).replace(tzinfo=JST)
else:
end_time = max_time(collection).replace(tzinfo=UTC).astimezone(JST)

print(" From : {}".format(start_time.strftime('%Y-%m-%d %H:%M:%S')))
print(" To : {}".format(end_time.strftime('%Y-%m-%d %H:%M:%S')))

documents = collection.find({TIME_KEY: { '$gte': start_time, '$lte': end_time}}, sort=[(TIME_KEY, pymongo.ASCENDING)])
documents_count = documents.count()
print("This document contains {} messages".format(documents_count))
# print(documents[0]['_meta']['inserted_at'])
# print(documents[documents_count-1]['_meta']['inserted_at'])

dirname = collection.full_name + start_time.strftime('_%Y-%m-%d_%H:%M:%S_') + end_time.strftime('_%Y-%m-%d_%H:%M:%S')
if not os.path.exists(dirname):
os.mkdir(dirname)
msg_classes = {}
for d in documents:
stored_class = d['_meta']['stored_class']
input_topic = d['_meta']['input_topic']
inserted_at = d['_meta']['inserted_at']
ros_time = to_ros_time(inserted_at)
if stored_class in msg_classes:
msg_class = msg_classes[stored_class]
else:
try:
msg_class = msg_classes[stored_class] = mg_util.load_class(stored_class)
except ImportError as e:
print(";;")
print(";; ImportError: {}".format(e))
print(";;")
print(";; try install ros-{}-{}".format(os.environ['ROS_DISTRO'], stored_class.split('.')[0].replace('_','-')))
print(";;")
sys.exit(-1)
message = mg_util.dictionary_to_message(d, msg_class)

if type(message) == Image:
filename = "{}{}.jpg".format(ros_time.to_nsec(), input_topic.replace('/','-'))
image = bridge.imgmsg_to_cv2(message)
cv2.imwrite(os.path.join(dirname, filename), image)
elif type(message) == CompressedImage:
filename = "{}{}.jpg".format(ros_time.to_nsec(), input_topic.replace('/','-'))
image = bridge.compressed_imgmsg_to_cv2(message)
cv2.imwrite(os.path.join(dirname, filename), image)
else:
filename = "{}{}.json".format(ros_time.to_nsec(), input_topic.replace('/','-'))
with open(os.path.join(dirname, filename), "wb") as f:
# f.write(yaml.dump(yaml.load(str(message)), allow_unicode=True))
f.write(json.dumps(yaml.load(str(message)), indent=4, ensure_ascii=False))
print("Writing.. {} ({})".format(filename, inserted_at))


# processes load main so move init_node out
if __name__ == "__main__":
main(sys.argv)
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import base64
import numpy as np
import pickle
import rospy
import sys

# date
from datetime import datetime, timedelta, tzinfo
from dateutil import tz
import calendar
import pytz

# mongo
from mongodb_store.message_store import MessageStoreProxy
import mongodb_store.util as MU
import pymongo

# message
from jsk_robot_startup.lifelog.logger_base import LoggerBase
from sensor_msgs.msg import CompressedImage
from smach_msgs.msg import SmachContainerStatus

# image processing
from cv_bridge import CvBridge
import cv2

# global variabels
JST = tz.gettz('Asia/Tokyo')
bridge = CvBridge()

# sample functions
def query_latest_smach():
try:
rospy.loginfo("Loading last smach execution...")
last_msg, _ = msg_store.query(
SmachContainerStatus._type,
{"info": "START"},
single=True,
sort_query=[("_meta.inserted_at", pymongo.DESCENDING)]
)
msgs = msg_store.query(
SmachContainerStatus._type,
{"header.stamp.secs": {"$gt": last_msg.header.stamp.secs}},
sort_query=[("_meta.inserted_at", pymongo.ASCENDING)]
)

def status_to_img(msg):
if sys.version_info.major < 3:
local_data_str = pickle.loads(msg.local_data)
else:
local_data_str = pickle.loads(
msg.local_data.encode('utf-8'), encoding='utf-8')
print("{} @{}".format(local_data_str['DESCRIPTION'],
datetime.fromtimestamp(msg.header.stamp.to_sec(), JST)))
imgmsg = None
if 'IMAGE' in local_data_str and local_data_str['IMAGE']:
imgmsg = CompressedImage()
imgmsg.deserialize(base64.b64decode(local_data_str['IMAGE']))
return imgmsg

return filter(lambda x: x is not None, map(lambda x: status_to_img(x[0]), msgs))
except Exception as e:
rospy.logerr('failed to load images from db: %s' % e)
return None


def query_images(now = datetime.now(JST)-timedelta(hours=0),
then = datetime.now(JST)-timedelta(hours=1)):
try:
rospy.loginfo("Loading images...")
msgs = msg_store.query(
CompressedImage._type,
{"_meta.inserted_at": {"$lt": now, "$gte": then}},
sort_query=[("_meta.inserted_at", pymongo.ASCENDING)]
)
return map(lambda x: x[0], msgs)
except Exception as e:
rospy.logerr('failed to load images from db: %s' % e)
return None

if __name__ == '__main__':
rospy.init_node('sample_robot_database')
db_name = 'jsk_robot_lifelog'
col_name = 'basil' # pr1012, fetch17 etc..
msg_store = MessageStoreProxy(database=db_name, collection=col_name)

print("> sample program for robot database")
print("> 1: get latest smach data")
print("> 2: get last 1 hours image data")
#key = int(input("> {1, 2, 3..} : "))
key = 2

if key == 1:
msgs = query_latest_smach()
elif key == 2:
msgs = query_images(then = datetime(2022, 11, 1, 17, 0, tzinfo=JST),
now = datetime(2022, 11, 1, 20, 10, tzinfo=JST))
else:
print("unknown inputs...")

# show data..
for msg in msgs:
print(" @{}".format(datetime.fromtimestamp(msg.header.stamp.to_sec(), JST)))
cv_image = bridge.compressed_imgmsg_to_cv2(msg, "bgr8")
cv2.imshow('image', cv_image)
cv2.waitKey(50)