Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: only compare names #1141

Merged
merged 8 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use bytes::Bytes;
use chrono::Utc;
use http::{HeaderName, HeaderValue};
use itertools::Itertools;
use serde_json::Value;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::fs;
use std::num::NonZeroU32;
Expand Down Expand Up @@ -104,9 +104,10 @@ pub async fn list(req: HttpRequest) -> Result<impl Responder, StreamError> {
.filter(|logstream| {
warn!("logstream-\n{logstream:?}");

Users.authorize(key.clone(), Action::ListStream, Some(&logstream.name), None)
Users.authorize(key.clone(), Action::ListStream, Some(logstream), None)
== crate::rbac::Response::Authorized
})
.map(|name| json!({"name": name}))
.collect_vec();

Ok(web::Json(res))
Expand Down
6 changes: 2 additions & 4 deletions src/handlers/http/modal/utils/logstream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
metadata::{self, SchemaVersion, STREAM_INFO},
option::{Mode, CONFIG},
static_schema::{convert_static_schema_to_arrow_schema, StaticSchema},
storage::{LogStream, ObjectStoreFormat, StreamType},
storage::{ObjectStoreFormat, StreamType},
validator,
};
use tracing::error;
Expand Down Expand Up @@ -454,9 +454,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
// Proceed to create log stream if it doesn't exist
let storage = CONFIG.storage().get_object_store();
let streams = storage.list_streams().await?;
if streams.contains(&LogStream {
name: stream_name.to_owned(),
}) {
if streams.contains(stream_name) {
let mut stream_metadata = ObjectStoreFormat::default();
let stream_metadata_bytes = storage.create_stream_from_ingestor(stream_name).await?;
if !stream_metadata_bytes.is_empty() {
Expand Down
4 changes: 1 addition & 3 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,7 @@ pub async fn create_streams_for_querier() {
let querier_streams = STREAM_INFO.list_streams();
let store = CONFIG.storage().get_object_store();
let storage_streams = store.list_streams().await.unwrap();
for stream in storage_streams {
let stream_name = stream.name;

for stream_name in storage_streams {
if !querier_streams.contains(&stream_name) {
let _ = create_stream_and_schema_from_storage(&stream_name).await;
}
Expand Down
13 changes: 3 additions & 10 deletions src/migration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use crate::{
};
use arrow_schema::Schema;
use bytes::Bytes;
use itertools::Itertools;
use relative_path::RelativePathBuf;
use serde::Serialize;
use serde_json::Value;
Expand Down Expand Up @@ -133,9 +132,8 @@ pub async fn run_metadata_migration(
/// run the migration for all streams
pub async fn run_migration(config: &Config) -> anyhow::Result<()> {
let storage = config.storage().get_object_store();
let streams = storage.list_streams().await?;
for stream in streams {
migration_stream(&stream.name, &*storage).await?;
for stream_name in storage.list_streams().await? {
migration_stream(&stream_name, &*storage).await?;
}

Ok(())
Expand Down Expand Up @@ -357,12 +355,7 @@ async fn run_meta_file_migration(
}

async fn run_stream_files_migration(object_store: &Arc<dyn ObjectStorage>) -> anyhow::Result<()> {
let streams = object_store
.list_old_streams()
.await?
.into_iter()
.map(|stream| stream.name)
.collect_vec();
let streams = object_store.list_old_streams().await?;

for stream in streams {
let paths = object_store.get_stream_file_paths(&stream).await?;
Expand Down
20 changes: 9 additions & 11 deletions src/storage/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::metrics::storage::azureblob::REQUEST_RESPONSE_TIME;
use crate::metrics::storage::StorageMetrics;
use object_store::limit::LimitStore;
use object_store::path::Path as StorePath;
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -266,8 +266,8 @@ impl BlobStore {
Ok(())
}

async fn _list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
let mut result_file_list: Vec<LogStream> = Vec::new();
async fn _list_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError> {
let mut result_file_list = HashSet::new();
let resp = self.client.list_with_delimiter(None).await?;

let streams = resp
Expand All @@ -287,7 +287,7 @@ impl BlobStore {
.iter()
.any(|name| name.location.filename().unwrap().ends_with("stream.json"))
{
result_file_list.push(LogStream { name: stream });
result_file_list.insert(stream);
}
}

Expand Down Expand Up @@ -573,19 +573,17 @@ impl ObjectStorage for BlobStore {
}
}

async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
let streams = self._list_streams().await?;

Ok(streams)
async fn list_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError> {
self._list_streams().await
}

async fn list_old_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
async fn list_old_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError> {
let resp = self.client.list_with_delimiter(None).await?;

let common_prefixes = resp.common_prefixes; // get all dirs

// return prefixes at the root level
let dirs: Vec<_> = common_prefixes
let dirs: HashSet<_> = common_prefixes
.iter()
.filter_map(|path| path.parts().next())
.map(|name| name.as_ref().to_string())
Expand All @@ -602,7 +600,7 @@ impl ObjectStorage for BlobStore {

stream_json_check.try_collect::<()>().await?;

Ok(dirs.into_iter().map(|name| LogStream { name }).collect())
Ok(dirs.into_iter().collect())
}

async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError> {
Expand Down
18 changes: 5 additions & 13 deletions src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, HashMap, HashSet},
path::{Path, PathBuf},
sync::Arc,
time::Instant,
Expand Down Expand Up @@ -295,7 +295,7 @@ impl ObjectStorage for LocalFS {
Ok(fs::remove_file(path).await?)
}

async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
async fn list_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError> {
let ignore_dir = &[
"lost+found",
PARSEABLE_ROOT_DIRECTORY,
Expand All @@ -311,16 +311,12 @@ impl ObjectStorage for LocalFS {
let logstream_dirs: Vec<Option<String>> =
FuturesUnordered::from_iter(entries).try_collect().await?;

let logstreams = logstream_dirs
.into_iter()
.flatten()
.map(|name| LogStream { name })
.collect();
let logstreams = logstream_dirs.into_iter().flatten().collect();

Ok(logstreams)
}

async fn list_old_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
async fn list_old_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError> {
let ignore_dir = &[
"lost+found",
PARSEABLE_ROOT_DIRECTORY,
Expand All @@ -335,11 +331,7 @@ impl ObjectStorage for LocalFS {
let logstream_dirs: Vec<Option<String>> =
FuturesUnordered::from_iter(entries).try_collect().await?;

let logstreams = logstream_dirs
.into_iter()
.flatten()
.map(|name| LogStream { name })
.collect();
let logstreams = logstream_dirs.into_iter().flatten().collect();

Ok(logstreams)
}
Expand Down
9 changes: 4 additions & 5 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ pub use store_metadata::{
put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata,
};

/// Name of a Stream
/// NOTE: this used to be a struct, flattened out for simplicity
pub type LogStream = String;

// metadata file names in a Stream prefix
pub const STREAM_METADATA_FILE_NAME: &str = ".stream.json";
pub const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json";
Expand Down Expand Up @@ -225,11 +229,6 @@ impl Default for ObjectStoreFormat {
}
}

#[derive(serde::Serialize, PartialEq, Debug)]
pub struct LogStream {
pub name: String,
}

#[derive(Debug, thiserror::Error)]
pub enum ObjectStorageError {
// no such key inside the object storage
Expand Down
11 changes: 5 additions & 6 deletions src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
*/

use super::{
retention::Retention, staging::convert_disk_files_to_parquet, LogStream, ObjectStorageError,
retention::Retention, staging::convert_disk_files_to_parquet, ObjectStorageError,
ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata,
};
use super::{
Owner, StreamType, ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME,
PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
LogStream, Owner, StreamType, ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY
};

use crate::alerts::AlertConfig;
Expand Down Expand Up @@ -52,7 +51,7 @@ use relative_path::RelativePathBuf;
use tracing::{error, warn};
use ulid::Ulid;

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashSet};
use std::fmt::Debug;
use std::num::NonZeroU32;
use std::{
Expand Down Expand Up @@ -92,8 +91,8 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
async fn delete_prefix(&self, path: &RelativePath) -> Result<(), ObjectStorageError>;
async fn check(&self) -> Result<(), ObjectStorageError>;
async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>;
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
async fn list_old_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
async fn list_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError>;
async fn list_old_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError>;
async fn list_dirs(&self) -> Result<Vec<String>, ObjectStorageError>;
async fn get_all_saved_filters(
&self,
Expand Down
25 changes: 12 additions & 13 deletions src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use object_store::{BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryC
use relative_path::{RelativePath, RelativePathBuf};
use tracing::{error, info};

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashSet};
use std::fmt::Display;
use std::iter::Iterator;
use std::path::Path as StdPath;
Expand All @@ -43,11 +43,12 @@ use std::time::{Duration, Instant};
use super::metrics_layer::MetricLayer;
use super::object_storage::parseable_json_path;
use super::{
ObjectStorageProvider, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
LogStream, ObjectStorageProvider, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME,
STREAM_ROOT_DIRECTORY,
};
use crate::handlers::http::users::USERS_ROOT_DIR;
use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics};
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY};
use crate::storage::{ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY};
use std::collections::HashMap;

// in bytes
Expand Down Expand Up @@ -402,8 +403,8 @@ impl S3 {
Ok(())
}

async fn _list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
let mut result_file_list: Vec<LogStream> = Vec::new();
async fn _list_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError> {
let mut result_file_list = HashSet::new();
let resp = self.client.list_with_delimiter(None).await?;

let streams = resp
Expand All @@ -423,7 +424,7 @@ impl S3 {
.iter()
.any(|name| name.location.filename().unwrap().ends_with("stream.json"))
{
result_file_list.push(LogStream { name: stream });
result_file_list.insert(stream);
}
}

Expand Down Expand Up @@ -709,19 +710,17 @@ impl ObjectStorage for S3 {
}
}

async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
let streams = self._list_streams().await?;

Ok(streams)
async fn list_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError> {
self._list_streams().await
}

async fn list_old_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
async fn list_old_streams(&self) -> Result<HashSet<LogStream>, ObjectStorageError> {
let resp = self.client.list_with_delimiter(None).await?;

let common_prefixes = resp.common_prefixes; // get all dirs

// return prefixes at the root level
let dirs: Vec<_> = common_prefixes
let dirs: HashSet<_> = common_prefixes
.iter()
.filter_map(|path| path.parts().next())
.map(|name| name.as_ref().to_string())
Expand All @@ -738,7 +737,7 @@ impl ObjectStorage for S3 {

stream_json_check.try_collect::<()>().await?;

Ok(dirs.into_iter().map(|name| LogStream { name }).collect())
Ok(dirs)
}

async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError> {
Expand Down
Loading