Skip to content

Commit

Permalink
Decouple from K8MetaItem struct on fluvio-sc (#3454)
Browse files Browse the repository at this point in the history
  • Loading branch information
morenol committed Aug 5, 2023
1 parent f6c84e9 commit b7a2765
Show file tree
Hide file tree
Showing 36 changed files with 253 additions and 183 deletions.
5 changes: 1 addition & 4 deletions crates/fluvio-controlplane-metadata/src/smartmodule/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ use tracing::debug;

use fluvio_protocol::{ByteBuf, Encoder, Decoder, Version};

use super::{
SmartModuleMetadata,
spec_v1::{SmartModuleSpecV1},
};
use super::{SmartModuleMetadata, spec_v1::SmartModuleSpecV1};

const V2_FORMAT: Version = 10;

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-sc-schema/src/objects/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use fluvio_protocol::api::Request;

use crate::{AdminPublicApiKey, CreatableAdminSpec, Status, TryEncodableFrom};

use super::{COMMON_VERSION};
use super::COMMON_VERSION;

#[derive(Encoder, Decoder, Default, Debug, Clone)]
pub struct CommonCreateRequest {
Expand Down
9 changes: 5 additions & 4 deletions crates/fluvio-sc/src/controllers/spus/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Duration;
use std::io::Error as IoError;

use fluvio_future::timer::sleep;
use fluvio_stream_model::core::MetadataItem;
use tracing::{debug, info, error, trace, instrument};

use fluvio_future::task::spawn;
Expand All @@ -15,14 +16,14 @@ use crate::stores::spu::*;

/// Reconcile SPU health status with Meta data
/// if SPU has not send heart beat within a period, it is considered down
pub struct SpuController {
spus: StoreContext<SpuSpec>,
pub struct SpuController<C: MetadataItem> {
spus: StoreContext<SpuSpec, C>,
health_check: SharedHealthCheck,
counter: u64, // how many time we have been sync
}

impl SpuController {
pub fn start(ctx: SharedContext) {
impl<C: MetadataItem + 'static> SpuController<C> {
pub fn start(ctx: SharedContext<C>) {
let controller = Self {
spus: ctx.spus().clone(),
health_check: ctx.health().clone(),
Expand Down
8 changes: 6 additions & 2 deletions crates/fluvio-sc/src/controllers/topics/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ pub struct TopicController<C: MetadataItem = K8MetaItem> {
reducer: TopicReducer<C>,
}

impl TopicController<K8MetaItem> {
impl<C> TopicController<C>
where
C: MetadataItem + 'static,
C::UId: Send + Sync,
{
/// streaming coordinator controller constructor
pub fn start(ctx: SharedContext) {
pub fn start(ctx: SharedContext<C>) {
let topics = ctx.topics().clone();
let partitions = ctx.partitions().clone();

Expand Down
33 changes: 18 additions & 15 deletions crates/fluvio-sc/src/core/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
//!
use std::sync::Arc;

use fluvio_stream_model::core::MetadataItem;

use crate::config::ScConfig;
use crate::stores::spu::*;
use crate::stores::partition::*;
Expand All @@ -14,18 +16,19 @@ use crate::stores::smartmodule::*;
use crate::stores::tableformat::*;
use crate::stores::*;

pub type SharedContext = Arc<Context>;
pub type SharedContext<C> = Arc<Context<C>>;
pub type K8SharedContext = Arc<Context<K8MetaItem>>;

/// Global Context for SC
/// This is where we store globally accessible data
#[derive(Debug)]
pub struct Context {
spus: StoreContext<SpuSpec>,
partitions: StoreContext<PartitionSpec>,
topics: StoreContext<TopicSpec>,
spgs: StoreContext<SpuGroupSpec>,
smartmodules: StoreContext<SmartModuleSpec>,
tableformats: StoreContext<TableFormatSpec>,
pub struct Context<C: MetadataItem> {
spus: StoreContext<SpuSpec, C>,
partitions: StoreContext<PartitionSpec, C>,
topics: StoreContext<TopicSpec, C>,
spgs: StoreContext<SpuGroupSpec, C>,
smartmodules: StoreContext<SmartModuleSpec, C>,
tableformats: StoreContext<TableFormatSpec, C>,
health: SharedHealthCheck,
config: ScConfig,
}
Expand All @@ -34,7 +37,7 @@ pub struct Context {
// ScMetadata - Implementation
// -----------------------------------

impl Context {
impl<C: MetadataItem> Context<C> {
pub fn shared_metadata(config: ScConfig) -> Arc<Self> {
Arc::new(Self::new(config))
}
Expand All @@ -54,29 +57,29 @@ impl Context {
}

/// reference to spus
pub fn spus(&self) -> &StoreContext<SpuSpec> {
pub fn spus(&self) -> &StoreContext<SpuSpec, C> {
&self.spus
}

/// reference to partitions
pub fn partitions(&self) -> &StoreContext<PartitionSpec> {
pub fn partitions(&self) -> &StoreContext<PartitionSpec, C> {
&self.partitions
}

/// reference to topics
pub fn topics(&self) -> &StoreContext<TopicSpec> {
pub fn topics(&self) -> &StoreContext<TopicSpec, C> {
&self.topics
}

pub fn spgs(&self) -> &StoreContext<SpuGroupSpec> {
pub fn spgs(&self) -> &StoreContext<SpuGroupSpec, C> {
&self.spgs
}

pub fn smartmodules(&self) -> &StoreContext<SmartModuleSpec> {
pub fn smartmodules(&self) -> &StoreContext<SmartModuleSpec, C> {
&self.smartmodules
}

pub fn tableformats(&self) -> &StoreContext<TableFormatSpec> {
pub fn tableformats(&self) -> &StoreContext<TableFormatSpec, C> {
&self.tableformats
}

Expand Down
20 changes: 15 additions & 5 deletions crates/fluvio-sc/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//!
use std::sync::Arc;

use fluvio_stream_model::core::MetadataItem;
#[cfg(feature = "k8")]
use k8_metadata_client::{MetadataClient, SharedClient};

Expand All @@ -25,7 +26,7 @@ use crate::services::auth::basic::BasicRbacPolicy;
pub async fn start_main_loop_with_k8<C>(
sc_config_policy: (ScConfig, Option<BasicRbacPolicy>),
metadata_client: SharedClient<C>,
) -> SharedContext
) -> crate::core::K8SharedContext
where
C: MetadataClient + 'static,
{
Expand Down Expand Up @@ -81,10 +82,14 @@ where
}

/// start the main loop
pub async fn start_main_loop(
ctx: Arc<Context>,
pub async fn start_main_loop<C>(
ctx: Arc<Context<C>>,
auth_policy: Option<BasicRbacPolicy>,
) -> SharedContext {
) -> SharedContext<C>
where
C: MetadataItem + 'static,
C::UId: Send + Sync,
{
let config = ctx.config();
whitelist!(config, "spu", SpuController::start(ctx.clone()));
whitelist!(config, "topic", TopicController::start(ctx.clone()));
Expand All @@ -109,10 +114,15 @@ pub async fn start_main_loop(
use crate::services::start_public_server;
use crate::core::SharedContext;

use fluvio_controlplane_metadata::core::MetadataItem;
use crate::services::auth::{AuthGlobalContext, RootAuthorization};
use crate::services::auth::basic::{BasicAuthorization, BasicRbacPolicy};

pub fn start(ctx: SharedContext, auth_policy_option: Option<BasicRbacPolicy>) {
pub fn start<C>(ctx: SharedContext<C>, auth_policy_option: Option<BasicRbacPolicy>)
where
C: MetadataItem + 'static,
C::UId: Send + Sync,
{
if let Some(policy) = auth_policy_option {
info!("using basic authorization");
start_public_server(AuthGlobalContext::new(
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-sc/src/k8/controllers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod k8_operator {
use k8_client::SharedK8Client;

use crate::cli::TlsConfig;
use crate::core::SharedContext;
use crate::core::K8SharedContext;
use crate::stores::StoreContext;
use crate::dispatcher::dispatcher::K8ClusterStateDispatcher;
use crate::k8::objects::spu_service::SpuServiceSpec;
Expand All @@ -26,7 +26,7 @@ mod k8_operator {
pub async fn run_k8_operators(
namespace: String,
k8_client: SharedK8Client,
global_ctx: SharedContext,
global_ctx: K8SharedContext,
tls: Option<TlsConfig>,
) {
let config = global_ctx.config();
Expand Down
8 changes: 3 additions & 5 deletions crates/fluvio-sc/src/k8/controllers/spu_controller.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{collections::HashMap, fmt, net::IpAddr, time::Duration};

use fluvio_controlplane_metadata::{
spu::{IngressPort},
spu::IngressPort,
store::{MetadataStoreObject, k8::K8MetaItem},
};

Expand All @@ -13,13 +13,11 @@ use fluvio_future::task::spawn;
use fluvio_future::timer::sleep;
use k8_types::core::service::{LoadBalancerIngress, LoadBalancerType};

use crate::{
stores::{StoreContext},
};
use crate::stores::StoreContext;
use crate::stores::spu::{IngressAddr, SpuSpec};
use crate::k8::objects::spu_service::SpuServiceSpec;
use crate::k8::objects::spg_group::SpuGroupObj;
use crate::stores::spg::{SpuGroupSpec};
use crate::stores::spg::SpuGroupSpec;

/// Update SPU from changes in SPU Group and SPU Services
/// This is only place where we make changes to SPU
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-sc/src/k8/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use k8_client::{K8Client, SharedK8Client, load_and_share};

use crate::k8::objects::spu_k8_config::ScK8Config;
use crate::config::ScConfig;
use crate::core::{Context, SharedContext};
use crate::core::{Context, K8SharedContext};

type ScConfigMetadata = MetadataStoreObject<ScK8Config, K8MetaItem>;

Expand Down Expand Up @@ -79,7 +79,7 @@ impl TestEnv {
&self.client
}

pub async fn create_global_ctx(&self) -> (SharedContext, StoreContext<ScK8Config>) {
pub async fn create_global_ctx(&self) -> (K8SharedContext, StoreContext<ScK8Config>) {
let config_map = ScConfigMetadata::with_spec("fluvio", ScK8Config::default());
let config_store = LocalStore::new_shared();
config_store.sync_all(vec![config_map]).await;
Expand Down
6 changes: 3 additions & 3 deletions crates/fluvio-sc/src/k8/objects/spu_k8_config.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::collections::{BTreeMap, HashMap};
use std::fmt;

use serde::{Deserialize};
use serde::Deserialize;
use tracing::{debug, info};

use fluvio_controlplane_metadata::core::MetadataContext;
use fluvio_types::defaults::SPU_PUBLIC_PORT;
use k8_client::{ClientError};
use k8_client::ClientError;
use k8_types::Env;
use k8_types::core::pod::{
ResourceRequirements, PodSecurityContext, ContainerSpec, VolumeMount, VolumeSpec,
Expand Down Expand Up @@ -193,7 +193,7 @@ mod extended {
use crate::stores::k8::K8ConvertError;
use crate::stores::k8::K8ExtendedSpec;
use crate::stores::k8::K8MetaItem;
use crate::stores::{MetadataStoreObject};
use crate::stores::MetadataStoreObject;

use super::*;

Expand Down
23 changes: 15 additions & 8 deletions crates/fluvio-sc/src/services/auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,23 @@ mod common {
use fluvio_auth::{AuthContext, Authorization, TypeAction, InstanceAction, AuthError};
use fluvio_socket::FluvioSocket;
use fluvio_controlplane_metadata::extended::ObjectType;
use fluvio_stream_model::core::MetadataItem;

use crate::core::SharedContext;

/// SC global context with authorization
/// auth is trait object which contains global auth auth policy
#[derive(Clone, Debug)]
pub struct AuthGlobalContext<A> {
pub global_ctx: SharedContext,
pub struct AuthGlobalContext<A, C: MetadataItem> {
pub global_ctx: SharedContext<C>,
pub auth: Arc<A>,
}

impl<A> AuthGlobalContext<A> {
pub fn new(global_ctx: SharedContext, auth: Arc<A>) -> Self {
impl<A, C> AuthGlobalContext<A, C>
where
C: MetadataItem,
{
pub fn new(global_ctx: SharedContext<C>, auth: Arc<A>) -> Self {
Self { global_ctx, auth }
}
}
Expand Down Expand Up @@ -79,13 +83,16 @@ mod common {
/// Auth Service Context, this hold individual context that is enough enforce auth
/// for this service context
#[derive(Debug, Clone)]
pub struct AuthServiceContext<AC> {
pub global_ctx: SharedContext,
pub struct AuthServiceContext<AC, C: MetadataItem> {
pub global_ctx: SharedContext<C>,
pub auth: AC,
}

impl<AC> AuthServiceContext<AC> {
pub fn new(global_ctx: SharedContext, auth: AC) -> Self {
impl<AC, C> AuthServiceContext<AC, C>
where
C: MetadataItem,
{
pub fn new(global_ctx: SharedContext<C>, auth: AC) -> Self {
Self { global_ctx, auth }
}
}
Expand Down
6 changes: 5 additions & 1 deletion crates/fluvio-sc/src/services/private_api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod private_server;

use fluvio_stream_model::core::MetadataItem;
use tracing::info;
use tracing::instrument;

Expand All @@ -14,7 +15,10 @@ use crate::core::SharedContext;
skip(ctx),
fields(address = &*ctx.config().private_endpoint)
)]
pub fn start_internal_server(ctx: SharedContext) {
pub fn start_internal_server<C>(ctx: SharedContext<C>)
where
C: MetadataItem + 'static,
{
info!("starting internal services");

let addr = ctx.config().private_endpoint.clone();
Expand Down
Loading

0 comments on commit b7a2765

Please sign in to comment.