From c26b6b1cb873f40211684b0bf192e24ae697e220 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 21 Jan 2025 21:43:54 -0500 Subject: [PATCH] fix: fetch first-event-at from storage current: query server fetches first-event-at from all the live ingestors but, this logic fails when ingestor which ingested the events for a stream is not reachable anymore, query server gets `None` change: fetch the first-event-at from all the stream jsons from storage find the earliest value, and update in server's memory map --- src/catalog/mod.rs | 61 ++++++++++++++++++- src/handlers/http/logstream.rs | 32 +++++----- .../http/modal/utils/logstream_utils.rs | 54 ++++++++++++++++ src/metadata.rs | 41 ++++++++++++- src/storage/object_storage.rs | 60 ++++++++++++++++++ src/storage/retention.rs | 4 +- 6 files changed, 227 insertions(+), 25 deletions(-) diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index aa46afb4f..0f4c44b19 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -27,6 +27,7 @@ use crate::option::{Mode, CONFIG}; use crate::stats::{ event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats, }; +use crate::storage::object_storage::get_stream_meta_from_storage; use crate::{ catalog::manifest::Manifest, event::DEFAULT_TIMESTAMP_KEY, @@ -279,7 +280,9 @@ async fn create_manifest( } }; first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339()); - if let Err(err) = STREAM_INFO.set_first_event_at(stream_name, first_event_at.clone()) { + if let Err(err) = + STREAM_INFO.set_first_event_at(stream_name, first_event_at.as_ref().unwrap()) + { error!( "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", stream_name @@ -330,8 +333,8 @@ pub async fn remove_manifest_from_snapshot( let manifests = &mut meta.snapshot.manifest_list; // Filter out items whose manifest_path contains any of the dates_to_delete manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date))); + STREAM_INFO.reset_first_event_at(stream_name)?; meta.first_event_at = None; - STREAM_INFO.set_first_event_at(stream_name, None)?; storage.put_snapshot(stream_name, meta.snapshot).await?; } match CONFIG.options.mode { @@ -391,7 +394,7 @@ pub async fn get_first_event( first_event_at = lower_bound.with_timezone(&Local).to_rfc3339(); meta.first_event_at = Some(first_event_at.clone()); storage.put_stream_manifest(stream_name, &meta).await?; - STREAM_INFO.set_first_event_at(stream_name, Some(first_event_at.clone()))?; + STREAM_INFO.set_first_event_at(stream_name, &first_event_at)?; } } } @@ -432,6 +435,58 @@ pub async fn get_first_event( Ok(Some(first_event_at)) } +/// Retrieves the earliest first-event-at from the storage for the specified stream. +/// +/// This function fetches the object-store format from all the stream.json files for the given stream from the storage, +/// extracts the `first_event_at` timestamps, and returns the earliest `first_event_at`. +/// +/// # Arguments +/// +/// * `stream_name` - The name of the stream for which `first_event_at` is to be retrieved. +/// +/// # Returns +/// +/// * `Result, ObjectStorageError>` - Returns `Ok(Some(String))` with the earliest +/// first event timestamp if found, `Ok(None)` if no timestamps are found, or an `ObjectStorageError` +/// if an error occurs. +/// +/// # Errors +/// +/// This function will return an error if: +/// * The stream metadata cannot be retrieved from the storage. +/// +/// # Examples +/// ```ignore +/// ```rust +/// let result = get_first_event_from_storage("my_stream").await; +/// match result { +/// Ok(Some(first_event)) => println!("first-event-at: {}", first_event), +/// Ok(None) => println!("first-event-at not found"), +/// Err(e) => eprintln!("Error retrieving first-event-at from storage: {}", e), +/// } +/// ``` +pub async fn get_first_event_from_storage( + stream_name: &str, +) -> Result, ObjectStorageError> { + let mut all_first_events = vec![]; + let stream_metas = get_stream_meta_from_storage(stream_name).await; + if let Ok(stream_metas) = stream_metas { + for stream_meta in stream_metas.iter() { + if let Some(first_event) = &stream_meta.first_event_at { + let first_event = DateTime::parse_from_rfc3339(first_event).unwrap(); + let first_event = first_event.with_timezone(&Utc); + all_first_events.push(first_event); + } + } + } + + if all_first_events.is_empty() { + return Ok(None); + } + let first_event_at = all_first_events.iter().min().unwrap().to_rfc3339(); + Ok(Some(first_event_at)) +} + /// Partition the path to which this manifest belongs. /// Useful when uploading the manifest file. pub fn partition_path( diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 903b51ebd..4b4151bc0 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -21,11 +21,11 @@ use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, St use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME}; use super::ingest::create_stream_if_not_exists; use super::modal::utils::logstream_utils::{ - create_stream_and_schema_from_storage, create_update_stream, + create_stream_and_schema_from_storage, create_update_stream, update_first_event_at, }; use super::query::update_schema_when_distributed; use crate::alerts::Alerts; -use crate::catalog::get_first_event; +use crate::catalog::get_first_event_from_storage; use crate::event::format::{override_data_type, LogSource}; use crate::handlers::STREAM_TYPE_KEY; use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; @@ -57,7 +57,7 @@ use std::fs; use std::num::NonZeroU32; use std::str::FromStr; use std::sync::Arc; -use tracing::{error, warn}; +use tracing::warn; pub async fn delete(stream_name: Path) -> Result { let stream_name = stream_name.into_inner(); @@ -551,18 +551,16 @@ pub async fn get_stream_info(stream_name: Path) -> Result = Vec::new(); - if let Ok(Some(first_event_at)) = get_first_event(store, &stream_name, dates).await { - if let Err(err) = - metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at)) - { - error!( - "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", - stream_name - ); - } - } + // if first_event_at is not found in memory map, check if it exists in the storage + // if it exists in the storage, update the first_event_at in memory map + let stream_first_event_at = + if let Ok(Some(first_event_at)) = STREAM_INFO.get_first_event(&stream_name) { + Some(first_event_at) + } else if let Ok(Some(first_event_at)) = get_first_event_from_storage(&stream_name).await { + update_first_event_at(&stream_name, &first_event_at).await + } else { + None + }; let hash_map = STREAM_INFO.read().unwrap(); let stream_meta = &hash_map @@ -572,7 +570,7 @@ pub async fn get_stream_info(stream_name: Path) -> Result) -> Result Result< Ok(true) } + +/// Updates the first-event-at in storage and logstream metadata for the specified stream. +/// +/// This function updates the `first-event-at` in both the object store and the stream info metadata. +/// If either update fails, an error is logged, but the function will still return the `first-event-at`. +/// +/// # Arguments +/// +/// * `stream_name` - The name of the stream to update. +/// * `first_event_at` - The value of first-event-at. +/// +/// # Returns +/// +/// * `Option` - Returns `Some(String)` with the provided timestamp if the update is successful, +/// or `None` if an error occurs. +/// +/// # Errors +/// +/// This function logs an error if: +/// * The `first-event-at` cannot be updated in the object store. +/// * The `first-event-at` cannot be updated in the stream info. +/// +/// # Examples +///```ignore +/// ```rust +/// use parseable::handlers::http::modal::utils::logstream_utils::update_first_event_at; +/// let result = update_first_event_at("my_stream", "2023-01-01T00:00:00Z").await; +/// match result { +/// Some(timestamp) => println!("first-event-at: {}", timestamp), +/// None => eprintln!("Failed to update first-event-at"), +/// } +/// ``` +pub async fn update_first_event_at(stream_name: &str, first_event_at: &str) -> Option { + let storage = CONFIG.storage().get_object_store(); + if let Err(err) = storage + .update_first_event_in_stream(stream_name, first_event_at) + .await + { + error!( + "Failed to update first_event_at in storage for stream {:?}: {err:?}", + stream_name + ); + } + + if let Err(err) = metadata::STREAM_INFO.set_first_event_at(stream_name, first_event_at) { + error!( + "Failed to update first_event_at in stream info for stream {:?}: {err:?}", + stream_name + ); + } + + Some(first_event_at.to_string()) +} diff --git a/src/metadata.rs b/src/metadata.rs index 5c18aa329..8f24a1c15 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -212,13 +212,50 @@ impl StreamInfo { pub fn set_first_event_at( &self, stream_name: &str, - first_event_at: Option, + first_event_at: &str, ) -> Result<(), MetadataError> { let mut map = self.write().expect(LOCK_EXPECT); map.get_mut(stream_name) .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) .map(|metadata| { - metadata.first_event_at = first_event_at; + metadata.first_event_at = Some(first_event_at.to_owned()); + }) + } + + /// Removes the `first_event_at` timestamp for the specified stream from the LogStreamMetadata. + /// + /// This function acquires a write lock, retrieves the LogStreamMetadata for the given stream, + /// and removes the `first_event_at` timestamp if it exists. + /// + /// # Arguments + /// + /// * `stream_name` - The name of the stream for which the `first_event_at` timestamp is to be removed. + /// + /// # Returns + /// + /// * `Result<(), MetadataError>` - Returns `Ok(())` if the `first_event_at` timestamp is successfully removed, + /// or a `MetadataError` if the stream metadata is not found. + /// + /// # Errors + /// + /// This function will return an error if: + /// * The stream metadata cannot be found. + /// + /// # Examples + /// ```ignore + /// ```rust + /// let result = metadata.remove_first_event_at("my_stream"); + /// match result { + /// Ok(()) => println!("first-event-at removed successfully"), + /// Err(e) => eprintln!("Error removing first-event-at from STREAM_INFO: {}", e), + /// } + /// ``` + pub fn reset_first_event_at(&self, stream_name: &str) -> Result<(), MetadataError> { + let mut map = self.write().expect(LOCK_EXPECT); + map.get_mut(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| { + metadata.first_event_at.take(); }) } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 16d526c8a..d99094ea0 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -217,6 +217,42 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(()) } + /// Updates the first event timestamp in the object store for the specified stream. + /// + /// This function retrieves the current object-store format for the given stream, + /// updates the `first_event_at` field with the provided timestamp, and then + /// stores the updated format back in the object store. + /// + /// # Arguments + /// + /// * `stream_name` - The name of the stream to update. + /// * `first_event` - The timestamp of the first event to set. + /// + /// # Returns + /// + /// * `Result<(), ObjectStorageError>` - Returns `Ok(())` if the update is successful, + /// or an `ObjectStorageError` if an error occurs. + /// + /// # Examples + /// ```ignore + /// ```rust + /// let result = object_store.update_first_event_in_stream("my_stream", "2023-01-01T00:00:00Z").await; + /// assert!(result.is_ok()); + /// ``` + async fn update_first_event_in_stream( + &self, + stream_name: &str, + first_event: &str, + ) -> Result<(), ObjectStorageError> { + let mut format = self.get_object_store_format(stream_name).await?; + format.first_event_at = Some(first_event.to_string()); + let format_json = to_bytes(&format); + self.put_object(&stream_json_path(stream_name), format_json) + .await?; + + Ok(()) + } + async fn put_alerts( &self, stream_name: &str, @@ -747,3 +783,27 @@ pub fn ingestor_metadata_path(id: Option<&str>) -> RelativePathBuf { &format!("ingestor.{}.json", INGESTOR_META.get_ingestor_id()), ]) } + +pub async fn get_stream_meta_from_storage( + stream_name: &str, +) -> Result, ObjectStorageError> { + let storage = CONFIG.storage().get_object_store(); + let mut stream_metas = vec![]; + let stream_meta_bytes = storage + .get_objects( + Some(&RelativePathBuf::from_iter([ + stream_name, + STREAM_ROOT_DIRECTORY, + ])), + Box::new(|file_name| file_name.ends_with("stream.json")), + ) + .await; + if let Ok(stream_meta_bytes) = stream_meta_bytes { + for stream_meta in stream_meta_bytes { + let stream_meta_ob = serde_json::from_slice::(&stream_meta)?; + stream_metas.push(stream_meta_ob); + } + } + + Ok(stream_metas) +} diff --git a/src/storage/retention.rs b/src/storage/retention.rs index 24ffbf199..2cbee48c7 100644 --- a/src/storage/retention.rs +++ b/src/storage/retention.rs @@ -218,9 +218,9 @@ mod action { return; } } - if let Ok(first_event_at) = res_remove_manifest { + if let Ok(Some(first_event_at)) = res_remove_manifest { if let Err(err) = - metadata::STREAM_INFO.set_first_event_at(&stream_name, first_event_at) + metadata::STREAM_INFO.set_first_event_at(&stream_name, &first_event_at) { error!( "Failed to update first_event_at in streaminfo for stream {:?} {err:?}",