Skip to content

Commit

Permalink
fix: fetch first-event-at from storage
Browse files Browse the repository at this point in the history
current: query server fetches first-event-at from all the live ingestors
but, this logic fails when ingestor which ingested the events for a stream
is not reachable anymore, query server gets `None`

change: fetch the first-event-at from all the stream jsons from storage
find the earliest value, and update in server's memory map
  • Loading branch information
nikhilsinhaparseable committed Jan 24, 2025
1 parent 5a39e22 commit c26b6b1
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 25 deletions.
61 changes: 58 additions & 3 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::option::{Mode, CONFIG};
use crate::stats::{
event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats,
};
use crate::storage::object_storage::get_stream_meta_from_storage;
use crate::{
catalog::manifest::Manifest,
event::DEFAULT_TIMESTAMP_KEY,
Expand Down Expand Up @@ -279,7 +280,9 @@ async fn create_manifest(
}
};
first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339());
if let Err(err) = STREAM_INFO.set_first_event_at(stream_name, first_event_at.clone()) {
if let Err(err) =
STREAM_INFO.set_first_event_at(stream_name, first_event_at.as_ref().unwrap())
{
error!(
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
stream_name
Expand Down Expand Up @@ -330,8 +333,8 @@ pub async fn remove_manifest_from_snapshot(
let manifests = &mut meta.snapshot.manifest_list;
// Filter out items whose manifest_path contains any of the dates_to_delete
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
STREAM_INFO.reset_first_event_at(stream_name)?;
meta.first_event_at = None;
STREAM_INFO.set_first_event_at(stream_name, None)?;
storage.put_snapshot(stream_name, meta.snapshot).await?;
}
match CONFIG.options.mode {
Expand Down Expand Up @@ -391,7 +394,7 @@ pub async fn get_first_event(
first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
meta.first_event_at = Some(first_event_at.clone());
storage.put_stream_manifest(stream_name, &meta).await?;
STREAM_INFO.set_first_event_at(stream_name, Some(first_event_at.clone()))?;
STREAM_INFO.set_first_event_at(stream_name, &first_event_at)?;
}
}
}
Expand Down Expand Up @@ -432,6 +435,58 @@ pub async fn get_first_event(
Ok(Some(first_event_at))
}

/// Retrieves the earliest first-event-at from the storage for the specified stream.
///
/// This function fetches the object-store format from all the stream.json files for the given stream from the storage,
/// extracts the `first_event_at` timestamps, and returns the earliest `first_event_at`.
///
/// # Arguments
///
/// * `stream_name` - The name of the stream for which `first_event_at` is to be retrieved.
///
/// # Returns
///
/// * `Result<Option<String>, ObjectStorageError>` - Returns `Ok(Some(String))` with the earliest
/// first event timestamp if found, `Ok(None)` if no timestamps are found, or an `ObjectStorageError`
/// if an error occurs.
///
/// # Errors
///
/// This function will return an error if:
/// * The stream metadata cannot be retrieved from the storage.
///
/// # Examples
/// ```ignore
/// ```rust
/// let result = get_first_event_from_storage("my_stream").await;
/// match result {
/// Ok(Some(first_event)) => println!("first-event-at: {}", first_event),
/// Ok(None) => println!("first-event-at not found"),
/// Err(e) => eprintln!("Error retrieving first-event-at from storage: {}", e),
/// }
/// ```
pub async fn get_first_event_from_storage(
stream_name: &str,
) -> Result<Option<String>, ObjectStorageError> {
let mut all_first_events = vec![];
let stream_metas = get_stream_meta_from_storage(stream_name).await;
if let Ok(stream_metas) = stream_metas {
for stream_meta in stream_metas.iter() {
if let Some(first_event) = &stream_meta.first_event_at {
let first_event = DateTime::parse_from_rfc3339(first_event).unwrap();
let first_event = first_event.with_timezone(&Utc);
all_first_events.push(first_event);
}
}
}

if all_first_events.is_empty() {
return Ok(None);
}
let first_event_at = all_first_events.iter().min().unwrap().to_rfc3339();
Ok(Some(first_event_at))
}

/// Partition the path to which this manifest belongs.
/// Useful when uploading the manifest file.
pub fn partition_path(
Expand Down
32 changes: 14 additions & 18 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, St
use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME};
use super::ingest::create_stream_if_not_exists;
use super::modal::utils::logstream_utils::{
create_stream_and_schema_from_storage, create_update_stream,
create_stream_and_schema_from_storage, create_update_stream, update_first_event_at,
};
use super::query::update_schema_when_distributed;
use crate::alerts::Alerts;
use crate::catalog::get_first_event;
use crate::catalog::get_first_event_from_storage;
use crate::event::format::{override_data_type, LogSource};
use crate::handlers::STREAM_TYPE_KEY;
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
Expand Down Expand Up @@ -57,7 +57,7 @@ use std::fs;
use std::num::NonZeroU32;
use std::str::FromStr;
use std::sync::Arc;
use tracing::{error, warn};
use tracing::warn;

pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();
Expand Down Expand Up @@ -551,18 +551,16 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
}
}

