Skip to content

Commit

Permalink
add more info on mongodb client
Browse files Browse the repository at this point in the history
- add example launch/script for mongo db

- add information based on jsk-ros-pkg#1706

- add robot_databse_mongo_dump.py
  • Loading branch information
k-okada committed Mar 31, 2023
1 parent 6743071 commit 36bbef2
Show file tree
Hide file tree
Showing 4 changed files with 318 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<launch>
<!-- logging database -->
<param name="robot/database" value="jsk_robot_lifelog"/>

<!-- use central data base -->
<!-- <arg name="host" default="localhost" /> -->
<arg name="host" default="musca.jsk.imi.i.u-tokyo.ac.jp" />
<arg name="port" default="27017" />

<!-- default value -->
<arg name="use_daemon" default="true" />
<arg name="db_path" default="/var/lib/robot/mongodb_store"/>
<arg name="defaults_path" default=""/>
<arg name="replicator_dump_path" default="/tmp/replicator_dumps"/>
<arg name="queue_size" default="100" />
<arg name="use_localdatacenter" default="true" />

<!-- mongodb_store_inc.launch -->
<param name="mongodb_use_daemon" value="$(arg use_daemon)" />
<param name="mongodb_port" value="$(arg port)" />
<param name="mongodb_host" value="$(arg host)" />

<node name="config_manager" pkg="mongodb_store" type="config_manager.py" >
<param name="defaults_path" value="$(arg defaults_path)"/>
</node>

<node name="message_store" pkg="mongodb_store" type="message_store_node.py" >
<param name="mongodb_use_localdatacenter" value="$(arg use_localdatacenter)" />
<param name="queue_size" value="$(arg queue_size)" />
</node>

<node name="replicator_node" pkg="mongodb_store" type="replicator_node.py" >
<param name="replicator_dump_path" value="$(arg replicator_dump_path)"/>
</node>

</launch>
42 changes: 42 additions & 0 deletions jsk_robot_common/jsk_robot_startup/lifelog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,45 @@ Save object detection result to database
* `~robot_frame` (String, default: `base_footprint`)

robot base tf frame


## Examples

You can start a mongodb logging system using `lifelog/mongodb.launch` and `lifelog/common_logger.launch` files.
See [jsk_pr2_lifelog/db_client.launch](https://github.com/jsk-ros-pkg/jsk_robot/blob/5d90d483aaa674d33968f34db83f53cd3d018bd4/jsk_pr2_robot/jsk_pr2_startup/jsk_pr2_lifelog/db_client.launch), [fetch_lifelog.xml](https://github.com/jsk-ros-pkg/jsk_robot/blob/921097b7a9c16cd99d0eb8f9f271bda4784dadc5/jsk_fetch_robot/jsk_fetch_startup/launch/fetch_lifelog.xml) and [jsk_baxter_lifelog/db_client.launch](https://github.com/jsk-ros-pkg/jsk_robot/blob/c03dc5af06d8b7786b4212b132047acaa229eb0e/jsk_baxter_robot/jsk_baxter_startup/jsk_baxter_lifelog/db_client.launch) for examples.

## Sample client

Sample program to retrieve stored data can be found at [jsk_robot_startup/scripts/robot_database_monogo_example.py](./scripts/robot_database_monogo_example.py).

To connect replicator server (musca), you need to start [jsk_robot_startup/launch/robot_database_monogo_server.launch](./launch/robot_database_monogo_server.launch). Please be careful about `ROS_MASTER_URI`, because `robot_database_monogo_server.launch` will starts same node name as robot system nodes, thus it may corrupt some settings.

## Troubleshooting

If you encounter following error
```
[ERROR] [1666693339.502586]: Could not get message store services. Maybe the message store has not been started? Retrying..
[ERROR] [1666693339.502586]: Could not get message store services. Maybe the message store has not been started? Retrying..
```
1) Is mongodb service working correctly?
```
sudo systemctl status mongodb.service
sudo systemctl start mongodb.service
```
2) Is `mongo` command works?
```
mongo localhost:27017
```
3) Check /etc/mongodb.conf
```
$ cat /etc/mongodb.conf | grep -v ^#
dbpath=/var/lib/mongodb
logpath=/var/log/mongodb/mongodb.log
logappend=true
bind_ip = 0.0.0.0
journal=true
```
Default `bind_jp` is `127.0.0.1`, and it not work on some machines, see [#1706](https://github.com/jsk-ros-pkg/jsk_robot/issues/1706) for more info.
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

#
# Copied from https://github.com/strands-project/mongodb_store/blob/melodic-devel/mongodb_store/scripts/mongodb_play.py
#
from __future__ import print_function

import argparse
import calendar
import cv2
import datetime
import json
import mongodb_store.util as mg_util
import os
import pymongo
import rospy
import sys
import yaml

from cv_bridge import CvBridge
from dateutil import tz
from sensor_msgs.msg import Image, CompressedImage

UTC = tz.gettz('UTC')
JST = tz.gettz('Asia/Tokyo')

MongoClient = mg_util.import_MongoClient()

TIME_KEY = '_meta.inserted_at'

def max_time(collection):
return collection.find_one(sort=[(TIME_KEY, pymongo.DESCENDING)])['_meta']['inserted_at']

def min_time(collection):
return collection.find_one(sort=[(TIME_KEY, pymongo.ASCENDING)])['_meta']['inserted_at']

def to_ros_time(dt):
return rospy.Time(calendar.timegm(dt.utctimetuple()), dt.microsecond * 1000)

def to_datetime(rt):
return datetime.datetime.utcfromtimestamp(rt.secs) + datetime.timedelta(microseconds = rt.nsecs / 1000)

def ros_time_strftime(rt, format):
""" converts a ros time to a datetime and calls strftime on it with the given format """
return to_datetime(rt).strftime(format)

def mkdatetime(date_string):
return datetime.datetime.strptime(date_string, '%Y-%m-%d %H:%M:%S')

def main(argv):
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="musca.jsk.imi.i.u-tokyo.ac.jp")
parser.add_argument("--port", "-p", default=27017)
parser.add_argument("--database", "-d", default="jsk_robot_lifelog")
parser.add_argument("--collection", "-c", default="basil")
parser.add_argument("--start", "-s", default="", help='start datetime of query, defaults to the earliest date stored in db, across all requested collections. Formatted "Y-m-d H:M" e.g. "2022-07-14 06:38:00"')
parser.add_argument("--end", "-e", default="", help='end datetime of query, defaults to the latest date stored in db, across all requested collections. Formatted "Y-m-d H:M" e.g. "2022-07-14 06:39:00"')
args = parser.parse_args()
bridge = CvBridge()

