Skip to content

Commit

Permalink
fix: delete and update (#222)
Browse files Browse the repository at this point in the history
* fix: clean codes

* fix: tets

* fix: revert comments

* fix: meta db backup

* fix: meta db backup

* fix: skip test of remote backup and restore
  • Loading branch information
numb3r3 authored Mar 24, 2023
1 parent 7783b30 commit bfeb7e9
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ repos:
# args:
# - --select=D101,D102,D103
- repo: https://github.com/timothycrosley/isort
rev: 5.8.0
rev: 5.12.0
hooks:
- id: isort
args: ["--profile", "black"]
Expand Down
15 changes: 8 additions & 7 deletions annlite/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __init__(
CellTable(f'table_{c}', columns=columns) for c in range(n_cells)
]

self._meta_table = MetaTable('metas', data_path=data_path, in_memory=False)
self._meta_table = MetaTable('metas', data_path=data_path, in_memory=True)

def ivf_search(
self,
Expand Down Expand Up @@ -280,10 +280,11 @@ def insert(
offsets = np.array(offsets, dtype=np.int64)

self.vec_index(cell_id).add_with_ids(data, offsets)
self._meta_table.bulk_add_address([d.id for d in docs], cells, offsets)

if not only_index:
self.doc_store(cell_id).insert(docs)
self._meta_table.bulk_add_address([d.id for d in docs], cells, offsets)

else:
for cell_id, cell_count in zip(unique_cells, unique_cell_counts):
# TODO: Jina should allow boolean filtering in docarray to avoid this
Expand All @@ -297,12 +298,13 @@ def insert(
cell_data = data[indices, :]

self.vec_index(cell_id).add_with_ids(cell_data, cell_offsets)
self._meta_table.bulk_add_address(
[d.id for d in cell_docs], [cell_id] * cell_count, cell_offsets
)

if not only_index:
self.doc_store(cell_id).insert(cell_docs)
self._meta_table.bulk_add_address(
[d.id for d in cell_docs], [cell_id] * cell_count, cell_offsets
)

logger.debug(f'{len(docs)} new docs added')

def _add_vecs(self, data: 'np.ndarray', cells: 'np.ndarray', offsets: 'np.ndarray'):
Expand Down Expand Up @@ -392,7 +394,6 @@ def delete(

for doc_id in ids:
cell_id, offset = self._meta_table.get_address(doc_id)
print(f'{doc_id} {cell_id} {offset}')
if cell_id is not None:
self.vec_index(cell_id).delete([offset])
self.cell_table(cell_id).delete_by_offset(offset)
Expand Down Expand Up @@ -423,7 +424,7 @@ def _rebuild_database(self):
)
for _ in range(self.n_cells)
]
self._meta_table = MetaTable('metas', data_path=self.data_path, in_memory=False)
# self._meta_table = MetaTable('metas', data_path=self.data_path, in_memory=False)

def _get_doc_by_id(self, doc_id: str):
cell_id = 0
Expand Down
35 changes: 14 additions & 21 deletions annlite/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ def dump_index(self):
for cell_id in range(self.n_cells):
self.vec_index(cell_id).dump(self.index_path / f'cell_{cell_id}.hnsw')
self.cell_table(cell_id).dump(self.index_path / f'cell_{cell_id}.db')
self.meta_table.dump(self.index_path / f'meta.db')
except Exception as ex:
logger.error(f'Failed to dump the indexer, {ex!r}')

Expand Down Expand Up @@ -749,10 +750,10 @@ def _backup_index_to_remote(self, target_name: str, token: str):

# upload meta table
uploader.upload_file(
Path(self.data_path) / 'metas.db',
Path(self.index_path) / 'meta.db',
target_name=target_name,
type='meta_table',
cell_id='all',
cell_id=0,
)

# upload training model
Expand All @@ -773,6 +774,7 @@ def _rebuild_index_from_local(self):
self.snapshot_path / f'cell_{cell_id}.hnsw'
)
self.cell_table(cell_id).load(self.snapshot_path / f'cell_{cell_id}.db')
self.meta_table.load(self.snapshot_path / f'meta.db')
else:
logger.info(f'Rebuild the indexer from scratch')
for cell_id in range(self.n_cells):
Expand Down Expand Up @@ -855,7 +857,9 @@ def _rebuild_index_from_remote(self, source_name: str, token: str):
# download database files and rebuild
logger.info(f'Load the database `{source_name}` from remote store')

database_ids = merger.get_artifact_ids(art_list, type='database')
database_ids = merger.get_artifact_ids(
art_list, type='database', cell_id=cell_id
)
merger.download(ids=database_ids, download_folder='database')
for zip_file in list((restore_path / 'database').iterdir()):
# default has only one cell
Expand All @@ -879,34 +883,23 @@ def _rebuild_index_from_remote(self, source_name: str, token: str):
/ zip_file.name.split('.zip')[0]
)
Path(zip_file).unlink()
self._rebuild_database()
self._rebuild_database()

# download meta_table files
logger.info(f'Load the meta_table `{source_name}` from remote store')

meta_table_ids = merger.get_artifact_ids(art_list, type='meta_table')
meta_table_ids = merger.get_artifact_ids(
art_list, type='meta_table', cell_id=0
)
merger.download(ids=meta_table_ids, download_folder='meta_table')

if len(meta_table_ids) > 1:
merger.merge_file(
inputdir=restore_path / 'meta_table',
outputdir=self.data_path,
outputfilename=Path('metas.db'),
outputdir=restore_path / 'meta_table',
outputfilename=Path('meta.db'),
)
else:
mata_table_file = restore_path / 'meta_table' / 'metas.db'
if platform.system() == 'Windows':
origin_metas_path = self.data_path / 'metas.db'
if origin_metas_path.exists():
self._meta_table.close()
origin_metas_path.unlink()
mata_table_file.rename(self.data_path / 'metas.db')
if platform.system() == 'Windows':
from .storage.table import MetaTable

self._meta_table = MetaTable(
'metas', data_path=self.data_path, in_memory=False
)
self._meta_table.load(restore_path / 'meta_table' / 'meta.db')
shutil.rmtree(restore_path / 'meta_table')

# download model files
Expand Down
9 changes: 7 additions & 2 deletions annlite/storage/kv.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,13 @@ def close(self):
'`DocStorage` had been closed already, will skip this close operation.'
)
return
self._db.flush(wait=True)
self._db.close()
try:
self._db.flush(wait=True)
self._db.close()
except Exception as ex:
if 'No such file or directory' not in str(ex):
# this is a known bug, we can safely ignore it
raise ex
self._is_closed = True

def __len__(self):
Expand Down
1 change: 0 additions & 1 deletion annlite/storage/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,6 @@ def get_address(self, doc_id: str):
def delete_address(self, doc_id: str, commit: bool = True):
sql = f'DELETE from {self.name} WHERE _doc_id = ?'
self._conn.execute(sql, (doc_id,))
print(f'Deleted {doc_id} at: {time_now()}')
if commit:
self._conn.commit()

Expand Down
1 change: 1 addition & 0 deletions tests/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ def test_local_backup_restore(tmpdir):
assert int(status['index_size']) == N


@pytest.mark.skip(reason='This test requires a running hubble instance')
def test_remote_backup_restore(tmpdir):
X = np.random.random((N, D))
docs = DocumentArray([Document(id=f'{i}', embedding=X[i]) for i in range(N)])
Expand Down

0 comments on commit bfeb7e9

Please sign in to comment.