let store = CONFIG.storage().get_object_store();
let dates: Vec<String> = Vec::new();
if let Ok(Some(first_event_at)) = get_first_event(store, &stream_name, dates).await {
if let Err(err) =
metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at))
{
error!(
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
stream_name
);
}
}
// if first_event_at is not found in memory map, check if it exists in the storage
// if it exists in the storage, update the first_event_at in memory map
let stream_first_event_at =
if let Ok(Some(first_event_at)) = STREAM_INFO.get_first_event(&stream_name) {
Some(first_event_at)
} else if let Ok(Some(first_event_at)) = get_first_event_from_storage(&stream_name).await {
update_first_event_at(&stream_name, &first_event_at).await
} else {
None
};

let hash_map = STREAM_INFO.read().unwrap();
let stream_meta = &hash_map
Expand All @@ -572,7 +570,7 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
let stream_info: StreamInfo = StreamInfo {
stream_type: stream_meta.stream_type.clone(),
created_at: stream_meta.created_at.clone(),
first_event_at: stream_meta.first_event_at.clone(),
first_event_at: stream_first_event_at,
time_partition: stream_meta.time_partition.clone(),
time_partition_limit: stream_meta
.time_partition_limit
Expand All @@ -582,8 +580,6 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
log_source: stream_meta.log_source.clone(),
};

// get the other info from

Ok((web::Json(stream_info), StatusCode::OK))
}

Expand Down
54 changes: 54 additions & 0 deletions src/handlers/http/modal/utils/logstream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::{
storage::{LogStream, ObjectStoreFormat, StreamType},
validator,
};
use tracing::error;

pub async fn create_update_stream(
headers: &HeaderMap,
Expand Down Expand Up @@ -508,3 +509,56 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<

Ok(true)
}

/// Updates the first-event-at in storage and logstream metadata for the specified stream.
///
/// This function updates the `first-event-at` in both the object store and the stream info metadata.
/// If either update fails, an error is logged, but the function will still return the `first-event-at`.
///
/// # Arguments
///
/// * `stream_name` - The name of the stream to update.
/// * `first_event_at` - The value of first-event-at.
///
/// # Returns
///
/// * `Option<String>` - Returns `Some(String)` with the provided timestamp if the update is successful,
/// or `None` if an error occurs.
///
/// # Errors
///
/// This function logs an error if:
/// * The `first-event-at` cannot be updated in the object store.
/// * The `first-event-at` cannot be updated in the stream info.
///
/// # Examples
///```ignore
/// ```rust
/// use parseable::handlers::http::modal::utils::logstream_utils::update_first_event_at;
/// let result = update_first_event_at("my_stream", "2023-01-01T00:00:00Z").await;
/// match result {
/// Some(timestamp) => println!("first-event-at: {}", timestamp),
/// None => eprintln!("Failed to update first-event-at"),
/// }
/// ```
pub async fn update_first_event_at(stream_name: &str, first_event_at: &str) -> Option<String> {
let storage = CONFIG.storage().get_object_store();
if let Err(err) = storage
.update_first_event_in_stream(stream_name, first_event_at)
.await
{
error!(
"Failed to update first_event_at in storage for stream {:?}: {err:?}",
stream_name
);
}

if let Err(err) = metadata::STREAM_INFO.set_first_event_at(stream_name, first_event_at) {
error!(
"Failed to update first_event_at in stream info for stream {:?}: {err:?}",
stream_name
);
}

Some(first_event_at.to_string())
}
41 changes: 39 additions & 2 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,50 @@ impl StreamInfo {
pub fn set_first_event_at(
&self,
stream_name: &str,
first_event_at: Option<String>,
first_event_at: &str,
) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
map.get_mut(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| {
metadata.first_event_at = first_event_at;
metadata.first_event_at = Some(first_event_at.to_owned());
})
}