print("Connecting ... {}:{}".format(args.host, args.port))
mongo_client=MongoClient(args.host, args.port)
collection = mongo_client[args.database][args.collection]
print("Selected ... {}/{}".format(args.database,args.collection))

# make sure there's an index on time in the collection so the sort operation doesn't require the whole collection to be loaded
collection.ensure_index(TIME_KEY)

# get the min and max time across all collections, conver to ros time
if args.start:
start_time = mkdatetime(args.start).replace(tzinfo=JST)
else:
start_time = min_time(collection).replace(tzinfo=UTC).astimezone(JST)

if args.end:
end_time = mkdatetime(args.end).replace(tzinfo=JST)
else:
end_time = max_time(collection).replace(tzinfo=UTC).astimezone(JST)

print(" From : {}".format(start_time.strftime('%Y-%m-%d %H:%M:%S')))
print(" To : {}".format(end_time.strftime('%Y-%m-%d %H:%M:%S')))

documents = collection.find({TIME_KEY: { '$gte': start_time, '$lte': end_time}}, sort=[(TIME_KEY, pymongo.ASCENDING)])
documents_count = documents.count()
print("This document contains {} messages".format(documents_count))
# print(documents[0]['_meta']['inserted_at'])
# print(documents[documents_count-1]['_meta']['inserted_at'])

dirname = collection.full_name + start_time.strftime('_%Y-%m-%d_%H:%M:%S_') + end_time.strftime('_%Y-%m-%d_%H:%M:%S')
if not os.path.exists(dirname):
os.mkdir(dirname)
msg_classes = {}
for d in documents:
stored_class = d['_meta']['stored_class']
input_topic = d['_meta']['input_topic']
inserted_at = d['_meta']['inserted_at']
ros_time = to_ros_time(inserted_at)
if stored_class in msg_classes:
msg_class = msg_classes[stored_class]
else:
try:
msg_class = msg_classes[stored_class] = mg_util.load_class(stored_class)
except ImportError as e:
print(";;")
print(";; ImportError: {}".format(e))
print(";;")
print(";; try install ros-{}-{}".format(os.environ['ROS_DISTRO'], stored_class.split('.')[0].replace('_','-')))
print(";;")
sys.exit(-1)
message = mg_util.dictionary_to_message(d, msg_class)

