From 1e83feece2cfe9a6e5fd28ffb5f9352676a361d9 Mon Sep 17 00:00:00 2001 From: vsoch Date: Mon, 27 May 2024 07:13:47 -0600 Subject: [PATCH 1/6] chore: rename of Fluxjgf class. Problem: The "JGF" part of fluxjgf should be capitalized to refer to "Json Graph Format" and for easier reading for the developer. Solution: Rename "Fluxjgf to FluxJGF" Signed-off-by: vsoch --- src/fluence/jgf/jgf.go | 32 ++++++++++++++++---------------- src/fluence/jgf/types.go | 2 +- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/fluence/jgf/jgf.go b/src/fluence/jgf/jgf.go index 8a047f9..87da82e 100644 --- a/src/fluence/jgf/jgf.go +++ b/src/fluence/jgf/jgf.go @@ -53,9 +53,9 @@ var ( ) // InitJGF initializes the Flux Json Graph Format object -func InitJGF() (fluxgraph Fluxjgf) { +func InitJGF() (fluxgraph FluxJGF) { var g graph - fluxgraph = Fluxjgf{ + fluxgraph = FluxJGF{ Graph: g, Elements: 0, NodeMap: make(map[string]node), @@ -71,14 +71,14 @@ func getDefaultPaths() map[string]string { } // addNode adds a node to the JGF -func (g *Fluxjgf) addNode(toadd node) { +func (g *FluxJGF) addNode(toadd node) { g.Graph.Nodes = append(g.Graph.Nodes, toadd) g.NodeMap[toadd.Id] = toadd g.Elements = g.Elements + 1 } // MakeEdge creates an edge for the JGF -func (g *Fluxjgf) MakeEdge(source string, target string, contains string) { +func (g *FluxJGF) MakeEdge(source string, target string, contains string) { newedge := edge{ Source: source, Target: target, @@ -94,7 +94,7 @@ func (g *Fluxjgf) MakeEdge(source string, target string, contains string) { } // MakeSubnet creates a subnet for the graph -func (g *Fluxjgf) MakeSubnet(index int64, ip string) string { +func (g *FluxJGF) MakeSubnet(index int64, ip string) string { newnode := node{ Id: fmt.Sprintf("%d", g.Elements), Metadata: nodeMetadata{ @@ -115,7 +115,7 @@ func (g *Fluxjgf) MakeSubnet(index int64, ip string) string { } // MakeNode creates a new node for the graph -func (g *Fluxjgf) MakeNode(index int, exclusive bool, subnet string) string { +func (g *FluxJGF) MakeNode(index int, exclusive bool, subnet string) string { newnode := node{ Id: fmt.Sprintf("%d", g.Elements), Metadata: nodeMetadata{ @@ -136,7 +136,7 @@ func (g *Fluxjgf) MakeNode(index int, exclusive bool, subnet string) string { } // MakeSocket creates a socket for the graph -func (g *Fluxjgf) MakeSocket(index int64, name string) string { +func (g *FluxJGF) MakeSocket(index int64, name string) string { newnode := node{ Id: fmt.Sprintf("%d", g.Elements), Metadata: nodeMetadata{ @@ -157,7 +157,7 @@ func (g *Fluxjgf) MakeSocket(index int64, name string) string { } // MakeCore creates a core for the graph -func (g *Fluxjgf) MakeCore(index int64, name string) string { +func (g *FluxJGF) MakeCore(index int64, name string) string { newnode := node{ Id: fmt.Sprintf("%d", g.Elements), Metadata: nodeMetadata{ @@ -178,7 +178,7 @@ func (g *Fluxjgf) MakeCore(index int64, name string) string { } // MakeVCore makes a vcore (I think 2 vcpu == 1 cpu) for the graph -func (g *Fluxjgf) MakeVCore(coreid string, index int64, name string) string { +func (g *FluxJGF) MakeVCore(coreid string, index int64, name string) string { newnode := node{ Id: fmt.Sprintf("%d", g.Elements), Metadata: nodeMetadata{ @@ -201,7 +201,7 @@ func (g *Fluxjgf) MakeVCore(coreid string, index int64, name string) string { } // MakeNFProperties makes the node feature discovery properties for the graph -func (g *Fluxjgf) MakeNFDProperties(coreid string, index int64, filter string, labels *map[string]string) { +func (g *FluxJGF) MakeNFDProperties(coreid string, index int64, filter string, labels *map[string]string) { for key, _ := range *labels { if strings.Contains(key, filter) { name := strings.Split(key, "/")[1] @@ -230,7 +230,7 @@ func (g *Fluxjgf) MakeNFDProperties(coreid string, index int64, filter string, l } } -func (g *Fluxjgf) MakeNFDPropertiesByValue(coreid string, index int64, filter string, labels *map[string]string) { +func (g *FluxJGF) MakeNFDPropertiesByValue(coreid string, index int64, filter string, labels *map[string]string) { for key, val := range *labels { if strings.Contains(key, filter) { name := val @@ -257,7 +257,7 @@ func (g *Fluxjgf) MakeNFDPropertiesByValue(coreid string, index int64, filter st } // MakeMemory creates memory for the graph -func (g *Fluxjgf) MakeMemory(index int64, name string, unit string, size int64) string { +func (g *FluxJGF) MakeMemory(index int64, name string, unit string, size int64) string { newnode := node{ Id: fmt.Sprintf("%d", g.Elements), Metadata: nodeMetadata{ @@ -278,7 +278,7 @@ func (g *Fluxjgf) MakeMemory(index int64, name string, unit string, size int64) } // MakeGPU makes a gpu for the graph -func (g *Fluxjgf) MakeGPU(index int64, name string, size int64) string { +func (g *FluxJGF) MakeGPU(index int64, name string, size int64) string { newnode := node{ Id: fmt.Sprintf("%d", g.Elements), Metadata: nodeMetadata{ @@ -299,7 +299,7 @@ func (g *Fluxjgf) MakeGPU(index int64, name string, size int64) string { } // MakeCluster creates the cluster -func (g *Fluxjgf) MakeCluster(clustername string) string { +func (g *FluxJGF) MakeCluster(clustername string) string { g.Elements = 0 newnode := node{ Id: strconv.Itoa(0), @@ -323,7 +323,7 @@ func (g *Fluxjgf) MakeCluster(clustername string) string { } // MakeRack makes the rack -func (g *Fluxjgf) MakeRack(index int64) string { +func (g *FluxJGF) MakeRack(index int64) string { newnode := node{ Id: fmt.Sprintf("%d", g.Elements), Metadata: nodeMetadata{ @@ -343,7 +343,7 @@ func (g *Fluxjgf) MakeRack(index int64) string { return newnode.Id } -func (g *Fluxjgf) WriteJGF(path string) error { +func (g *FluxJGF) WriteJGF(path string) error { encodedJGF, err := json.MarshalIndent(g, "", " ") diff --git a/src/fluence/jgf/types.go b/src/fluence/jgf/types.go index 21ccd00..cca3cfe 100644 --- a/src/fluence/jgf/types.go +++ b/src/fluence/jgf/types.go @@ -55,7 +55,7 @@ type graph struct { Directed bool `json:"directed,omitempty"` } -type Fluxjgf struct { +type FluxJGF struct { Graph graph `json:"graph"` Elements int64 `json:"-"` NodeMap map[string]node `json:"-"` From 22ee6b104106c1c9df8cc8f5ecb6635f0d3f69ae Mon Sep 17 00:00:00 2001 From: vsoch Date: Mon, 27 May 2024 07:27:22 -0600 Subject: [PATCH 2/6] chore: rename InitJGF to NewFluxJGF Problem: The convention for creating new structs is typically to use "New$Name" and return a reference or copy to it. Solution: Rename InitJGF to NewFluxJGF to mirror that practice, making it clear we get back a "new flux JGF" and the function is not initializing something that already exists. Signed-off-by: vsoch --- src/fluence/jgf/jgf.go | 10 ++++------ src/fluence/utils/utils.go | 7 ++++--- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/fluence/jgf/jgf.go b/src/fluence/jgf/jgf.go index 87da82e..e7f25f5 100644 --- a/src/fluence/jgf/jgf.go +++ b/src/fluence/jgf/jgf.go @@ -52,15 +52,13 @@ var ( containmentKey = "containment" ) -// InitJGF initializes the Flux Json Graph Format object -func InitJGF() (fluxgraph FluxJGF) { - var g graph - fluxgraph = FluxJGF{ - Graph: g, +// NewFluxJGF creates and returns a new Flux Json Graph Format object +func NewFluxJGF() FluxJGF { + return FluxJGF{ + Graph: graph{}, Elements: 0, NodeMap: make(map[string]node), } - return } // getDefaultPaths returns a new map with empty containment diff --git a/src/fluence/utils/utils.go b/src/fluence/utils/utils.go index 490a0e0..4ee92d4 100644 --- a/src/fluence/utils/utils.go +++ b/src/fluence/utils/utils.go @@ -19,7 +19,8 @@ import ( ) var ( - controlPlaneLabel = "node-role.kubernetes.io/control-plane" + controlPlaneLabel = "node-role.kubernetes.io/control-plane" + defaultClusterName = "k8scluster" ) // RegisterExisting uses the in cluster API to get existing pods @@ -91,14 +92,14 @@ func CreateJGF(filename string, skipLabel *string) error { } // Create a Flux Json Graph Format (JGF) with all cluster nodes - fluxgraph := jgf.InitJGF() + fluxgraph := jgf.NewFluxJGF() // Top level of the graph is the cluster // This assumes fluxion is only serving one cluster. // previous comments indicate that we choose between the level // of a rack and a subnet. A rack doesn't make sense (the nodes could // be on multiple racks) so subnet is likely the right abstraction - cluster := fluxgraph.MakeCluster("k8scluster") + cluster := fluxgraph.MakeCluster(defaultClusterName) vcores := 0 fmt.Println("Number nodes ", len(nodes.Items)) From 3ab00adeb2f9c8e3950ba3382418a5e6310806dd Mon Sep 17 00:00:00 2001 From: vsoch Date: Mon, 27 May 2024 14:31:24 -0600 Subject: [PATCH 3/6] test: rename test to be e2e Problem: we want to have end to end (e2e) and regular go tests. Solution: rename the test.yaml to e2e-test.yaml, and prepare to add simple tests for next round of changes (testify package) Signed-off-by: vsoch --- .github/workflows/{test.yaml => e2e-test.yaml} | 0 src/fluence/go.mod | 2 ++ src/fluence/go.sum | 2 -- 3 files changed, 2 insertions(+), 2 deletions(-) rename .github/workflows/{test.yaml => e2e-test.yaml} (100%) diff --git a/.github/workflows/test.yaml b/.github/workflows/e2e-test.yaml similarity index 100% rename from .github/workflows/test.yaml rename to .github/workflows/e2e-test.yaml diff --git a/src/fluence/go.mod b/src/fluence/go.mod index 01fc126..31228d9 100644 --- a/src/fluence/go.mod +++ b/src/fluence/go.mod @@ -4,6 +4,7 @@ go 1.21 require ( github.com/flux-framework/fluxion-go v0.32.1-0.20240420052153-909523c84ca2 + github.com/stretchr/testify v1.7.0 google.golang.org/grpc v1.38.0 google.golang.org/protobuf v1.26.0 gopkg.in/yaml.v2 v2.4.0 @@ -25,6 +26,7 @@ require ( github.com/json-iterator/go v1.1.11 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/net v0.0.0-20210520170846-37e1c6afe023 // indirect golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d // indirect golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 // indirect diff --git a/src/fluence/go.sum b/src/fluence/go.sum index 534497d..c7291ca 100644 --- a/src/fluence/go.sum +++ b/src/fluence/go.sum @@ -98,8 +98,6 @@ github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d/go.mod h1:ZZM github.com/fatih/camelcase v1.0.0/go.mod h1:yN2Sb0lFhZJUdVvtELVWefmrXpuZESvPmqwoZc+/fpc= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flux-framework/fluxion-go v0.32.0 h1:NY6Y1mlTTTZhHD+CmAsDsdNTxUsAFDQoORpMZj8NFLI= -github.com/flux-framework/fluxion-go v0.32.0/go.mod h1:ZI3QxSvUfgJE2Snur/SntJmVfpMjr6D4ICVmdqJ9fkQ= github.com/flux-framework/fluxion-go v0.32.1-0.20240420052153-909523c84ca2 h1:Yz/vVX0XfB2q51ZLh2p8YI5vphvv0rZF4PqtKPscvsY= github.com/flux-framework/fluxion-go v0.32.1-0.20240420052153-909523c84ca2/go.mod h1:jA5+kOSLxchFzixzYEvMAGjkXB5yszO/HxUwdhX/5/U= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= From 422c313ed6a1c918ceaa2f0c51b8bdbbfa901aaf Mon Sep 17 00:00:00 2001 From: vsoch Date: Mon, 27 May 2024 23:48:37 -0600 Subject: [PATCH 4/6] jgf: refactor to use shared functions and fix containment Problem: the containment paths are currently not set, and each resource type has a separate function. Solution: create a shared node generation function that uses a common resource counter, where the counter manages the global and resource-specific counts. This scoped commit includes refactor of those JGF functions, which also means removing resources that are not present in the graph (e.g., rack and socket) and ensuring that we do not recreate the subnet since we loop through nodes (and might hit the same one twice). I am also removing the NFD features because I think they are changed and might lead to error if someone has them. We should assess which of them we want/need and add them back strategically. I have a local test but want to push this first to give a look over / run current tests to determine if other changes are needed. Signed-off-by: vsoch --- .github/workflows/e2e-test.yaml | 6 +- src/fluence/cmd/main.go | 2 +- src/fluence/fluxion/fluxion.go | 8 +- src/fluence/jgf/jgf.go | 368 +++++++++++--------------------- src/fluence/jgf/types.go | 92 +++++++- src/fluence/utils/utils.go | 92 ++++---- 6 files changed, 275 insertions(+), 293 deletions(-) diff --git a/.github/workflows/e2e-test.yaml b/.github/workflows/e2e-test.yaml index ed45891..0832080 100644 --- a/.github/workflows/e2e-test.yaml +++ b/.github/workflows/e2e-test.yaml @@ -23,7 +23,7 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-go@v4 with: - go-version: ^1.19 + go-version: ^1.21 - name: Build Containers run: | @@ -56,7 +56,7 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-go@v4 with: - go-version: ^1.19 + go-version: ^1.21 - name: Build Container run: | @@ -87,7 +87,7 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-go@v4 with: - go-version: ^1.19 + go-version: ^1.21 - name: Download fluence artifact uses: actions/download-artifact@v4 diff --git a/src/fluence/cmd/main.go b/src/fluence/cmd/main.go index e8ef87d..753e56e 100644 --- a/src/fluence/cmd/main.go +++ b/src/fluence/cmd/main.go @@ -40,7 +40,7 @@ func main() { // Fluxion GRPC flux := fluxion.Fluxion{} - flux.InitFluxion(policy, label) + flux.InitFluxion(*policy, *label) lis, err := net.Listen("tcp", port) if err != nil { diff --git a/src/fluence/fluxion/fluxion.go b/src/fluence/fluxion/fluxion.go index f288cdf..ab2a3d7 100644 --- a/src/fluence/fluxion/fluxion.go +++ b/src/fluence/fluxion/fluxion.go @@ -20,11 +20,11 @@ type Fluxion struct { } // InitFluxion creates a new client to interaction with the fluxion API (via go bindings) -func (fluxion *Fluxion) InitFluxion(policy *string, label *string) { +func (fluxion *Fluxion) InitFluxion(policy string, label string) { fluxion.cli = fluxcli.NewReapiClient() klog.Infof("[Fluence] Created flux resource client %s", fluxion.cli) - err := utils.CreateJGF(defaults.KubernetesJsonGraphFormat, label) + err := utils.CreateInClusterJGF(defaults.KubernetesJsonGraphFormat, label) if err != nil { return } @@ -36,8 +36,8 @@ func (fluxion *Fluxion) InitFluxion(policy *string, label *string) { } p := "{}" - if *policy != "" { - p = string("{\"matcher_policy\": \"" + *policy + "\"}") + if policy != "" { + p = string("{\"matcher_policy\": \"" + policy + "\"}") klog.Infof("[Fluence] match policy: %s", p) } fluxion.cli.InitContext(string(jgf), p) diff --git a/src/fluence/jgf/jgf.go b/src/fluence/jgf/jgf.go index e7f25f5..6600fa4 100644 --- a/src/fluence/jgf/jgf.go +++ b/src/fluence/jgf/jgf.go @@ -20,7 +20,6 @@ import ( "fmt" "log" "os" - "strconv" "strings" ) @@ -54,25 +53,47 @@ var ( // NewFluxJGF creates and returns a new Flux Json Graph Format object func NewFluxJGF() FluxJGF { + + // Create a new cluster, and count the top level as a resource + // The index 0 (of the element count) is the cluster + counters := map[string]int64{"cluster": int64(1)} return FluxJGF{ - Graph: graph{}, - Elements: 0, - NodeMap: make(map[string]node), + Graph: graph{}, + NodeMap: make(map[string]Node), + + // Counters and lookup for resources + Resources: ResourceCounter{counts: counters}, } } -// getDefaultPaths returns a new map with empty containment -// this cannot be a global shared variable or we get an error -// about inserting an edge to itself. -func getDefaultPaths() map[string]string { - return map[string]string{containmentKey: ""} +// ToJson returns a Json string of the graph +func (g *FluxJGF) ToJson() (string, error) { + toprint, err := json.MarshalIndent(g.Graph, "", "\t") + return string(toprint), err +} + +// GetNodePath returns the node containment path +func getNodePath(root, subpath string) string { + var path string + if subpath == "" { + path = fmt.Sprintf("/%s", root) + } else { + path = fmt.Sprintf("/%s/%s", root, subpath) + } + // Hack to allow for imperfection of slash placement + path = strings.ReplaceAll(path, "//", "/") + return path } -// addNode adds a node to the JGF -func (g *FluxJGF) addNode(toadd node) { - g.Graph.Nodes = append(g.Graph.Nodes, toadd) - g.NodeMap[toadd.Id] = toadd - g.Elements = g.Elements + 1 +// getContainmentPath returns a new map with containment metadata +func (g *FluxJGF) getContainmentPath(subpath string) map[string]string { + return map[string]string{containmentKey: getNodePath(g.Resources.RootName, subpath)} +} + +// MakeBidirectionalEdge makes an edge for a parent and child +func (g *FluxJGF) MakeBidirectionalEdge(parent, child string) { + g.MakeEdge(parent, child, ContainsRelation) + g.MakeEdge(child, parent, InRelation) } // MakeEdge creates an edge for the JGF @@ -85,260 +106,125 @@ func (g *FluxJGF) MakeEdge(source string, target string, contains string) { }, } g.Graph.Edges = append(g.Graph.Edges, newedge) - if contains == ContainsRelation { - tnode := g.NodeMap[target] - tnode.Metadata.Paths[containmentKey] = g.NodeMap[source].Metadata.Paths[containmentKey] + "/" + tnode.Metadata.Name - } } // MakeSubnet creates a subnet for the graph -func (g *FluxJGF) MakeSubnet(index int64, ip string) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), +// The name is typically the ip address +func (g *FluxJGF) MakeSubnet(name string) Node { + + // Get a resource counter for the subnet + resource := g.Resources.getCounter(name, SubnetType) + subpath := resource.NameWithIndex() + return g.makeNewNode(resource, subpath, defaultUnit, defaultSize) +} + +// makeNewNode is a shared function to make a new node from a resource spec +// subpath is the subpath to add to the graph root, e.g., / +// Since there is some variability to this structure, it is assembled by +// the calling function +func (g *FluxJGF) makeNewNode( + resource ResourceCount, + subpath, unit string, + size int64, +) Node { + + // A subnet comes directly under the cluster, which is the parent + newNode := Node{ + + // Global identifier in graph, as a string + Id: resource.StringElementId(), Metadata: nodeMetadata{ - Type: SubnetType, - Basename: ip, - Name: ip + fmt.Sprintf("%d", g.Elements), - Id: index, - Uniq_id: g.Elements, + Type: resource.Type, + + // The original name without an index + Basename: resource.Name, + + // The name with an index + Name: resource.NameWithIndex(), + + // Integer resource index + Id: resource.Index, + + // Integer global element index + Uniq_id: resource.ElementId, Rank: defaultRank, Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), + Unit: unit, + Size: size, + + // subnet is one above root graph, so just need it's name + Paths: g.getContainmentPath(subpath), }, } - g.addNode(newnode) - return newnode.Id + + // Add the new node to the graph + g.Graph.Nodes = append(g.Graph.Nodes, newNode) + g.NodeMap[newNode.Id] = newNode + return newNode } // MakeNode creates a new node for the graph -func (g *FluxJGF) MakeNode(index int, exclusive bool, subnet string) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: NodeType, - Basename: subnet, - Name: subnet + fmt.Sprintf("%d", g.Elements), - Id: g.Elements, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: exclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - return newnode.Id -} +func (g *FluxJGF) MakeNode(name, subpath string) Node { -// MakeSocket creates a socket for the graph -func (g *FluxJGF) MakeSocket(index int64, name string) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: SocketType, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - return newnode.Id -} + // Get a resource counter for the node, which is under the subnet + resource := g.Resources.getCounter(name, NodeType) -// MakeCore creates a core for the graph -func (g *FluxJGF) MakeCore(index int64, name string) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: CoreType, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - return newnode.Id + // Here the full containment path will be: + // // + subpath = fmt.Sprintf("%s/%s", subpath, resource.NameWithIndex()) + return g.makeNewNode(resource, subpath, defaultUnit, defaultSize) } -// MakeVCore makes a vcore (I think 2 vcpu == 1 cpu) for the graph -func (g *FluxJGF) MakeVCore(coreid string, index int64, name string) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: VirtualCoreType, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - g.MakeEdge(coreid, newnode.Id, ContainsRelation) - g.MakeEdge(newnode.Id, coreid, InRelation) - return newnode.Id -} +// MakeCore creates a core for the graph +func (g *FluxJGF) MakeCore(name, subpath string) Node { -// MakeNFProperties makes the node feature discovery properties for the graph -func (g *FluxJGF) MakeNFDProperties(coreid string, index int64, filter string, labels *map[string]string) { - for key, _ := range *labels { - if strings.Contains(key, filter) { - name := strings.Split(key, "/")[1] - if strings.Contains(name, ".") { - name = strings.Split(name, ".")[1] - } - - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: name, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - g.MakeEdge(coreid, newnode.Id, ContainsRelation) - } - } -} + // A core is located at the subnet->node->core + resource := g.Resources.getCounter(name, CoreType) -func (g *FluxJGF) MakeNFDPropertiesByValue(coreid string, index int64, filter string, labels *map[string]string) { - for key, val := range *labels { - if strings.Contains(key, filter) { - name := val - - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: name, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - g.MakeEdge(coreid, newnode.Id, ContainsRelation) - } - } + // Here the full containment path will be: + // /// + subpath = fmt.Sprintf("%s/%s", subpath, resource.NameWithIndex()) + return g.makeNewNode(resource, subpath, defaultUnit, defaultSize) } // MakeMemory creates memory for the graph -func (g *FluxJGF) MakeMemory(index int64, name string, unit string, size int64) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: MemoryType, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: unit, - Size: size, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - return newnode.Id +// Flux doesn't understand memory? Not sure if this is doing anything +func (g *FluxJGF) MakeMemory(name, subpath string, size int64) Node { + + // unit is assumed to be MB + unit := "MB" + + // A core is located at the subnet->node->core + resource := g.Resources.getCounter(name, MemoryType) + + // Here the full containment path will be: + // /// + subpath = fmt.Sprintf("%s/%s", subpath, resource.NameWithIndex()) + return g.makeNewNode(resource, subpath, unit, size) } // MakeGPU makes a gpu for the graph -func (g *FluxJGF) MakeGPU(index int64, name string, size int64) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: GPUType, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: size, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - return newnode.Id -} +func (g *FluxJGF) MakeGPU(name, subpath string, size int64) Node { -// MakeCluster creates the cluster -func (g *FluxJGF) MakeCluster(clustername string) string { - g.Elements = 0 - newnode := node{ - Id: strconv.Itoa(0), - Metadata: nodeMetadata{ - Type: ClusterType, - Basename: clustername, - Name: clustername + "0", - Id: g.Elements, - Uniq_id: 0, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: map[string]string{ - containmentKey: "/" + clustername + "0", - }, - }, - } - g.addNode(newnode) - return newnode.Id + // Get a resource counter for the gpu, which is under the subnet->node->gpu + resource := g.Resources.getCounter(name, GPUType) + + // Here the full containment path will be: + // // + subpath = fmt.Sprintf("%s/%s", subpath, resource.NameWithIndex()) + return g.makeNewNode(resource, subpath, defaultUnit, size) } -// MakeRack makes the rack -func (g *FluxJGF) MakeRack(index int64) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: RackType, - Basename: RackType, - Name: RackType + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, +// InitCluster creates a new cluster, primarily the root "cluster" node +func (g *FluxJGF) InitCluster(name string) (Node, error) { + if g.Resources.Elements > 0 { + return Node{}, fmt.Errorf("init can only be called for a new cluster") } - g.addNode(newnode) - return newnode.Id + + // The cluster name is the index (always 0) with the original name + g.Resources.RootName = fmt.Sprintf("%s0", name) + resource := g.Resources.getCounter(name, ClusterType) + return g.makeNewNode(resource, "", defaultUnit, defaultSize), nil } func (g *FluxJGF) WriteJGF(path string) error { diff --git a/src/fluence/jgf/types.go b/src/fluence/jgf/types.go index cca3cfe..41d8fde 100644 --- a/src/fluence/jgf/types.go +++ b/src/fluence/jgf/types.go @@ -16,7 +16,9 @@ limitations under the License. package jgf -type node struct { +import "fmt" + +type Node struct { Id string `json:"id"` Label string `json:"label,omitempty"` Metadata nodeMetadata `json:"metadata,omitempty"` @@ -49,14 +51,94 @@ type nodeMetadata struct { } type graph struct { - Nodes []node `json:"nodes"` + Nodes []Node `json:"nodes"` Edges []edge `json:"edges"` // Metadata metadata `json:"metadata,omitempty"` Directed bool `json:"directed,omitempty"` } type FluxJGF struct { - Graph graph `json:"graph"` - Elements int64 `json:"-"` - NodeMap map[string]node `json:"-"` + Graph graph `json:"graph"` + NodeMap map[string]Node `json:"-"` + + // Counters for specific resource types (e.g., rack, node) + Resources ResourceCounter `json:"-"` +} + +// ResourceCounter keeps track of indices for each resource type +type ResourceCounter struct { + + // count of elements by resource type + counts map[string]int64 + + // Total elements in the graph + Elements int64 + + // Name or path of root + RootName string +} + +// ResourceCount provides complete metadata to populate a new node +// This object is returned by the resourceCounter for a node to use +// to quickly derive values, etc. +type ResourceCount struct { + + // Name of the resource (e.g., "red") + Name string + + // Name of the resource type (e.g., "node") + Type string + + // Element ID, in the context of total elements in the graph + ElementId int64 + + // Index or count for the resource in question + Index int64 +} + +// Return the resource name + resource +// This is scoped to the resource and not global for all the +// elements in the graph +func (r *ResourceCount) NameWithIndex() string { + return fmt.Sprintf("%s%d", r.Name, r.Index) +} + +// StringElementId is the global index as a string +func (r *ResourceCount) StringElementId() string { + return fmt.Sprintf("%d", r.ElementId) +} + +// StringResourceIndex is the string variant of the resource index +func (r *ResourceCount) StringResourceIndex() string { + return fmt.Sprintf("%d", r.Index) +} + +// NextIndex returns the next global index and adds 1 to the count +func (r *ResourceCounter) NextIndex() int64 { + nextIndex := r.Elements + r.Elements = nextIndex + 1 + return nextIndex +} + +// NextIndex returns the next resource index and adds 1 to the count +func (r *ResourceCounter) NextResourceIndex(resourceType string) int64 { + nextIndex, ok := r.counts[resourceType] + if !ok { + nextIndex = int64(0) + } + r.counts[resourceType] = nextIndex + 1 + return nextIndex +} + +// getCounter returns the counter context for a specific resource type +func (r *ResourceCounter) getCounter(resourceName string, resourceType string) ResourceCount { + resourceCount := ResourceCount{ + Index: r.NextResourceIndex(resourceName), + Type: resourceType, + Name: resourceName, + ElementId: r.NextIndex(), + } + + // Update the count for the next element (global) and resource count + return resourceCount } diff --git a/src/fluence/utils/utils.go b/src/fluence/utils/utils.go index 4ee92d4..25ddb85 100644 --- a/src/fluence/utils/utils.go +++ b/src/fluence/utils/utils.go @@ -68,12 +68,12 @@ func RegisterExisting(clientset *kubernetes.Clientset, ctx context.Context) (map return nodes, nil } -// CreateJGF creates the Json Graph Format +// CreateInClusterJGF creates the Json Graph Format from the Kubernetes API // We currently don't have support in fluxion to allocate jobs for existing pods, // so instead we create the graph with fewer resources. When that support is // added (see sig-scheduler-plugins/pkg/fluence/register.go) we can // remove the adjustment here, which is more of a hack -func CreateJGF(filename string, skipLabel *string) error { +func CreateInClusterJGF(filename string, skipLabel string) error { ctx := context.Background() config, err := rest.InClusterConfig() if err != nil { @@ -94,20 +94,28 @@ func CreateJGF(filename string, skipLabel *string) error { // Create a Flux Json Graph Format (JGF) with all cluster nodes fluxgraph := jgf.NewFluxJGF() - // Top level of the graph is the cluster + // Initialize the cluster. The top level of the graph is the cluster // This assumes fluxion is only serving one cluster. // previous comments indicate that we choose between the level // of a rack and a subnet. A rack doesn't make sense (the nodes could // be on multiple racks) so subnet is likely the right abstraction - cluster := fluxgraph.MakeCluster(defaultClusterName) - - vcores := 0 + clusterNode, err := fluxgraph.InitCluster(defaultClusterName) + if err != nil { + return err + } fmt.Println("Number nodes ", len(nodes.Items)) + + // TODO for follow up / next PR: + // Metrics / summary should be an attribute of the JGF outer flux graph + // Resources should come in from entire group (and not repres. pod) var totalAllocCpu int64 totalAllocCpu = 0 - sdnCount := int64(0) - for nodeIndex, node := range nodes.Items { + // Keep a lookup of subnet nodes in case we see one twice + // We don't want to create a new entity for it in the graph + subnetLookup := map[string]jgf.Node{} + + for _, node := range nodes.Items { // We should not be scheduling to the control plane _, ok := node.Labels[controlPlaneLabel] @@ -118,8 +126,8 @@ func CreateJGF(filename string, skipLabel *string) error { // Anything labeled with "skipLabel" meaning it is present, // should be skipped - if *skipLabel != "" { - _, ok := node.Labels[*skipLabel] + if skipLabel != "" { + _, ok := node.Labels[skipLabel] if ok { fmt.Printf("Skipping node %s\n", node.GetName()) continue @@ -142,12 +150,19 @@ func CreateJGF(filename string, skipLabel *string) error { return err } - // Here we build the subnet according to topology.kubernetes.io/zone label + // Have we seen this subnet node before? subnetName := node.Labels["topology.kubernetes.io/zone"] - subnet := fluxgraph.MakeSubnet(sdnCount, subnetName) - sdnCount = sdnCount + 1 - fluxgraph.MakeEdge(cluster, subnet, jgf.ContainsRelation) - fluxgraph.MakeEdge(subnet, cluster, jgf.InRelation) + subnetNode, exists := subnetLookup[subnetName] + if !exists { + // Build the subnet according to topology.kubernetes.io/zone label + subnetNode = fluxgraph.MakeSubnet(subnetName) + + // This is one example of bidirectional, I won't document in + // all following occurrences but this is what the function does + // [cluster] -> contains -> [subnet] + // [subnet] -> in -> [cluster] + fluxgraph.MakeBidirectionalEdge(clusterNode.Id, subnetNode.Id) + } // These are requests for existing pods, for cpu and memory reqs := computeTotalRequests(pods) @@ -175,43 +190,43 @@ func CreateJGF(filename string, skipLabel *string) error { gpuAllocatable, hasGpuAllocatable := node.Status.Allocatable["nvidia.com/gpu"] // TODO possibly look at pod resources vs. node.Status.Allocatable + // Make the compute node, which is a child of the subnet + // The parameters here are the node name, and the parent path + computeNode := fluxgraph.MakeNode(node.Name, subnetNode.Metadata.Name) - workernode := fluxgraph.MakeNode(nodeIndex, false, node.Name) - fluxgraph.MakeEdge(subnet, workernode, jgf.ContainsRelation) - fluxgraph.MakeEdge(workernode, subnet, jgf.InRelation) + // [subnet] -> contains -> [compute node] + fluxgraph.MakeBidirectionalEdge(subnetNode.Id, computeNode.Id) + // Here we are adding GPU resources under nodes if hasGpuAllocatable { fmt.Println("GPU Resource quantity ", gpuAllocatable.Value()) for index := 0; index < int(gpuAllocatable.Value()); index++ { - gpu := fluxgraph.MakeGPU(int64(index), jgf.NvidiaGPU, 1) - fluxgraph.MakeEdge(workernode, gpu, jgf.ContainsRelation) - fluxgraph.MakeEdge(gpu, workernode, jgf.InRelation) + + // The subpath (from and not including root) is the subnet -> node + subpath := fmt.Sprintf("%s/%s", subnetNode.Metadata.Name, computeNode.Metadata.Name) + + // TODO: can this size be greater than 1? + gpuNode := fluxgraph.MakeGPU(jgf.NvidiaGPU, subpath, 1) + + // [compute] -> contains -> [gpu] + fluxgraph.MakeBidirectionalEdge(computeNode.Id, gpuNode.Id) } } + // Here is where we are adding cores for index := 0; index < int(availCpu); index++ { - core := fluxgraph.MakeCore(int64(index), jgf.CoreType) - fluxgraph.MakeEdge(workernode, core, jgf.ContainsRelation) - fluxgraph.MakeEdge(core, workernode, jgf.InRelation) - - // Question from Vanessa: - // How can we get here and have vcores ever not equal to zero? - if vcores == 0 { - fluxgraph.MakeNFDProperties(core, int64(index), "cpu-", &node.Labels) - } else { - for virtualCore := 0; virtualCore < vcores; virtualCore++ { - vcore := fluxgraph.MakeVCore(core, int64(virtualCore), jgf.VirtualCoreType) - fluxgraph.MakeNFDProperties(vcore, int64(index), "cpu-", &node.Labels) - } - } + subpath := fmt.Sprintf("%s/%s", subnetNode.Metadata.Name, computeNode.Metadata.Name) + coreNode := fluxgraph.MakeCore(jgf.CoreType, subpath) + fluxgraph.MakeBidirectionalEdge(computeNode.Id, coreNode.Id) } + // Here is where we are adding memory fractionMem := availMem >> 30 for i := 0; i < int(fractionMem); i++ { - mem := fluxgraph.MakeMemory(int64(i), jgf.MemoryType, "MB", 1<<10) - fluxgraph.MakeEdge(workernode, mem, jgf.ContainsRelation) - fluxgraph.MakeEdge(mem, workernode, jgf.InRelation) + subpath := fmt.Sprintf("%s/%s", subnetNode.Metadata.Name, computeNode.Metadata.Name) + memoryNode := fluxgraph.MakeMemory(jgf.MemoryType, subpath, 1<<10) + fluxgraph.MakeBidirectionalEdge(computeNode.Id, memoryNode.Id) } } fmt.Printf("\nCan request at most %d exclusive cpu", totalAllocCpu) @@ -220,7 +235,6 @@ func CreateJGF(filename string, skipLabel *string) error { return err } return nil - } // computeTotalRequests sums up the pod requests for the list. We do not consider limits. From a3a4f03bbad8dd7e2acb6333e1b4fff41e91b9f1 Mon Sep 17 00:00:00 2001 From: vsoch Date: Tue, 28 May 2024 00:47:07 -0600 Subject: [PATCH 5/6] test: add start of tests for fluence package Problem: we will want to have scoped tests for fluence. Solution: to start, add a simple test file for JGF (the content of this current PR) and have an easy way to run it with "make test" from that directory. Currently this does not validate most of the structure - I would like to review the output, discuss a testing strategy, and then update the commit here to reflect that decision. Signed-off-by: vsoch --- .github/workflows/test.yaml | 19 +++++++++ src/fluence/Makefile | 7 ++++ src/fluence/jgf/jgf_test.go | 77 +++++++++++++++++++++++++++++++++++++ 3 files changed, 103 insertions(+) create mode 100644 .github/workflows/test.yaml create mode 100644 src/fluence/Makefile create mode 100644 src/fluence/jgf/jgf_test.go diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml new file mode 100644 index 0000000..593d1a0 --- /dev/null +++ b/.github/workflows/test.yaml @@ -0,0 +1,19 @@ +name: fluence testing + +on: + pull_request: [] + +jobs: + test-fluence: + runs-on: ubuntu-latest + name: build fluence + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v4 + with: + go-version: ^1.21 + + - name: Run Tests + run: | + cd src/fluence + make test \ No newline at end of file diff --git a/src/fluence/Makefile b/src/fluence/Makefile new file mode 100644 index 0000000..9e56215 --- /dev/null +++ b/src/fluence/Makefile @@ -0,0 +1,7 @@ + +.PHONY: all +all: test + +.PHONY: test +test: + go test -v ./jgf/ \ No newline at end of file diff --git a/src/fluence/jgf/jgf_test.go b/src/fluence/jgf/jgf_test.go new file mode 100644 index 0000000..c705a48 --- /dev/null +++ b/src/fluence/jgf/jgf_test.go @@ -0,0 +1,77 @@ +package jgf + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewFluxJGF(t *testing.T) { + + // Create a new FluxGraph, assert that it is empty + fluxgraph := NewFluxJGF() + assert.Equal(t, len(fluxgraph.Graph.Nodes), 0) + assert.Equal(t, fluxgraph.Resources.Elements, int64(0)) + assert.Equal(t, len(fluxgraph.NodeMap), 0) + + out, err := fluxgraph.ToJson() + assert.Nil(t, err) + fmt.Println() + fmt.Println("== Empty graph:") + fmt.Println(out) + + // Init the cluster (make the root node) + clusterNode, err := fluxgraph.InitCluster("keebler") + assert.Nil(t, err) + + out, err = fluxgraph.ToJson() + assert.Nil(t, err) + fmt.Println() + fmt.Println("== Graph with Cluster Root:") + fmt.Println(out) + + // Add subnets to it + subnetNodeA := fluxgraph.MakeSubnet("east") + subnetNodeB := fluxgraph.MakeSubnet("west") + fluxgraph.MakeBidirectionalEdge(clusterNode.Id, subnetNodeA.Id) + fluxgraph.MakeBidirectionalEdge(clusterNode.Id, subnetNodeB.Id) + + out, err = fluxgraph.ToJson() + assert.Nil(t, err) + fmt.Println() + fmt.Println("== Graph with Two Subnets:") + fmt.Println(out) + + // Add some nodes! + computeNodeA := fluxgraph.MakeNode("node", subnetNodeA.Metadata.Name) + computeNodeB := fluxgraph.MakeNode("node", subnetNodeB.Metadata.Name) + fluxgraph.MakeBidirectionalEdge(subnetNodeA.Id, computeNodeA.Id) + fluxgraph.MakeBidirectionalEdge(subnetNodeB.Id, computeNodeB.Id) + + out, err = fluxgraph.ToJson() + assert.Nil(t, err) + fmt.Println() + fmt.Println("== Graph with Two Subnets, Each with a node:") + fmt.Println(out) + + // Add a GPU to one, and cores to the other + subpath := fmt.Sprintf("%s/%s", subnetNodeA.Metadata.Name, computeNodeA.Metadata.Name) + gpuNodeA := fluxgraph.MakeGPU(NvidiaGPU, subpath, 1) + fluxgraph.MakeBidirectionalEdge(computeNodeA.Id, gpuNodeA.Id) + + subpath = fmt.Sprintf("%s/%s", subnetNodeB.Metadata.Name, computeNodeB.Metadata.Name) + coreNode := fluxgraph.MakeCore(CoreType, subpath) + fluxgraph.MakeBidirectionalEdge(computeNodeB.Id, coreNode.Id) + + // Finally, add some memory to the second compute node + memoryNode := fluxgraph.MakeMemory(MemoryType, subpath, 1<<10) + fluxgraph.MakeBidirectionalEdge(computeNodeA.Id, memoryNode.Id) + + out, err = fluxgraph.ToJson() + assert.Nil(t, err) + fmt.Println() + fmt.Println("== Graph with Two Subnets, Two Nodes, with GPU/Core/Memory:") + fmt.Println(out) + +} From a07df5add76176690f6f74d39c7aa48319fdcb30 Mon Sep 17 00:00:00 2001 From: vsoch Date: Fri, 21 Jun 2024 14:55:04 -0600 Subject: [PATCH 6/6] graph: index should be scoped to parent Problem: the current strategy to derive an index is scoped to a resource globally across the graph. Solution: instead, provide a direct index counter for each new resource to ensure it is scoped to the parent Signed-off-by: vsoch --- src/fluence/jgf/jgf.go | 26 +++++++++++++++----------- src/fluence/jgf/jgf_test.go | 14 +++++++------- src/fluence/jgf/types.go | 5 ++++- src/fluence/utils/utils.go | 14 ++++++++------ 4 files changed, 34 insertions(+), 25 deletions(-) diff --git a/src/fluence/jgf/jgf.go b/src/fluence/jgf/jgf.go index 6600fa4..ba4d268 100644 --- a/src/fluence/jgf/jgf.go +++ b/src/fluence/jgf/jgf.go @@ -20,7 +20,7 @@ import ( "fmt" "log" "os" - "strings" + filepath "path" ) var ( @@ -80,9 +80,7 @@ func getNodePath(root, subpath string) string { } else { path = fmt.Sprintf("/%s/%s", root, subpath) } - // Hack to allow for imperfection of slash placement - path = strings.ReplaceAll(path, "//", "/") - return path + return filepath.Clean(path) } // getContainmentPath returns a new map with containment metadata @@ -110,10 +108,11 @@ func (g *FluxJGF) MakeEdge(source string, target string, contains string) { // MakeSubnet creates a subnet for the graph // The name is typically the ip address -func (g *FluxJGF) MakeSubnet(name string) Node { +func (g *FluxJGF) MakeSubnet(name string, index int64) Node { // Get a resource counter for the subnet resource := g.Resources.getCounter(name, SubnetType) + resource.Index = index subpath := resource.NameWithIndex() return g.makeNewNode(resource, subpath, defaultUnit, defaultSize) } @@ -125,8 +124,7 @@ func (g *FluxJGF) MakeSubnet(name string) Node { func (g *FluxJGF) makeNewNode( resource ResourceCount, subpath, unit string, - size int64, -) Node { + size int64) Node { // A subnet comes directly under the cluster, which is the parent newNode := Node{ @@ -164,10 +162,11 @@ func (g *FluxJGF) makeNewNode( } // MakeNode creates a new node for the graph -func (g *FluxJGF) MakeNode(name, subpath string) Node { +func (g *FluxJGF) MakeNode(name, subpath string, index int64) Node { // Get a resource counter for the node, which is under the subnet resource := g.Resources.getCounter(name, NodeType) + resource.Index = index // Here the full containment path will be: // // @@ -176,10 +175,11 @@ func (g *FluxJGF) MakeNode(name, subpath string) Node { } // MakeCore creates a core for the graph -func (g *FluxJGF) MakeCore(name, subpath string) Node { +func (g *FluxJGF) MakeCore(name, subpath string, index int64) Node { // A core is located at the subnet->node->core resource := g.Resources.getCounter(name, CoreType) + resource.Index = index // Here the full containment path will be: // /// @@ -189,13 +189,16 @@ func (g *FluxJGF) MakeCore(name, subpath string) Node { // MakeMemory creates memory for the graph // Flux doesn't understand memory? Not sure if this is doing anything -func (g *FluxJGF) MakeMemory(name, subpath string, size int64) Node { +func (g *FluxJGF) MakeMemory( + name, subpath string, + size, index int64) Node { // unit is assumed to be MB unit := "MB" // A core is located at the subnet->node->core resource := g.Resources.getCounter(name, MemoryType) + resource.Index = index // Here the full containment path will be: // /// @@ -204,10 +207,11 @@ func (g *FluxJGF) MakeMemory(name, subpath string, size int64) Node { } // MakeGPU makes a gpu for the graph -func (g *FluxJGF) MakeGPU(name, subpath string, size int64) Node { +func (g *FluxJGF) MakeGPU(name, subpath string, size, index int64) Node { // Get a resource counter for the gpu, which is under the subnet->node->gpu resource := g.Resources.getCounter(name, GPUType) + resource.Index = index // Here the full containment path will be: // // diff --git a/src/fluence/jgf/jgf_test.go b/src/fluence/jgf/jgf_test.go index c705a48..1d1a596 100644 --- a/src/fluence/jgf/jgf_test.go +++ b/src/fluence/jgf/jgf_test.go @@ -32,8 +32,8 @@ func TestNewFluxJGF(t *testing.T) { fmt.Println(out) // Add subnets to it - subnetNodeA := fluxgraph.MakeSubnet("east") - subnetNodeB := fluxgraph.MakeSubnet("west") + subnetNodeA := fluxgraph.MakeSubnet("east", 0) + subnetNodeB := fluxgraph.MakeSubnet("west", 1) fluxgraph.MakeBidirectionalEdge(clusterNode.Id, subnetNodeA.Id) fluxgraph.MakeBidirectionalEdge(clusterNode.Id, subnetNodeB.Id) @@ -44,8 +44,8 @@ func TestNewFluxJGF(t *testing.T) { fmt.Println(out) // Add some nodes! - computeNodeA := fluxgraph.MakeNode("node", subnetNodeA.Metadata.Name) - computeNodeB := fluxgraph.MakeNode("node", subnetNodeB.Metadata.Name) + computeNodeA := fluxgraph.MakeNode("node", subnetNodeA.Metadata.Name, 0) + computeNodeB := fluxgraph.MakeNode("node", subnetNodeB.Metadata.Name, 1) fluxgraph.MakeBidirectionalEdge(subnetNodeA.Id, computeNodeA.Id) fluxgraph.MakeBidirectionalEdge(subnetNodeB.Id, computeNodeB.Id) @@ -57,15 +57,15 @@ func TestNewFluxJGF(t *testing.T) { // Add a GPU to one, and cores to the other subpath := fmt.Sprintf("%s/%s", subnetNodeA.Metadata.Name, computeNodeA.Metadata.Name) - gpuNodeA := fluxgraph.MakeGPU(NvidiaGPU, subpath, 1) + gpuNodeA := fluxgraph.MakeGPU(NvidiaGPU, subpath, 1, 0) fluxgraph.MakeBidirectionalEdge(computeNodeA.Id, gpuNodeA.Id) subpath = fmt.Sprintf("%s/%s", subnetNodeB.Metadata.Name, computeNodeB.Metadata.Name) - coreNode := fluxgraph.MakeCore(CoreType, subpath) + coreNode := fluxgraph.MakeCore(CoreType, subpath, 0) fluxgraph.MakeBidirectionalEdge(computeNodeB.Id, coreNode.Id) // Finally, add some memory to the second compute node - memoryNode := fluxgraph.MakeMemory(MemoryType, subpath, 1<<10) + memoryNode := fluxgraph.MakeMemory(MemoryType, subpath, 1<<10, 0) fluxgraph.MakeBidirectionalEdge(computeNodeA.Id, memoryNode.Id) out, err = fluxgraph.ToJson() diff --git a/src/fluence/jgf/types.go b/src/fluence/jgf/types.go index 41d8fde..8359c28 100644 --- a/src/fluence/jgf/types.go +++ b/src/fluence/jgf/types.go @@ -131,7 +131,10 @@ func (r *ResourceCounter) NextResourceIndex(resourceType string) int64 { } // getCounter returns the counter context for a specific resource type -func (r *ResourceCounter) getCounter(resourceName string, resourceType string) ResourceCount { +func (r *ResourceCounter) getCounter( + resourceName string, + resourceType string, +) ResourceCount { resourceCount := ResourceCount{ Index: r.NextResourceIndex(resourceName), Type: resourceType, diff --git a/src/fluence/utils/utils.go b/src/fluence/utils/utils.go index 25ddb85..19fadf8 100644 --- a/src/fluence/utils/utils.go +++ b/src/fluence/utils/utils.go @@ -114,8 +114,9 @@ func CreateInClusterJGF(filename string, skipLabel string) error { // Keep a lookup of subnet nodes in case we see one twice // We don't want to create a new entity for it in the graph subnetLookup := map[string]jgf.Node{} + var subnetCounter int64 = 0 - for _, node := range nodes.Items { + for nodeCount, node := range nodes.Items { // We should not be scheduling to the control plane _, ok := node.Labels[controlPlaneLabel] @@ -155,7 +156,8 @@ func CreateInClusterJGF(filename string, skipLabel string) error { subnetNode, exists := subnetLookup[subnetName] if !exists { // Build the subnet according to topology.kubernetes.io/zone label - subnetNode = fluxgraph.MakeSubnet(subnetName) + subnetNode = fluxgraph.MakeSubnet(subnetName, subnetCounter) + subnetCounter += 1 // This is one example of bidirectional, I won't document in // all following occurrences but this is what the function does @@ -192,7 +194,7 @@ func CreateInClusterJGF(filename string, skipLabel string) error { // TODO possibly look at pod resources vs. node.Status.Allocatable // Make the compute node, which is a child of the subnet // The parameters here are the node name, and the parent path - computeNode := fluxgraph.MakeNode(node.Name, subnetNode.Metadata.Name) + computeNode := fluxgraph.MakeNode(node.Name, subnetNode.Metadata.Name, int64(nodeCount)) // [subnet] -> contains -> [compute node] fluxgraph.MakeBidirectionalEdge(subnetNode.Id, computeNode.Id) @@ -206,7 +208,7 @@ func CreateInClusterJGF(filename string, skipLabel string) error { subpath := fmt.Sprintf("%s/%s", subnetNode.Metadata.Name, computeNode.Metadata.Name) // TODO: can this size be greater than 1? - gpuNode := fluxgraph.MakeGPU(jgf.NvidiaGPU, subpath, 1) + gpuNode := fluxgraph.MakeGPU(jgf.NvidiaGPU, subpath, 1, int64(index)) // [compute] -> contains -> [gpu] fluxgraph.MakeBidirectionalEdge(computeNode.Id, gpuNode.Id) @@ -217,7 +219,7 @@ func CreateInClusterJGF(filename string, skipLabel string) error { // Here is where we are adding cores for index := 0; index < int(availCpu); index++ { subpath := fmt.Sprintf("%s/%s", subnetNode.Metadata.Name, computeNode.Metadata.Name) - coreNode := fluxgraph.MakeCore(jgf.CoreType, subpath) + coreNode := fluxgraph.MakeCore(jgf.CoreType, subpath, int64(index)) fluxgraph.MakeBidirectionalEdge(computeNode.Id, coreNode.Id) } @@ -225,7 +227,7 @@ func CreateInClusterJGF(filename string, skipLabel string) error { fractionMem := availMem >> 30 for i := 0; i < int(fractionMem); i++ { subpath := fmt.Sprintf("%s/%s", subnetNode.Metadata.Name, computeNode.Metadata.Name) - memoryNode := fluxgraph.MakeMemory(jgf.MemoryType, subpath, 1<<10) + memoryNode := fluxgraph.MakeMemory(jgf.MemoryType, subpath, 1<<10, int64(i)) fluxgraph.MakeBidirectionalEdge(computeNode.Id, memoryNode.Id) } }