diff --git a/.github/workflows/e2e-test.yaml b/.github/workflows/e2e-test.yaml index ed45891..0832080 100644 --- a/.github/workflows/e2e-test.yaml +++ b/.github/workflows/e2e-test.yaml @@ -23,7 +23,7 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-go@v4 with: - go-version: ^1.19 + go-version: ^1.21 - name: Build Containers run: | @@ -56,7 +56,7 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-go@v4 with: - go-version: ^1.19 + go-version: ^1.21 - name: Build Container run: | @@ -87,7 +87,7 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-go@v4 with: - go-version: ^1.19 + go-version: ^1.21 - name: Download fluence artifact uses: actions/download-artifact@v4 diff --git a/src/fluence/cmd/main.go b/src/fluence/cmd/main.go index e8ef87d..753e56e 100644 --- a/src/fluence/cmd/main.go +++ b/src/fluence/cmd/main.go @@ -40,7 +40,7 @@ func main() { // Fluxion GRPC flux := fluxion.Fluxion{} - flux.InitFluxion(policy, label) + flux.InitFluxion(*policy, *label) lis, err := net.Listen("tcp", port) if err != nil { diff --git a/src/fluence/fluxion/fluxion.go b/src/fluence/fluxion/fluxion.go index f288cdf..ab2a3d7 100644 --- a/src/fluence/fluxion/fluxion.go +++ b/src/fluence/fluxion/fluxion.go @@ -20,11 +20,11 @@ type Fluxion struct { } // InitFluxion creates a new client to interaction with the fluxion API (via go bindings) -func (fluxion *Fluxion) InitFluxion(policy *string, label *string) { +func (fluxion *Fluxion) InitFluxion(policy string, label string) { fluxion.cli = fluxcli.NewReapiClient() klog.Infof("[Fluence] Created flux resource client %s", fluxion.cli) - err := utils.CreateJGF(defaults.KubernetesJsonGraphFormat, label) + err := utils.CreateInClusterJGF(defaults.KubernetesJsonGraphFormat, label) if err != nil { return } @@ -36,8 +36,8 @@ func (fluxion *Fluxion) InitFluxion(policy *string, label *string) { } p := "{}" - if *policy != "" { - p = string("{\"matcher_policy\": \"" + *policy + "\"}") + if policy != "" { + p = string("{\"matcher_policy\": \"" + policy + "\"}") klog.Infof("[Fluence] match policy: %s", p) } fluxion.cli.InitContext(string(jgf), p) diff --git a/src/fluence/jgf/jgf.go b/src/fluence/jgf/jgf.go index e7f25f5..6600fa4 100644 --- a/src/fluence/jgf/jgf.go +++ b/src/fluence/jgf/jgf.go @@ -20,7 +20,6 @@ import ( "fmt" "log" "os" - "strconv" "strings" ) @@ -54,25 +53,47 @@ var ( // NewFluxJGF creates and returns a new Flux Json Graph Format object func NewFluxJGF() FluxJGF { + + // Create a new cluster, and count the top level as a resource + // The index 0 (of the element count) is the cluster + counters := map[string]int64{"cluster": int64(1)} return FluxJGF{ - Graph: graph{}, - Elements: 0, - NodeMap: make(map[string]node), + Graph: graph{}, + NodeMap: make(map[string]Node), + + // Counters and lookup for resources + Resources: ResourceCounter{counts: counters}, } } -// getDefaultPaths returns a new map with empty containment -// this cannot be a global shared variable or we get an error -// about inserting an edge to itself. -func getDefaultPaths() map[string]string { - return map[string]string{containmentKey: ""} +// ToJson returns a Json string of the graph +func (g *FluxJGF) ToJson() (string, error) { + toprint, err := json.MarshalIndent(g.Graph, "", "\t") + return string(toprint), err +} + +// GetNodePath returns the node containment path +func getNodePath(root, subpath string) string { + var path string + if subpath == "" { + path = fmt.Sprintf("/%s", root) + } else { + path = fmt.Sprintf("/%s/%s", root, subpath) + } + // Hack to allow for imperfection of slash placement + path = strings.ReplaceAll(path, "//", "/") + return path } -// addNode adds a node to the JGF -func (g *FluxJGF) addNode(toadd node) { - g.Graph.Nodes = append(g.Graph.Nodes, toadd) - g.NodeMap[toadd.Id] = toadd - g.Elements = g.Elements + 1 +// getContainmentPath returns a new map with containment metadata +func (g *FluxJGF) getContainmentPath(subpath string) map[string]string { + return map[string]string{containmentKey: getNodePath(g.Resources.RootName, subpath)} +} + +// MakeBidirectionalEdge makes an edge for a parent and child +func (g *FluxJGF) MakeBidirectionalEdge(parent, child string) { + g.MakeEdge(parent, child, ContainsRelation) + g.MakeEdge(child, parent, InRelation) } // MakeEdge creates an edge for the JGF @@ -85,260 +106,125 @@ func (g *FluxJGF) MakeEdge(source string, target string, contains string) { }, } g.Graph.Edges = append(g.Graph.Edges, newedge) - if contains == ContainsRelation { - tnode := g.NodeMap[target] - tnode.Metadata.Paths[containmentKey] = g.NodeMap[source].Metadata.Paths[containmentKey] + "/" + tnode.Metadata.Name - } } // MakeSubnet creates a subnet for the graph -func (g *FluxJGF) MakeSubnet(index int64, ip string) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), +// The name is typically the ip address +func (g *FluxJGF) MakeSubnet(name string) Node { + + // Get a resource counter for the subnet + resource := g.Resources.getCounter(name, SubnetType) + subpath := resource.NameWithIndex() + return g.makeNewNode(resource, subpath, defaultUnit, defaultSize) +} + +// makeNewNode is a shared function to make a new node from a resource spec +// subpath is the subpath to add to the graph root, e.g., / +// Since there is some variability to this structure, it is assembled by +// the calling function +func (g *FluxJGF) makeNewNode( + resource ResourceCount, + subpath, unit string, + size int64, +) Node { + + // A subnet comes directly under the cluster, which is the parent + newNode := Node{ + + // Global identifier in graph, as a string + Id: resource.StringElementId(), Metadata: nodeMetadata{ - Type: SubnetType, - Basename: ip, - Name: ip + fmt.Sprintf("%d", g.Elements), - Id: index, - Uniq_id: g.Elements, + Type: resource.Type, + + // The original name without an index + Basename: resource.Name, + + // The name with an index + Name: resource.NameWithIndex(), + + // Integer resource index + Id: resource.Index, + + // Integer global element index + Uniq_id: resource.ElementId, Rank: defaultRank, Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), + Unit: unit, + Size: size, + + // subnet is one above root graph, so just need it's name + Paths: g.getContainmentPath(subpath), }, } - g.addNode(newnode) - return newnode.Id + + // Add the new node to the graph + g.Graph.Nodes = append(g.Graph.Nodes, newNode) + g.NodeMap[newNode.Id] = newNode + return newNode } // MakeNode creates a new node for the graph -func (g *FluxJGF) MakeNode(index int, exclusive bool, subnet string) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: NodeType, - Basename: subnet, - Name: subnet + fmt.Sprintf("%d", g.Elements), - Id: g.Elements, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: exclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - return newnode.Id -} +func (g *FluxJGF) MakeNode(name, subpath string) Node { -// MakeSocket creates a socket for the graph -func (g *FluxJGF) MakeSocket(index int64, name string) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: SocketType, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - return newnode.Id -} + // Get a resource counter for the node, which is under the subnet + resource := g.Resources.getCounter(name, NodeType) -// MakeCore creates a core for the graph -func (g *FluxJGF) MakeCore(index int64, name string) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: CoreType, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - return newnode.Id + // Here the full containment path will be: + // // + subpath = fmt.Sprintf("%s/%s", subpath, resource.NameWithIndex()) + return g.makeNewNode(resource, subpath, defaultUnit, defaultSize) } -// MakeVCore makes a vcore (I think 2 vcpu == 1 cpu) for the graph -func (g *FluxJGF) MakeVCore(coreid string, index int64, name string) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: VirtualCoreType, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - g.MakeEdge(coreid, newnode.Id, ContainsRelation) - g.MakeEdge(newnode.Id, coreid, InRelation) - return newnode.Id -} +// MakeCore creates a core for the graph +func (g *FluxJGF) MakeCore(name, subpath string) Node { -// MakeNFProperties makes the node feature discovery properties for the graph -func (g *FluxJGF) MakeNFDProperties(coreid string, index int64, filter string, labels *map[string]string) { - for key, _ := range *labels { - if strings.Contains(key, filter) { - name := strings.Split(key, "/")[1] - if strings.Contains(name, ".") { - name = strings.Split(name, ".")[1] - } - - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: name, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - g.MakeEdge(coreid, newnode.Id, ContainsRelation) - } - } -} + // A core is located at the subnet->node->core + resource := g.Resources.getCounter(name, CoreType) -func (g *FluxJGF) MakeNFDPropertiesByValue(coreid string, index int64, filter string, labels *map[string]string) { - for key, val := range *labels { - if strings.Contains(key, filter) { - name := val - - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: name, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - g.MakeEdge(coreid, newnode.Id, ContainsRelation) - } - } + // Here the full containment path will be: + // /// + subpath = fmt.Sprintf("%s/%s", subpath, resource.NameWithIndex()) + return g.makeNewNode(resource, subpath, defaultUnit, defaultSize) } // MakeMemory creates memory for the graph -func (g *FluxJGF) MakeMemory(index int64, name string, unit string, size int64) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: MemoryType, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: unit, - Size: size, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - return newnode.Id +// Flux doesn't understand memory? Not sure if this is doing anything +func (g *FluxJGF) MakeMemory(name, subpath string, size int64) Node { + + // unit is assumed to be MB + unit := "MB" + + // A core is located at the subnet->node->core + resource := g.Resources.getCounter(name, MemoryType) + + // Here the full containment path will be: + // /// + subpath = fmt.Sprintf("%s/%s", subpath, resource.NameWithIndex()) + return g.makeNewNode(resource, subpath, unit, size) } // MakeGPU makes a gpu for the graph -func (g *FluxJGF) MakeGPU(index int64, name string, size int64) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: GPUType, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: size, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - return newnode.Id -} +func (g *FluxJGF) MakeGPU(name, subpath string, size int64) Node { -// MakeCluster creates the cluster -func (g *FluxJGF) MakeCluster(clustername string) string { - g.Elements = 0 - newnode := node{ - Id: strconv.Itoa(0), - Metadata: nodeMetadata{ - Type: ClusterType, - Basename: clustername, - Name: clustername + "0", - Id: g.Elements, - Uniq_id: 0, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: map[string]string{ - containmentKey: "/" + clustername + "0", - }, - }, - } - g.addNode(newnode) - return newnode.Id + // Get a resource counter for the gpu, which is under the subnet->node->gpu + resource := g.Resources.getCounter(name, GPUType) + + // Here the full containment path will be: + // // + subpath = fmt.Sprintf("%s/%s", subpath, resource.NameWithIndex()) + return g.makeNewNode(resource, subpath, defaultUnit, size) } -// MakeRack makes the rack -func (g *FluxJGF) MakeRack(index int64) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: RackType, - Basename: RackType, - Name: RackType + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, +// InitCluster creates a new cluster, primarily the root "cluster" node +func (g *FluxJGF) InitCluster(name string) (Node, error) { + if g.Resources.Elements > 0 { + return Node{}, fmt.Errorf("init can only be called for a new cluster") } - g.addNode(newnode) - return newnode.Id + + // The cluster name is the index (always 0) with the original name + g.Resources.RootName = fmt.Sprintf("%s0", name) + resource := g.Resources.getCounter(name, ClusterType) + return g.makeNewNode(resource, "", defaultUnit, defaultSize), nil } func (g *FluxJGF) WriteJGF(path string) error { diff --git a/src/fluence/jgf/types.go b/src/fluence/jgf/types.go index cca3cfe..41d8fde 100644 --- a/src/fluence/jgf/types.go +++ b/src/fluence/jgf/types.go @@ -16,7 +16,9 @@ limitations under the License. package jgf -type node struct { +import "fmt" + +type Node struct { Id string `json:"id"` Label string `json:"label,omitempty"` Metadata nodeMetadata `json:"metadata,omitempty"` @@ -49,14 +51,94 @@ type nodeMetadata struct { } type graph struct { - Nodes []node `json:"nodes"` + Nodes []Node `json:"nodes"` Edges []edge `json:"edges"` // Metadata metadata `json:"metadata,omitempty"` Directed bool `json:"directed,omitempty"` } type FluxJGF struct { - Graph graph `json:"graph"` - Elements int64 `json:"-"` - NodeMap map[string]node `json:"-"` + Graph graph `json:"graph"` + NodeMap map[string]Node `json:"-"` + + // Counters for specific resource types (e.g., rack, node) + Resources ResourceCounter `json:"-"` +} + +// ResourceCounter keeps track of indices for each resource type +type ResourceCounter struct { + + // count of elements by resource type + counts map[string]int64 + + // Total elements in the graph + Elements int64 + + // Name or path of root + RootName string +} + +// ResourceCount provides complete metadata to populate a new node +// This object is returned by the resourceCounter for a node to use +// to quickly derive values, etc. +type ResourceCount struct { + + // Name of the resource (e.g., "red") + Name string + + // Name of the resource type (e.g., "node") + Type string + + // Element ID, in the context of total elements in the graph + ElementId int64 + + // Index or count for the resource in question + Index int64 +} + +// Return the resource name + resource +// This is scoped to the resource and not global for all the +// elements in the graph +func (r *ResourceCount) NameWithIndex() string { + return fmt.Sprintf("%s%d", r.Name, r.Index) +} + +// StringElementId is the global index as a string +func (r *ResourceCount) StringElementId() string { + return fmt.Sprintf("%d", r.ElementId) +} + +// StringResourceIndex is the string variant of the resource index +func (r *ResourceCount) StringResourceIndex() string { + return fmt.Sprintf("%d", r.Index) +} + +// NextIndex returns the next global index and adds 1 to the count +func (r *ResourceCounter) NextIndex() int64 { + nextIndex := r.Elements + r.Elements = nextIndex + 1 + return nextIndex +} + +// NextIndex returns the next resource index and adds 1 to the count +func (r *ResourceCounter) NextResourceIndex(resourceType string) int64 { + nextIndex, ok := r.counts[resourceType] + if !ok { + nextIndex = int64(0) + } + r.counts[resourceType] = nextIndex + 1 + return nextIndex +} + +// getCounter returns the counter context for a specific resource type +func (r *ResourceCounter) getCounter(resourceName string, resourceType string) ResourceCount { + resourceCount := ResourceCount{ + Index: r.NextResourceIndex(resourceName), + Type: resourceType, + Name: resourceName, + ElementId: r.NextIndex(), + } + + // Update the count for the next element (global) and resource count + return resourceCount } diff --git a/src/fluence/utils/utils.go b/src/fluence/utils/utils.go index 4ee92d4..25ddb85 100644 --- a/src/fluence/utils/utils.go +++ b/src/fluence/utils/utils.go @@ -68,12 +68,12 @@ func RegisterExisting(clientset *kubernetes.Clientset, ctx context.Context) (map return nodes, nil } -// CreateJGF creates the Json Graph Format +// CreateInClusterJGF creates the Json Graph Format from the Kubernetes API // We currently don't have support in fluxion to allocate jobs for existing pods, // so instead we create the graph with fewer resources. When that support is // added (see sig-scheduler-plugins/pkg/fluence/register.go) we can // remove the adjustment here, which is more of a hack -func CreateJGF(filename string, skipLabel *string) error { +func CreateInClusterJGF(filename string, skipLabel string) error { ctx := context.Background() config, err := rest.InClusterConfig() if err != nil { @@ -94,20 +94,28 @@ func CreateJGF(filename string, skipLabel *string) error { // Create a Flux Json Graph Format (JGF) with all cluster nodes fluxgraph := jgf.NewFluxJGF() - // Top level of the graph is the cluster + // Initialize the cluster. The top level of the graph is the cluster // This assumes fluxion is only serving one cluster. // previous comments indicate that we choose between the level // of a rack and a subnet. A rack doesn't make sense (the nodes could // be on multiple racks) so subnet is likely the right abstraction - cluster := fluxgraph.MakeCluster(defaultClusterName) - - vcores := 0 + clusterNode, err := fluxgraph.InitCluster(defaultClusterName) + if err != nil { + return err + } fmt.Println("Number nodes ", len(nodes.Items)) + + // TODO for follow up / next PR: + // Metrics / summary should be an attribute of the JGF outer flux graph + // Resources should come in from entire group (and not repres. pod) var totalAllocCpu int64 totalAllocCpu = 0 - sdnCount := int64(0) - for nodeIndex, node := range nodes.Items { + // Keep a lookup of subnet nodes in case we see one twice + // We don't want to create a new entity for it in the graph + subnetLookup := map[string]jgf.Node{} + + for _, node := range nodes.Items { // We should not be scheduling to the control plane _, ok := node.Labels[controlPlaneLabel] @@ -118,8 +126,8 @@ func CreateJGF(filename string, skipLabel *string) error { // Anything labeled with "skipLabel" meaning it is present, // should be skipped - if *skipLabel != "" { - _, ok := node.Labels[*skipLabel] + if skipLabel != "" { + _, ok := node.Labels[skipLabel] if ok { fmt.Printf("Skipping node %s\n", node.GetName()) continue @@ -142,12 +150,19 @@ func CreateJGF(filename string, skipLabel *string) error { return err } - // Here we build the subnet according to topology.kubernetes.io/zone label + // Have we seen this subnet node before? subnetName := node.Labels["topology.kubernetes.io/zone"] - subnet := fluxgraph.MakeSubnet(sdnCount, subnetName) - sdnCount = sdnCount + 1 - fluxgraph.MakeEdge(cluster, subnet, jgf.ContainsRelation) - fluxgraph.MakeEdge(subnet, cluster, jgf.InRelation) + subnetNode, exists := subnetLookup[subnetName] + if !exists { + // Build the subnet according to topology.kubernetes.io/zone label + subnetNode = fluxgraph.MakeSubnet(subnetName) + + // This is one example of bidirectional, I won't document in + // all following occurrences but this is what the function does + // [cluster] -> contains -> [subnet] + // [subnet] -> in -> [cluster] + fluxgraph.MakeBidirectionalEdge(clusterNode.Id, subnetNode.Id) + } // These are requests for existing pods, for cpu and memory reqs := computeTotalRequests(pods) @@ -175,43 +190,43 @@ func CreateJGF(filename string, skipLabel *string) error { gpuAllocatable, hasGpuAllocatable := node.Status.Allocatable["nvidia.com/gpu"] // TODO possibly look at pod resources vs. node.Status.Allocatable + // Make the compute node, which is a child of the subnet + // The parameters here are the node name, and the parent path + computeNode := fluxgraph.MakeNode(node.Name, subnetNode.Metadata.Name) - workernode := fluxgraph.MakeNode(nodeIndex, false, node.Name) - fluxgraph.MakeEdge(subnet, workernode, jgf.ContainsRelation) - fluxgraph.MakeEdge(workernode, subnet, jgf.InRelation) + // [subnet] -> contains -> [compute node] + fluxgraph.MakeBidirectionalEdge(subnetNode.Id, computeNode.Id) + // Here we are adding GPU resources under nodes if hasGpuAllocatable { fmt.Println("GPU Resource quantity ", gpuAllocatable.Value()) for index := 0; index < int(gpuAllocatable.Value()); index++ { - gpu := fluxgraph.MakeGPU(int64(index), jgf.NvidiaGPU, 1) - fluxgraph.MakeEdge(workernode, gpu, jgf.ContainsRelation) - fluxgraph.MakeEdge(gpu, workernode, jgf.InRelation) + + // The subpath (from and not including root) is the subnet -> node + subpath := fmt.Sprintf("%s/%s", subnetNode.Metadata.Name, computeNode.Metadata.Name) + + // TODO: can this size be greater than 1? + gpuNode := fluxgraph.MakeGPU(jgf.NvidiaGPU, subpath, 1) + + // [compute] -> contains -> [gpu] + fluxgraph.MakeBidirectionalEdge(computeNode.Id, gpuNode.Id) } } + // Here is where we are adding cores for index := 0; index < int(availCpu); index++ { - core := fluxgraph.MakeCore(int64(index), jgf.CoreType) - fluxgraph.MakeEdge(workernode, core, jgf.ContainsRelation) - fluxgraph.MakeEdge(core, workernode, jgf.InRelation) - - // Question from Vanessa: - // How can we get here and have vcores ever not equal to zero? - if vcores == 0 { - fluxgraph.MakeNFDProperties(core, int64(index), "cpu-", &node.Labels) - } else { - for virtualCore := 0; virtualCore < vcores; virtualCore++ { - vcore := fluxgraph.MakeVCore(core, int64(virtualCore), jgf.VirtualCoreType) - fluxgraph.MakeNFDProperties(vcore, int64(index), "cpu-", &node.Labels) - } - } + subpath := fmt.Sprintf("%s/%s", subnetNode.Metadata.Name, computeNode.Metadata.Name) + coreNode := fluxgraph.MakeCore(jgf.CoreType, subpath) + fluxgraph.MakeBidirectionalEdge(computeNode.Id, coreNode.Id) } + // Here is where we are adding memory fractionMem := availMem >> 30 for i := 0; i < int(fractionMem); i++ { - mem := fluxgraph.MakeMemory(int64(i), jgf.MemoryType, "MB", 1<<10) - fluxgraph.MakeEdge(workernode, mem, jgf.ContainsRelation) - fluxgraph.MakeEdge(mem, workernode, jgf.InRelation) + subpath := fmt.Sprintf("%s/%s", subnetNode.Metadata.Name, computeNode.Metadata.Name) + memoryNode := fluxgraph.MakeMemory(jgf.MemoryType, subpath, 1<<10) + fluxgraph.MakeBidirectionalEdge(computeNode.Id, memoryNode.Id) } } fmt.Printf("\nCan request at most %d exclusive cpu", totalAllocCpu) @@ -220,7 +235,6 @@ func CreateJGF(filename string, skipLabel *string) error { return err } return nil - } // computeTotalRequests sums up the pod requests for the list. We do not consider limits.