if type(message) == Image:
filename = "{}{}.jpg".format(ros_time.to_nsec(), input_topic.replace('/','-'))
image = bridge.imgmsg_to_cv2(message)
cv2.imwrite(os.path.join(dirname, filename), image)
elif type(message) == CompressedImage:
filename = "{}{}.jpg".format(ros_time.to_nsec(), input_topic.replace('/','-'))
image = bridge.compressed_imgmsg_to_cv2(message)
cv2.imwrite(os.path.join(dirname, filename), image)
else:
filename = "{}{}.json".format(ros_time.to_nsec(), input_topic.replace('/','-'))
with open(os.path.join(dirname, filename), "wb") as f:
# f.write(yaml.dump(yaml.load(str(message)), allow_unicode=True))
f.write(json.dumps(yaml.load(str(message)), indent=4, ensure_ascii=False))
print("Writing.. {} ({})".format(filename, inserted_at))


# processes load main so move init_node out
if __name__ == "__main__":
main(sys.argv)
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import base64
import numpy as np
import pickle
import rospy
import sys

# date
from datetime import datetime, timedelta, tzinfo
from dateutil import tz
import calendar
import pytz

# mongo
from mongodb_store.message_store import MessageStoreProxy
import mongodb_store.util as MU
import pymongo

# message
from jsk_robot_startup.lifelog.logger_base import LoggerBase
from sensor_msgs.msg import CompressedImage
from smach_msgs.msg import SmachContainerStatus

# image processing
from cv_bridge import CvBridge
import cv2

# global variabels
JST = tz.gettz('Asia/Tokyo')
bridge = CvBridge()

# sample functions
def query_latest_smach():
try:
rospy.loginfo("Loading last smach execution...")
last_msg, _ = msg_store.query(
SmachContainerStatus._type,
{"info": "START"},
single=True,
sort_query=[("_meta.inserted_at", pymongo.DESCENDING)]
)
msgs = msg_store.query(
SmachContainerStatus._type,
{"header.stamp.secs": {"$gt": last_msg.header.stamp.secs}},
sort_query=[("_meta.inserted_at", pymongo.ASCENDING)]
)

def status_to_img(msg):
if sys.version_info.major < 3:
local_data_str = pickle.loads(msg.local_data)
else:
local_data_str = pickle.loads(
msg.local_data.encode('utf-8'), encoding='utf-8')
print("{} @{}".format(local_data_str['DESCRIPTION'],
datetime.fromtimestamp(msg.header.stamp.to_sec(), JST)))
imgmsg = None
if 'IMAGE' in local_data_str and local_data_str['IMAGE']:
imgmsg = CompressedImage()
imgmsg.deserialize(base64.b64decode(local_data_str['IMAGE']))
return imgmsg

return filter(lambda x: x is not None, map(lambda x: status_to_img(x[0]), msgs))
except Exception as e:
rospy.logerr('failed to load images from db: %s' % e)
return None


def query_images(now = datetime.now(JST)-timedelta(hours=0),
then = datetime.now(JST)-timedelta(hours=1)):
try:
rospy.loginfo("Loading images...")
msgs = msg_store.query(
CompressedImage._type,
{"_meta.inserted_at": {"$lt": now, "$gte": then}},
sort_query=[("_meta.inserted_at", pymongo.ASCENDING)]
)
return map(lambda x: x[0], msgs)
except Exception as e:
rospy.logerr('failed to load images from db: %s' % e)
return None

if __name__ == '__main__':
rospy.init_node('sample_robot_database')
db_name = 'jsk_robot_lifelog'
col_name = 'basil' # pr1012, fetch17 etc..
msg_store = MessageStoreProxy(database=db_name, collection=col_name)

print("> sample program for robot database")
print("> 1: get latest smach data")
print("> 2: get last 1 hours image data")
#key = int(input("> {1, 2, 3..} : "))
key = 2

if key == 1:
msgs = query_latest_smach()
elif key == 2:
msgs = query_images(then = datetime(2022, 11, 1, 17, 0, tzinfo=JST),
now = datetime(2022, 11, 1, 20, 10, tzinfo=JST))
else:
print("unknown inputs...")

# show data..
for msg in msgs:
print(" @{}".format(datetime.fromtimestamp(msg.header.stamp.to_sec(), JST)))
cv_image = bridge.compressed_imgmsg_to_cv2(msg, "bgr8")
cv2.imshow('image', cv_image)
cv2.waitKey(50)

0 comments on commit 36bbef2

Please sign in to comment.