From b7a2765b8aa2263838a46fd2ce55921bfdf64736 Mon Sep 17 00:00:00 2001 From: morenol Date: Sat, 5 Aug 2023 16:36:45 +0000 Subject: [PATCH] Decouple from K8MetaItem struct on fluvio-sc (#3454) --- .../src/smartmodule/spec.rs | 5 +- crates/fluvio-sc-schema/src/objects/create.rs | 2 +- .../src/controllers/spus/controller.rs | 9 +-- .../src/controllers/topics/controller.rs | 8 ++- crates/fluvio-sc/src/core/context.rs | 33 ++++++----- crates/fluvio-sc/src/init.rs | 20 +++++-- crates/fluvio-sc/src/k8/controllers/mod.rs | 4 +- .../src/k8/controllers/spu_controller.rs | 8 +-- crates/fluvio-sc/src/k8/fixture.rs | 4 +- .../fluvio-sc/src/k8/objects/spu_k8_config.rs | 6 +- crates/fluvio-sc/src/services/auth/mod.rs | 23 +++++--- .../fluvio-sc/src/services/private_api/mod.rs | 6 +- .../services/private_api/private_server.rs | 56 ++++++++++++------- .../src/services/public_api/create.rs | 20 ++++--- .../src/services/public_api/delete.rs | 16 +++--- .../fluvio-sc/src/services/public_api/list.rs | 19 ++++--- .../fluvio-sc/src/services/public_api/mod.rs | 8 ++- .../src/services/public_api/partition/mod.rs | 7 ++- .../src/services/public_api/public_server.rs | 13 +++-- .../services/public_api/smartmodule/create.rs | 13 +++-- .../services/public_api/smartmodule/delete.rs | 5 +- .../services/public_api/smartmodule/list.rs | 4 +- .../src/services/public_api/spg/create.rs | 11 ++-- .../src/services/public_api/spg/delete.rs | 5 +- .../src/services/public_api/spg/fetch.rs | 5 +- .../src/services/public_api/spu/fetch.rs | 9 +-- .../spu/register_custom_spus_req.rs | 9 +-- .../spu/unregister_custom_spus_req.rs | 11 ++-- .../services/public_api/tableformat/create.rs | 13 +++-- .../services/public_api/tableformat/delete.rs | 5 +- .../src/services/public_api/topic/create.rs | 32 ++++++----- .../src/services/public_api/topic/delete.rs | 5 +- .../src/services/public_api/topic/fetch.rs | 5 +- .../src/services/public_api/watch.rs | 31 +++++----- crates/fluvio-sc/src/start.rs | 4 +- crates/fluvio-sc/src/stores/spu/mod.rs | 2 +- 36 files changed, 253 insertions(+), 183 deletions(-) diff --git a/crates/fluvio-controlplane-metadata/src/smartmodule/spec.rs b/crates/fluvio-controlplane-metadata/src/smartmodule/spec.rs index 5211d90b59..43391a4fdc 100644 --- a/crates/fluvio-controlplane-metadata/src/smartmodule/spec.rs +++ b/crates/fluvio-controlplane-metadata/src/smartmodule/spec.rs @@ -9,10 +9,7 @@ use tracing::debug; use fluvio_protocol::{ByteBuf, Encoder, Decoder, Version}; -use super::{ - SmartModuleMetadata, - spec_v1::{SmartModuleSpecV1}, -}; +use super::{SmartModuleMetadata, spec_v1::SmartModuleSpecV1}; const V2_FORMAT: Version = 10; diff --git a/crates/fluvio-sc-schema/src/objects/create.rs b/crates/fluvio-sc-schema/src/objects/create.rs index 9a9f949330..50302e00c0 100644 --- a/crates/fluvio-sc-schema/src/objects/create.rs +++ b/crates/fluvio-sc-schema/src/objects/create.rs @@ -7,7 +7,7 @@ use fluvio_protocol::api::Request; use crate::{AdminPublicApiKey, CreatableAdminSpec, Status, TryEncodableFrom}; -use super::{COMMON_VERSION}; +use super::COMMON_VERSION; #[derive(Encoder, Decoder, Default, Debug, Clone)] pub struct CommonCreateRequest { diff --git a/crates/fluvio-sc/src/controllers/spus/controller.rs b/crates/fluvio-sc/src/controllers/spus/controller.rs index fcde422353..b1522febe9 100644 --- a/crates/fluvio-sc/src/controllers/spus/controller.rs +++ b/crates/fluvio-sc/src/controllers/spus/controller.rs @@ -5,6 +5,7 @@ use std::time::Duration; use std::io::Error as IoError; use fluvio_future::timer::sleep; +use fluvio_stream_model::core::MetadataItem; use tracing::{debug, info, error, trace, instrument}; use fluvio_future::task::spawn; @@ -15,14 +16,14 @@ use crate::stores::spu::*; /// Reconcile SPU health status with Meta data /// if SPU has not send heart beat within a period, it is considered down -pub struct SpuController { - spus: StoreContext, +pub struct SpuController { + spus: StoreContext, health_check: SharedHealthCheck, counter: u64, // how many time we have been sync } -impl SpuController { - pub fn start(ctx: SharedContext) { +impl SpuController { + pub fn start(ctx: SharedContext) { let controller = Self { spus: ctx.spus().clone(), health_check: ctx.health().clone(), diff --git a/crates/fluvio-sc/src/controllers/topics/controller.rs b/crates/fluvio-sc/src/controllers/topics/controller.rs index 68c883cff3..f2df793b81 100644 --- a/crates/fluvio-sc/src/controllers/topics/controller.rs +++ b/crates/fluvio-sc/src/controllers/topics/controller.rs @@ -25,9 +25,13 @@ pub struct TopicController { reducer: TopicReducer, } -impl TopicController { +impl TopicController +where + C: MetadataItem + 'static, + C::UId: Send + Sync, +{ /// streaming coordinator controller constructor - pub fn start(ctx: SharedContext) { + pub fn start(ctx: SharedContext) { let topics = ctx.topics().clone(); let partitions = ctx.partitions().clone(); diff --git a/crates/fluvio-sc/src/core/context.rs b/crates/fluvio-sc/src/core/context.rs index 6994a2477b..6b3e949c70 100644 --- a/crates/fluvio-sc/src/core/context.rs +++ b/crates/fluvio-sc/src/core/context.rs @@ -5,6 +5,8 @@ //! use std::sync::Arc; +use fluvio_stream_model::core::MetadataItem; + use crate::config::ScConfig; use crate::stores::spu::*; use crate::stores::partition::*; @@ -14,18 +16,19 @@ use crate::stores::smartmodule::*; use crate::stores::tableformat::*; use crate::stores::*; -pub type SharedContext = Arc; +pub type SharedContext = Arc>; +pub type K8SharedContext = Arc>; /// Global Context for SC /// This is where we store globally accessible data #[derive(Debug)] -pub struct Context { - spus: StoreContext, - partitions: StoreContext, - topics: StoreContext, - spgs: StoreContext, - smartmodules: StoreContext, - tableformats: StoreContext, +pub struct Context { + spus: StoreContext, + partitions: StoreContext, + topics: StoreContext, + spgs: StoreContext, + smartmodules: StoreContext, + tableformats: StoreContext, health: SharedHealthCheck, config: ScConfig, } @@ -34,7 +37,7 @@ pub struct Context { // ScMetadata - Implementation // ----------------------------------- -impl Context { +impl Context { pub fn shared_metadata(config: ScConfig) -> Arc { Arc::new(Self::new(config)) } @@ -54,29 +57,29 @@ impl Context { } /// reference to spus - pub fn spus(&self) -> &StoreContext { + pub fn spus(&self) -> &StoreContext { &self.spus } /// reference to partitions - pub fn partitions(&self) -> &StoreContext { + pub fn partitions(&self) -> &StoreContext { &self.partitions } /// reference to topics - pub fn topics(&self) -> &StoreContext { + pub fn topics(&self) -> &StoreContext { &self.topics } - pub fn spgs(&self) -> &StoreContext { + pub fn spgs(&self) -> &StoreContext { &self.spgs } - pub fn smartmodules(&self) -> &StoreContext { + pub fn smartmodules(&self) -> &StoreContext { &self.smartmodules } - pub fn tableformats(&self) -> &StoreContext { + pub fn tableformats(&self) -> &StoreContext { &self.tableformats } diff --git a/crates/fluvio-sc/src/init.rs b/crates/fluvio-sc/src/init.rs index 88bde467ce..f5e3046f7c 100644 --- a/crates/fluvio-sc/src/init.rs +++ b/crates/fluvio-sc/src/init.rs @@ -6,6 +6,7 @@ //! use std::sync::Arc; +use fluvio_stream_model::core::MetadataItem; #[cfg(feature = "k8")] use k8_metadata_client::{MetadataClient, SharedClient}; @@ -25,7 +26,7 @@ use crate::services::auth::basic::BasicRbacPolicy; pub async fn start_main_loop_with_k8( sc_config_policy: (ScConfig, Option), metadata_client: SharedClient, -) -> SharedContext +) -> crate::core::K8SharedContext where C: MetadataClient + 'static, { @@ -81,10 +82,14 @@ where } /// start the main loop -pub async fn start_main_loop( - ctx: Arc, +pub async fn start_main_loop( + ctx: Arc>, auth_policy: Option, -) -> SharedContext { +) -> SharedContext +where + C: MetadataItem + 'static, + C::UId: Send + Sync, +{ let config = ctx.config(); whitelist!(config, "spu", SpuController::start(ctx.clone())); whitelist!(config, "topic", TopicController::start(ctx.clone())); @@ -109,10 +114,15 @@ pub async fn start_main_loop( use crate::services::start_public_server; use crate::core::SharedContext; + use fluvio_controlplane_metadata::core::MetadataItem; use crate::services::auth::{AuthGlobalContext, RootAuthorization}; use crate::services::auth::basic::{BasicAuthorization, BasicRbacPolicy}; - pub fn start(ctx: SharedContext, auth_policy_option: Option) { + pub fn start(ctx: SharedContext, auth_policy_option: Option) + where + C: MetadataItem + 'static, + C::UId: Send + Sync, + { if let Some(policy) = auth_policy_option { info!("using basic authorization"); start_public_server(AuthGlobalContext::new( diff --git a/crates/fluvio-sc/src/k8/controllers/mod.rs b/crates/fluvio-sc/src/k8/controllers/mod.rs index bbe9ed2869..e83ae89d9b 100644 --- a/crates/fluvio-sc/src/k8/controllers/mod.rs +++ b/crates/fluvio-sc/src/k8/controllers/mod.rs @@ -11,7 +11,7 @@ mod k8_operator { use k8_client::SharedK8Client; use crate::cli::TlsConfig; - use crate::core::SharedContext; + use crate::core::K8SharedContext; use crate::stores::StoreContext; use crate::dispatcher::dispatcher::K8ClusterStateDispatcher; use crate::k8::objects::spu_service::SpuServiceSpec; @@ -26,7 +26,7 @@ mod k8_operator { pub async fn run_k8_operators( namespace: String, k8_client: SharedK8Client, - global_ctx: SharedContext, + global_ctx: K8SharedContext, tls: Option, ) { let config = global_ctx.config(); diff --git a/crates/fluvio-sc/src/k8/controllers/spu_controller.rs b/crates/fluvio-sc/src/k8/controllers/spu_controller.rs index ad8dcf2ee9..b991792977 100644 --- a/crates/fluvio-sc/src/k8/controllers/spu_controller.rs +++ b/crates/fluvio-sc/src/k8/controllers/spu_controller.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, fmt, net::IpAddr, time::Duration}; use fluvio_controlplane_metadata::{ - spu::{IngressPort}, + spu::IngressPort, store::{MetadataStoreObject, k8::K8MetaItem}, }; @@ -13,13 +13,11 @@ use fluvio_future::task::spawn; use fluvio_future::timer::sleep; use k8_types::core::service::{LoadBalancerIngress, LoadBalancerType}; -use crate::{ - stores::{StoreContext}, -}; +use crate::stores::StoreContext; use crate::stores::spu::{IngressAddr, SpuSpec}; use crate::k8::objects::spu_service::SpuServiceSpec; use crate::k8::objects::spg_group::SpuGroupObj; -use crate::stores::spg::{SpuGroupSpec}; +use crate::stores::spg::SpuGroupSpec; /// Update SPU from changes in SPU Group and SPU Services /// This is only place where we make changes to SPU diff --git a/crates/fluvio-sc/src/k8/fixture.rs b/crates/fluvio-sc/src/k8/fixture.rs index 1d89a9f8f5..db2e6edf17 100644 --- a/crates/fluvio-sc/src/k8/fixture.rs +++ b/crates/fluvio-sc/src/k8/fixture.rs @@ -14,7 +14,7 @@ use k8_client::{K8Client, SharedK8Client, load_and_share}; use crate::k8::objects::spu_k8_config::ScK8Config; use crate::config::ScConfig; -use crate::core::{Context, SharedContext}; +use crate::core::{Context, K8SharedContext}; type ScConfigMetadata = MetadataStoreObject; @@ -79,7 +79,7 @@ impl TestEnv { &self.client } - pub async fn create_global_ctx(&self) -> (SharedContext, StoreContext) { + pub async fn create_global_ctx(&self) -> (K8SharedContext, StoreContext) { let config_map = ScConfigMetadata::with_spec("fluvio", ScK8Config::default()); let config_store = LocalStore::new_shared(); config_store.sync_all(vec![config_map]).await; diff --git a/crates/fluvio-sc/src/k8/objects/spu_k8_config.rs b/crates/fluvio-sc/src/k8/objects/spu_k8_config.rs index 7a2ad40e3f..7bc3abb474 100644 --- a/crates/fluvio-sc/src/k8/objects/spu_k8_config.rs +++ b/crates/fluvio-sc/src/k8/objects/spu_k8_config.rs @@ -1,12 +1,12 @@ use std::collections::{BTreeMap, HashMap}; use std::fmt; -use serde::{Deserialize}; +use serde::Deserialize; use tracing::{debug, info}; use fluvio_controlplane_metadata::core::MetadataContext; use fluvio_types::defaults::SPU_PUBLIC_PORT; -use k8_client::{ClientError}; +use k8_client::ClientError; use k8_types::Env; use k8_types::core::pod::{ ResourceRequirements, PodSecurityContext, ContainerSpec, VolumeMount, VolumeSpec, @@ -193,7 +193,7 @@ mod extended { use crate::stores::k8::K8ConvertError; use crate::stores::k8::K8ExtendedSpec; use crate::stores::k8::K8MetaItem; - use crate::stores::{MetadataStoreObject}; + use crate::stores::MetadataStoreObject; use super::*; diff --git a/crates/fluvio-sc/src/services/auth/mod.rs b/crates/fluvio-sc/src/services/auth/mod.rs index 9b88e4ce06..52c0a33319 100644 --- a/crates/fluvio-sc/src/services/auth/mod.rs +++ b/crates/fluvio-sc/src/services/auth/mod.rs @@ -12,19 +12,23 @@ mod common { use fluvio_auth::{AuthContext, Authorization, TypeAction, InstanceAction, AuthError}; use fluvio_socket::FluvioSocket; use fluvio_controlplane_metadata::extended::ObjectType; + use fluvio_stream_model::core::MetadataItem; use crate::core::SharedContext; /// SC global context with authorization /// auth is trait object which contains global auth auth policy #[derive(Clone, Debug)] - pub struct AuthGlobalContext { - pub global_ctx: SharedContext, + pub struct AuthGlobalContext { + pub global_ctx: SharedContext, pub auth: Arc, } - impl AuthGlobalContext { - pub fn new(global_ctx: SharedContext, auth: Arc) -> Self { + impl AuthGlobalContext + where + C: MetadataItem, + { + pub fn new(global_ctx: SharedContext, auth: Arc) -> Self { Self { global_ctx, auth } } } @@ -79,13 +83,16 @@ mod common { /// Auth Service Context, this hold individual context that is enough enforce auth /// for this service context #[derive(Debug, Clone)] - pub struct AuthServiceContext { - pub global_ctx: SharedContext, + pub struct AuthServiceContext { + pub global_ctx: SharedContext, pub auth: AC, } - impl AuthServiceContext { - pub fn new(global_ctx: SharedContext, auth: AC) -> Self { + impl AuthServiceContext + where + C: MetadataItem, + { + pub fn new(global_ctx: SharedContext, auth: AC) -> Self { Self { global_ctx, auth } } } diff --git a/crates/fluvio-sc/src/services/private_api/mod.rs b/crates/fluvio-sc/src/services/private_api/mod.rs index 6637c505d4..0c5729622c 100644 --- a/crates/fluvio-sc/src/services/private_api/mod.rs +++ b/crates/fluvio-sc/src/services/private_api/mod.rs @@ -1,5 +1,6 @@ mod private_server; +use fluvio_stream_model::core::MetadataItem; use tracing::info; use tracing::instrument; @@ -14,7 +15,10 @@ use crate::core::SharedContext; skip(ctx), fields(address = &*ctx.config().private_endpoint) )] -pub fn start_internal_server(ctx: SharedContext) { +pub fn start_internal_server(ctx: SharedContext) +where + C: MetadataItem + 'static, +{ info!("starting internal services"); let addr = ctx.config().private_endpoint.clone(); diff --git a/crates/fluvio-sc/src/services/private_api/private_server.rs b/crates/fluvio-sc/src/services/private_api/private_server.rs index 59c430a9e5..f92d35e68f 100644 --- a/crates/fluvio-sc/src/services/private_api/private_server.rs +++ b/crates/fluvio-sc/src/services/private_api/private_server.rs @@ -1,3 +1,4 @@ +use std::marker::PhantomData; use std::sync::Arc; use std::io::Error as IoError; use std::io::ErrorKind; @@ -16,6 +17,8 @@ use fluvio_controlplane::spu_api::update_replica::UpdateReplicaRequest; use fluvio_controlplane::spu_api::update_smartmodule::UpdateSmartModuleRequest; use fluvio_controlplane::spu_api::update_spu::UpdateSpuRequest; use fluvio_controlplane_metadata::message::Message; +use fluvio_stream_model::core::MetadataItem; +use fluvio_stream_model::store::ChangeListener; use tracing::{debug, info, trace, instrument, error}; use async_trait::async_trait; use futures_util::stream::Stream; @@ -30,7 +33,6 @@ use fluvio_service::{FluvioService, wait_for_request}; use fluvio_socket::{FluvioSocket, SocketError, FluvioSink}; use crate::core::SharedContext; -use crate::stores::K8ChangeListener; use crate::stores::partition::PartitonStatusExtension; use crate::stores::partition::{PartitionSpec, PartitionStatus, PartitionResolution}; use crate::stores::spu::SpuLocalStorePolicy; @@ -40,23 +42,28 @@ use crate::stores::actions::WSAction; const HEALTH_DURATION: u64 = 90; #[derive(Debug)] -pub struct ScInternalService {} +pub struct ScInternalService { + data: PhantomData, +} -impl ScInternalService { +impl ScInternalService { pub fn new() -> Self { - Self {} + Self { data: PhantomData } } } #[async_trait] -impl FluvioService for ScInternalService { - type Context = SharedContext; +impl FluvioService for ScInternalService +where + C: MetadataItem, +{ + type Context = SharedContext; type Request = InternalScRequest; #[instrument(skip(self, context))] async fn respond( self: Arc, - context: SharedContext, + context: Self::Context, socket: FluvioSocket, _connection: ConnectInfo, ) -> Result<()> { @@ -109,12 +116,15 @@ impl FluvioService for ScInternalService { // perform internal dispatch #[instrument(name = "ScInternalService", skip(context, api_stream))] -async fn dispatch_loop( - context: SharedContext, +async fn dispatch_loop( + context: SharedContext, spu_id: SpuId, mut api_stream: impl Stream> + Unpin, mut sink: FluvioSink, -) -> Result<(), SocketError> { +) -> Result<(), SocketError> +where + C: MetadataItem, +{ let mut spu_spec_listener = context.spus().change_listener(); let mut partition_spec_listener = context.partitions().change_listener(); let mut sm_spec_listener = context.smartmodules().change_listener(); @@ -191,7 +201,10 @@ async fn dispatch_loop( /// send lrs update to metadata stores #[instrument(skip(ctx, requests))] -async fn receive_lrs_update(ctx: &SharedContext, requests: UpdateLrsRequest) { +async fn receive_lrs_update(ctx: &SharedContext, requests: UpdateLrsRequest) +where + C: MetadataItem, +{ let requests = requests.into_requests(); if requests.is_empty() { trace!("no requests, just health check"); @@ -213,7 +226,7 @@ async fn receive_lrs_update(ctx: &SharedContext, requests: UpdateLrsRequest) { ); current_status.merge(new_status); - actions.push(WSAction::UpdateStatus::(( + actions.push(WSAction::::UpdateStatus(( key, current_status, ))); @@ -237,7 +250,10 @@ async fn receive_lrs_update(ctx: &SharedContext, requests: UpdateLrsRequest) { skip(ctx,request), fields(replica=%request.id) )] -async fn receive_replica_remove(ctx: &SharedContext, request: ReplicaRemovedRequest) { +async fn receive_replica_remove(ctx: &SharedContext, request: ReplicaRemovedRequest) +where + C: MetadataItem, +{ debug!(request=?request); // create action inside to optimize read locking let read_guard = ctx.partitions().store().read().await; @@ -245,7 +261,7 @@ async fn receive_replica_remove(ctx: &SharedContext, request: ReplicaRemovedRequ // force to delete partition regardless if confirm if request.confirm { debug!("force delete"); - Some(WSAction::DeleteFinal::(request.id)) + Some(WSAction::::DeleteFinal(request.id)) } else { debug!("no delete"); None @@ -264,8 +280,8 @@ async fn receive_replica_remove(ctx: &SharedContext, request: ReplicaRemovedRequ /// send spu spec changes only #[instrument(skip(sink))] -async fn send_spu_spec_changes( - listener: &mut K8ChangeListener, +async fn send_spu_spec_changes( + listener: &mut ChangeListener, sink: &mut FluvioSink, spu_id: SpuId, ) -> Result<(), SocketError> { @@ -310,8 +326,8 @@ async fn send_spu_spec_changes( } #[instrument(level = "trace", skip(sink))] -async fn send_replica_spec_changes( - listener: &mut K8ChangeListener, +async fn send_replica_spec_changes( + listener: &mut ChangeListener, sink: &mut FluvioSink, spu_id: SpuId, ) -> Result<(), SocketError> { @@ -381,8 +397,8 @@ async fn send_replica_spec_changes( } #[instrument(level = "trace", skip(sink))] -async fn send_smartmodule_changes( - listener: &mut K8ChangeListener, +async fn send_smartmodule_changes( + listener: &mut ChangeListener, sink: &mut FluvioSink, spu_id: SpuId, ) -> Result<(), SocketError> { diff --git a/crates/fluvio-sc/src/services/public_api/create.rs b/crates/fluvio-sc/src/services/public_api/create.rs index 28560b4f37..760550d7d8 100644 --- a/crates/fluvio-sc/src/services/public_api/create.rs +++ b/crates/fluvio-sc/src/services/public_api/create.rs @@ -1,10 +1,11 @@ use fluvio_protocol::link::ErrorCode; +use fluvio_stream_model::core::MetadataItem; use tracing::{instrument, debug, error}; -use anyhow::{Result}; +use anyhow::Result; -use fluvio_controlplane_metadata::smartmodule::{SmartModuleSpec}; +use fluvio_controlplane_metadata::smartmodule::SmartModuleSpec; use fluvio_controlplane_metadata::spg::SpuGroupSpec; -use fluvio_controlplane_metadata::spu::{CustomSpuSpec}; +use fluvio_controlplane_metadata::spu::CustomSpuSpec; use fluvio_controlplane_metadata::tableformat::TableFormatSpec; use fluvio_controlplane_metadata::topic::TopicSpec; use fluvio_protocol::api::{RequestMessage, ResponseMessage}; @@ -16,9 +17,9 @@ use crate::services::auth::AuthServiceContext; /// Handler for create topic request #[instrument(skip(request, auth_context))] -pub async fn handle_create_request( +pub async fn handle_create_request( request: Box>, - auth_context: &AuthServiceContext, + auth_context: &AuthServiceContext, ) -> Result> { let (header, req) = request.get_header_request(); @@ -53,22 +54,23 @@ mod create_handler { use fluvio_controlplane_metadata::core::Spec; use fluvio_stream_dispatcher::store::StoreContext; + use fluvio_stream_model::core::MetadataItem; use tracing::{info, trace, instrument}; use fluvio_protocol::link::ErrorCode; use fluvio_sc_schema::{AdminSpec, Status}; - use fluvio_sc_schema::objects::{CommonCreateRequest}; + use fluvio_sc_schema::objects::CommonCreateRequest; use fluvio_controlplane_metadata::extended::SpecExt; use fluvio_auth::{AuthContext, TypeAction}; use crate::services::auth::AuthServiceContext; #[instrument(skip(create, spec, auth_ctx, object_ctx, error_code))] - pub async fn process( + pub async fn process( create: CommonCreateRequest, spec: S, - auth_ctx: &AuthServiceContext, - object_ctx: &StoreContext, + auth_ctx: &AuthServiceContext, + object_ctx: &StoreContext, error_code: F, ) -> Result where diff --git a/crates/fluvio-sc/src/services/public_api/delete.rs b/crates/fluvio-sc/src/services/public_api/delete.rs index a11618ae4e..a6df76d9c6 100644 --- a/crates/fluvio-sc/src/services/public_api/delete.rs +++ b/crates/fluvio-sc/src/services/public_api/delete.rs @@ -6,8 +6,9 @@ //! use fluvio_protocol::link::ErrorCode; +use fluvio_stream_model::core::MetadataItem; use tracing::{instrument, trace, debug, error}; -use anyhow::{Result}; +use anyhow::Result; use fluvio_controlplane_metadata::smartmodule::SmartModuleSpec; use fluvio_controlplane_metadata::spg::SpuGroupSpec; @@ -17,15 +18,15 @@ use fluvio_controlplane_metadata::topic::TopicSpec; use fluvio_protocol::api::{RequestMessage, ResponseMessage}; use fluvio_sc_schema::{Status, TryEncodableFrom}; use fluvio_sc_schema::objects::{ObjectApiDeleteRequest, DeleteRequest}; -use fluvio_auth::{AuthContext}; +use fluvio_auth::AuthContext; use crate::services::auth::AuthServiceContext; /// Handler for delete topic request #[instrument(skip(request, auth_ctx))] -pub async fn handle_delete_request( +pub async fn handle_delete_request( request: RequestMessage, - auth_ctx: &AuthServiceContext, + auth_ctx: &AuthServiceContext, ) -> Result> { let (header, del_req) = request.get_header_request(); @@ -63,6 +64,7 @@ mod delete_handler { use fluvio_protocol::link::ErrorCode; use fluvio_stream_dispatcher::store::StoreContext; + use fluvio_stream_model::core::MetadataItem; use tracing::{info, trace, instrument}; use fluvio_sc_schema::{AdminSpec, Status}; @@ -73,10 +75,10 @@ mod delete_handler { /// Handler for object delete #[instrument(skip(auth_ctx, object_ctx, error_code, not_found_code))] - pub async fn process( + pub async fn process( name: String, - auth_ctx: &AuthServiceContext, - object_ctx: &StoreContext, + auth_ctx: &AuthServiceContext, + object_ctx: &StoreContext, error_code: F, not_found_code: G, ) -> Result diff --git a/crates/fluvio-sc/src/services/public_api/list.rs b/crates/fluvio-sc/src/services/public_api/list.rs index 7a3d0c74a5..9e59776f1d 100644 --- a/crates/fluvio-sc/src/services/public_api/list.rs +++ b/crates/fluvio-sc/src/services/public_api/list.rs @@ -6,6 +6,7 @@ use fluvio_controlplane_metadata::{ smartmodule::SmartModuleSpec, tableformat::TableFormatSpec, }; +use fluvio_stream_model::core::MetadataItem; use tracing::{debug, instrument}; use anyhow::Result; @@ -14,15 +15,15 @@ use fluvio_sc_schema::{ objects::{ObjectApiListRequest, ObjectApiListResponse, ListRequest}, TryEncodableFrom, }; -use fluvio_auth::{AuthContext}; +use fluvio_auth::AuthContext; use crate::services::auth::AuthServiceContext; use super::smartmodule::fetch_smart_modules; #[instrument(skip(request, auth_ctx))] -pub async fn handle_list_request( +pub async fn handle_list_request( request: RequestMessage, - auth_ctx: &AuthServiceContext, + auth_ctx: &AuthServiceContext, ) -> Result> { let (header, req) = request.get_header_request(); debug!("list header: {:#?}, request: {:#?}", header, req); @@ -87,32 +88,32 @@ mod fetch { use std::io::{Error, ErrorKind}; use fluvio_controlplane_metadata::core::Spec; - use fluvio_controlplane_metadata::store::k8::K8MetaItem; use fluvio_protocol::{Decoder, Encoder}; use fluvio_sc_schema::AdminSpec; use fluvio_stream_dispatcher::store::StoreContext; + use fluvio_stream_model::core::MetadataItem; use tracing::{debug, trace, instrument}; use fluvio_sc_schema::objects::{ListResponse, Metadata, ListFilters}; use fluvio_auth::{AuthContext, TypeAction}; - use fluvio_controlplane_metadata::store::{MetadataStoreObject}; + use fluvio_controlplane_metadata::store::MetadataStoreObject; use fluvio_controlplane_metadata::extended::SpecExt; use fluvio_controlplane_metadata::store::KeyFilter; use crate::services::auth::AuthServiceContext; #[instrument(skip(filters, auth_ctx))] - pub async fn handle_fetch_request( + pub async fn handle_fetch_request( filters: ListFilters, - auth_ctx: &AuthServiceContext, - object_ctx: &StoreContext, + auth_ctx: &AuthServiceContext, + object_ctx: &StoreContext, ) -> Result, Error> where AC: AuthContext, S: AdminSpec + SpecExt, ::Status: Encoder + Decoder, ::IndexKey: AsRef, - Metadata: From>, + Metadata: From>, { debug!(ty = %S::LABEL,"fetching"); diff --git a/crates/fluvio-sc/src/services/public_api/mod.rs b/crates/fluvio-sc/src/services/public_api/mod.rs index af03487145..994d2b91fa 100644 --- a/crates/fluvio-sc/src/services/public_api/mod.rs +++ b/crates/fluvio-sc/src/services/public_api/mod.rs @@ -18,6 +18,7 @@ mod server { use std::fmt::Debug; + use fluvio_stream_model::core::MetadataItem; use tracing::debug; use fluvio_service::FluvioApiServer; @@ -27,10 +28,13 @@ mod server { use super::public_server::PublicService; /// create public server - pub fn start_public_server(ctx: AuthGlobalContext) + pub fn start_public_server(ctx: AuthGlobalContext) where A: Authorization + Sync + Send + Debug + 'static, - AuthGlobalContext: Clone + Debug, + C: MetadataItem + 'static, + C::UId: Send + Sync, + + AuthGlobalContext: Clone + Debug, ::Context: Send + Sync, { let addr = ctx.global_ctx.config().public_endpoint.clone(); diff --git a/crates/fluvio-sc/src/services/public_api/partition/mod.rs b/crates/fluvio-sc/src/services/public_api/partition/mod.rs index b5e6456a83..ed44d1c19c 100644 --- a/crates/fluvio-sc/src/services/public_api/partition/mod.rs +++ b/crates/fluvio-sc/src/services/public_api/partition/mod.rs @@ -1,19 +1,20 @@ use std::io::{Error, ErrorKind}; +use fluvio_stream_model::core::MetadataItem; use tracing::{trace, debug, instrument}; use anyhow::Result; use fluvio_sc_schema::objects::{ListResponse, Metadata, ListFilters}; -use fluvio_sc_schema::partition::{PartitionSpec}; +use fluvio_sc_schema::partition::PartitionSpec; use fluvio_controlplane_metadata::extended::SpecExt; use fluvio_auth::{AuthContext, TypeAction}; use crate::services::auth::AuthServiceContext; #[instrument(skip(_filters, auth_ctx))] -pub async fn handle_fetch_request( +pub async fn handle_fetch_request( _filters: ListFilters, - auth_ctx: &AuthServiceContext, + auth_ctx: &AuthServiceContext, ) -> Result> { debug!("fetching custom spu list"); diff --git a/crates/fluvio-sc/src/services/public_api/public_server.rs b/crates/fluvio-sc/src/services/public_api/public_server.rs index 170d87260f..3c9dcb86d4 100644 --- a/crates/fluvio-sc/src/services/public_api/public_server.rs +++ b/crates/fluvio-sc/src/services/public_api/public_server.rs @@ -20,6 +20,7 @@ use fluvio_service::ConnectInfo; use fluvio_types::event::StickyEvent; use fluvio_auth::Authorization; //use fluvio_service::aAuthorization; +use fluvio_stream_model::core::MetadataItem; use fluvio_service::api_loop; use fluvio_service::call_service; use fluvio_socket::FluvioSocket; @@ -30,23 +31,25 @@ use fluvio_sc_schema::AdminPublicDecodedRequest; use crate::services::auth::{AuthGlobalContext, AuthServiceContext}; #[derive(Debug)] -pub struct PublicService { - data: PhantomData, +pub struct PublicService { + data: PhantomData<(A, C)>, } -impl PublicService { +impl PublicService { pub fn new() -> Self { PublicService { data: PhantomData } } } #[async_trait] -impl FluvioService for PublicService +impl FluvioService for PublicService where A: Authorization + Sync + Send, + C::UId: Send + Sync, + C: MetadataItem + 'static, ::Context: Send + Sync, { - type Context = AuthGlobalContext; + type Context = AuthGlobalContext; type Request = AdminPublicDecodedRequest; #[instrument(skip(self, ctx))] diff --git a/crates/fluvio-sc/src/services/public_api/smartmodule/create.rs b/crates/fluvio-sc/src/services/public_api/smartmodule/create.rs index fe37226929..cab1c6e892 100644 --- a/crates/fluvio-sc/src/services/public_api/smartmodule/create.rs +++ b/crates/fluvio-sc/src/services/public_api/smartmodule/create.rs @@ -4,12 +4,13 @@ //! Converts SmartModule API request into KV request and sends to KV store for processing. //! +use fluvio_stream_model::core::MetadataItem; use tracing::{info, trace, debug, instrument}; use anyhow::{anyhow, Result}; use fluvio_protocol::link::ErrorCode; -use fluvio_sc_schema::{Status}; -use fluvio_sc_schema::objects::{CreateRequest}; +use fluvio_sc_schema::Status; +use fluvio_sc_schema::objects::CreateRequest; use fluvio_sc_schema::smartmodule::SmartModuleSpec; use fluvio_controlplane_metadata::extended::SpecExt; use fluvio_auth::{AuthContext, TypeAction}; @@ -19,9 +20,9 @@ use crate::services::auth::AuthServiceContext; /// Handler for smartmodule request #[instrument(skip(req, auth_ctx))] -pub async fn handle_create_smartmodule_request( +pub async fn handle_create_smartmodule_request( req: CreateRequest, - auth_ctx: &AuthServiceContext, + auth_ctx: &AuthServiceContext, ) -> Result { let (create, spec) = req.parts(); let name = create.name; @@ -53,8 +54,8 @@ pub async fn handle_create_smartmodule_request( /// Process custom smartmodule, converts smartmodule spec to K8 and sends to KV store #[instrument(skip(ctx, name, smartmodule_spec))] -async fn process_smartmodule_request( - ctx: &Context, +async fn process_smartmodule_request( + ctx: &Context, name: String, smartmodule_spec: SmartModuleSpec, ) -> Status { diff --git a/crates/fluvio-sc/src/services/public_api/smartmodule/delete.rs b/crates/fluvio-sc/src/services/public_api/smartmodule/delete.rs index b42e9c57f3..f0158a8477 100644 --- a/crates/fluvio-sc/src/services/public_api/smartmodule/delete.rs +++ b/crates/fluvio-sc/src/services/public_api/smartmodule/delete.rs @@ -1,5 +1,6 @@ use std::io::{Error, ErrorKind}; +use fluvio_stream_model::core::MetadataItem; use tracing::{debug, trace, instrument, info}; use anyhow::Result; @@ -12,9 +13,9 @@ use crate::services::auth::AuthServiceContext; /// Handler for delete smartmodule request #[instrument(skip(name, auth_ctx))] -pub async fn handle_delete_smartmodule( +pub async fn handle_delete_smartmodule( name: String, - auth_ctx: &AuthServiceContext, + auth_ctx: &AuthServiceContext, ) -> Result { use fluvio_protocol::link::ErrorCode; diff --git a/crates/fluvio-sc/src/services/public_api/smartmodule/list.rs b/crates/fluvio-sc/src/services/public_api/smartmodule/list.rs index a2648670ae..122652587a 100644 --- a/crates/fluvio-sc/src/services/public_api/smartmodule/list.rs +++ b/crates/fluvio-sc/src/services/public_api/smartmodule/list.rs @@ -97,9 +97,7 @@ mod test { SmartModuleSpec, SmartModuleMetadata, SmartModulePackage, FluvioSemVersion, }; - use crate::{ - services::auth::{RootAuthContext}, - }; + use crate::services::auth::RootAuthContext; use super::fetch_smart_modules; diff --git a/crates/fluvio-sc/src/services/public_api/spg/create.rs b/crates/fluvio-sc/src/services/public_api/spg/create.rs index 25e7b76855..04344f59b7 100644 --- a/crates/fluvio-sc/src/services/public_api/spg/create.rs +++ b/crates/fluvio-sc/src/services/public_api/spg/create.rs @@ -6,13 +6,14 @@ use std::time::Duration; +use fluvio_stream_model::core::MetadataItem; use tracing::{info, trace, instrument}; use anyhow::{anyhow, Result}; use fluvio_stream_dispatcher::actions::WSAction; use fluvio_protocol::link::ErrorCode; use fluvio_sc_schema::Status; -use fluvio_sc_schema::objects::{CreateRequest}; +use fluvio_sc_schema::objects::CreateRequest; use fluvio_sc_schema::spg::SpuGroupSpec; use fluvio_controlplane_metadata::extended::SpecExt; use fluvio_auth::{AuthContext, TypeAction}; @@ -24,9 +25,9 @@ const DEFAULT_SPG_CREATE_TIMEOUT: u32 = 120 * 1000; // 2 minutes /// Handler for spu groups request #[instrument(skip(req, auth_ctx))] -pub async fn handle_create_spu_group_request( +pub async fn handle_create_spu_group_request( req: CreateRequest, - auth_ctx: &AuthServiceContext, + auth_ctx: &AuthServiceContext, ) -> Result { let (create, spg) = req.parts(); let name = create.name; @@ -60,8 +61,8 @@ pub async fn handle_create_spu_group_request( /// Process custom spu, converts spu spec to K8 and sends to KV store #[instrument(skip(ctx, spg_spec))] -async fn process_custom_spu_request( - ctx: &Context, +async fn process_custom_spu_request( + ctx: &Context, name: String, timeout: Option, spg_spec: SpuGroupSpec, diff --git a/crates/fluvio-sc/src/services/public_api/spg/delete.rs b/crates/fluvio-sc/src/services/public_api/spg/delete.rs index 4519aa5d4d..dbd5320c85 100644 --- a/crates/fluvio-sc/src/services/public_api/spg/delete.rs +++ b/crates/fluvio-sc/src/services/public_api/spg/delete.rs @@ -1,5 +1,6 @@ use std::io::{Error, ErrorKind}; +use fluvio_stream_model::core::MetadataItem; use tracing::{info, trace, instrument}; use fluvio_sc_schema::Status; @@ -11,9 +12,9 @@ use crate::services::auth::AuthServiceContext; /// Handler for delete spu group request #[instrument(skip(name, auth_ctx))] -pub async fn handle_delete_spu_group( +pub async fn handle_delete_spu_group( name: String, - auth_ctx: &AuthServiceContext, + auth_ctx: &AuthServiceContext, ) -> Result { use fluvio_protocol::link::ErrorCode; diff --git a/crates/fluvio-sc/src/services/public_api/spg/fetch.rs b/crates/fluvio-sc/src/services/public_api/spg/fetch.rs index 2c050fd534..2d01efbdd6 100644 --- a/crates/fluvio-sc/src/services/public_api/spg/fetch.rs +++ b/crates/fluvio-sc/src/services/public_api/spg/fetch.rs @@ -1,5 +1,6 @@ use std::io::{Error, ErrorKind}; +use fluvio_stream_model::core::MetadataItem; use tracing::{debug, trace, instrument}; use anyhow::Result; @@ -12,9 +13,9 @@ use fluvio_controlplane_metadata::extended::SpecExt; use crate::services::auth::AuthServiceContext; #[instrument(skip(filters, auth_ctx))] -pub async fn handle_fetch_spu_groups_request( +pub async fn handle_fetch_spu_groups_request( filters: ListFilters, - auth_ctx: &AuthServiceContext, + auth_ctx: &AuthServiceContext, ) -> Result> { debug!("fetching spu groups"); diff --git a/crates/fluvio-sc/src/services/public_api/spu/fetch.rs b/crates/fluvio-sc/src/services/public_api/spu/fetch.rs index d7fc752977..ee009a9ee6 100644 --- a/crates/fluvio-sc/src/services/public_api/spu/fetch.rs +++ b/crates/fluvio-sc/src/services/public_api/spu/fetch.rs @@ -1,3 +1,4 @@ +use fluvio_stream_model::core::MetadataItem; use tracing::{trace, debug, instrument}; use anyhow::{anyhow, Result}; @@ -11,9 +12,9 @@ use fluvio_controlplane_metadata::extended::SpecExt; use crate::services::auth::AuthServiceContext; #[instrument(skip(filters, auth_ctx))] -pub async fn handle_fetch_custom_spu_request( +pub async fn handle_fetch_custom_spu_request( filters: ListFilters, - auth_ctx: &AuthServiceContext, + auth_ctx: &AuthServiceContext, ) -> Result> { debug!("fetching custom spu list"); @@ -59,9 +60,9 @@ pub async fn handle_fetch_custom_spu_request( } #[instrument(skip(filters, auth_ctx))] -pub async fn handle_fetch_spus_request( +pub async fn handle_fetch_spus_request( filters: ListFilters, - auth_ctx: &AuthServiceContext, + auth_ctx: &AuthServiceContext, ) -> Result> { debug!("fetching spu list"); diff --git a/crates/fluvio-sc/src/services/public_api/spu/register_custom_spus_req.rs b/crates/fluvio-sc/src/services/public_api/spu/register_custom_spus_req.rs index 5fbdfb33c9..1ce4c3660d 100644 --- a/crates/fluvio-sc/src/services/public_api/spu/register_custom_spus_req.rs +++ b/crates/fluvio-sc/src/services/public_api/spu/register_custom_spus_req.rs @@ -3,6 +3,7 @@ //! //! Converts Custom Spu API request into KV request and sends to KV store for processing. //! +use fluvio_stream_model::core::MetadataItem; use tracing::{debug, info, trace, instrument}; use std::io::Error as IoError; @@ -18,18 +19,18 @@ use crate::core::SharedContext; use crate::services::auth::AuthServiceContext; use crate::stores::spu::SpuLocalStorePolicy; -pub struct RegisterCustomSpu { - ctx: SharedContext, +pub struct RegisterCustomSpu { + ctx: SharedContext, name: String, spec: CustomSpuSpec, } -impl RegisterCustomSpu { +impl RegisterCustomSpu { /// Handler for create spus request #[instrument(skip(req, auth_ctx))] pub async fn handle_register_custom_spu_request( req: CreateRequest, - auth_ctx: &AuthServiceContext, + auth_ctx: &AuthServiceContext, ) -> Status { let (create, spec) = req.parts(); let name = create.name; diff --git a/crates/fluvio-sc/src/services/public_api/spu/unregister_custom_spus_req.rs b/crates/fluvio-sc/src/services/public_api/spu/unregister_custom_spus_req.rs index 21822e01e2..a336d4533e 100644 --- a/crates/fluvio-sc/src/services/public_api/spu/unregister_custom_spus_req.rs +++ b/crates/fluvio-sc/src/services/public_api/spu/unregister_custom_spus_req.rs @@ -14,14 +14,15 @@ use fluvio_controlplane_metadata::spu::CustomSpuKey; use fluvio_auth::{AuthContext, InstanceAction}; use fluvio_controlplane_metadata::extended::SpecExt; +use crate::dispatcher::core::MetadataItem; use crate::stores::spu::{SpuAdminMd, SpuLocalStorePolicy}; use crate::services::auth::AuthServiceContext; /// Handler for delete custom spu request #[instrument(skip(key, auth_ctx))] -pub async fn handle_un_register_custom_spu_request( +pub async fn handle_un_register_custom_spu_request( key: CustomSpuKey, - auth_ctx: &AuthServiceContext, + auth_ctx: &AuthServiceContext, ) -> Result { let spu_name = key.to_string(); @@ -88,9 +89,9 @@ pub async fn handle_un_register_custom_spu_request( } /// Generate for delete custom spu operation and return result. -async fn un_register_custom_spu( - auth_ctx: &AuthServiceContext, - spu: SpuAdminMd, +async fn un_register_custom_spu( + auth_ctx: &AuthServiceContext, + spu: SpuAdminMd, ) -> Status { let spu_name = spu.key_owned(); diff --git a/crates/fluvio-sc/src/services/public_api/tableformat/create.rs b/crates/fluvio-sc/src/services/public_api/tableformat/create.rs index 103daca11b..0e1478020b 100644 --- a/crates/fluvio-sc/src/services/public_api/tableformat/create.rs +++ b/crates/fluvio-sc/src/services/public_api/tableformat/create.rs @@ -4,12 +4,13 @@ //! Converts TableFormat API request into KV request and sends to KV store for processing. //! +use fluvio_stream_model::core::MetadataItem; use tracing::{debug, info, trace, instrument}; use anyhow::{anyhow, Result}; use fluvio_protocol::link::ErrorCode; -use fluvio_sc_schema::{Status}; -use fluvio_sc_schema::objects::{CreateRequest}; +use fluvio_sc_schema::Status; +use fluvio_sc_schema::objects::CreateRequest; use fluvio_sc_schema::tableformat::TableFormatSpec; use fluvio_controlplane_metadata::extended::SpecExt; use fluvio_auth::{AuthContext, TypeAction}; @@ -19,9 +20,9 @@ use crate::services::auth::AuthServiceContext; /// Handler for tableformat request #[instrument(skip(req, auth_ctx))] -pub async fn handle_create_tableformat_request( +pub async fn handle_create_tableformat_request( req: CreateRequest, - auth_ctx: &AuthServiceContext, + auth_ctx: &AuthServiceContext, ) -> Result { let (create, spec) = req.parts(); let name = create.name; @@ -68,8 +69,8 @@ pub async fn handle_create_tableformat_request( /// Process custom tableformat, converts tableformat spec to K8 and sends to KV store #[instrument(skip(ctx, name, tableformat_spec))] -async fn process_tableformat_request( - ctx: &Context, +async fn process_tableformat_request( + ctx: &Context, name: String, tableformat_spec: TableFormatSpec, ) -> Status { diff --git a/crates/fluvio-sc/src/services/public_api/tableformat/delete.rs b/crates/fluvio-sc/src/services/public_api/tableformat/delete.rs index 4f1e7ede18..b8648eccbe 100644 --- a/crates/fluvio-sc/src/services/public_api/tableformat/delete.rs +++ b/crates/fluvio-sc/src/services/public_api/tableformat/delete.rs @@ -1,5 +1,6 @@ use std::io::{Error, ErrorKind}; +use fluvio_stream_model::core::MetadataItem; use tracing::{info, trace, instrument}; use fluvio_sc_schema::Status; @@ -11,9 +12,9 @@ use crate::services::auth::AuthServiceContext; /// Handler for delete tableformat request #[instrument(skip(name, auth_ctx))] -pub async fn handle_delete_tableformat( +pub async fn handle_delete_tableformat( name: String, - auth_ctx: &AuthServiceContext, + auth_ctx: &AuthServiceContext, ) -> Result { use fluvio_protocol::link::ErrorCode; diff --git a/crates/fluvio-sc/src/services/public_api/topic/create.rs b/crates/fluvio-sc/src/services/public_api/topic/create.rs index 26498a6d42..af617ded9f 100644 --- a/crates/fluvio-sc/src/services/public_api/topic/create.rs +++ b/crates/fluvio-sc/src/services/public_api/topic/create.rs @@ -9,19 +9,19 @@ //! Assigned Topics allow the users to apply their custom-defined replica assignment. //! -use fluvio_controlplane_metadata::smartmodule::SmartModulePackageKey; -use fluvio_sc_schema::objects::CreateRequest; -use fluvio_stream_model::store::k8::K8MetaItem; use tracing::{info, debug, trace, instrument}; use anyhow::{anyhow, Result}; use fluvio_protocol::link::ErrorCode; use fluvio_controlplane_metadata::topic::ReplicaSpec; +use fluvio_sc_schema::objects::CreateRequest; use fluvio_sc_schema::shared::validate_resource_name; use fluvio_sc_schema::Status; use fluvio_sc_schema::topic::TopicSpec; use fluvio_auth::{AuthContext, TypeAction}; use fluvio_controlplane_metadata::extended::SpecExt; +use fluvio_controlplane_metadata::smartmodule::SmartModulePackageKey; +use fluvio_stream_model::core::MetadataItem; use crate::core::Context; use crate::controllers::topics::generate_replica_map; @@ -32,9 +32,9 @@ use crate::services::auth::AuthServiceContext; /// Handler for create topic request #[instrument(skip(req, auth_ctx))] -pub(crate) async fn handle_create_topics_request( +pub(crate) async fn handle_create_topics_request( req: CreateRequest, - auth_ctx: &AuthServiceContext, + auth_ctx: &AuthServiceContext, ) -> Result { let (create, topic) = req.parts(); let name = create.name; @@ -59,7 +59,7 @@ pub(crate) async fn handle_create_topics_request( } // validate topic request - let mut status = validate_topic_request(&name, &topic, &auth_ctx.global_ctx).await; + let mut status = validate_topic_request::(&name, &topic, &auth_ctx.global_ctx).await; if status.is_error() { return Ok(status); } @@ -73,7 +73,11 @@ pub(crate) async fn handle_create_topics_request( } /// Validate topic, takes advantage of the validation routines inside topic action workflow -async fn validate_topic_request(name: &str, topic_spec: &TopicSpec, metadata: &Context) -> Status { +async fn validate_topic_request( + name: &str, + topic_spec: &TopicSpec, + metadata: &Context, +) -> Status { debug!("validating topic: {}", name); if let Err(err) = validate_resource_name(name) { @@ -132,7 +136,7 @@ async fn validate_topic_request(name: &str, topic_spec: &TopicSpec, metadata: &C match topic_spec.replicas() { ReplicaSpec::Computed(param) => { - let next_state = validate_computed_topic_parameters::(param); + let next_state = validate_computed_topic_parameters::(param); trace!("validating, computed topic: {:#?}", next_state); if next_state.resolution.is_invalid() { Status::new( @@ -141,7 +145,7 @@ async fn validate_topic_request(name: &str, topic_spec: &TopicSpec, metadata: &C Some(next_state.reason), ) } else { - let next_state = generate_replica_map::(spus, param).await; + let next_state = generate_replica_map::(spus, param).await; trace!("validating, generate replica map topic: {:#?}", next_state); if next_state.resolution.no_resource() { Status::new( @@ -155,7 +159,7 @@ async fn validate_topic_request(name: &str, topic_spec: &TopicSpec, metadata: &C } } ReplicaSpec::Assigned(ref partition_map) => { - let next_state = validate_assigned_topic_parameters::(partition_map); + let next_state = validate_assigned_topic_parameters::(partition_map); trace!("validating, computed topic: {:#?}", next_state); if next_state.resolution.is_invalid() { Status::new( @@ -165,7 +169,7 @@ async fn validate_topic_request(name: &str, topic_spec: &TopicSpec, metadata: &C ) } else { let next_state = - update_replica_map_for_assigned_topic::(partition_map, spus).await; + update_replica_map_for_assigned_topic::(partition_map, spus).await; trace!("validating, assign replica map topic: {:#?}", next_state); if next_state.resolution.is_invalid() { Status::new( @@ -183,8 +187,8 @@ async fn validate_topic_request(name: &str, topic_spec: &TopicSpec, metadata: &C /// create new topic and wait until all partitions are fully provisioned /// if any partitions are not provisioned in time, this will generate error -async fn process_topic_request( - auth_ctx: &AuthServiceContext, +async fn process_topic_request( + auth_ctx: &AuthServiceContext, name: String, topic_spec: TopicSpec, ) -> Status { @@ -223,7 +227,7 @@ async fn process_topic_request( "waiting for partitions to be provisioned", ); - let topic_uid = &topic_instance.ctx().item().uid; + let topic_uid = &topic_instance.ctx().item().uid(); let partition_ctx = auth_ctx.global_ctx.partitions(); let mut partition_listener = partition_ctx.change_listener(); diff --git a/crates/fluvio-sc/src/services/public_api/topic/delete.rs b/crates/fluvio-sc/src/services/public_api/topic/delete.rs index 6bc8a22379..6888a68b6c 100644 --- a/crates/fluvio-sc/src/services/public_api/topic/delete.rs +++ b/crates/fluvio-sc/src/services/public_api/topic/delete.rs @@ -4,6 +4,7 @@ //! Delete topic request handler. Lookup topic in local metadata, grab its K8 context //! and send K8 a delete message. //! +use fluvio_stream_model::core::MetadataItem; use tracing::{info, trace, instrument}; use std::io::{Error, ErrorKind}; @@ -17,9 +18,9 @@ use crate::services::auth::AuthServiceContext; /// Handler for delete topic request #[instrument(skip(topic_name, auth_ctx))] -pub async fn handle_delete_topic( +pub async fn handle_delete_topic( topic_name: String, - auth_ctx: &AuthServiceContext, + auth_ctx: &AuthServiceContext, ) -> Result { info!(%topic_name, "Deleting topic"); diff --git a/crates/fluvio-sc/src/services/public_api/topic/fetch.rs b/crates/fluvio-sc/src/services/public_api/topic/fetch.rs index 5655cb31e4..7353bd8cfa 100644 --- a/crates/fluvio-sc/src/services/public_api/topic/fetch.rs +++ b/crates/fluvio-sc/src/services/public_api/topic/fetch.rs @@ -1,3 +1,4 @@ +use fluvio_stream_model::core::MetadataItem; use tracing::{trace, debug, instrument}; use anyhow::{anyhow, Result}; @@ -10,9 +11,9 @@ use fluvio_controlplane_metadata::extended::SpecExt; use crate::services::auth::AuthServiceContext; #[instrument(skip(filters, auth_ctx))] -pub async fn handle_fetch_topics_request( +pub async fn handle_fetch_topics_request( filters: ListFilters, - auth_ctx: &AuthServiceContext, + auth_ctx: &AuthServiceContext, ) -> Result> { debug!("retrieving topic list: {:#?}", filters); diff --git a/crates/fluvio-sc/src/services/public_api/watch.rs b/crates/fluvio-sc/src/services/public_api/watch.rs index 05c5b26b49..1c105a1455 100644 --- a/crates/fluvio-sc/src/services/public_api/watch.rs +++ b/crates/fluvio-sc/src/services/public_api/watch.rs @@ -1,5 +1,7 @@ use std::sync::Arc; +use fluvio_stream_model::core::MetadataItem; +use fluvio_stream_model::store::ChangeListener; use tracing::{debug, trace, error, instrument}; use anyhow::{anyhow, Result}; @@ -20,14 +22,14 @@ use fluvio_controlplane_metadata::smartmodule::SmartModuleSpec; use fluvio_controlplane_metadata::tableformat::TableFormatSpec; use crate::services::auth::AuthServiceContext; -use crate::stores::{StoreContext, K8ChangeListener}; +use crate::stores::StoreContext; use fluvio_controlplane_metadata::spg::SpuGroupSpec; /// handle watch request by spawning watch controller for each store #[instrument(skip(request, auth_ctx, sink, end_event))] -pub fn handle_watch_request( +pub fn handle_watch_request( request: RequestMessage, - auth_ctx: &AuthServiceContext, + auth_ctx: &AuthServiceContext, sink: ExclusiveFlvSink, end_event: Arc, ) -> Result<()> { @@ -35,7 +37,7 @@ pub fn handle_watch_request( debug!("handling watch header: {:#?}, request: {:#?}", header, req); if (req.downcast()? as Option>).is_some() { - WatchController::::update( + WatchController::::update( sink, end_event, auth_ctx.global_ctx.topics().clone(), @@ -43,7 +45,7 @@ pub fn handle_watch_request( false, ) } else if (req.downcast()? as Option>).is_some() { - WatchController::::update( + WatchController::::update( sink, end_event, auth_ctx.global_ctx.spus().clone(), @@ -51,7 +53,7 @@ pub fn handle_watch_request( false, ) } else if (req.downcast()? as Option>).is_some() { - WatchController::::update( + WatchController::::update( sink, end_event, auth_ctx.global_ctx.spgs().clone(), @@ -59,7 +61,7 @@ pub fn handle_watch_request( false, ) } else if (req.downcast()? as Option>).is_some() { - WatchController::::update( + WatchController::::update( sink, end_event, auth_ctx.global_ctx.partitions().clone(), @@ -67,7 +69,7 @@ pub fn handle_watch_request( false, ) } else if let Some(req) = req.downcast()? as Option> { - WatchController::::update( + WatchController::::update( sink, end_event, auth_ctx.global_ctx.smartmodules().clone(), @@ -75,7 +77,7 @@ pub fn handle_watch_request( req.summary, ) } else if (req.downcast()? as Option>).is_some() { - WatchController::::update( + WatchController::::update( sink, end_event, auth_ctx.global_ctx.tableformats().clone(), @@ -91,16 +93,17 @@ pub fn handle_watch_request( } /// Watch controller for each object. Note that return type may or not be the same as the object hence two separate spec -struct WatchController { +struct WatchController { response_sink: ExclusiveFlvSink, - store: StoreContext, + store: StoreContext, header: RequestHeader, summary: bool, end_event: Arc, } -impl WatchController +impl WatchController where + C: MetadataItem + 'static, S: AdminSpec + 'static, S: Encoder + Decoder + Send + Sync, S::Status: Encoder + Decoder + Send + Sync, @@ -110,7 +113,7 @@ where fn update( response_sink: ExclusiveFlvSink, end_event: Arc, - store: StoreContext, + store: StoreContext, header: RequestHeader, summary: bool, ) { @@ -166,7 +169,7 @@ where /// sync with store and send out changes to send response /// if can't send, then signal end and return false #[instrument(skip(self, listener))] - async fn sync_and_send_changes(&mut self, listener: &mut K8ChangeListener) -> bool { + async fn sync_and_send_changes(&mut self, listener: &mut ChangeListener) -> bool { use fluvio_controlplane_metadata::message::*; if !listener.has_change() { diff --git a/crates/fluvio-sc/src/start.rs b/crates/fluvio-sc/src/start.rs index 017949760a..52f0fa3c3f 100644 --- a/crates/fluvio-sc/src/start.rs +++ b/crates/fluvio-sc/src/start.rs @@ -2,7 +2,7 @@ use k8_client::new_shared; use tracing::info; -use crate::cli::ScOpt; +use crate::{cli::ScOpt, core::K8SharedContext}; pub fn main_loop(opt: ScOpt) { // parse configuration (program exits on error) @@ -46,7 +46,7 @@ pub fn main_loop(opt: ScOpt) { info!("starting main loop"); #[cfg(feature = "k8")] - let ctx: std::sync::Arc = crate::init::start_main_loop_with_k8( + let ctx: K8SharedContext = crate::init::start_main_loop_with_k8( (sc_config.clone(), auth_policy), k8_client.clone(), ) diff --git a/crates/fluvio-sc/src/stores/spu/mod.rs b/crates/fluvio-sc/src/stores/spu/mod.rs index 2c654828bc..b2b623f9c2 100644 --- a/crates/fluvio-sc/src/stores/spu/mod.rs +++ b/crates/fluvio-sc/src/stores/spu/mod.rs @@ -5,7 +5,7 @@ pub use fluvio_controlplane_metadata::spu::*; pub use fluvio_controlplane_metadata::store::k8::K8MetaItem; pub use health_check::*; -pub type SpuAdminMd = SpuMetadata; +pub type SpuAdminMd = SpuMetadata; pub type SpuAdminStore = SpuLocalStore; // check if given range is conflict with any of the range