Skip to content

Commit

Permalink
Merge branch 'main' into to-file
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh authored Jan 29, 2025
2 parents 43fa624 + 5294779 commit 07b9aa4
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 21 deletions.
14 changes: 10 additions & 4 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,8 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
.get(&stream_name)
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;

let stream_info: StreamInfo = StreamInfo {
stream_type: stream_meta.stream_type.clone(),
let stream_info = StreamInfo {
stream_type: stream_meta.stream_type,
created_at: stream_meta.created_at.clone(),
first_event_at: stream_first_event_at,
time_partition: stream_meta.time_partition.clone(),
Expand Down Expand Up @@ -604,7 +604,10 @@ pub async fn put_stream_hot_tier(
}
}

if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) {
if STREAM_INFO
.stream_type(&stream_name)
.is_ok_and(|t| t == StreamType::Internal)
{
return Err(StreamError::Custom {
msg: "Hot tier can not be updated for internal stream".to_string(),
status: StatusCode::BAD_REQUEST,
Expand Down Expand Up @@ -687,7 +690,10 @@ pub async fn delete_stream_hot_tier(
return Err(StreamError::HotTierNotEnabled(stream_name));
};

if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) {
if STREAM_INFO
.stream_type(&stream_name)
.is_ok_and(|t| t == StreamType::Internal)
{
return Err(StreamError::Custom {
msg: "Hot tier can not be deleted for internal stream".to_string(),
status: StatusCode::BAD_REQUEST,
Expand Down
5 changes: 3 additions & 2 deletions src/handlers/http/modal/query/querier_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ pub async fn get_stats(
let stats = stats::get_current_stats(&stream_name, "json")
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;

let ingestor_stats = if STREAM_INFO.stream_type(&stream_name).unwrap()
== Some(StreamType::UserDefined.to_string())
let ingestor_stats = if STREAM_INFO
.stream_type(&stream_name)
.is_ok_and(|t| t == StreamType::Internal)
{
Some(fetch_stats_from_ingestors(&stream_name).await?)
} else {
Expand Down
5 changes: 1 addition & 4 deletions src/handlers/http/modal/utils/logstream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,10 +485,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
.and_then(|limit| limit.parse().ok());
let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or("");
let static_schema_flag = stream_metadata.static_schema_flag;
let stream_type = stream_metadata
.stream_type
.map(|s| StreamType::from(s.as_str()))
.unwrap_or_default();
let stream_type = stream_metadata.stream_type;
let schema_version = stream_metadata.schema_version;
let log_source = stream_metadata.log_source;
metadata::STREAM_INFO.add_stream(
Expand Down
15 changes: 8 additions & 7 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub struct LogStreamMetadata {
pub custom_partition: Option<String>,
pub static_schema_flag: bool,
pub hot_tier_enabled: bool,
pub stream_type: Option<String>,
pub stream_type: StreamType,
pub log_source: LogSource,
}

Expand Down Expand Up @@ -334,7 +334,7 @@ impl StreamInfo {
} else {
static_schema
},
stream_type: Some(stream_type.to_string()),
stream_type,
schema_version,
log_source,
..Default::default()
Expand All @@ -359,16 +359,17 @@ impl StreamInfo {
self.read()
.expect(LOCK_EXPECT)
.iter()
.filter(|(_, v)| v.stream_type.clone().unwrap() == StreamType::Internal.to_string())
.filter(|(_, v)| v.stream_type == StreamType::Internal)
.map(|(k, _)| k.clone())
.collect()
}

pub fn stream_type(&self, stream_name: &str) -> Result<Option<String>, MetadataError> {
let map = self.read().expect(LOCK_EXPECT);
map.get(stream_name)
pub fn stream_type(&self, stream_name: &str) -> Result<StreamType, MetadataError> {
self.read()
.expect(LOCK_EXPECT)
.get(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| metadata.stream_type.clone())
.map(|metadata| metadata.stream_type)
}

pub fn update_stats(
Expand Down
8 changes: 5 additions & 3 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ pub struct ObjectStoreFormat {
pub static_schema_flag: bool,
#[serde(default)]
pub hot_tier_enabled: bool,
pub stream_type: Option<String>,
#[serde(default)]
pub stream_type: StreamType,
#[serde(default)]
pub log_source: LogSource,
}
Expand All @@ -138,7 +139,8 @@ pub struct StreamInfo {
skip_serializing_if = "std::ops::Not::not"
)]
pub static_schema_flag: bool,
pub stream_type: Option<String>,
#[serde(default)]
pub stream_type: StreamType,
pub log_source: LogSource,
}

Expand Down Expand Up @@ -203,7 +205,7 @@ impl Default for ObjectStoreFormat {
version: CURRENT_SCHEMA_VERSION.to_string(),
schema_version: SchemaVersion::V1, // Newly created streams should be v1
objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(),
stream_type: Some(StreamType::UserDefined.to_string()),
stream_type: StreamType::UserDefined,
created_at: Local::now().to_rfc3339(),
first_event_at: None,
owner: Owner::new("".to_string(), "".to_string()),
Expand Down
2 changes: 1 addition & 1 deletion src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
let format = ObjectStoreFormat {
created_at: Local::now().to_rfc3339(),
permissions: vec![Permisssion::new(CONFIG.options.username.clone())],
stream_type: Some(stream_type.to_string()),
stream_type,
time_partition: (!time_partition.is_empty()).then(|| time_partition.to_string()),
time_partition_limit: time_partition_limit.map(|limit| limit.to_string()),
custom_partition: (!custom_partition.is_empty()).then(|| custom_partition.to_string()),
Expand Down

0 comments on commit 07b9aa4

Please sign in to comment.