Skip to content

Commit

Permalink
Indexing latency check fix (#3747)
Browse files Browse the repository at this point in the history
* add logs + update dev script

* update conig

* remove prints

* temporarily turn off

* va

* update

* fix

* finalize monitoring updates

* update
  • Loading branch information
pablonyx authored Jan 23, 2025
1 parent 1613a8b commit ccb16b7
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 15 deletions.
47 changes: 33 additions & 14 deletions backend/onyx/background/celery/tasks/monitoring/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from onyx.db.models import IndexAttempt
from onyx.db.models import SyncRecord
from onyx.db.models import UserGroup
from onyx.db.search_settings import get_active_search_settings
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.utils.telemetry import optional_telemetry
Expand Down Expand Up @@ -184,6 +185,10 @@ def _build_connector_start_latency_metric(

start_latency = (recent_attempt.time_started - desired_start_time).total_seconds()

task_logger.info(
f"Start latency for index attempt {recent_attempt.id}: {start_latency:.2f}s "
f"(desired: {desired_start_time}, actual: {recent_attempt.time_started})"
)
return Metric(
key=metric_key,
name="connector_start_latency",
Expand Down Expand Up @@ -217,6 +222,9 @@ def _build_run_success_metrics(
IndexingStatus.FAILED,
IndexingStatus.CANCELED,
]:
task_logger.info(
f"Adding run success metric for index attempt {attempt.id} with status {attempt.status}"
)
metrics.append(
Metric(
key=metric_key,
Expand All @@ -237,25 +245,29 @@ def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Me
# Get all connector credential pairs
cc_pairs = db_session.scalars(select(ConnectorCredentialPair)).all()

active_search_settings = get_active_search_settings(db_session)
metrics = []
for cc_pair in cc_pairs:
# Get all attempts in the last hour

for cc_pair, search_settings in zip(cc_pairs, active_search_settings):
recent_attempts = (
db_session.query(IndexAttempt)
.filter(
IndexAttempt.connector_credential_pair_id == cc_pair.id,
IndexAttempt.time_created >= one_hour_ago,
IndexAttempt.search_settings_id == search_settings.id,
)
.order_by(IndexAttempt.time_created.desc())
.limit(2)
.all()
)
most_recent_attempt = recent_attempts[0] if recent_attempts else None
if not recent_attempts:
continue

most_recent_attempt = recent_attempts[0]
second_most_recent_attempt = (
recent_attempts[1] if len(recent_attempts) > 1 else None
)

# if no metric to emit, skip
if most_recent_attempt is None:
if one_hour_ago > most_recent_attempt.time_created:
continue

# Connector start latency
Expand Down Expand Up @@ -298,7 +310,7 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
f"{sync_record.entity_id}:{sync_record.id}"
)
if _has_metric_been_emitted(redis_std, metric_key):
task_logger.debug(
task_logger.info(
f"Skipping metric for sync record {sync_record.id} "
"because it has already been emitted"
)
Expand All @@ -318,11 +330,15 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]

if sync_speed is None:
task_logger.error(
"Something went wrong with sync speed calculation. "
f"Sync record: {sync_record.id}"
f"Something went wrong with sync speed calculation. "
f"Sync record: {sync_record.id}, duration: {sync_duration_mins}, "
f"docs synced: {sync_record.num_docs_synced}"
)
continue

task_logger.info(
f"Calculated sync speed for record {sync_record.id}: {sync_speed} docs/min"
)
metrics.append(
Metric(
key=metric_key,
Expand All @@ -341,7 +357,7 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
f":{sync_record.entity_id}:{sync_record.id}"
)
if _has_metric_been_emitted(redis_std, start_latency_key):
task_logger.debug(
task_logger.info(
f"Skipping start latency metric for sync record {sync_record.id} "
"because it has already been emitted"
)
Expand All @@ -359,7 +375,7 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
)
else:
# Skip other sync types
task_logger.debug(
task_logger.info(
f"Skipping sync record {sync_record.id} "
f"with type {sync_record.sync_type} "
f"and id {sync_record.entity_id} "
Expand All @@ -378,12 +394,15 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
start_latency = (
sync_record.sync_start_time - entity.time_last_modified_by_user
).total_seconds()
task_logger.info(
f"Calculated start latency for sync record {sync_record.id}: {start_latency} seconds"
)
if start_latency < 0:
task_logger.error(
f"Start latency is negative for sync record {sync_record.id} "
f"with type {sync_record.sync_type} and id {sync_record.entity_id}."
"This is likely because the entity was updated between the time the "
"time the sync finished and this job ran. Skipping."
f"with type {sync_record.sync_type} and id {sync_record.entity_id}. "
f"Sync start time: {sync_record.sync_start_time}, "
f"Entity last modified: {entity.time_last_modified_by_user}"
)
continue

Expand Down
6 changes: 5 additions & 1 deletion backend/onyx/utils/long_term_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,16 @@ def __init__(
def _cleanup_old_files(self, category_path: Path) -> None:
try:
files = sorted(
[f for f in category_path.glob("*.json") if f.is_file()],
[f for f in category_path.glob("*.json")],
key=lambda x: x.stat().st_mtime, # Sort by modification time
reverse=True,
)

# Delete oldest files that exceed the limit
for file in files[self.max_files_per_category :]:
if not file.is_file():
logger.debug(f"File already deleted: {file}")
continue
try:
file.unlink()
except Exception as e:
Expand Down
25 changes: 25 additions & 0 deletions backend/scripts/dev_run_background_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@ def run_jobs() -> None:
"--queues=connector_indexing",
]

cmd_worker_monitoring = [
"celery",
"-A",
"onyx.background.celery.versioned_apps.monitoring",
"worker",
"--pool=threads",
"--concurrency=1",
"--prefetch-multiplier=1",
"--loglevel=INFO",
"--hostname=monitoring@%n",
"--queues=monitoring",
]

cmd_beat = [
"celery",
"-A",
Expand All @@ -97,6 +110,13 @@ def run_jobs() -> None:
cmd_worker_indexing, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)

worker_monitoring_process = subprocess.Popen(
cmd_worker_monitoring,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)

beat_process = subprocess.Popen(
cmd_beat, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
Expand All @@ -114,18 +134,23 @@ def run_jobs() -> None:
worker_indexing_thread = threading.Thread(
target=monitor_process, args=("INDEX", worker_indexing_process)
)
worker_monitoring_thread = threading.Thread(
target=monitor_process, args=("MONITORING", worker_monitoring_process)
)
beat_thread = threading.Thread(target=monitor_process, args=("BEAT", beat_process))

worker_primary_thread.start()
worker_light_thread.start()
worker_heavy_thread.start()
worker_indexing_thread.start()
worker_monitoring_thread.start()
beat_thread.start()

worker_primary_thread.join()
worker_light_thread.join()
worker_heavy_thread.join()
worker_indexing_thread.join()
worker_monitoring_thread.join()
beat_thread.join()


Expand Down

0 comments on commit ccb16b7

Please sign in to comment.