From c30d2ee374989391234b7ce7bdecd14b8aa4d1fe Mon Sep 17 00:00:00 2001 From: William Casarin Date: Sat, 14 Dec 2024 01:15:59 -0800 Subject: [PATCH 1/5] migration: dont fail v3 -> v4 on 0 migrations Signed-off-by: William Casarin --- src/nostrdb.c | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/nostrdb.c b/src/nostrdb.c index 068ef0c..429e774 100644 --- a/src/nostrdb.c +++ b/src/nostrdb.c @@ -1511,7 +1511,7 @@ static int ndb_rebuild_note_indices(struct ndb_txn *txn, enum ndb_dbs *indices, for (i = 0; i < num_indices; i++) { if (!ndb_db_is_index(indices[i])) { fprintf(stderr, "ndb_rebuild_note_index: %s is not an index db\n", ndb_db_name(index)); - return 0; + return -1; } } @@ -1520,13 +1520,13 @@ static int ndb_rebuild_note_indices(struct ndb_txn *txn, enum ndb_dbs *indices, index = indices[i]; if (mdb_drop(txn->mdb_txn, index, drop_dbi)) { fprintf(stderr, "ndb_rebuild_pubkey_index: mdb_drop failed for %s\n", ndb_db_name(index)); - return 0; + return -1; } } if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE], &cur))) { fprintf(stderr, "ndb_migrate_user_search_indices: mdb_cursor_open failed, error %d\n", rc); - return 0; + return -1; } count = 0; @@ -1549,7 +1549,7 @@ static int ndb_rebuild_note_indices(struct ndb_txn *txn, enum ndb_dbs *indices, case NDB_DBS: // this should never happen since we check at // the start - count = 0; + count = -1; goto cleanup; case NDB_DB_PROFILE_PK: case NDB_DB_NOTE_KIND: @@ -1557,17 +1557,17 @@ static int ndb_rebuild_note_indices(struct ndb_txn *txn, enum ndb_dbs *indices, case NDB_DB_NOTE_BLOCKS: case NDB_DB_NOTE_TAGS: fprintf(stderr, "%s index rebuild not supported yet. sorry.\n", ndb_db_name(index)); - count = 0; + count = -1; goto cleanup; case NDB_DB_NOTE_PUBKEY: if (!ndb_write_note_pubkey_index(txn, note, note_key)) { - count = 0; + count = -1; goto cleanup; } break; case NDB_DB_NOTE_PUBKEY_KIND: if (!ndb_write_note_pubkey_kind_index(txn, note, note_key)) { - count = 0; + count = -1; goto cleanup; } break; @@ -1599,15 +1599,15 @@ static int ndb_migrate_profile_indices(struct ndb *ndb) } enum ndb_dbs indices[] = {NDB_DB_NOTE_PUBKEY, NDB_DB_NOTE_PUBKEY_KIND}; - if ((count = ndb_rebuild_note_indices(&txn, indices, 2))) { + if ((count = ndb_rebuild_note_indices(&txn, indices, 2)) != -1) { fprintf(stderr, "migrated %d notes to have pubkey and pubkey_kind indices\n", count); ndb_end_query(&txn); + return 1; } else { fprintf(stderr, "error migrating notes to have pubkey and pubkey_kind indices, aborting.\n"); mdb_txn_abort(txn.mdb_txn); + return 0; } - - return count; } static int ndb_migrate_user_search_indices(struct ndb *ndb) From 6ca3f8df654c00899f3b23f88dbd4c8f7e09c454 Mon Sep 17 00:00:00 2001 From: William Casarin Date: Sun, 15 Dec 2024 10:00:30 -0800 Subject: [PATCH 2/5] leak: fix memory leak when failing to write like stats --- src/nostrdb.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/nostrdb.c b/src/nostrdb.c index 429e774..deeae80 100644 --- a/src/nostrdb.c +++ b/src/nostrdb.c @@ -2835,6 +2835,7 @@ static int ndb_write_reaction_stats(struct ndb_txn *txn, struct ndb_note *note) if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_META], &key, &val, 0))) { ndb_debug("write reaction stats to db failed: %s\n", mdb_strerror(rc)); + free(root); return 0; } From 522ad8895fb4e751ff703b23e5d5a8ea86306663 Mon Sep 17 00:00:00 2001 From: William Casarin Date: Sun, 15 Dec 2024 11:11:28 -0800 Subject: [PATCH 3/5] writer: rename any_note to needs_commit This is a bit more clear as to what this variable actually means Signed-off-by: William Casarin --- src/nostrdb.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/nostrdb.c b/src/nostrdb.c index deeae80..0a92d32 100644 --- a/src/nostrdb.c +++ b/src/nostrdb.c @@ -4313,7 +4313,7 @@ static void *ndb_writer_thread(void *data) struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg; struct written_note written_notes[THREAD_QUEUE_BATCH]; size_t scratch_size; - int i, popped, done, any_note, num_notes; + int i, popped, done, needs_commit, num_notes; uint64_t note_nkey; struct ndb_txn txn; unsigned char *scratch; @@ -4332,20 +4332,20 @@ static void *ndb_writer_thread(void *data) popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH); ndb_debug("writer popped %d items\n", popped); - any_note = 0; + needs_commit = 0; for (i = 0 ; i < popped; i++) { msg = &msgs[i]; switch (msg->type) { - case NDB_WRITER_NOTE: any_note = 1; break; - case NDB_WRITER_PROFILE: any_note = 1; break; - case NDB_WRITER_DBMETA: any_note = 1; break; - case NDB_WRITER_PROFILE_LAST_FETCH: any_note = 1; break; - case NDB_WRITER_BLOCKS: any_note = 1; break; + case NDB_WRITER_NOTE: needs_commit = 1; break; + case NDB_WRITER_PROFILE: needs_commit = 1; break; + case NDB_WRITER_DBMETA: needs_commit = 1; break; + case NDB_WRITER_PROFILE_LAST_FETCH: needs_commit = 1; break; + case NDB_WRITER_BLOCKS: needs_commit = 1; break; case NDB_WRITER_QUIT: break; } } - if (any_note && mdb_txn_begin(txn.lmdb->env, NULL, 0, (MDB_txn **)&txn.mdb_txn)) + if (needs_commit && mdb_txn_begin(txn.lmdb->env, NULL, 0, (MDB_txn **)&txn.mdb_txn)) { fprintf(stderr, "writer thread txn_begin failed"); // should definitely not happen unless DB is full @@ -4410,7 +4410,7 @@ static void *ndb_writer_thread(void *data) } // commit writes - if (any_note) { + if (needs_commit) { if (!ndb_end_query(&txn)) { ndb_debug("writer thread txn commit failed\n"); } else { From 6d63a70f95d2c1420185d459b6dd5cea82994af3 Mon Sep 17 00:00:00 2001 From: William Casarin Date: Sun, 15 Dec 2024 11:23:09 -0800 Subject: [PATCH 4/5] flags: make some indexes optional Make fulltext indices and note blocks optional. This will be useful for quickly building databases when testing, since more stuff in the write queue when writing can slow things down. Signed-off-by: William Casarin --- src/nostrdb.c | 28 ++++++++++++++++++---------- src/nostrdb.h | 5 ++++- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/src/nostrdb.c b/src/nostrdb.c index 0a92d32..704f114 100644 --- a/src/nostrdb.c +++ b/src/nostrdb.c @@ -162,6 +162,7 @@ struct ndb_writer { struct ndb_lmdb *lmdb; struct ndb_monitor *monitor; + uint32_t ndb_flags; void *queue_buf; int queue_buflen; pthread_t thread_id; @@ -4171,7 +4172,8 @@ static int ndb_write_new_blocks(struct ndb_txn *txn, struct ndb_note *note, static uint64_t ndb_write_note(struct ndb_txn *txn, struct ndb_writer_note *note, - unsigned char *scratch, size_t scratch_size) + unsigned char *scratch, size_t scratch_size, + uint32_t ndb_flags) { int rc; uint64_t note_key; @@ -4207,15 +4209,18 @@ static uint64_t ndb_write_note(struct ndb_txn *txn, // only parse content and do fulltext index on text and longform notes if (note->note->kind == 1 || note->note->kind == 30023) { - if (!ndb_write_note_fulltext_index(txn, note->note, note_key)) - return 0; + if (!ndb_flag_set(ndb_flags, NDB_FLAG_NO_FULLTEXT)) { + if (!ndb_write_note_fulltext_index(txn, note->note, note_key)) + return 0; + } // write note blocks - ndb_write_new_blocks(txn, note->note, note_key, scratch, - scratch_size); + if (!ndb_flag_set(ndb_flags, NDB_FLAG_NO_NOTE_BLOCKS)) { + ndb_write_new_blocks(txn, note->note, note_key, scratch, scratch_size); + } } - if (note->note->kind == 7) { + if (note->note->kind == 7 && !ndb_flag_set(ndb_flags, NDB_FLAG_NO_STATS)) { ndb_write_reaction_stats(txn, note->note); } @@ -4365,7 +4370,8 @@ static void *ndb_writer_thread(void *data) case NDB_WRITER_PROFILE: note_nkey = ndb_write_note(&txn, &msg->note, - scratch, scratch_size); + scratch, scratch_size, + writer->ndb_flags); if (note_nkey > 0) { written_notes[num_notes++] = (struct written_note){ @@ -4384,7 +4390,8 @@ static void *ndb_writer_thread(void *data) case NDB_WRITER_NOTE: note_nkey = ndb_write_note(&txn, &msg->note, scratch, - scratch_size); + scratch_size, + writer->ndb_flags); if (note_nkey > 0) { written_notes[num_notes++] = (struct written_note){ @@ -4514,10 +4521,11 @@ static void *ndb_ingester_thread(void *data) static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb, - struct ndb_monitor *monitor) + struct ndb_monitor *monitor, uint32_t ndb_flags) { writer->lmdb = lmdb; writer->monitor = monitor; + writer->ndb_flags = ndb_flags; writer->queue_buflen = sizeof(struct ndb_writer_msg) * DEFAULT_QUEUE_SIZE; writer->queue_buf = malloc(writer->queue_buflen); if (writer->queue_buf == NULL) { @@ -4842,7 +4850,7 @@ int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *c ndb_monitor_init(&ndb->monitor, config->sub_cb, config->sub_cb_ctx); - if (!ndb_writer_init(&ndb->writer, &ndb->lmdb, &ndb->monitor)) { + if (!ndb_writer_init(&ndb->writer, &ndb->lmdb, &ndb->monitor, ndb->flags)) { fprintf(stderr, "ndb_writer_init failed\n"); return 0; } diff --git a/src/nostrdb.h b/src/nostrdb.h index 9e86379..d4722e7 100644 --- a/src/nostrdb.h +++ b/src/nostrdb.h @@ -9,8 +9,11 @@ #define NDB_PACKED_STR 0x1 #define NDB_PACKED_ID 0x2 -#define NDB_FLAG_NOMIGRATE (1 << 0) +#define NDB_FLAG_NOMIGRATE (1 << 0) #define NDB_FLAG_SKIP_NOTE_VERIFY (1 << 1) +#define NDB_FLAG_NO_FULLTEXT (1 << 2) +#define NDB_FLAG_NO_NOTE_BLOCKS (1 << 3) +#define NDB_FLAG_NO_STATS (1 << 4) //#define DEBUG 1 From 423598b0f747920369a8625d9aca5298b8e6aa59 Mon Sep 17 00:00:00 2001 From: William Casarin Date: Sun, 15 Dec 2024 10:30:01 -0800 Subject: [PATCH 5/5] migrations: make migrations asyncronous This also seems to fix some issues with older migrations. Fixes: https://github.com/damus-io/nostrdb/issues/58 --- src/nostrdb.c | 296 ++++++++++++++++++++++++-------------------------- src/nostrdb.h | 2 +- test.c | 11 +- 3 files changed, 153 insertions(+), 156 deletions(-) diff --git a/src/nostrdb.c b/src/nostrdb.c index 704f114..7b52340 100644 --- a/src/nostrdb.c +++ b/src/nostrdb.c @@ -57,7 +57,7 @@ static const int DEFAULT_QUEUE_SIZE = 32768; #define NDB_PARSED_TAGS (1 << 6) #define NDB_PARSED_ALL (NDB_PARSED_ID|NDB_PARSED_PUBKEY|NDB_PARSED_SIG|NDB_PARSED_CREATED_AT|NDB_PARSED_KIND|NDB_PARSED_CONTENT|NDB_PARSED_TAGS) -typedef int (*ndb_migrate_fn)(struct ndb *); +typedef int (*ndb_migrate_fn)(struct ndb_txn *); typedef int (*ndb_word_parser_fn)(void *, const char *word, int word_len, int word_index); @@ -135,6 +135,7 @@ enum ndb_writer_msgtype { NDB_WRITER_DBMETA, // write ndb metadata NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched NDB_WRITER_BLOCKS, // write parsed note blocks + NDB_WRITER_MIGRATE, // migrate the database }; // keys used for storing data in the NDB metadata database (NDB_DB_NDB_META) @@ -1589,47 +1590,33 @@ static int ndb_rebuild_note_indices(struct ndb_txn *txn, enum ndb_dbs *indices, // // This was before we had note_profile_pubkey{,_kind} indices. Let's create them. -static int ndb_migrate_profile_indices(struct ndb *ndb) +static int ndb_migrate_profile_indices(struct ndb_txn *txn) { - struct ndb_txn txn; int count; - if (!ndb_begin_rw_query(ndb, &txn)) { - fprintf(stderr, "ndb_migrate_profile_indices: ndb_begin_rw_query failed\n"); - return 0; - } - enum ndb_dbs indices[] = {NDB_DB_NOTE_PUBKEY, NDB_DB_NOTE_PUBKEY_KIND}; - if ((count = ndb_rebuild_note_indices(&txn, indices, 2)) != -1) { + if ((count = ndb_rebuild_note_indices(txn, indices, 2)) != -1) { fprintf(stderr, "migrated %d notes to have pubkey and pubkey_kind indices\n", count); - ndb_end_query(&txn); return 1; } else { fprintf(stderr, "error migrating notes to have pubkey and pubkey_kind indices, aborting.\n"); - mdb_txn_abort(txn.mdb_txn); return 0; } } -static int ndb_migrate_user_search_indices(struct ndb *ndb) +static int ndb_migrate_user_search_indices(struct ndb_txn *txn) { int rc; MDB_cursor *cur; MDB_val k, v; void *profile_root; NdbProfileRecord_table_t record; - struct ndb_txn txn; struct ndb_note *note; uint64_t note_key, profile_key; size_t len; int count; - if (!ndb_begin_rw_query(ndb, &txn)) { - fprintf(stderr, "ndb_migrate_user_search_indices: ndb_begin_rw_query failed\n"); - return 0; - } - - if ((rc = mdb_cursor_open(txn.mdb_txn, ndb->lmdb.dbs[NDB_DB_PROFILE], &cur))) { + if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE], &cur))) { fprintf(stderr, "ndb_migrate_user_search_indices: mdb_cursor_open failed, error %d\n", rc); return 0; } @@ -1642,18 +1629,16 @@ static int ndb_migrate_user_search_indices(struct ndb *ndb) profile_key = *((uint64_t*)k.mv_data); record = NdbProfileRecord_as_root(profile_root); note_key = NdbProfileRecord_note_key(record); - note = ndb_get_note_by_key(&txn, note_key, &len); + note = ndb_get_note_by_key(txn, note_key, &len); if (note == NULL) { - fprintf(stderr, "ndb_migrate_user_search_indices: note lookup failed\n"); - return 0; + continue; } - if (!ndb_write_profile_search_indices(&txn, note, profile_key, + if (!ndb_write_profile_search_indices(txn, note, profile_key, profile_root)) { - fprintf(stderr, "ndb_migrate_user_search_indices: ndb_write_profile_search_indices failed\n"); - return 0; + continue; } count++; @@ -1663,51 +1648,33 @@ static int ndb_migrate_user_search_indices(struct ndb *ndb) mdb_cursor_close(cur); - ndb_end_query(&txn); - return 1; } -static int ndb_migrate_lower_user_search_indices(struct ndb *ndb) +static int ndb_migrate_lower_user_search_indices(struct ndb_txn *txn) { - MDB_txn *txn; - - if (mdb_txn_begin(ndb->lmdb.env, NULL, 0, &txn)) { - fprintf(stderr, "ndb_migrate_lower_user_search_indices: ndb_txn_begin failed\n"); - return 0; - } - // just drop the search db so we can rebuild it - if (mdb_drop(txn, ndb->lmdb.dbs[NDB_DB_PROFILE_SEARCH], 0)) { + if (mdb_drop(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 0)) { fprintf(stderr, "ndb_migrate_lower_user_search_indices: mdb_drop failed\n"); return 0; } - mdb_txn_commit(txn); - - return ndb_migrate_user_search_indices(ndb); + return ndb_migrate_user_search_indices(txn); } int ndb_process_profile_note(struct ndb_note *note, struct ndb_profile_record_builder *profile); -int ndb_db_version(struct ndb *ndb) +int ndb_db_version(struct ndb_txn *txn) { - int rc; uint64_t version, version_key; MDB_val k, v; - MDB_txn *txn; version_key = NDB_META_KEY_VERSION; k.mv_data = &version_key; k.mv_size = sizeof(version_key); - if ((rc = mdb_txn_begin(ndb->lmdb.env, NULL, 0, &txn))) { - fprintf(stderr, "ndb_db_version: mdb_txn_begin failed, error %d\n", rc); - return -1; - } - - if (mdb_get(txn, ndb->lmdb.dbs[NDB_DB_NDB_META], &k, &v)) { + if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &k, &v)) { version = -1; } else { if (v.mv_size != 8) { @@ -1717,7 +1684,6 @@ int ndb_db_version(struct ndb *ndb) version = *((uint64_t*)v.mv_data); } - mdb_txn_abort(txn); return version; } @@ -1857,30 +1823,29 @@ static inline int ndb_writer_queue_msg(struct ndb_writer *writer, return prot_queue_push(&writer->inbox, msg); } -static int ndb_migrate_utf8_profile_names(struct ndb *ndb) +static uint64_t ndb_write_note_and_profile(struct ndb_txn *txn, struct ndb_writer_profile *profile, unsigned char *scratch, size_t scratch_size, uint32_t ndb_flags); +static int ndb_migrate_utf8_profile_names(struct ndb_txn *txn) { int rc; MDB_cursor *cur; MDB_val k, v; void *profile_root; NdbProfileRecord_table_t record; - struct ndb_txn txn; struct ndb_note *note, *copied_note; uint64_t note_key; size_t len; - int count, failed; - struct ndb_writer_msg out; + int count, failed, ret; + struct ndb_writer_profile profile; - if (!ndb_begin_rw_query(ndb, &txn)) { - fprintf(stderr, "ndb_migrate_utf8_profile_names: ndb_begin_rw_query failed\n"); - return 0; - } - - if ((rc = mdb_cursor_open(txn.mdb_txn, ndb->lmdb.dbs[NDB_DB_PROFILE], &cur))) { + if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE], &cur))) { fprintf(stderr, "ndb_migrate_utf8_profile_names: mdb_cursor_open failed, error %d\n", rc); return 0; } + size_t scratch_size = 8 * 1024 * 1024; + unsigned char *scratch = malloc(scratch_size); + + ret = 1; count = 0; failed = 0; @@ -1889,14 +1854,14 @@ static int ndb_migrate_utf8_profile_names(struct ndb *ndb) profile_root = v.mv_data; record = NdbProfileRecord_as_root(profile_root); note_key = NdbProfileRecord_note_key(record); - note = ndb_get_note_by_key(&txn, note_key, &len); + note = ndb_get_note_by_key(txn, note_key, &len); if (note == NULL) { - fprintf(stderr, "ndb_migrate_utf8_profile_names: note lookup failed\n"); - return 0; + failed++; + continue; } - struct ndb_profile_record_builder *b = &out.profile.record; + struct ndb_profile_record_builder *b = &profile.record; // reprocess profile if (!ndb_process_profile_note(note, b)) { @@ -1908,13 +1873,14 @@ static int ndb_migrate_utf8_profile_names(struct ndb *ndb) copied_note = malloc(len); memcpy(copied_note, note, len); - out.type = NDB_WRITER_PROFILE; - out.profile.note.note = copied_note; - out.profile.note.note_len = len; + profile.note.note = copied_note; + profile.note.note_len = len; - ndb_writer_queue_msg(&ndb->writer, &out); - - count++; + // we don't pass in flags when migrating... a bit sketchy but + // whatever. noone is using this to customize nostrdb atm + if (ndb_write_note_and_profile(txn, &profile, scratch, scratch_size, 0)) { + count++; + } } fprintf(stderr, "migrated %d profiles to fix utf8 profile names\n", count); @@ -1923,11 +1889,10 @@ static int ndb_migrate_utf8_profile_names(struct ndb *ndb) fprintf(stderr, "failed to migrate %d profiles to fix utf8 profile names\n", failed); } + free(scratch); mdb_cursor_close(cur); - ndb_end_query(&txn); - - return 1; + return ret; } static struct ndb_migration MIGRATIONS[] = { @@ -4227,29 +4192,6 @@ static uint64_t ndb_write_note(struct ndb_txn *txn, return note_key; } -// only to be called from the writer thread -static void ndb_write_version(struct ndb_txn *txn, uint64_t version) -{ - int rc; - MDB_val key, val; - uint64_t version_key; - - version_key = NDB_META_KEY_VERSION; - - key.mv_data = &version_key; - key.mv_size = sizeof(version_key); - val.mv_data = &version; - val.mv_size = sizeof(version); - - if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) { - ndb_debug("write version to ndb_meta failed: %s\n", - mdb_strerror(rc)); - return; - } - - //fprintf(stderr, "writing version %" PRIu64 "\n", version); -} - static void ndb_monitor_lock(struct ndb_monitor *mon) { pthread_mutex_lock(&mon->mutex); } @@ -4311,6 +4253,93 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor, ndb_monitor_unlock(monitor); } +uint64_t ndb_write_note_and_profile( + struct ndb_txn *txn, + struct ndb_writer_profile *profile, + unsigned char *scratch, + size_t scratch_size, + uint32_t ndb_flags) +{ + uint64_t note_nkey; + + note_nkey = ndb_write_note(txn, &profile->note, scratch, scratch_size, ndb_flags); + + if (profile->record.builder) { + // only write if parsing didn't fail + ndb_write_profile(txn, profile, note_nkey); + } + + return note_nkey; +} + +// only to be called from the writer thread +static int ndb_write_version(struct ndb_txn *txn, uint64_t version) +{ + int rc; + MDB_val key, val; + uint64_t version_key; + + version_key = NDB_META_KEY_VERSION; + + key.mv_data = &version_key; + key.mv_size = sizeof(version_key); + val.mv_data = &version; + val.mv_size = sizeof(version); + + if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) { + ndb_debug("write version to ndb_meta failed: %s\n", + mdb_strerror(rc)); + return 0; + } + + //fprintf(stderr, "writing version %" PRIu64 "\n", version); + return 1; +} + + +static int ndb_run_migrations(struct ndb_txn *txn) +{ + int64_t version, latest_version, i; + + latest_version = sizeof(MIGRATIONS) / sizeof(MIGRATIONS[0]); + + if ((version = ndb_db_version(txn)) == -1) { + ndb_debug("run_migrations: no version found, assuming new db\n"); + version = latest_version; + + // no version found. fresh db? + if (!ndb_write_version(txn, version)) { + fprintf(stderr, "run_migrations: failed writing db version"); + return 0; + } + + return 1; + } else { + ndb_debug("ndb: version %" PRIu64 " found\n", version); + } + + if (version < latest_version) + fprintf(stderr, "nostrdb: migrating v%d -> v%d\n", + (int)version, (int)latest_version); + + for (i = version; i < latest_version; i++) { + if (!MIGRATIONS[i].fn(txn)) { + fprintf(stderr, "run_migrations: migration v%d -> v%d failed\n", (int)i, (int)(i+1)); + return 0; + } + + if (!ndb_write_version(txn, i+1)) { + fprintf(stderr, "run_migrations: failed writing db version"); + return 0; + } + + version = i+1; + } + + return 1; +} + + static void *ndb_writer_thread(void *data) { ndb_debug("started writer thread\n"); @@ -4346,6 +4375,7 @@ static void *ndb_writer_thread(void *data) case NDB_WRITER_DBMETA: needs_commit = 1; break; case NDB_WRITER_PROFILE_LAST_FETCH: needs_commit = 1; break; case NDB_WRITER_BLOCKS: needs_commit = 1; break; + case NDB_WRITER_MIGRATE: needs_commit = 1; break; case NDB_WRITER_QUIT: break; } } @@ -4369,24 +4399,22 @@ static void *ndb_writer_thread(void *data) continue; case NDB_WRITER_PROFILE: note_nkey = - ndb_write_note(&txn, &msg->note, - scratch, scratch_size, - writer->ndb_flags); + ndb_write_note_and_profile( + &txn, + &msg->profile, + scratch, + scratch_size, + writer->ndb_flags); + if (note_nkey > 0) { written_notes[num_notes++] = (struct written_note){ .note_id = note_nkey, - .note = &msg->note, + .note = &msg->profile.note, }; } else { ndb_debug("failed to write note\n"); } - if (msg->profile.record.builder) { - // only write if parsing didn't fail - ndb_write_profile(&txn, &msg->profile, - note_nkey); - } - break; case NDB_WRITER_NOTE: note_nkey = ndb_write_note(&txn, &msg->note, scratch, @@ -4407,6 +4435,12 @@ static void *ndb_writer_thread(void *data) ndb_write_blocks(&txn, msg->blocks.note_key, msg->blocks.blocks); break; + case NDB_WRITER_MIGRATE: + if (!ndb_run_migrations(&txn)) { + mdb_txn_abort(txn.mdb_txn); + goto bail; + } + break; case NDB_WRITER_PROFILE_LAST_FETCH: ndb_writer_last_profile_fetch(&txn, msg->last_fetch.pubkey, @@ -4443,6 +4477,7 @@ static void *ndb_writer_thread(void *data) } } +bail: free(scratch); ndb_debug("quitting writer thread\n"); return NULL; @@ -4753,50 +4788,6 @@ static int ndb_queue_write_version(struct ndb *ndb, uint64_t version) return ndb_writer_queue_msg(&ndb->writer, &msg); } -static int ndb_run_migrations(struct ndb *ndb) -{ - int64_t version, latest_version, i; - - latest_version = sizeof(MIGRATIONS) / sizeof(MIGRATIONS[0]); - - if ((version = ndb_db_version(ndb)) == -1) { - ndb_debug("run_migrations: no version found, assuming new db\n"); - version = latest_version; - - // no version found. fresh db? - if (!ndb_queue_write_version(ndb, version)) { - fprintf(stderr, "run_migrations: failed writing db version"); - return 0; - } - - return 1; - } else { - ndb_debug("ndb: version %" PRIu64 " found\n", version); - } - - if (version < latest_version) - fprintf(stderr, "nostrdb: migrating v%d -> v%d\n", - (int)version, (int)latest_version); - - for (i = version; i < latest_version; i++) { - if (!MIGRATIONS[i].fn(ndb)) { - fprintf(stderr, "run_migrations: migration v%d -> v%d failed\n", (int)i, (int)(i+1)); - return 0; - } - - if (!ndb_queue_write_version(ndb, i+1)) { - fprintf(stderr, "run_migrations: failed writing db version"); - return 0; - } - - version = i+1; - } - - ndb->version = version; - - return 1; -} - static void ndb_monitor_init(struct ndb_monitor *monitor, ndb_sub_fn cb, void *sub_cb_ctx) { @@ -4861,10 +4852,9 @@ int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *c return 0; } - if (!ndb_flag_set(config->flags, NDB_FLAG_NOMIGRATE) && - !ndb_run_migrations(ndb)) { - fprintf(stderr, "failed to run migrations\n"); - return 0; + if (!ndb_flag_set(config->flags, NDB_FLAG_NOMIGRATE)) { + struct ndb_writer_msg msg = { .type = NDB_WRITER_MIGRATE }; + ndb_writer_queue_msg(&ndb->writer, &msg); } // Initialize LMDB environment and spin up threads diff --git a/src/nostrdb.h b/src/nostrdb.h index d4722e7..94895e2 100644 --- a/src/nostrdb.h +++ b/src/nostrdb.h @@ -458,7 +458,7 @@ int ndb_note_verify(void *secp_ctx, unsigned char pubkey[32], unsigned char id[3 // NDB int ndb_init(struct ndb **ndb, const char *dbdir, const struct ndb_config *); -int ndb_db_version(struct ndb *ndb); +int ndb_db_version(struct ndb_txn *txn); int ndb_process_event(struct ndb *, const char *json, int len); int ndb_process_events(struct ndb *, const char *ldjson, size_t len); #ifndef _WIN32 diff --git a/test.c b/test.c index cf5154e..bd7bd72 100644 --- a/test.c +++ b/test.c @@ -470,12 +470,16 @@ static void test_migrate() { static const char *v0_dir = "testdata/db/v0"; struct ndb *ndb; struct ndb_config config; + struct ndb_txn txn; + ndb_default_config(&config); ndb_config_set_flags(&config, NDB_FLAG_NOMIGRATE); fprintf(stderr, "testing migrate on v0\n"); assert(ndb_init(&ndb, v0_dir, &config)); - assert(ndb_db_version(ndb) == 0); + assert(ndb_begin_query(ndb, &txn)); + assert(ndb_db_version(&txn) == 0); + assert(ndb_end_query(&txn)); ndb_destroy(ndb); ndb_config_set_flags(&config, 0); @@ -483,7 +487,10 @@ static void test_migrate() { assert(ndb_init(&ndb, v0_dir, &config)); ndb_destroy(ndb); assert(ndb_init(&ndb, v0_dir, &config)); - assert(ndb_db_version(ndb) == 3); + + assert(ndb_begin_query(ndb, &txn)); + assert(ndb_db_version(&txn) == 3); + assert(ndb_end_query(&txn)); test_profile_search(ndb); ndb_destroy(ndb);