Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue: Deleting files before launching a rag application will result in a failure to start that rag application #434

Merged
merged 9 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 38 additions & 14 deletions lazyllm/tools/rag/doc_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import ast
from collections import defaultdict
from functools import wraps
from pathlib import Path
from typing import Callable, Dict, List, Optional, Set, Union, Tuple, Any
from lazyllm import LOG, once_wrapper
from .transform import (NodeTransform, FuncNodeTransform, SentenceSplitter, LLMParser,
Expand Down Expand Up @@ -96,6 +97,7 @@ def _lazy_init(self) -> None:

if not self.store.is_group_active(LAZY_ROOT_NAME):
ids, paths, metadatas = self._list_files()
ids, paths, metadatas = self._delete_nonexistent_docs_on_startup(ids, paths, metadatas)
if paths:
if not metadatas: metadatas = [{}] * len(paths)
for idx, meta in enumerate(metadatas):
Expand All @@ -107,12 +109,33 @@ def _lazy_init(self) -> None:
self._dlm.update_kb_group(cond_file_ids=ids, cond_group=self._kb_group_name,
new_status=DocListManager.Status.success)
LOG.debug(f"building {LAZY_ROOT_NAME} nodes: {root_nodes}")

if self._dlm:
self._daemon = threading.Thread(target=self.worker)
self._daemon.daemon = True
self._daemon.start()

def _delete_nonexistent_docs_on_startup(self, ids, paths, metadatas):
path_existing = [Path(path).exists() for path in paths]
paths_need_delete = [paths[idx] for idx, exist in enumerate(path_existing) if not exist]
rt_metadatas = [meta for meta, exist in zip(metadatas, path_existing) if exist] if metadatas else None
rt_ids = [ids[idx] for idx, exist in enumerate(path_existing) if exist] if ids else None
rt_paths = [path for path, exist in zip(paths, path_existing) if exist]

if ids:
ids_need_delete = [ids[idx] for idx, exist in enumerate(path_existing) if not exist]
else:
ids_need_delete = [gen_docid(path) for path in paths_need_delete]
if ids_need_delete:
if self._dlm is None:
# if not using dlm, delete store directly;
self._delete_doc_from_store(paths_need_delete)
else:
LOG.warning(f"Found {len(paths_need_delete)} docs that are not in store: {paths_need_delete}")
# else dlm must turn on path monitoring to detect deleted files
assert (self._dlm.enable_path_monitoring is True
), 'DocListManager must turn on path monitoring or only use DocManager to delete files'
return rt_ids, rt_paths, rt_metadatas

def _resolve_index_pending_registrations(self):
for index_type, index_cls, index_args, index_kwargs in self.index_pending_registrations:
args = [self._resolve_index_placeholder(arg) for arg in index_args]
Expand Down Expand Up @@ -258,6 +281,7 @@ def worker(self):
new_meta_dict = json.loads(row[1]) if row[1] else {}
self.store.update_doc_meta(row[0], new_meta_dict)

# Step 1: do doc-parsing, highest priority
docs = self._dlm.get_docs_need_reparse(group=self._kb_group_name)
if docs:
filepaths = [doc.path for doc in docs]
Expand All @@ -266,32 +290,32 @@ def worker(self):
# update status and need_reparse
self._dlm.update_kb_group(cond_file_ids=ids, cond_group=self._kb_group_name,
new_status=DocListManager.Status.working, new_need_reparse=False)
self._delete_files(filepaths)
self._add_files(input_files=filepaths, ids=ids, metadatas=metadatas)
self._delete_doc_from_store(filepaths)
self._add_doc_to_store(input_files=filepaths, ids=ids, metadatas=metadatas)
self._dlm.update_kb_group(cond_file_ids=ids, cond_group=self._kb_group_name,
new_status=DocListManager.Status.success)

# Step 2: After doc is deleted from related kb_group, delete doc from db
if self._kb_group_name == DocListManager.DEFAULT_GROUP_NAME:
self._dlm.delete_unreferenced_doc()

# Step 3: do doc-deleting
ids, files, metadatas = self._list_files(status=DocListManager.Status.deleting)
if files:
self._delete_files(files)
self._delete_doc_from_store(files)
self._dlm.delete_files_from_kb_group(ids, self._kb_group_name)
continue

if self._kb_group_name == DocListManager.DEFAULT_GROUP_NAME:
self._dlm.init_tables()
self._dlm.delete_obsolete_files()

# Step 4: do doc-adding
ids, files, metadatas = self._list_files(status=DocListManager.Status.waiting,
upload_status=DocListManager.Status.success)
if files:
self._dlm.update_kb_group(cond_file_ids=ids, cond_group=self._kb_group_name,
new_status=DocListManager.Status.working)
self._add_files(input_files=files, ids=ids, metadatas=metadatas)
self._add_doc_to_store(input_files=files, ids=ids, metadatas=metadatas)
# change working to success while leaving other status unchanged
self._dlm.update_kb_group(cond_file_ids=ids, cond_group=self._kb_group_name,
cond_status_list=[DocListManager.Status.working],
new_status=DocListManager.Status.success)
continue
time.sleep(10)

def _list_files(
Expand All @@ -307,8 +331,8 @@ def _list_files(
metadatas.append(json.loads(row[3]) if row[3] else {})
return ids, paths, metadatas

def _add_files(self, input_files: List[str], ids: Optional[List[str]] = None,
metadatas: Optional[List[Dict[str, Any]]] = None):
def _add_doc_to_store(self, input_files: List[str], ids: Optional[List[str]] = None,
metadatas: Optional[List[Dict[str, Any]]] = None):
if not input_files:
return
root_nodes = self._reader.load_data(input_files)
Expand All @@ -328,7 +352,7 @@ def _add_files(self, input_files: List[str], ids: Optional[List[str]] = None,
self.store.update_nodes(nodes)
LOG.debug(f"Merge {group} with {nodes}")

def _delete_files(self, input_files: List[str]) -> None:
def _delete_doc_from_store(self, input_files: List[str]) -> None:
docs = self.store.get_index(type='file_node_map').query(input_files)
LOG.info(f"delete_files: removing documents {input_files} and nodes {docs}")
if len(docs) == 0:
Expand Down
7 changes: 7 additions & 0 deletions lazyllm/tools/rag/doc_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,14 @@
class DocManager(lazyllm.ModuleBase):
def __init__(self, dlm: DocListManager) -> None:
super().__init__()
# disable path monitoring in case of competition adding/deleting files
self._manager = dlm
self._manager.enable_path_monitoring = False

def __reduce__(self):
# For unknown reason, when deserializing _manager monitoring is always enabled
self._manager.enable_path_monitoring = False
return (__class__, (self._manager,))

@app.get("/", response_model=BaseResponse, summary="docs")
def document(self):
Expand Down
150 changes: 120 additions & 30 deletions lazyllm/tools/rag/utils.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,34 @@
import concurrent
import hashlib
import json
import os
import shutil
import hashlib
import concurrent
from typing import List, Callable, Generator, Dict, Any, Optional, Union, Tuple, Set
import sqlite3
import threading
import time
from abc import ABC, abstractmethod
from .index_base import IndexBase
from .doc_node import DocNode
from .global_metadata import RAG_DOC_PATH, RAG_DOC_ID
from lazyllm.common import override
from lazyllm.common.queue import sqlite3_check_threadsafety
import sqlalchemy
from sqlalchemy.orm import DeclarativeBase, sessionmaker
from sqlalchemy import Column, select, insert, update, Row, bindparam
from sqlalchemy.exc import NoResultFound
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import (Any, Callable, Dict, Generator, List, Optional, Set, Tuple,
Union)

import pydantic
import sqlite3
from pydantic import BaseModel
import sqlalchemy
from fastapi import UploadFile
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
from filelock import FileLock
from pydantic import BaseModel
from sqlalchemy import Column, Row, bindparam, insert, select, update
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import DeclarativeBase, sessionmaker

import lazyllm
from lazyllm import config
from lazyllm.common import override
from lazyllm.common.queue import sqlite3_check_threadsafety

from .doc_node import DocNode
from .global_metadata import RAG_DOC_ID, RAG_DOC_PATH
from .index_base import IndexBase

# min(32, (os.cpu_count() or 1) + 4) is the default number of workers for ThreadPoolExecutor
config.add(
Expand Down Expand Up @@ -103,32 +108,49 @@ class Status:
success = 'success'
failed = 'failed'
deleting = 'deleting'
# deleted is no longer used
deleted = 'deleted'

def __init__(self, path, name):
def __init__(self, path, name, enable_path_monitoring=True):
self._path = path
self._name = name
self._enable_path_monitoring = enable_path_monitoring
self._id = hashlib.sha256(f'{name}@+@{path}'.encode()).hexdigest()
if not os.path.isabs(path):
raise ValueError(f"path [{path}] is not an absolute path")

def __reduce__(self):
return (__class__, (self._path, self._name, False,))

def __new__(cls, *args, **kw):
if cls is not DocListManager:
return super().__new__(cls)
return super().__new__(__class__.__pool__[config['default_dlmanager']])

@property
def enable_path_monitoring(self):
return self._enable_path_monitoring

@enable_path_monitoring.setter
@abstractmethod
def enable_path_monitoring(self, val: bool):
pass

def init_tables(self) -> 'DocListManager':
if not self.table_inited():
self._init_tables()
self.add_kb_group(DocListManager.DEFAULT_GROUP_NAME)
# in case of using after relase
self.add_kb_group(DocListManager.DEFAULT_GROUP_NAME)
return self

def monitor_directory(self) -> Set[str]:
files_list = []
for root, _, files in os.walk(self._path):
files = [os.path.join(root, file_path) for file_path in files]
files_list.extend(files)
self.add_files(files_list, status=DocListManager.Status.success)
return self
return set(files_list)

# Actually it shoule be "set_docs_status_deleting"
def delete_files(self, file_ids: List[str]) -> List[DocPartRow]:
document_list = self.update_file_status(file_ids, DocListManager.Status.deleting)
self.update_kb_group(cond_file_ids=file_ids, new_status=DocListManager.Status.deleting)
Expand Down Expand Up @@ -181,17 +203,17 @@ def add_files(
status: Optional[str] = Status.waiting,
batch_size: int = 64,
) -> List[DocPartRow]:
documents = self._add_files(files, metadatas, status, batch_size)
documents = self._add_doc_records(files, metadatas, status, batch_size)
if documents:
self.add_files_to_kb_group([doc.doc_id for doc in documents], group=DocListManager.DEFAULT_GROUP_NAME)
return documents

@abstractmethod
def _add_files(self, files: List[str], metadatas: Optional[List] = None,
status: Optional[str] = Status.waiting, batch_size: int = 64) -> List[DocPartRow]: pass
def _add_doc_records(self, files: List[str], metadatas: Optional[List] = None,
status: Optional[str] = Status.waiting, batch_size: int = 64) -> List[DocPartRow]: pass

@abstractmethod
def delete_obsolete_files(self): pass
def delete_unreferenced_doc(self): pass

@abstractmethod
def get_docs_need_reparse(self, group: Optional[str] = None) -> List[KBDocument]: pass
Expand Down Expand Up @@ -225,8 +247,8 @@ def release(self): pass


class SqliteDocListManager(DocListManager):
def __init__(self, path, name):
super().__init__(path, name)
def __init__(self, path, name, enable_path_monitoring=True):
super().__init__(path, name, enable_path_monitoring)
root_dir = os.path.expanduser(os.path.join(config['home'], '.dbs'))
os.makedirs(root_dir, exist_ok=True)
self._db_path = os.path.join(root_dir, f'.lazyllm_dlmanager.{self._id}.db')
Expand All @@ -237,6 +259,26 @@ def __init__(self, path, name):
f"sqlite:///{self._db_path}?check_same_thread={self._check_same_thread}"
)
self._Session = sessionmaker(bind=self._engine)
self.init_tables()
self._monitor_thread = threading.Thread(target=self.monitor_directory_worker)
self._monitor_thread.daemon = True
self._monitor_continue = True
if self._enable_path_monitoring:
self._monitor_thread.start()

@DocListManager.enable_path_monitoring.setter
def enable_path_monitoring(self, val: bool):
self._enable_path_monitoring = (val is True)
if val is True:
self._monitor_continue = True
self._monitor_thread.start()
else:
self._monitor_continue = False
if self._monitor_thread.is_alive():
self._monitor_thread.join()

def __del__(self):
self.enable_path_monitoring = False

def _init_tables(self):
KBDataBase.metadata.create_all(bind=self._engine)
Expand All @@ -246,6 +288,54 @@ def table_inited(self):
cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='documents'")
return cursor.fetchone() is not None

def monitor_directory_worker(self):
failed_files_count = defaultdict(int)
with self._db_lock, self._Session() as session:
docs_all = session.query(KBDocument).all()
previous_files = set([doc.path for doc in docs_all])
skip_files = set()
while self._monitor_continue:
# 1. Scan files in the directory, find added and deleted files
current_files = set(super().monitor_directory())
to_be_added_files = current_files - previous_files - skip_files
to_be_deleted_files = previous_files - current_files - skip_files
failed_files = set()

to_be_added_doc_ids = set([gen_docid(ele) for ele in to_be_added_files])
to_be_deleted_doc_ids = set([gen_docid(ele) for ele in to_be_deleted_files])
failed_doc_ids = set()
filter_status_list = [DocListManager.Status.success,
DocListManager.Status.failed, DocListManager.Status.waiting]
with self._db_lock, self._Session() as session:
docs_not_expected = session.query(KBDocument).filter(KBDocument.doc_id.in_(to_be_added_doc_ids)).all()
docs_expected = session.query(KBDocument).filter(KBDocument.doc_id.in_(to_be_deleted_doc_ids),
KBDocument.status.in_(filter_status_list)).all()
# 2. Skip new files that are already in the database
failed_files.update([doc.path for doc in docs_not_expected])
failed_doc_ids.update([doc.doc_id for doc in docs_not_expected])
to_be_added_files -= failed_files
# Actually it is add to doc with success status, then add to kb_group with waiting status
self.add_files(list(to_be_added_files), status=DocListManager.Status.success)

# 3. Skip deleted files that are: 1. not in the database, 2. status not success/failed/waiting
safe_to_delete_files = set([doc.path for doc in docs_expected])
safe_to_delete_doc_ids = set([doc.doc_id for doc in docs_expected])
failed_doc_ids.update(to_be_deleted_doc_ids - safe_to_delete_doc_ids)
failed_files.update(to_be_deleted_files - safe_to_delete_files)
to_be_deleted_files = safe_to_delete_files
to_be_deleted_doc_ids = safe_to_delete_doc_ids
self.delete_files(list(to_be_deleted_doc_ids))

# 4. update skip_files
for ele in failed_files:
failed_files_count[ele] += 1
if failed_files_count[ele] >= 3:
skip_files.add(ele)
# update previous files, while failed files will be re-processed in the next loop
previous_files = (current_files | to_be_added_files) - to_be_deleted_files
time.sleep(10)
lazyllm.LOG.warning("END MONITORING")

@staticmethod
def get_status_cond_and_params(status: Union[str, List[str]],
exclude_status: Optional[Union[str, List[str]]] = None,
Expand Down Expand Up @@ -414,7 +504,7 @@ def list_kb_group_files(self, group: str = None, limit: Optional[int] = None, de
if not details: return [row[:2] for row in rows]
return rows

def delete_obsolete_files(self):
def delete_unreferenced_doc(self):
with self._db_lock, self._Session() as session:
docs_to_delete = (
session.query(KBDocument)
Expand All @@ -423,12 +513,12 @@ def delete_obsolete_files(self):
)
for doc in docs_to_delete:
session.delete(doc)
log = KBOperationLogs(log=f"Delete obsolete file: {doc.doc_id}")
log = KBOperationLogs(log=f"Delete obsolete file, doc_id:{doc.doc_id}, path:{doc.path}.")
session.add(log)
session.commit()

def _add_files(self, files: List[str], metadatas: Optional[List[Dict[str, Any]]] = None,
status: Optional[str] = DocListManager.Status.waiting, batch_size: int = 64):
def _add_doc_records(self, files: List[str], metadatas: Optional[List[Dict[str, Any]]] = None,
status: Optional[str] = DocListManager.Status.waiting, batch_size: int = 64):
documents = []

for i in range(0, len(files), batch_size):
Expand Down
2 changes: 1 addition & 1 deletion lazyllm/tools/sql/sql_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def execute_commit(self, statement: str):
def execute_query(self, statement: str) -> str:
statement = re.sub(r"/\*.*?\*/", "", statement, flags=re.DOTALL).strip()
if not statement.upper().startswith("SELECT"):
return "Only select statement supported"
return f"Only select statement supported. Original statement: {statement}"
try:
result = []
with self.get_session() as session:
Expand Down
Loading
Loading