/// Removes the `first_event_at` timestamp for the specified stream from the LogStreamMetadata.
///
/// This function acquires a write lock, retrieves the LogStreamMetadata for the given stream,
/// and removes the `first_event_at` timestamp if it exists.
///
/// # Arguments
///
/// * `stream_name` - The name of the stream for which the `first_event_at` timestamp is to be removed.
///
/// # Returns
///
/// * `Result<(), MetadataError>` - Returns `Ok(())` if the `first_event_at` timestamp is successfully removed,
/// or a `MetadataError` if the stream metadata is not found.
///
/// # Errors
///
/// This function will return an error if:
/// * The stream metadata cannot be found.
///
/// # Examples
/// ```ignore
/// ```rust
/// let result = metadata.remove_first_event_at("my_stream");
/// match result {
/// Ok(()) => println!("first-event-at removed successfully"),
/// Err(e) => eprintln!("Error removing first-event-at from STREAM_INFO: {}", e),
/// }
/// ```
pub fn reset_first_event_at(&self, stream_name: &str) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
map.get_mut(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| {
metadata.first_event_at.take();
})
}

Expand Down
60 changes: 60 additions & 0 deletions src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,42 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
Ok(())
}

/// Updates the first event timestamp in the object store for the specified stream.
///
/// This function retrieves the current object-store format for the given stream,
/// updates the `first_event_at` field with the provided timestamp, and then
/// stores the updated format back in the object store.
///
/// # Arguments
///
/// * `stream_name` - The name of the stream to update.
/// * `first_event` - The timestamp of the first event to set.
///
/// # Returns
///
/// * `Result<(), ObjectStorageError>` - Returns `Ok(())` if the update is successful,
/// or an `ObjectStorageError` if an error occurs.
///
/// # Examples
/// ```ignore
/// ```rust
/// let result = object_store.update_first_event_in_stream("my_stream", "2023-01-01T00:00:00Z").await;
/// assert!(result.is_ok());
/// ```
async fn update_first_event_in_stream(
&self,
stream_name: &str,
first_event: &str,
) -> Result<(), ObjectStorageError> {
let mut format = self.get_object_store_format(stream_name).await?;
format.first_event_at = Some(first_event.to_string());
let format_json = to_bytes(&format);
self.put_object(&stream_json_path(stream_name), format_json)
.await?;

Ok(())
}

async fn put_alerts(
&self,
stream_name: &str,
Expand Down Expand Up @@ -747,3 +783,27 @@ pub fn ingestor_metadata_path(id: Option<&str>) -> RelativePathBuf {
&format!("ingestor.{}.json", INGESTOR_META.get_ingestor_id()),
])
}

pub async fn get_stream_meta_from_storage(
stream_name: &str,
) -> Result<Vec<ObjectStoreFormat>, ObjectStorageError> {
let storage = CONFIG.storage().get_object_store();
let mut stream_metas = vec![];
let stream_meta_bytes = storage
.get_objects(
Some(&RelativePathBuf::from_iter([
stream_name,
STREAM_ROOT_DIRECTORY,
])),
Box::new(|file_name| file_name.ends_with("stream.json")),
)
.await;
if let Ok(stream_meta_bytes) = stream_meta_bytes {
for stream_meta in stream_meta_bytes {
let stream_meta_ob = serde_json::from_slice::<ObjectStoreFormat>(&stream_meta)?;
stream_metas.push(stream_meta_ob);
}
}

Ok(stream_metas)
}
4 changes: 2 additions & 2 deletions src/storage/retention.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ mod action {
return;
}
}
if let Ok(first_event_at) = res_remove_manifest {
if let Ok(Some(first_event_at)) = res_remove_manifest {
if let Err(err) =
metadata::STREAM_INFO.set_first_event_at(&stream_name, first_event_at)
metadata::STREAM_INFO.set_first_event_at(&stream_name, &first_event_at)
{
error!(
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
Expand Down

0 comments on commit c26b6b1

Please sign in to comment.