Skip to content

Commit

Permalink
refactor: only compare names (#1141)
Browse files Browse the repository at this point in the history
This PR avoids unnecessary allocation by using
 `HashSet<String>`
  • Loading branch information
de-sh authored Jan 31, 2025
1 parent 11a6378 commit 68e12b2
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 67 deletions.
5 changes: 3 additions & 2 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use bytes::Bytes;
use chrono::Utc;
use http::{HeaderName, HeaderValue};
use itertools::Itertools;
use serde_json::Value;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::fs;
use std::num::NonZeroU32;
Expand Down Expand Up @@ -104,9 +104,10 @@ pub async fn list(req: HttpRequest) -> Result<impl Responder, StreamError> {
.filter(|logstream| {
warn!("logstream-\n{logstream:?}");

Users.authorize(key.clone(), Action::ListStream, Some(&logstream.name), None)
Users.authorize(key.clone(), Action::ListStream, Some(logstream), None)
== crate::rbac::Response::Authorized
})
.map(|name| json!({"name": name}))
.collect_vec();

Ok(web::Json(res))
Expand Down
6 changes: 2 additions & 4 deletions src/handlers/http/modal/utils/logstream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
metadata::{self, SchemaVersion, STREAM_INFO},
option::{Mode, CONFIG},
static_schema::{convert_static_schema_to_arrow_schema, StaticSchema},
storage::{LogStream, ObjectStoreFormat, StreamType},
storage::{ObjectStoreFormat, StreamType},
validator,
};
use tracing::error;
Expand Down Expand Up @@ -454,9 +454,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
// Proceed to create log stream if it doesn't exist
let storage = CONFIG.storage().get_object_store();
let streams = storage.list_streams().await?;
if streams.contains(&LogStream {
name: stream_name.to_owned(),
}) {
if streams.contains(stream_name) {
let mut stream_metadata = ObjectStoreFormat::default();
let stream_metadata_bytes = storage.create_stream_from_ingestor(stream_name).await?;
if !stream_metadata_bytes.is_empty() {
Expand Down
4 changes: 1 addition & 3 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,7 @@ pub async fn create_streams_for_querier() {
let querier_streams = STREAM_INFO.list_streams();
let store = CONFIG.storage().get_object_store();
let storage_streams = store.list_streams().await.unwrap();
for stream in storage_streams {
let stream_name = stream.name;

for stream_name in storage_streams {
if !querier_streams.contains(&stream_name) {
let _ = create_stream_and_schema_from_storage(&stream_name).await;
}
Expand Down
13 changes: 3 additions & 10 deletions src/migration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use crate::{
};
use arrow_schema::Schema;
use bytes::Bytes;
use itertools::Itertools;
use relative_path::RelativePathBuf;
use serde::Serialize;
use serde_json::Value;
Expand Down Expand Up @@ -133,9 +132,8 @@ pub async fn run_metadata_migration(
/// run the migration for all streams
pub async fn run_migration(config: &Config) -> anyhow::Result<()> {
let storage = config.storage().get_object_store();
let streams = storage.list_streams().await?;
for stream in streams {
migration_stream(&stream.name, &*storage).await?;
for stream_name in storage.list_streams().await? {
migration_stream(&stream_name, &*storage).await?;
}

Ok(())
Expand Down Expand Up @@ -357,12 +355,7 @@ async fn run_meta_file_migration(
}

async fn run_stream_files_migration(object_store: &Arc<dyn ObjectStorage>) -> anyhow::Result<()> {
let streams = object_store
.list_old_streams()
.await?
.into_iter()
.map(|stream| stream.name)
.collect_vec();
let streams = object_store.list_old_streams().await?;

for stream in streams {
let paths = object_store.get_stream_file_paths(&stream).await?;
Expand Down
20 changes: 9 additions & 11 deletions src/storage/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::metrics::storage::azureblob::REQUEST_RESPONSE_TIME;
use crate::metrics::storage::StorageMetrics;
use object_store::limit::LimitStore;
use object_store::path::Path as StorePath;
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -266,8 +266,8 @@ impl BlobStore {
Ok(())
}

async fn _list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
let mut result_file_list: Vec<LogStream> = Vec::new();
async fn _list_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError> {
let mut result_file_list = HashSet::new();
let resp = self.client.list_with_delimiter(None).await?;

let streams = resp
Expand All @@ -287,7 +287,7 @@ impl BlobStore {
.iter()
.any(|name| name.location.filename().unwrap().ends_with("stream.json"))
{
result_file_list.push(LogStream { name: stream });
result_file_list.insert(stream);
}
}

Expand Down Expand Up @@ -573,19 +573,17 @@ impl ObjectStorage for BlobStore {
}
}

async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
let streams = self._list_streams().await?;

Ok(streams)
async fn list_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError> {
self._list_streams().await
}

async fn list_old_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
async fn list_old_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError> {
let resp = self.client.list_with_delimiter(None).await?;

let common_prefixes = resp.common_prefixes; // get all dirs

// return prefixes at the root level
let dirs: Vec<_> = common_prefixes
let dirs: HashSet<_> = common_prefixes
.iter()
.filter_map(|path| path.parts().next())
.map(|name| name.as_ref().to_string())
Expand All @@ -602,7 +600,7 @@ impl ObjectStorage for BlobStore {

stream_json_check.try_collect::<()>().await?;

Ok(dirs.into_iter().map(|name| LogStream { name }).collect())
Ok(dirs.into_iter().collect())
}

async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError> {
Expand Down
18 changes: 5 additions & 13 deletions src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, HashMap, HashSet},
path::{Path, PathBuf},
sync::Arc,
time::Instant,
Expand Down Expand Up @@ -295,7 +295,7 @@ impl ObjectStorage for LocalFS {
Ok(fs::remove_file(path).await?)
}

async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
async fn list_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError> {
let ignore_dir = &[
"lost+found",
PARSEABLE_ROOT_DIRECTORY,
Expand All @@ -311,16 +311,12 @@ impl ObjectStorage for LocalFS {
let logstream_dirs: Vec<Option<String>> =
FuturesUnordered::from_iter(entries).try_collect().await?;

let logstreams = logstream_dirs
.into_iter()
.flatten()
.map(|name| LogStream { name })
.collect();
let logstreams = logstream_dirs.into_iter().flatten().collect();

Ok(logstreams)
}

async fn list_old_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
async fn list_old_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError> {
let ignore_dir = &[
"lost+found",
PARSEABLE_ROOT_DIRECTORY,
Expand All @@ -335,11 +331,7 @@ impl ObjectStorage for LocalFS {
let logstream_dirs: Vec<Option<String>> =
FuturesUnordered::from_iter(entries).try_collect().await?;

let logstreams = logstream_dirs
.into_iter()
.flatten()
.map(|name| LogStream { name })
.collect();
let logstreams = logstream_dirs.into_iter().flatten().collect();

Ok(logstreams)
}
Expand Down
9 changes: 4 additions & 5 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ pub use store_metadata::{
put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata,
};

/// Name of a Stream
/// NOTE: this used to be a struct, flattened out for simplicity
pub type LogStream = String;

// metadata file names in a Stream prefix
pub const STREAM_METADATA_FILE_NAME: &str = ".stream.json";
pub const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json";
Expand Down Expand Up @@ -225,11 +229,6 @@ impl Default for ObjectStoreFormat {
}
}

#[derive(serde::Serialize, PartialEq, Debug)]
pub struct LogStream {
pub name: String,
}

#[derive(Debug, thiserror::Error)]
pub enum ObjectStorageError {
// no such key inside the object storage
Expand Down
11 changes: 5 additions & 6 deletions src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
*/

use super::{
retention::Retention, staging::convert_disk_files_to_parquet, LogStream, ObjectStorageError,
retention::Retention, staging::convert_disk_files_to_parquet, ObjectStorageError,
ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata,
};
use super::{
Owner, StreamType, ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME,
PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
LogStream, Owner, StreamType, ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY
};

use crate::alerts::AlertConfig;
Expand Down Expand Up @@ -52,7 +51,7 @@ use relative_path::RelativePathBuf;
use tracing::{error, warn};
use ulid::Ulid;

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashSet};
use std::fmt::Debug;
use std::num::NonZeroU32;
use std::{
Expand Down Expand Up @@ -92,8 +91,8 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
async fn delete_prefix(&self, path: &RelativePath) -> Result<(), ObjectStorageError>;
async fn check(&self) -> Result<(), ObjectStorageError>;
async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>;
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
async fn list_old_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
async fn list_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError>;
async fn list_old_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError>;
async fn list_dirs(&self) -> Result<Vec<String>, ObjectStorageError>;
async fn get_all_saved_filters(
&self,
Expand Down
25 changes: 12 additions & 13 deletions src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use object_store::{BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryC
use relative_path::{RelativePath, RelativePathBuf};
use tracing::{error, info};

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashSet};
use std::fmt::Display;
use std::iter::Iterator;
use std::path::Path as StdPath;
Expand All @@ -43,11 +43,12 @@ use std::time::{Duration, Instant};
use super::metrics_layer::MetricLayer;
use super::object_storage::parseable_json_path;
use super::{
ObjectStorageProvider, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
LogStream, ObjectStorageProvider, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME,
STREAM_ROOT_DIRECTORY,
};
use crate::handlers::http::users::USERS_ROOT_DIR;
use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics};
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY};
use crate::storage::{ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY};
use std::collections::HashMap;

// in bytes
Expand Down Expand Up @@ -402,8 +403,8 @@ impl S3 {
Ok(())
}

async fn _list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
let mut result_file_list: Vec<LogStream> = Vec::new();
async fn _list_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError> {
let mut result_file_list = HashSet::new();
let resp = self.client.list_with_delimiter(None).await?;

let streams = resp
Expand All @@ -423,7 +424,7 @@ impl S3 {
.iter()
.any(|name| name.location.filename().unwrap().ends_with("stream.json"))
{
result_file_list.push(LogStream { name: stream });
result_file_list.insert(stream);
}
}

Expand Down Expand Up @@ -709,19 +710,17 @@ impl ObjectStorage for S3 {
}
}

async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
let streams = self._list_streams().await?;

Ok(streams)
async fn list_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError> {
self._list_streams().await
}

async fn list_old_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
async fn list_old_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError> {
let resp = self.client.list_with_delimiter(None).await?;

let common_prefixes = resp.common_prefixes; // get all dirs

// return prefixes at the root level
let dirs: Vec<_> = common_prefixes
let dirs: HashSet<_> = common_prefixes
.iter()
.filter_map(|path| path.parts().next())
.map(|name| name.as_ref().to_string())
Expand All @@ -738,7 +737,7 @@ impl ObjectStorage for S3 {

stream_json_check.try_collect::<()>().await?;

Ok(dirs.into_iter().map(|name| LogStream { name }).collect())
Ok(dirs)
}

async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError> {
Expand Down

0 comments on commit 68e12b2

Please sign in to comment.