Skip to content
This repository has been archived by the owner on Jun 2, 2020. It is now read-only.

Commit

Permalink
temporary commit
Browse files Browse the repository at this point in the history
  • Loading branch information
slimeth committed Feb 28, 2019
1 parent 1be3ccb commit 9da14ea
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 37 deletions.
2 changes: 1 addition & 1 deletion cpp/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ venv-doc/
# Archive
obsolete/

# Project specific
# Project specific
104 changes: 71 additions & 33 deletions cpp/src/library.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,75 @@
#include "queue.h"

namespace persipubsub {
static unsigned int MAX_READER_NUM = 1024;
static unsigned int MAX_DB_NUM = 1024;
static unsigned long MAX_DB_SIZE_BYTES = 32UL * 1024UL * 1024UL * 1024UL;

// define all database names here
static char DATA_DB[] = "data_db"; // msg_id | data
static char PENDING_DB[] = "pending_db"; // msg_id | pending subscriber
static char META_DB[] = "meta_db"; // msg_id | metadata
static char QUEUE_DB[] = "queue_db"; // queue_pth | all queue data

static char HWM_DB_SIZE_BYTES_KEY[] = "hwm_db_size_bytes";
static char MAX_MSGS_NUM_KEY[] = "max_msgs_num";
static char MSG_TIMEOUT_SECS_KEY[] = "msg_timeout_secs";
static char STRATEGY_KEY[] = "strategy";
static char SUBSCRIBER_IDS_KEY[] = "subscriber_ids";

struct QueueData{
unsigned int msg_timeout_secs_;
unsigned int max_msgs_num_;
unsigned long hwm_db_size_bytes_;
persipubsub::queue::Strategy strategy_;
std::vector<std::string> subscriber_ids_;

QueueData(unsigned int msg_timeout_secs, unsigned int max_msgs_num,
unsigned long hwm_db_size_bytes,
persipubsub::queue::Strategy strategy,
std::vector<std::string> subscriber_ids) :
msg_timeout_secs_(msg_timeout_secs),
max_msgs_num_(max_msgs_num),
hwm_db_size_bytes_(hwm_db_size_bytes), strategy_(strategy),
subscriber_ids_(std::move(subscriber_ids)) {};
};
QueueData lookup_queue_data(const lmdb::env& env);
static unsigned int MAX_READER_NUM = 1024;
static unsigned int MAX_DB_NUM = 1024;
static unsigned long MAX_DB_SIZE_BYTES = 32UL * 1024UL * 1024UL * 1024UL;

// define all database names here
static char DATA_DB[] = "data_db"; // msg_id | data
static char PENDING_DB[] = "pending_db"; // msg_id | pending subscriber
static char META_DB[] = "meta_db"; // msg_id | metadata
static char QUEUE_DB[] = "queue_db"; // queue_pth | all queue data

static char HWM_DB_SIZE_BYTES_KEY[] = "hwm_db_size_bytes";
static char MAX_MSGS_NUM_KEY[] = "max_msgs_num";
static char MSG_TIMEOUT_SECS_KEY[] = "msg_timeout_secs";
static char STRATEGY_KEY[] = "strategy";
static char SUBSCRIBER_IDS_KEY[] = "subscriber_ids";

struct QueueData {
unsigned int msg_timeout_secs_;
unsigned int max_msgs_num_;
unsigned long hwm_db_size_bytes_;
persipubsub::queue::Strategy strategy_;
std::vector<std::string> subscriber_ids_;

QueueData(unsigned int msg_timeout_secs, unsigned int max_msgs_num,
unsigned long hwm_db_size_bytes,
persipubsub::queue::Strategy strategy,
std::vector<std::string> subscriber_ids) :
msg_timeout_secs_(msg_timeout_secs),
max_msgs_num_(max_msgs_num),
hwm_db_size_bytes_(hwm_db_size_bytes), strategy_(strategy),
subscriber_ids_(std::move(subscriber_ids)) {};
};

QueueData lookup_queue_data(const lmdb::env &env);


class ReadTransaction {

public:
ReadTransaction(const lmdb::env& env) : rtxn_(lmdb::txn::begin(env, nullptr, MDB_RDONLY)) {}

~ReadTransaction() {
rtxn_.abort();
}

lmdb::txn rtxn_;
};

class WriteTransaction {

public:
WriteTransaction(const lmdb::env& env) : wtxn_(lmdb::txn::begin(env, nullptr)) {}

~WriteTransaction() {
wtxn_.commit();
}

lmdb::txn wtxn_;
};

class Cursor {

public:
Cursor(const lmdb::txn& txn, const lmdb::dbi& dbi) : cursor_(lmdb::cursor::open(txn, dbi)) {}

~Cursor() {
cursor_.close();
}

lmdb::cursor cursor_;
};
} // namespace persipubsub
14 changes: 12 additions & 2 deletions cpp/src/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,16 @@ lmdb::env persipubsub::queue::initialize_environment(const fs::path& queue_dir,
void persipubsub::queue::prune_dangling_messages_for(const persipubsub::queue::Queue& queue,
const std::vector<std::string> &subscriber_ids){

auto wtxn = lmdb::txn::begin(queue.env_);
/*todo check mistake:
* terminate called after throwing an instance of 'lmdb::runtime_error'
what(): mdb_txn_commit: Invalid argument
unknown location(0): fatal error in "test_put_to_single_subscriber": signal: SIGABRT (application abort requested)
/home/selim/workspace/pqry/persipubsub/cpp/test/test_queue.cpp(99): last checkpoint
terminate called recursively
*/

auto wtxn = persipubsub::WriteTransaction(queue.env_).wtxn_;
//auto wtxn = lmdb::txn::begin(queue.env_);
auto pending_dbi = lmdb::dbi::open(wtxn, persipubsub::PENDING_DB);
auto meta_dbi = lmdb::dbi::open(wtxn, persipubsub::META_DB);
auto data_dbi = lmdb::dbi::open(wtxn, persipubsub::DATA_DB);
Expand Down Expand Up @@ -116,7 +125,8 @@ void persipubsub::queue::prune_dangling_messages_for(const persipubsub::queue::Q
}
}

wtxn.commit();
//todo check if needed
//wtxn.commit();
}

// todo add transaction and cursor classes
Expand Down
2 changes: 1 addition & 1 deletion cpp/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ target_link_libraries (Test
${Boost_FILESYSTEM_LIBRARY}
${Boost_SYSTEM_LIBRARY}
${Boost_UNIT_TEST_FRAMEWORK_LIBRARY}
)
)

0 comments on commit 9da14ea

Please sign in to comment.