Skip to content

Commit

Permalink
SPU controller uses SVC ingress annotation (#888)
Browse files Browse the repository at this point in the history
* SPU controller uses SVC ingress annotation

* fix case

* add ingress_annotation function

* Include annotation in ingress computation

* reset version
  • Loading branch information
nacardin authored Mar 22, 2021
1 parent 03a750f commit 8acfc05
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 85 deletions.
4 changes: 2 additions & 2 deletions src/sc/src/k8/controllers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod k8_operator {
use crate::core::SharedContext;
use crate::stores::StoreContext;
use crate::dispatcher::dispatcher::K8ClusterStateDispatcher;
use crate::k8::objects::spu_service::SpuServicespec;
use crate::k8::objects::spu_service::SpuServiceSpec;
use crate::k8::objects::spu_k8_config::ScK8Config;
use crate::k8::objects::statefulset::StatefulsetSpec;
use crate::k8::objects::spg_service::SpgServiceSpec;
Expand All @@ -27,7 +27,7 @@ mod k8_operator {
global_ctx: SharedContext,
tls: Option<TlsConfig>,
) {
let spu_service_ctx: StoreContext<SpuServicespec> = StoreContext::new();
let spu_service_ctx: StoreContext<SpuServiceSpec> = StoreContext::new();
let statefulset_ctx: StoreContext<StatefulsetSpec> = StoreContext::new();
let spg_service_ctx: StoreContext<SpgServiceSpec> = StoreContext::new();

Expand Down
138 changes: 67 additions & 71 deletions src/sc/src/k8/controllers/spu.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{fmt, time::Duration};
use std::{fmt, net::IpAddr, time::Duration};

use fluvio_controlplane_metadata::{
spg::SpuEndpointTemplate,
spu::{Endpoint, IngressPort, SpuType},
store::MetadataStoreObject,
store::{MetadataStoreObject, k8::K8MetaItem},
};
use fluvio_stream_dispatcher::actions::WSAction;
use fluvio_types::SpuId;
Expand All @@ -21,14 +21,14 @@ use crate::{
stores::{StoreContext, K8ChangeListener},
};
use crate::stores::spu::{IngressAddr, SpuSpec};
use crate::k8::objects::spu_service::SpuServicespec;
use crate::k8::objects::spu_service::SpuServiceSpec;
use crate::k8::objects::spg_group::SpuGroupObj;
use crate::stores::spg::{SpuGroupSpec};

/// Maintain Managed SPU
/// sync from spu services and statefulset
pub struct SpuController {
services: StoreContext<SpuServicespec>,
services: StoreContext<SpuServiceSpec>,
groups: StoreContext<SpuGroupSpec>,
spus: StoreContext<SpuSpec>,
disable_update_service: bool,
Expand All @@ -49,7 +49,7 @@ impl fmt::Debug for SpuController {
impl SpuController {
pub fn start(
spus: StoreContext<SpuSpec>,
services: StoreContext<SpuServicespec>,
services: StoreContext<SpuServiceSpec>,
groups: StoreContext<SpuGroupSpec>,
disable_update_service: bool,
) {
Expand Down Expand Up @@ -159,44 +159,21 @@ impl SpuController {
let (updates, deletes) = changes.parts();

debug!(
"received spu changes updates: {},deletes: {},epoch: {}",
"received spu changes updates: {}, deletes: {}, epoch: {}",
updates.len(),
deletes.len(),
epoch,
);

for spu_md in updates.into_iter() {
let spu_id = spu_md.key();
// check if ingress exists
let spu_ingress = &spu_md.spec.public_endpoint.ingress;
let object_meta = spu_md.ctx().item().inner();
let svc_name = SpuServicespec::service_name(&object_meta.name);
let spu_meta = spu_md.ctx().item().inner();
let svc_name = SpuServiceSpec::service_name(&spu_meta.name);
if let Some(svc) = self.services.store().value(&svc_name).await {
// apply ingress
let svc_ingresses = svc.status.ingress();
let computed_spu_ingress: Vec<IngressAddr> =
svc_ingresses.iter().map(convert).collect();
if &computed_spu_ingress != spu_ingress {
let mut update_spu = spu_md.spec.clone();
debug!(
"updating spu:{} public end point: {:#?} from svc: {}",
spu_id,
computed_spu_ingress,
svc.key()
);
update_spu.public_endpoint.ingress = computed_spu_ingress;
if let Err(err) = self.spus.create_spec(spu_id.to_owned(), update_spu).await {
error!("error applying spec: {}", err);
}
} else {
debug!(
"detected no spu: {} ingress changes with svc: {}",
spu_id,
svc.key()
);
}
self.apply_ingress_from_svc(spu_md, svc.inner_owned())
.await?;
} else {
debug!("no svc exists for spu {},skipping", spu_id);
debug!("no svc exists for spu {}, skipping", spu_id);
}
}

Expand All @@ -206,7 +183,7 @@ impl SpuController {
/// svc has been changed, update spu
async fn sync_from_spu_services(
&mut self,
listener: &mut K8ChangeListener<SpuServicespec>,
listener: &mut K8ChangeListener<SpuServiceSpec>,
) -> Result<(), ClientError> {
if !listener.has_change() {
trace!("no service change, skipping");
Expand All @@ -218,53 +195,25 @@ impl SpuController {
let (updates, deletes) = changes.parts();

debug!(
"received spu service changes updates: {},deletes: {},epoch: {}",
"received spu service changes updates: {}, deletes: {}, epoch: {}",
updates.len(),
deletes.len(),
epoch,
);

for svc_md in updates.into_iter() {
let svc_id = svc_md.key();
let svc_meta = svc_md.ctx().item().inner();
// check if ingress exists
let svc_ingresses = svc_md.status.ingress();

if let Some(spu_name) = SpuServicespec::spu_name(&svc_meta) {
if let Some(mut spu) = self.spus.store().value(spu_name).await {
debug!(
"trying sync service: {}, with: spu: {}",
svc_md.key(),
spu_name
);
trace!("svc ingress: {:#?}", svc_ingresses);
let spu_ingress = svc_ingresses.iter().map(convert).collect();
trace!("spu ingress: {:#?}", spu_ingress);
if spu_ingress != spu.spec.public_endpoint.ingress {
debug!(
"updating spu:{} public end point: {:#?}",
spu_name, spu_ingress
);
spu.spec.public_endpoint.ingress = spu_ingress;
if let Err(err) = self
.spus
.create_spec(spu_name.to_owned(), spu.spec.clone())
.await
{
error!("error applying spec: {}", err);
}
} else {
debug!("detected no spu: {} ingress changes", spu_name);
}
if let Some(spu_name) = SpuServiceSpec::spu_name(&svc_meta) {
if let Some(spu) = self.spus.store().value(spu_name).await {
self.apply_ingress_from_svc(spu.inner_owned(), svc_md)
.await?;
} else {
debug!(
svc = %svc_md.key(),
%spu_name,
"spu service update skipped, because spu doesn't exist",
);
debug!("no spu exists for svc {}, skipping", svc_id);
}
} else {
error!(
svc = %svc_md.key(),
svc = %svc_id,
"spu service doesnt have spu name",
);
}
Expand All @@ -273,6 +222,53 @@ impl SpuController {
Ok(())
}

async fn apply_ingress_from_svc(
&mut self,
spu_md: MetadataStoreObject<SpuSpec, K8MetaItem>,
svc_md: MetadataStoreObject<SpuServiceSpec, K8MetaItem>,
) -> Result<(), ClientError> {
let spu_id = spu_md.key();
let svc_id = svc_md.key();

// check if ingress exists
let spu_ingress = &spu_md.spec.public_endpoint.ingress;

let svc_lb_ingresses = svc_md.status.ingress();
let mut computed_spu_ingress: Vec<IngressAddr> =
svc_lb_ingresses.iter().map(convert).collect();

if let Some(address) = SpuServiceSpec::ingress_annotation(svc_md.ctx().item()) {
if let Ok(ip_addr) = address.parse::<IpAddr>() {
computed_spu_ingress.push(IngressAddr {
hostname: None,
ip: Some(ip_addr.to_string()),
});
} else {
computed_spu_ingress.push(IngressAddr {
hostname: Some(address.clone()),
ip: None,
});
}
}

if &computed_spu_ingress != spu_ingress {
let mut update_spu = spu_md.spec.clone();
debug!(
"updating spu: {} public end point: {:#?} from svc: {}",
spu_id, computed_spu_ingress, svc_id
);
update_spu.public_endpoint.ingress = computed_spu_ingress;
self.spus.create_spec(spu_id.to_owned(), update_spu).await?;
} else {
debug!(
"detected no spu: {} ingress changes with svc: {}",
spu_id, svc_id
);
};

Ok(())
}

async fn sync_with_spg(
&mut self,
listener: &mut K8ChangeListener<SpuGroupSpec>,
Expand Down
8 changes: 4 additions & 4 deletions src/sc/src/k8/controllers/spu_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ use crate::stores::spg::{SpuGroupSpec};
use crate::stores::{StoreContext, K8ChangeListener};
use crate::k8::objects::spu_k8_config::ScK8Config;
use crate::k8::objects::spg_group::SpuGroupObj;
use crate::k8::objects::spu_service::SpuServicespec;
use crate::k8::objects::spu_service::SpuServiceSpec;

/// Manages SpuService
/// It is used to update SPU's public ip address from external load balancer service.
/// External load balancer update external ip or hostname out of band.
pub struct SpuServiceController {
client: SharedK8Client,
namespace: String,
services: StoreContext<SpuServicespec>,
services: StoreContext<SpuServiceSpec>,
groups: StoreContext<SpuGroupSpec>,
}

Expand All @@ -43,7 +43,7 @@ impl SpuServiceController {
pub fn start(
client: SharedK8Client,
namespace: String,
services: StoreContext<SpuServicespec>,
services: StoreContext<SpuServiceSpec>,
groups: StoreContext<SpuGroupSpec>,
) {
let controller = Self {
Expand Down Expand Up @@ -153,7 +153,7 @@ impl SpuServiceController {

spu_k8_config.apply_service(&mut k8_service_spec);

let svc_name = SpuServicespec::service_name(spu_name);
let svc_name = SpuServiceSpec::service_name(spu_name);

let mut ctx = spg_obj
.ctx()
Expand Down
20 changes: 12 additions & 8 deletions src/sc/src/k8/objects/spu_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ use crate::stores::spg::SpuGroupSpec;
/// Service associated with SPU
#[derive(Deserialize, Serialize, Debug, PartialEq, Default, Clone)]
#[serde(rename_all = "camelCase")]
pub struct SpuServicespec(K8ServiceSpec);
pub struct SpuServiceSpec(K8ServiceSpec);

impl Spec for SpuServicespec {
impl Spec for SpuServiceSpec {
const LABEL: &'static str = "SpuService";
type IndexKey = String;
type Status = SpuServiceStatus;
type Owner = SpuGroupSpec;
}

impl SpuServicespec {
impl SpuServiceSpec {
pub fn inner(&self) -> &K8ServiceSpec {
&self.0
}
Expand All @@ -38,16 +38,20 @@ impl SpuServicespec {
pub fn spu_name(meta: &ObjectMeta) -> Option<&String> {
meta.labels.get("fluvio.io/spu-name")
}

pub fn ingress_annotation(meta: &ObjectMeta) -> Option<&String> {
meta.annotations.get("fluvio.io/ingress-address")
}
}

impl From<K8ServiceSpec> for SpuServicespec {
impl From<K8ServiceSpec> for SpuServiceSpec {
fn from(k8: K8ServiceSpec) -> Self {
Self(k8)
}
}

impl From<SpuServicespec> for K8ServiceSpec {
fn from(spec: SpuServicespec) -> Self {
impl From<SpuServiceSpec> for K8ServiceSpec {
fn from(spec: SpuServiceSpec) -> Self {
spec.0
}
}
Expand Down Expand Up @@ -101,14 +105,14 @@ mod extended {

use super::*;

impl K8ExtendedSpec for SpuServicespec {
impl K8ExtendedSpec for SpuServiceSpec {
type K8Spec = ServiceSpec;
type K8Status = ServiceStatus;

fn convert_from_k8(
k8_obj: K8Obj<Self::K8Spec>,
) -> Result<MetadataStoreObject<Self, K8MetaItem>, K8ConvertError<Self::K8Spec>> {
if let Some(name) = SpuServicespec::spu_name(&k8_obj.metadata) {
if let Some(name) = SpuServiceSpec::spu_name(&k8_obj.metadata) {
debug!(spu = %name,
service_name = %k8_obj.metadata.name,
"detected spu service");
Expand Down

0 comments on commit 8acfc05

Please sign in to comment.