From 36bbef2978b38e50e73dca42c6fe9021cdb0365e Mon Sep 17 00:00:00 2001 From: Kei Okada Date: Fri, 28 Oct 2022 19:50:16 +0900 Subject: [PATCH] add more info on mongodb client - add example launch/script for mongo db - add information based on https://github.com/jsk-ros-pkg/jsk_robot/issues/1706 - add robot_databse_mongo_dump.py --- .../launch/robot_database_mongo_server.launch | 36 +++++ .../jsk_robot_startup/lifelog/README.md | 42 ++++++ .../scripts/robot_database_mongo_dump.py | 131 ++++++++++++++++++ .../scripts/robot_database_mongo_example.py | 109 +++++++++++++++ 4 files changed, 318 insertions(+) create mode 100644 jsk_robot_common/jsk_robot_startup/launch/robot_database_mongo_server.launch create mode 100755 jsk_robot_common/jsk_robot_startup/scripts/robot_database_mongo_dump.py create mode 100755 jsk_robot_common/jsk_robot_startup/scripts/robot_database_mongo_example.py diff --git a/jsk_robot_common/jsk_robot_startup/launch/robot_database_mongo_server.launch b/jsk_robot_common/jsk_robot_startup/launch/robot_database_mongo_server.launch new file mode 100644 index 0000000000..9a826b441d --- /dev/null +++ b/jsk_robot_common/jsk_robot_startup/launch/robot_database_mongo_server.launch @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/jsk_robot_common/jsk_robot_startup/lifelog/README.md b/jsk_robot_common/jsk_robot_startup/lifelog/README.md index 7ac192515a..bd7e71515e 100644 --- a/jsk_robot_common/jsk_robot_startup/lifelog/README.md +++ b/jsk_robot_common/jsk_robot_startup/lifelog/README.md @@ -85,3 +85,45 @@ Save object detection result to database * `~robot_frame` (String, default: `base_footprint`) robot base tf frame + + +## Examples + +You can start a mongodb logging system using `lifelog/mongodb.launch` and `lifelog/common_logger.launch` files. + See [jsk_pr2_lifelog/db_client.launch](https://github.com/jsk-ros-pkg/jsk_robot/blob/5d90d483aaa674d33968f34db83f53cd3d018bd4/jsk_pr2_robot/jsk_pr2_startup/jsk_pr2_lifelog/db_client.launch), [fetch_lifelog.xml](https://github.com/jsk-ros-pkg/jsk_robot/blob/921097b7a9c16cd99d0eb8f9f271bda4784dadc5/jsk_fetch_robot/jsk_fetch_startup/launch/fetch_lifelog.xml) and [jsk_baxter_lifelog/db_client.launch](https://github.com/jsk-ros-pkg/jsk_robot/blob/c03dc5af06d8b7786b4212b132047acaa229eb0e/jsk_baxter_robot/jsk_baxter_startup/jsk_baxter_lifelog/db_client.launch) for examples. + +## Sample client + +Sample program to retrieve stored data can be found at [jsk_robot_startup/scripts/robot_database_monogo_example.py](./scripts/robot_database_monogo_example.py). + +To connect replicator server (musca), you need to start [jsk_robot_startup/launch/robot_database_monogo_server.launch](./launch/robot_database_monogo_server.launch). Please be careful about `ROS_MASTER_URI`, because `robot_database_monogo_server.launch` will starts same node name as robot system nodes, thus it may corrupt some settings. + +## Troubleshooting + +If you encounter following error +``` +[ERROR] [1666693339.502586]: Could not get message store services. Maybe the message store has not been started? Retrying.. +[ERROR] [1666693339.502586]: Could not get message store services. Maybe the message store has not been started? Retrying.. +``` + +1) Is mongodb service working correctly? +``` +sudo systemctl status mongodb.service +sudo systemctl start mongodb.service +``` + +2) Is `mongo` command works? +``` +mongo localhost:27017 +``` + +3) Check /etc/mongodb.conf +``` +$ cat /etc/mongodb.conf | grep -v ^# +dbpath=/var/lib/mongodb +logpath=/var/log/mongodb/mongodb.log +logappend=true +bind_ip = 0.0.0.0 +journal=true +``` +Default `bind_jp` is `127.0.0.1`, and it not work on some machines, see [#1706](https://github.com/jsk-ros-pkg/jsk_robot/issues/1706) for more info. diff --git a/jsk_robot_common/jsk_robot_startup/scripts/robot_database_mongo_dump.py b/jsk_robot_common/jsk_robot_startup/scripts/robot_database_mongo_dump.py new file mode 100755 index 0000000000..d770c91c6f --- /dev/null +++ b/jsk_robot_common/jsk_robot_startup/scripts/robot_database_mongo_dump.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# +# Copied from https://github.com/strands-project/mongodb_store/blob/melodic-devel/mongodb_store/scripts/mongodb_play.py +# +from __future__ import print_function + +import argparse +import calendar +import cv2 +import datetime +import json +import mongodb_store.util as mg_util +import os +import pymongo +import rospy +import sys +import yaml + +from cv_bridge import CvBridge +from dateutil import tz +from sensor_msgs.msg import Image, CompressedImage + +UTC = tz.gettz('UTC') +JST = tz.gettz('Asia/Tokyo') + +MongoClient = mg_util.import_MongoClient() + +TIME_KEY = '_meta.inserted_at' + +def max_time(collection): + return collection.find_one(sort=[(TIME_KEY, pymongo.DESCENDING)])['_meta']['inserted_at'] + +def min_time(collection): + return collection.find_one(sort=[(TIME_KEY, pymongo.ASCENDING)])['_meta']['inserted_at'] + +def to_ros_time(dt): + return rospy.Time(calendar.timegm(dt.utctimetuple()), dt.microsecond * 1000) + +def to_datetime(rt): + return datetime.datetime.utcfromtimestamp(rt.secs) + datetime.timedelta(microseconds = rt.nsecs / 1000) + +def ros_time_strftime(rt, format): + """ converts a ros time to a datetime and calls strftime on it with the given format """ + return to_datetime(rt).strftime(format) + +def mkdatetime(date_string): + return datetime.datetime.strptime(date_string, '%Y-%m-%d %H:%M:%S') + +def main(argv): + parser = argparse.ArgumentParser() + parser.add_argument("--host", default="musca.jsk.imi.i.u-tokyo.ac.jp") + parser.add_argument("--port", "-p", default=27017) + parser.add_argument("--database", "-d", default="jsk_robot_lifelog") + parser.add_argument("--collection", "-c", default="basil") + parser.add_argument("--start", "-s", default="", help='start datetime of query, defaults to the earliest date stored in db, across all requested collections. Formatted "Y-m-d H:M" e.g. "2022-07-14 06:38:00"') + parser.add_argument("--end", "-e", default="", help='end datetime of query, defaults to the latest date stored in db, across all requested collections. Formatted "Y-m-d H:M" e.g. "2022-07-14 06:39:00"') + args = parser.parse_args() + bridge = CvBridge() + + print("Connecting ... {}:{}".format(args.host, args.port)) + mongo_client=MongoClient(args.host, args.port) + collection = mongo_client[args.database][args.collection] + print("Selected ... {}/{}".format(args.database,args.collection)) + + # make sure there's an index on time in the collection so the sort operation doesn't require the whole collection to be loaded + collection.ensure_index(TIME_KEY) + + # get the min and max time across all collections, conver to ros time + if args.start: + start_time = mkdatetime(args.start).replace(tzinfo=JST) + else: + start_time = min_time(collection).replace(tzinfo=UTC).astimezone(JST) + + if args.end: + end_time = mkdatetime(args.end).replace(tzinfo=JST) + else: + end_time = max_time(collection).replace(tzinfo=UTC).astimezone(JST) + + print(" From : {}".format(start_time.strftime('%Y-%m-%d %H:%M:%S'))) + print(" To : {}".format(end_time.strftime('%Y-%m-%d %H:%M:%S'))) + + documents = collection.find({TIME_KEY: { '$gte': start_time, '$lte': end_time}}, sort=[(TIME_KEY, pymongo.ASCENDING)]) + documents_count = documents.count() + print("This document contains {} messages".format(documents_count)) + # print(documents[0]['_meta']['inserted_at']) + # print(documents[documents_count-1]['_meta']['inserted_at']) + + dirname = collection.full_name + start_time.strftime('_%Y-%m-%d_%H:%M:%S_') + end_time.strftime('_%Y-%m-%d_%H:%M:%S') + if not os.path.exists(dirname): + os.mkdir(dirname) + msg_classes = {} + for d in documents: + stored_class = d['_meta']['stored_class'] + input_topic = d['_meta']['input_topic'] + inserted_at = d['_meta']['inserted_at'] + ros_time = to_ros_time(inserted_at) + if stored_class in msg_classes: + msg_class = msg_classes[stored_class] + else: + try: + msg_class = msg_classes[stored_class] = mg_util.load_class(stored_class) + except ImportError as e: + print(";;") + print(";; ImportError: {}".format(e)) + print(";;") + print(";; try install ros-{}-{}".format(os.environ['ROS_DISTRO'], stored_class.split('.')[0].replace('_','-'))) + print(";;") + sys.exit(-1) + message = mg_util.dictionary_to_message(d, msg_class) + + if type(message) == Image: + filename = "{}{}.jpg".format(ros_time.to_nsec(), input_topic.replace('/','-')) + image = bridge.imgmsg_to_cv2(message) + cv2.imwrite(os.path.join(dirname, filename), image) + elif type(message) == CompressedImage: + filename = "{}{}.jpg".format(ros_time.to_nsec(), input_topic.replace('/','-')) + image = bridge.compressed_imgmsg_to_cv2(message) + cv2.imwrite(os.path.join(dirname, filename), image) + else: + filename = "{}{}.json".format(ros_time.to_nsec(), input_topic.replace('/','-')) + with open(os.path.join(dirname, filename), "wb") as f: + # f.write(yaml.dump(yaml.load(str(message)), allow_unicode=True)) + f.write(json.dumps(yaml.load(str(message)), indent=4, ensure_ascii=False)) + print("Writing.. {} ({})".format(filename, inserted_at)) + + +# processes load main so move init_node out +if __name__ == "__main__": + main(sys.argv) diff --git a/jsk_robot_common/jsk_robot_startup/scripts/robot_database_mongo_example.py b/jsk_robot_common/jsk_robot_startup/scripts/robot_database_mongo_example.py new file mode 100755 index 0000000000..ccd784752f --- /dev/null +++ b/jsk_robot_common/jsk_robot_startup/scripts/robot_database_mongo_example.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import base64 +import numpy as np +import pickle +import rospy +import sys + +# date +from datetime import datetime, timedelta, tzinfo +from dateutil import tz +import calendar +import pytz + +# mongo +from mongodb_store.message_store import MessageStoreProxy +import mongodb_store.util as MU +import pymongo + +# message +from jsk_robot_startup.lifelog.logger_base import LoggerBase +from sensor_msgs.msg import CompressedImage +from smach_msgs.msg import SmachContainerStatus + +# image processing +from cv_bridge import CvBridge +import cv2 + +# global variabels +JST = tz.gettz('Asia/Tokyo') +bridge = CvBridge() + +# sample functions +def query_latest_smach(): + try: + rospy.loginfo("Loading last smach execution...") + last_msg, _ = msg_store.query( + SmachContainerStatus._type, + {"info": "START"}, + single=True, + sort_query=[("_meta.inserted_at", pymongo.DESCENDING)] + ) + msgs = msg_store.query( + SmachContainerStatus._type, + {"header.stamp.secs": {"$gt": last_msg.header.stamp.secs}}, + sort_query=[("_meta.inserted_at", pymongo.ASCENDING)] + ) + + def status_to_img(msg): + if sys.version_info.major < 3: + local_data_str = pickle.loads(msg.local_data) + else: + local_data_str = pickle.loads( + msg.local_data.encode('utf-8'), encoding='utf-8') + print("{} @{}".format(local_data_str['DESCRIPTION'], + datetime.fromtimestamp(msg.header.stamp.to_sec(), JST))) + imgmsg = None + if 'IMAGE' in local_data_str and local_data_str['IMAGE']: + imgmsg = CompressedImage() + imgmsg.deserialize(base64.b64decode(local_data_str['IMAGE'])) + return imgmsg + + return filter(lambda x: x is not None, map(lambda x: status_to_img(x[0]), msgs)) + except Exception as e: + rospy.logerr('failed to load images from db: %s' % e) + return None + + +def query_images(now = datetime.now(JST)-timedelta(hours=0), + then = datetime.now(JST)-timedelta(hours=1)): + try: + rospy.loginfo("Loading images...") + msgs = msg_store.query( + CompressedImage._type, + {"_meta.inserted_at": {"$lt": now, "$gte": then}}, + sort_query=[("_meta.inserted_at", pymongo.ASCENDING)] + ) + return map(lambda x: x[0], msgs) + except Exception as e: + rospy.logerr('failed to load images from db: %s' % e) + return None + +if __name__ == '__main__': + rospy.init_node('sample_robot_database') + db_name = 'jsk_robot_lifelog' + col_name = 'basil' # pr1012, fetch17 etc.. + msg_store = MessageStoreProxy(database=db_name, collection=col_name) + + print("> sample program for robot database") + print("> 1: get latest smach data") + print("> 2: get last 1 hours image data") + #key = int(input("> {1, 2, 3..} : ")) + key = 2 + + if key == 1: + msgs = query_latest_smach() + elif key == 2: + msgs = query_images(then = datetime(2022, 11, 1, 17, 0, tzinfo=JST), + now = datetime(2022, 11, 1, 20, 10, tzinfo=JST)) + else: + print("unknown inputs...") + + # show data.. + for msg in msgs: + print(" @{}".format(datetime.fromtimestamp(msg.header.stamp.to_sec(), JST))) + cv_image = bridge.compressed_imgmsg_to_cv2(msg, "bgr8") + cv2.imshow('image', cv_image) + cv2.waitKey(50)