Skip to content

Commit

Permalink
feat: store provider specific parametrs for the lease in the CRD (aka…
Browse files Browse the repository at this point in the history
…sh-network#108)

Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian committed Apr 17, 2023
1 parent ea8dd56 commit bd49c04
Show file tree
Hide file tree
Showing 78 changed files with 2,231 additions and 1,119 deletions.
63 changes: 30 additions & 33 deletions cluster/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/tools/remotecommand"

manifest "github.com/akash-network/akash-api/go/manifest/v2beta2"
mani "github.com/akash-network/akash-api/go/manifest/v2beta2"
dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3"
mtypes "github.com/akash-network/akash-api/go/node/market/v1beta3"
"github.com/akash-network/akash-api/go/node/types/unit"
Expand All @@ -25,12 +25,11 @@ import (
mquery "github.com/akash-network/node/x/market/query"

ctypes "github.com/akash-network/provider/cluster/types/v1beta3"
akashtypes "github.com/akash-network/provider/pkg/apis/akash.network/v2beta2"
crd "github.com/akash-network/provider/pkg/apis/akash.network/v2beta2"
)

// Errors types returned by the Exec function on the client interface
var (
// Errors types returned by the Exec function on the client interface
ErrExec = errors.New("remote command execute error")
ErrExecNoServiceWithName = fmt.Errorf("%w: no such service exists with that name", ErrExec)
ErrExecServiceNotRunning = fmt.Errorf("%w: service with that name is not running", ErrExec)
Expand Down Expand Up @@ -59,15 +58,15 @@ type ReadClient interface {
GetHostnameDeploymentConnections(ctx context.Context) ([]ctypes.LeaseIDHostnameConnection, error)

ObserveIPState(ctx context.Context) (<-chan ctypes.IPResourceEvent, error)
GetDeclaredIPs(ctx context.Context, leaseID mtypes.LeaseID) ([]akashtypes.ProviderLeasedIPSpec, error)
GetDeclaredIPs(ctx context.Context, leaseID mtypes.LeaseID) ([]crd.ProviderLeasedIPSpec, error)
}

// Client interface lease and deployment methods
//
//go:generate mockery --name Client
type Client interface {
ReadClient
Deploy(ctx context.Context, lID mtypes.LeaseID, mgroup *manifest.Group) error
Deploy(ctx context.Context, lID mtypes.LeaseID, mgroup *mani.Group) error
TeardownLease(context.Context, mtypes.LeaseID) error
Deployments(context.Context) ([]ctypes.Deployment, error)
Inventory(context.Context) (ctypes.Inventory, error)
Expand All @@ -82,23 +81,23 @@ type Client interface {
tty bool,
tsq remotecommand.TerminalSizeQueue) (ctypes.ExecResult, error)

// Connect a given hostname to a deployment
// ConnectHostnameToDeployment Connect a given hostname to a deployment
ConnectHostnameToDeployment(ctx context.Context, directive ctypes.ConnectHostnameToDeploymentDirective) error
// Remove a given hostname from a deployment
// RemoveHostnameFromDeployment Remove a given hostname from a deployment
RemoveHostnameFromDeployment(ctx context.Context, hostname string, leaseID mtypes.LeaseID, allowMissing bool) error

// Declare that a given deployment should be connected to a given hostname
// DeclareHostname Declare that a given deployment should be connected to a given hostname
DeclareHostname(ctx context.Context, lID mtypes.LeaseID, host string, serviceName string, externalPort uint32) error
// Purge any hostnames associated with a given deployment
// PurgeDeclaredHostnames Purge any hostnames associated with a given deployment
PurgeDeclaredHostnames(ctx context.Context, lID mtypes.LeaseID) error

PurgeDeclaredHostname(ctx context.Context, lID mtypes.LeaseID, hostname string) error

// KubeVersion returns the version information of kubernetes running in the cluster
KubeVersion() (*version.Info, error)

DeclareIP(ctx context.Context, lID mtypes.LeaseID, serviceName string, port uint32, externalPort uint32, proto manifest.ServiceProtocol, sharingKey string, overwrite bool) error
PurgeDeclaredIP(ctx context.Context, lID mtypes.LeaseID, serviceName string, externalPort uint32, proto manifest.ServiceProtocol) error
DeclareIP(ctx context.Context, lID mtypes.LeaseID, serviceName string, port uint32, externalPort uint32, proto mani.ServiceProtocol, sharingKey string, overwrite bool) error
PurgeDeclaredIP(ctx context.Context, lID mtypes.LeaseID, serviceName string, externalPort uint32, proto mani.ServiceProtocol) error
PurgeDeclaredIPs(ctx context.Context, lID mtypes.LeaseID) error
}

Expand Down Expand Up @@ -139,7 +138,7 @@ func (rp *resourcePair) subNLZ(val types.ResourceValue) bool {
return true
}

func (rp resourcePair) available() sdk.Int {
func (rp *resourcePair) available() sdk.Int {
return rp.allocatable.Sub(rp.allocated)
}

Expand Down Expand Up @@ -376,7 +375,7 @@ const (
type nullLease struct {
ctx context.Context
cancel func()
group *manifest.Group
group *mani.Group
}

type nullClient struct {
Expand All @@ -401,40 +400,38 @@ func NullClient() Client {
}
}

func (c *nullClient) RemoveHostnameFromDeployment(ctx context.Context, hostname string, leaseID mtypes.LeaseID, allowMissing bool) error {
func (c *nullClient) RemoveHostnameFromDeployment(_ context.Context, _ string, _ mtypes.LeaseID, _ bool) error {
return errNotImplemented
}

func (c *nullClient) ObserveHostnameState(ctx context.Context) (<-chan ctypes.HostnameResourceEvent, error) {
func (c *nullClient) ObserveHostnameState(_ context.Context) (<-chan ctypes.HostnameResourceEvent, error) {
return nil, errNotImplemented
}
func (c *nullClient) GetDeployments(ctx context.Context, dID dtypes.DeploymentID) ([]ctypes.Deployment, error) {
func (c *nullClient) GetDeployments(_ context.Context, _ dtypes.DeploymentID) ([]ctypes.Deployment, error) {
return nil, errNotImplemented
}
func (c *nullClient) GetHostnameDeploymentConnections(ctx context.Context) ([]ctypes.LeaseIDHostnameConnection, error) {

func (c *nullClient) GetHostnameDeploymentConnections(_ context.Context) ([]ctypes.LeaseIDHostnameConnection, error) {
return nil, errNotImplemented
}

// Connect a given hostname to a deployment
func (c *nullClient) ConnectHostnameToDeployment(ctx context.Context, directive ctypes.ConnectHostnameToDeploymentDirective) error {
func (c *nullClient) ConnectHostnameToDeployment(_ context.Context, _ ctypes.ConnectHostnameToDeploymentDirective) error {
return errNotImplemented
}

// Declare that a given deployment should be connected to a given hostname
func (c *nullClient) DeclareHostname(ctx context.Context, lID mtypes.LeaseID, host string, serviceName string, externalPort uint32) error {
func (c *nullClient) DeclareHostname(_ context.Context, _ mtypes.LeaseID, _ string, _ string, _ uint32) error {
return errNotImplemented
}

// Purge any hostnames associated with a given deployment
func (c *nullClient) PurgeDeclaredHostnames(ctx context.Context, lID mtypes.LeaseID) error {
func (c *nullClient) PurgeDeclaredHostnames(_ context.Context, _ mtypes.LeaseID) error {
return errNotImplemented
}

func (c *nullClient) PurgeDeclaredHostname(ctx context.Context, lID mtypes.LeaseID, hostname string) error {
func (c *nullClient) PurgeDeclaredHostname(_ context.Context, _ mtypes.LeaseID, _ string) error {
return errNotImplemented
}

func (c *nullClient) Deploy(ctx context.Context, lid mtypes.LeaseID, mgroup *manifest.Group) error {
func (c *nullClient) Deploy(ctx context.Context, lid mtypes.LeaseID, mgroup *mani.Group) error {
c.mtx.Lock()
defer c.mtx.Unlock()

Expand Down Expand Up @@ -496,7 +493,7 @@ func (c *nullClient) LeaseEvents(ctx context.Context, lid mtypes.LeaseID, _ stri
genEvent := func() *eventsv1.Event {
return &eventsv1.Event{
EventTime: v1.NewMicroTime(time.Now()),
ReportingController: lease.group.Name,
ReportingController: lease.group.GetName(),
}
}

Expand Down Expand Up @@ -611,30 +608,30 @@ func (c *nullClient) KubeVersion() (*version.Info, error) {
return nil, nil
}

func (c *nullClient) DeclareIP(ctx context.Context, lID mtypes.LeaseID, serviceName string, port uint32, externalPort uint32, proto manifest.ServiceProtocol, sharingKey string, overwrite bool) error {
func (c *nullClient) DeclareIP(_ context.Context, _ mtypes.LeaseID, _ string, _ uint32, _ uint32, _ mani.ServiceProtocol, _ string, _ bool) error {
return errNotImplemented
}

func (c *nullClient) PurgeDeclaredIPs(ctx context.Context, lID mtypes.LeaseID) error {
func (c *nullClient) PurgeDeclaredIPs(_ context.Context, _ mtypes.LeaseID) error {
return errNotImplemented
}

func (c *nullClient) ObserveIPState(ctx context.Context) (<-chan ctypes.IPResourceEvent, error) {
func (c *nullClient) ObserveIPState(_ context.Context) (<-chan ctypes.IPResourceEvent, error) {
return nil, errNotImplemented
}

func (c *nullClient) CreateIPPassthrough(ctx context.Context, lID mtypes.LeaseID, directive ctypes.ClusterIPPassthroughDirective) error {
func (c *nullClient) CreateIPPassthrough(_ context.Context, _ mtypes.LeaseID, _ ctypes.ClusterIPPassthroughDirective) error {
return errNotImplemented
}

func (c *nullClient) PurgeIPPassthrough(ctx context.Context, lID mtypes.LeaseID, directive ctypes.ClusterIPPassthroughDirective) error {
func (c *nullClient) PurgeIPPassthrough(_ context.Context, _ mtypes.LeaseID, _ ctypes.ClusterIPPassthroughDirective) error {
return errNotImplemented
}

func (c *nullClient) PurgeDeclaredIP(ctx context.Context, lID mtypes.LeaseID, serviceName string, externalPort uint32, proto manifest.ServiceProtocol) error {
func (c *nullClient) PurgeDeclaredIP(_ context.Context, _ mtypes.LeaseID, _ string, _ uint32, _ mani.ServiceProtocol) error {
return errNotImplemented
}

func (c *nullClient) GetDeclaredIPs(ctx context.Context, leaseID mtypes.LeaseID) ([]akashtypes.ProviderLeasedIPSpec, error) {
func (c *nullClient) GetDeclaredIPs(_ context.Context, _ mtypes.LeaseID) ([]crd.ProviderLeasedIPSpec, error) {
return nil, errNotImplemented
}
1 change: 1 addition & 0 deletions cluster/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3"
mtypes "github.com/akash-network/akash-api/go/node/market/v1beta3"
atypes "github.com/akash-network/akash-api/go/node/types/v1beta3"

"github.com/akash-network/node/pubsub"
sdlutil "github.com/akash-network/node/sdl/util"
"github.com/akash-network/node/util/runner"
Expand Down
2 changes: 1 addition & 1 deletion cluster/inventory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestInventory_ClusterDeploymentDeployed(t *testing.T) {
deployment := &mocks.Deployment{}
deployment.On("LeaseID").Return(lid)

groupServices := make([]manifest.Service, 1)
groupServices := make(manifest.Services, 1)

serviceCount := testutil.RandRangeInt(1, 10)
serviceEndpoints := make([]types.Endpoint, serviceCount)
Expand Down
23 changes: 23 additions & 0 deletions cluster/kube/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,26 @@ func applyManifest(ctx context.Context, kc crdapi.Interface, b builder.Manifest)
}
return err
}

func applyServicesParams(ctx context.Context, kc crdapi.Interface, b builder.ParamsServices) error {
obj, err := kc.AkashV2beta2().LeaseParamsServices(b.NS()).Get(ctx, b.Name(), metav1.GetOptions{})

metricsutils.IncCounterVecWithLabelValuesFiltered(kubeCallsCounter, "akash-parameters-get", err, errors.IsNotFound)

switch {
case err == nil:
// TODO - only run this update if it would change something
obj, err = b.Update(obj)
if err == nil {
_, err = kc.AkashV2beta2().LeaseParamsServices(b.NS()).Update(ctx, obj, metav1.UpdateOptions{})
metricsutils.IncCounterVecWithLabelValues(kubeCallsCounter, "akash-parameters-update", err)
}
case errors.IsNotFound(err):
obj, err = b.Create()
if err == nil {
_, err = kc.AkashV2beta2().LeaseParamsServices(b.NS()).Create(ctx, obj, metav1.CreateOptions{})
metricsutils.IncCounterVecWithLabelValues(kubeCallsCounter, "akash-parameters-create", err)
}
}
return err
}
13 changes: 6 additions & 7 deletions cluster/kube/builder/builder.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
package builder

// nolint:deadcode,golint

import (
"fmt"
"strconv"

mani "github.com/akash-network/akash-api/go/manifest/v2beta2"
"github.com/tendermint/tendermint/libs/log"
corev1 "k8s.io/api/core/v1"

"k8s.io/apimachinery/pkg/util/intstr"

manifesttypes "github.com/akash-network/akash-api/go/manifest/v2beta2"
mtypes "github.com/akash-network/akash-api/go/node/market/v1beta3"

clusterUtil "github.com/akash-network/provider/cluster/util"
crd "github.com/akash-network/provider/pkg/apis/akash.network/v2beta2"
)

const (
Expand All @@ -23,16 +22,15 @@ const (
AkashNetworkStorageClasses = "akash.network/storageclasses"
AkashServiceTarget = "akash.network/service-target"
AkashMetalLB = "metal-lb"
akashDeploymentPolicyName = "akash-deployment-restrictions"

akashNetworkNamespace = "akash.network/namespace"

akashNetworkNamespace = "akash.network/namespace"
AkashLeaseOwnerLabelName = "akash.network/lease.id.owner"
AkashLeaseDSeqLabelName = "akash.network/lease.id.dseq"
AkashLeaseGSeqLabelName = "akash.network/lease.id.gseq"
AkashLeaseOSeqLabelName = "akash.network/lease.id.oseq"
AkashLeaseProviderLabelName = "akash.network/lease.id.provider"
AkashLeaseManifestVersion = "akash.network/manifest.version"
akashDeploymentPolicyName = "akash-deployment-restrictions"
)

const (
Expand Down Expand Up @@ -65,7 +63,8 @@ type builder struct {
log log.Logger
settings Settings
lid mtypes.LeaseID
group *manifesttypes.Group
group *mani.Group
sparams crd.ParamsServices
}

var _ builderBase = (*builder)(nil)
Expand Down
Loading

0 comments on commit bd49c04

Please sign in to comment.