diff --git a/.github/workflows/e2e-test.yaml b/.github/workflows/e2e-test.yaml new file mode 100644 index 0000000..0832080 --- /dev/null +++ b/.github/workflows/e2e-test.yaml @@ -0,0 +1,179 @@ +name: fluence build test + +on: + pull_request: [] + # Test on demand (dispatch) or once a week, sunday + # We combine the builds into one job to simplify not needing to share + # containers between jobs. We also don't want to push unless the tests pass. + workflow_dispatch: + schedule: + - cron: '0 0 * * 0' + +jobs: + build-fluence: + + # The scheduler and controller are built together with the hack script + # in the upstream scheduler-plugins + env: + container: ghcr.io/flux-framework/fluence + controller: ghcr.io/flux-framework/fluence-controller + runs-on: ubuntu-latest + name: build fluence + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v4 + with: + go-version: ^1.21 + + - name: Build Containers + run: | + make prepare + make build REGISTRY=ghcr.io/flux-framework SCHEDULER_IMAGE=fluence CONTROLLER_IMAGE=fluence-controller + + - name: Save Containers + run: | + docker save ${{ env.container }} | gzip > fluence_latest.tar.gz + docker save ${{ env.controller }} | gzip > fluence_controller_latest.tar.gz + + - name: Upload container artifact + uses: actions/upload-artifact@v4 + with: + name: fluence + path: fluence_latest.tar.gz + + - name: Upload container artifact + uses: actions/upload-artifact@v4 + with: + name: fluence_controller + path: fluence_controller_latest.tar.gz + + build-sidecar: + env: + container: ghcr.io/flux-framework/fluence-sidecar + runs-on: ubuntu-latest + name: build sidecar + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v4 + with: + go-version: ^1.21 + + - name: Build Container + run: | + make prepare + make build-sidecar REGISTRY=ghcr.io/flux-framework SIDECAR_IMAGE=fluence-sidecar + + - name: Save Container + run: docker save ${{ env.container }} | gzip > fluence_sidecar_latest.tar.gz + + - name: Upload container artifact + uses: actions/upload-artifact@v4 + with: + name: fluence_sidecar + path: fluence_sidecar_latest.tar.gz + + test-fluence: + needs: [build-fluence, build-sidecar] + permissions: + packages: write + env: + fluence_container: ghcr.io/flux-framework/fluence + sidecar_container: ghcr.io/flux-framework/fluence-sidecar + controller_container: ghcr.io/flux-framework/fluence-controller + + runs-on: ubuntu-latest + name: test fluence + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v4 + with: + go-version: ^1.21 + + - name: Download fluence artifact + uses: actions/download-artifact@v4 + with: + name: fluence + path: /tmp + + - name: Download fluence_sidecar artifact + uses: actions/download-artifact@v4 + with: + name: fluence_sidecar + path: /tmp + + - name: Download fluence_controller artifact + uses: actions/download-artifact@v4 + with: + name: fluence_controller + path: /tmp + + - name: Make Space For Build + run: | + sudo rm -rf /usr/share/dotnet + sudo rm -rf /usr/local/lib/android + sudo rm -rf /opt/ghc + + - name: Load Docker images + run: | + ls /tmp/*.tar.gz + docker load --input /tmp/fluence_sidecar_latest.tar.gz + rm /tmp/fluence_sidecar_latest.tar.gz + docker load --input /tmp/fluence_latest.tar.gz + rm /tmp/fluence_latest.tar.gz + docker load --input /tmp/fluence_controller_latest.tar.gz + rm /tmp/fluence_controller_latest.tar.gz + docker image ls -a | grep fluence + + - name: Create Kind Cluster + uses: helm/kind-action@v1.5.0 + with: + cluster_name: kind + kubectl_version: v1.28.2 + version: v0.20.0 + config: ./.github/test-kind-config.yaml + + - name: Load Docker Containers into Kind + env: + fluence: ${{ env.fluence_container }} + sidecar: ${{ env.sidecar_container }} + controller: ${{ env.controller_container }} + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + kind load docker-image ${fluence} + kind load docker-image ${sidecar} + kind load docker-image ${controller} + + - name: Install Cert Manager + run: | + kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.13.1/cert-manager.yaml + sleep 10 + + - name: Test Fluence + run: /bin/bash ./.github/test.sh + + - name: Tag Weekly Images + run: | + # YEAR-MONTH-DAY or #YYYY-MM-DD + tag=$(echo $(date +%Y-%m-%d)) + echo "Tagging and releasing ${{ env.fluence_container}}:${tag}" + docker tag ${{ env.fluence_container }}:latest ${{ env.fluence_container }}:${tag} + echo "Tagging and releasing ${{ env.sidecar_container}}:${tag}" + docker tag ${{ env.sidecar_container }}:latest ${{ env.sidecar_container }}:${tag} + echo "Tagging and releasing ${{ env.controller_container}}:${tag}" + docker tag ${{ env.controller_container }}:latest ${{ env.controller_container }}:${tag} + + # If we get here, tests pass, and we can deploy + - name: GHCR Login + if: (github.event_name != 'pull_request') + uses: docker/login-action@v2 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Deploy Containers + if: (github.event_name != 'pull_request') + run: | + docker push ${{ env.fluence_container }} --all-tags + docker push ${{ env.sidecar_container }} --all-tags + docker push ${{ env.controller_container }} --all-tags \ No newline at end of file diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index ed45891..593d1a0 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -1,179 +1,19 @@ -name: fluence build test +name: fluence testing on: pull_request: [] - # Test on demand (dispatch) or once a week, sunday - # We combine the builds into one job to simplify not needing to share - # containers between jobs. We also don't want to push unless the tests pass. - workflow_dispatch: - schedule: - - cron: '0 0 * * 0' jobs: - build-fluence: - - # The scheduler and controller are built together with the hack script - # in the upstream scheduler-plugins - env: - container: ghcr.io/flux-framework/fluence - controller: ghcr.io/flux-framework/fluence-controller - runs-on: ubuntu-latest - name: build fluence - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-go@v4 - with: - go-version: ^1.19 - - - name: Build Containers - run: | - make prepare - make build REGISTRY=ghcr.io/flux-framework SCHEDULER_IMAGE=fluence CONTROLLER_IMAGE=fluence-controller - - - name: Save Containers - run: | - docker save ${{ env.container }} | gzip > fluence_latest.tar.gz - docker save ${{ env.controller }} | gzip > fluence_controller_latest.tar.gz - - - name: Upload container artifact - uses: actions/upload-artifact@v4 - with: - name: fluence - path: fluence_latest.tar.gz - - - name: Upload container artifact - uses: actions/upload-artifact@v4 - with: - name: fluence_controller - path: fluence_controller_latest.tar.gz - - build-sidecar: - env: - container: ghcr.io/flux-framework/fluence-sidecar - runs-on: ubuntu-latest - name: build sidecar - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-go@v4 - with: - go-version: ^1.19 - - - name: Build Container - run: | - make prepare - make build-sidecar REGISTRY=ghcr.io/flux-framework SIDECAR_IMAGE=fluence-sidecar - - - name: Save Container - run: docker save ${{ env.container }} | gzip > fluence_sidecar_latest.tar.gz - - - name: Upload container artifact - uses: actions/upload-artifact@v4 - with: - name: fluence_sidecar - path: fluence_sidecar_latest.tar.gz - test-fluence: - needs: [build-fluence, build-sidecar] - permissions: - packages: write - env: - fluence_container: ghcr.io/flux-framework/fluence - sidecar_container: ghcr.io/flux-framework/fluence-sidecar - controller_container: ghcr.io/flux-framework/fluence-controller - runs-on: ubuntu-latest - name: test fluence + name: build fluence steps: - uses: actions/checkout@v4 - uses: actions/setup-go@v4 with: - go-version: ^1.19 - - - name: Download fluence artifact - uses: actions/download-artifact@v4 - with: - name: fluence - path: /tmp - - - name: Download fluence_sidecar artifact - uses: actions/download-artifact@v4 - with: - name: fluence_sidecar - path: /tmp - - - name: Download fluence_controller artifact - uses: actions/download-artifact@v4 - with: - name: fluence_controller - path: /tmp - - - name: Make Space For Build - run: | - sudo rm -rf /usr/share/dotnet - sudo rm -rf /usr/local/lib/android - sudo rm -rf /opt/ghc - - - name: Load Docker images - run: | - ls /tmp/*.tar.gz - docker load --input /tmp/fluence_sidecar_latest.tar.gz - rm /tmp/fluence_sidecar_latest.tar.gz - docker load --input /tmp/fluence_latest.tar.gz - rm /tmp/fluence_latest.tar.gz - docker load --input /tmp/fluence_controller_latest.tar.gz - rm /tmp/fluence_controller_latest.tar.gz - docker image ls -a | grep fluence - - - name: Create Kind Cluster - uses: helm/kind-action@v1.5.0 - with: - cluster_name: kind - kubectl_version: v1.28.2 - version: v0.20.0 - config: ./.github/test-kind-config.yaml - - - name: Load Docker Containers into Kind - env: - fluence: ${{ env.fluence_container }} - sidecar: ${{ env.sidecar_container }} - controller: ${{ env.controller_container }} - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - run: | - kind load docker-image ${fluence} - kind load docker-image ${sidecar} - kind load docker-image ${controller} - - - name: Install Cert Manager - run: | - kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.13.1/cert-manager.yaml - sleep 10 - - - name: Test Fluence - run: /bin/bash ./.github/test.sh - - - name: Tag Weekly Images - run: | - # YEAR-MONTH-DAY or #YYYY-MM-DD - tag=$(echo $(date +%Y-%m-%d)) - echo "Tagging and releasing ${{ env.fluence_container}}:${tag}" - docker tag ${{ env.fluence_container }}:latest ${{ env.fluence_container }}:${tag} - echo "Tagging and releasing ${{ env.sidecar_container}}:${tag}" - docker tag ${{ env.sidecar_container }}:latest ${{ env.sidecar_container }}:${tag} - echo "Tagging and releasing ${{ env.controller_container}}:${tag}" - docker tag ${{ env.controller_container }}:latest ${{ env.controller_container }}:${tag} - - # If we get here, tests pass, and we can deploy - - name: GHCR Login - if: (github.event_name != 'pull_request') - uses: docker/login-action@v2 - with: - registry: ghcr.io - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} + go-version: ^1.21 - - name: Deploy Containers - if: (github.event_name != 'pull_request') + - name: Run Tests run: | - docker push ${{ env.fluence_container }} --all-tags - docker push ${{ env.sidecar_container }} --all-tags - docker push ${{ env.controller_container }} --all-tags \ No newline at end of file + cd src/fluence + make test \ No newline at end of file diff --git a/src/fluence/Makefile b/src/fluence/Makefile new file mode 100644 index 0000000..9e56215 --- /dev/null +++ b/src/fluence/Makefile @@ -0,0 +1,7 @@ + +.PHONY: all +all: test + +.PHONY: test +test: + go test -v ./jgf/ \ No newline at end of file diff --git a/src/fluence/cmd/main.go b/src/fluence/cmd/main.go index e8ef87d..753e56e 100644 --- a/src/fluence/cmd/main.go +++ b/src/fluence/cmd/main.go @@ -40,7 +40,7 @@ func main() { // Fluxion GRPC flux := fluxion.Fluxion{} - flux.InitFluxion(policy, label) + flux.InitFluxion(*policy, *label) lis, err := net.Listen("tcp", port) if err != nil { diff --git a/src/fluence/fluxion/fluxion.go b/src/fluence/fluxion/fluxion.go index f288cdf..ab2a3d7 100644 --- a/src/fluence/fluxion/fluxion.go +++ b/src/fluence/fluxion/fluxion.go @@ -20,11 +20,11 @@ type Fluxion struct { } // InitFluxion creates a new client to interaction with the fluxion API (via go bindings) -func (fluxion *Fluxion) InitFluxion(policy *string, label *string) { +func (fluxion *Fluxion) InitFluxion(policy string, label string) { fluxion.cli = fluxcli.NewReapiClient() klog.Infof("[Fluence] Created flux resource client %s", fluxion.cli) - err := utils.CreateJGF(defaults.KubernetesJsonGraphFormat, label) + err := utils.CreateInClusterJGF(defaults.KubernetesJsonGraphFormat, label) if err != nil { return } @@ -36,8 +36,8 @@ func (fluxion *Fluxion) InitFluxion(policy *string, label *string) { } p := "{}" - if *policy != "" { - p = string("{\"matcher_policy\": \"" + *policy + "\"}") + if policy != "" { + p = string("{\"matcher_policy\": \"" + policy + "\"}") klog.Infof("[Fluence] match policy: %s", p) } fluxion.cli.InitContext(string(jgf), p) diff --git a/src/fluence/go.mod b/src/fluence/go.mod index 01fc126..31228d9 100644 --- a/src/fluence/go.mod +++ b/src/fluence/go.mod @@ -4,6 +4,7 @@ go 1.21 require ( github.com/flux-framework/fluxion-go v0.32.1-0.20240420052153-909523c84ca2 + github.com/stretchr/testify v1.7.0 google.golang.org/grpc v1.38.0 google.golang.org/protobuf v1.26.0 gopkg.in/yaml.v2 v2.4.0 @@ -25,6 +26,7 @@ require ( github.com/json-iterator/go v1.1.11 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/net v0.0.0-20210520170846-37e1c6afe023 // indirect golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d // indirect golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 // indirect diff --git a/src/fluence/go.sum b/src/fluence/go.sum index 534497d..c7291ca 100644 --- a/src/fluence/go.sum +++ b/src/fluence/go.sum @@ -98,8 +98,6 @@ github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d/go.mod h1:ZZM github.com/fatih/camelcase v1.0.0/go.mod h1:yN2Sb0lFhZJUdVvtELVWefmrXpuZESvPmqwoZc+/fpc= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flux-framework/fluxion-go v0.32.0 h1:NY6Y1mlTTTZhHD+CmAsDsdNTxUsAFDQoORpMZj8NFLI= -github.com/flux-framework/fluxion-go v0.32.0/go.mod h1:ZI3QxSvUfgJE2Snur/SntJmVfpMjr6D4ICVmdqJ9fkQ= github.com/flux-framework/fluxion-go v0.32.1-0.20240420052153-909523c84ca2 h1:Yz/vVX0XfB2q51ZLh2p8YI5vphvv0rZF4PqtKPscvsY= github.com/flux-framework/fluxion-go v0.32.1-0.20240420052153-909523c84ca2/go.mod h1:jA5+kOSLxchFzixzYEvMAGjkXB5yszO/HxUwdhX/5/U= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= diff --git a/src/fluence/jgf/jgf.go b/src/fluence/jgf/jgf.go index 8a047f9..ba4d268 100644 --- a/src/fluence/jgf/jgf.go +++ b/src/fluence/jgf/jgf.go @@ -20,8 +20,7 @@ import ( "fmt" "log" "os" - "strconv" - "strings" + filepath "path" ) var ( @@ -52,33 +51,51 @@ var ( containmentKey = "containment" ) -// InitJGF initializes the Flux Json Graph Format object -func InitJGF() (fluxgraph Fluxjgf) { - var g graph - fluxgraph = Fluxjgf{ - Graph: g, - Elements: 0, - NodeMap: make(map[string]node), +// NewFluxJGF creates and returns a new Flux Json Graph Format object +func NewFluxJGF() FluxJGF { + + // Create a new cluster, and count the top level as a resource + // The index 0 (of the element count) is the cluster + counters := map[string]int64{"cluster": int64(1)} + return FluxJGF{ + Graph: graph{}, + NodeMap: make(map[string]Node), + + // Counters and lookup for resources + Resources: ResourceCounter{counts: counters}, + } +} + +// ToJson returns a Json string of the graph +func (g *FluxJGF) ToJson() (string, error) { + toprint, err := json.MarshalIndent(g.Graph, "", "\t") + return string(toprint), err +} + +// GetNodePath returns the node containment path +func getNodePath(root, subpath string) string { + var path string + if subpath == "" { + path = fmt.Sprintf("/%s", root) + } else { + path = fmt.Sprintf("/%s/%s", root, subpath) } - return + return filepath.Clean(path) } -// getDefaultPaths returns a new map with empty containment -// this cannot be a global shared variable or we get an error -// about inserting an edge to itself. -func getDefaultPaths() map[string]string { - return map[string]string{containmentKey: ""} +// getContainmentPath returns a new map with containment metadata +func (g *FluxJGF) getContainmentPath(subpath string) map[string]string { + return map[string]string{containmentKey: getNodePath(g.Resources.RootName, subpath)} } -// addNode adds a node to the JGF -func (g *Fluxjgf) addNode(toadd node) { - g.Graph.Nodes = append(g.Graph.Nodes, toadd) - g.NodeMap[toadd.Id] = toadd - g.Elements = g.Elements + 1 +// MakeBidirectionalEdge makes an edge for a parent and child +func (g *FluxJGF) MakeBidirectionalEdge(parent, child string) { + g.MakeEdge(parent, child, ContainsRelation) + g.MakeEdge(child, parent, InRelation) } // MakeEdge creates an edge for the JGF -func (g *Fluxjgf) MakeEdge(source string, target string, contains string) { +func (g *FluxJGF) MakeEdge(source string, target string, contains string) { newedge := edge{ Source: source, Target: target, @@ -87,263 +104,134 @@ func (g *Fluxjgf) MakeEdge(source string, target string, contains string) { }, } g.Graph.Edges = append(g.Graph.Edges, newedge) - if contains == ContainsRelation { - tnode := g.NodeMap[target] - tnode.Metadata.Paths[containmentKey] = g.NodeMap[source].Metadata.Paths[containmentKey] + "/" + tnode.Metadata.Name - } } // MakeSubnet creates a subnet for the graph -func (g *Fluxjgf) MakeSubnet(index int64, ip string) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: SubnetType, - Basename: ip, - Name: ip + fmt.Sprintf("%d", g.Elements), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - return newnode.Id -} +// The name is typically the ip address +func (g *FluxJGF) MakeSubnet(name string, index int64) Node { -// MakeNode creates a new node for the graph -func (g *Fluxjgf) MakeNode(index int, exclusive bool, subnet string) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: NodeType, - Basename: subnet, - Name: subnet + fmt.Sprintf("%d", g.Elements), - Id: g.Elements, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: exclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - return newnode.Id + // Get a resource counter for the subnet + resource := g.Resources.getCounter(name, SubnetType) + resource.Index = index + subpath := resource.NameWithIndex() + return g.makeNewNode(resource, subpath, defaultUnit, defaultSize) } -// MakeSocket creates a socket for the graph -func (g *Fluxjgf) MakeSocket(index int64, name string) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: SocketType, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - return newnode.Id -} +// makeNewNode is a shared function to make a new node from a resource spec +// subpath is the subpath to add to the graph root, e.g., / +// Since there is some variability to this structure, it is assembled by +// the calling function +func (g *FluxJGF) makeNewNode( + resource ResourceCount, + subpath, unit string, + size int64) Node { -// MakeCore creates a core for the graph -func (g *Fluxjgf) MakeCore(index int64, name string) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: CoreType, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - return newnode.Id -} + // A subnet comes directly under the cluster, which is the parent + newNode := Node{ -// MakeVCore makes a vcore (I think 2 vcpu == 1 cpu) for the graph -func (g *Fluxjgf) MakeVCore(coreid string, index int64, name string) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), + // Global identifier in graph, as a string + Id: resource.StringElementId(), Metadata: nodeMetadata{ - Type: VirtualCoreType, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, + Type: resource.Type, + + // The original name without an index + Basename: resource.Name, + + // The name with an index + Name: resource.NameWithIndex(), + + // Integer resource index + Id: resource.Index, + + // Integer global element index + Uniq_id: resource.ElementId, Rank: defaultRank, Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), + Unit: unit, + Size: size, + + // subnet is one above root graph, so just need it's name + Paths: g.getContainmentPath(subpath), }, } - g.addNode(newnode) - g.MakeEdge(coreid, newnode.Id, ContainsRelation) - g.MakeEdge(newnode.Id, coreid, InRelation) - return newnode.Id + + // Add the new node to the graph + g.Graph.Nodes = append(g.Graph.Nodes, newNode) + g.NodeMap[newNode.Id] = newNode + return newNode } -// MakeNFProperties makes the node feature discovery properties for the graph -func (g *Fluxjgf) MakeNFDProperties(coreid string, index int64, filter string, labels *map[string]string) { - for key, _ := range *labels { - if strings.Contains(key, filter) { - name := strings.Split(key, "/")[1] - if strings.Contains(name, ".") { - name = strings.Split(name, ".")[1] - } - - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: name, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - g.MakeEdge(coreid, newnode.Id, ContainsRelation) - } - } +// MakeNode creates a new node for the graph +func (g *FluxJGF) MakeNode(name, subpath string, index int64) Node { + + // Get a resource counter for the node, which is under the subnet + resource := g.Resources.getCounter(name, NodeType) + resource.Index = index + + // Here the full containment path will be: + // // + subpath = fmt.Sprintf("%s/%s", subpath, resource.NameWithIndex()) + return g.makeNewNode(resource, subpath, defaultUnit, defaultSize) } -func (g *Fluxjgf) MakeNFDPropertiesByValue(coreid string, index int64, filter string, labels *map[string]string) { - for key, val := range *labels { - if strings.Contains(key, filter) { - name := val - - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: name, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - g.MakeEdge(coreid, newnode.Id, ContainsRelation) - } - } +// MakeCore creates a core for the graph +func (g *FluxJGF) MakeCore(name, subpath string, index int64) Node { + + // A core is located at the subnet->node->core + resource := g.Resources.getCounter(name, CoreType) + resource.Index = index + + // Here the full containment path will be: + // /// + subpath = fmt.Sprintf("%s/%s", subpath, resource.NameWithIndex()) + return g.makeNewNode(resource, subpath, defaultUnit, defaultSize) } // MakeMemory creates memory for the graph -func (g *Fluxjgf) MakeMemory(index int64, name string, unit string, size int64) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: MemoryType, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: unit, - Size: size, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - return newnode.Id +// Flux doesn't understand memory? Not sure if this is doing anything +func (g *FluxJGF) MakeMemory( + name, subpath string, + size, index int64) Node { + + // unit is assumed to be MB + unit := "MB" + + // A core is located at the subnet->node->core + resource := g.Resources.getCounter(name, MemoryType) + resource.Index = index + + // Here the full containment path will be: + // /// + subpath = fmt.Sprintf("%s/%s", subpath, resource.NameWithIndex()) + return g.makeNewNode(resource, subpath, unit, size) } // MakeGPU makes a gpu for the graph -func (g *Fluxjgf) MakeGPU(index int64, name string, size int64) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: GPUType, - Basename: name, - Name: name + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: size, - Paths: getDefaultPaths(), - }, - } - g.addNode(newnode) - return newnode.Id -} +func (g *FluxJGF) MakeGPU(name, subpath string, size, index int64) Node { -// MakeCluster creates the cluster -func (g *Fluxjgf) MakeCluster(clustername string) string { - g.Elements = 0 - newnode := node{ - Id: strconv.Itoa(0), - Metadata: nodeMetadata{ - Type: ClusterType, - Basename: clustername, - Name: clustername + "0", - Id: g.Elements, - Uniq_id: 0, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: map[string]string{ - containmentKey: "/" + clustername + "0", - }, - }, - } - g.addNode(newnode) - return newnode.Id + // Get a resource counter for the gpu, which is under the subnet->node->gpu + resource := g.Resources.getCounter(name, GPUType) + resource.Index = index + + // Here the full containment path will be: + // // + subpath = fmt.Sprintf("%s/%s", subpath, resource.NameWithIndex()) + return g.makeNewNode(resource, subpath, defaultUnit, size) } -// MakeRack makes the rack -func (g *Fluxjgf) MakeRack(index int64) string { - newnode := node{ - Id: fmt.Sprintf("%d", g.Elements), - Metadata: nodeMetadata{ - Type: RackType, - Basename: RackType, - Name: RackType + fmt.Sprintf("%d", index), - Id: index, - Uniq_id: g.Elements, - Rank: defaultRank, - Exclusive: defaultExclusive, - Unit: defaultUnit, - Size: defaultSize, - Paths: getDefaultPaths(), - }, +// InitCluster creates a new cluster, primarily the root "cluster" node +func (g *FluxJGF) InitCluster(name string) (Node, error) { + if g.Resources.Elements > 0 { + return Node{}, fmt.Errorf("init can only be called for a new cluster") } - g.addNode(newnode) - return newnode.Id + + // The cluster name is the index (always 0) with the original name + g.Resources.RootName = fmt.Sprintf("%s0", name) + resource := g.Resources.getCounter(name, ClusterType) + return g.makeNewNode(resource, "", defaultUnit, defaultSize), nil } -func (g *Fluxjgf) WriteJGF(path string) error { +func (g *FluxJGF) WriteJGF(path string) error { encodedJGF, err := json.MarshalIndent(g, "", " ") diff --git a/src/fluence/jgf/jgf_test.go b/src/fluence/jgf/jgf_test.go new file mode 100644 index 0000000..1d1a596 --- /dev/null +++ b/src/fluence/jgf/jgf_test.go @@ -0,0 +1,77 @@ +package jgf + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewFluxJGF(t *testing.T) { + + // Create a new FluxGraph, assert that it is empty + fluxgraph := NewFluxJGF() + assert.Equal(t, len(fluxgraph.Graph.Nodes), 0) + assert.Equal(t, fluxgraph.Resources.Elements, int64(0)) + assert.Equal(t, len(fluxgraph.NodeMap), 0) + + out, err := fluxgraph.ToJson() + assert.Nil(t, err) + fmt.Println() + fmt.Println("== Empty graph:") + fmt.Println(out) + + // Init the cluster (make the root node) + clusterNode, err := fluxgraph.InitCluster("keebler") + assert.Nil(t, err) + + out, err = fluxgraph.ToJson() + assert.Nil(t, err) + fmt.Println() + fmt.Println("== Graph with Cluster Root:") + fmt.Println(out) + + // Add subnets to it + subnetNodeA := fluxgraph.MakeSubnet("east", 0) + subnetNodeB := fluxgraph.MakeSubnet("west", 1) + fluxgraph.MakeBidirectionalEdge(clusterNode.Id, subnetNodeA.Id) + fluxgraph.MakeBidirectionalEdge(clusterNode.Id, subnetNodeB.Id) + + out, err = fluxgraph.ToJson() + assert.Nil(t, err) + fmt.Println() + fmt.Println("== Graph with Two Subnets:") + fmt.Println(out) + + // Add some nodes! + computeNodeA := fluxgraph.MakeNode("node", subnetNodeA.Metadata.Name, 0) + computeNodeB := fluxgraph.MakeNode("node", subnetNodeB.Metadata.Name, 1) + fluxgraph.MakeBidirectionalEdge(subnetNodeA.Id, computeNodeA.Id) + fluxgraph.MakeBidirectionalEdge(subnetNodeB.Id, computeNodeB.Id) + + out, err = fluxgraph.ToJson() + assert.Nil(t, err) + fmt.Println() + fmt.Println("== Graph with Two Subnets, Each with a node:") + fmt.Println(out) + + // Add a GPU to one, and cores to the other + subpath := fmt.Sprintf("%s/%s", subnetNodeA.Metadata.Name, computeNodeA.Metadata.Name) + gpuNodeA := fluxgraph.MakeGPU(NvidiaGPU, subpath, 1, 0) + fluxgraph.MakeBidirectionalEdge(computeNodeA.Id, gpuNodeA.Id) + + subpath = fmt.Sprintf("%s/%s", subnetNodeB.Metadata.Name, computeNodeB.Metadata.Name) + coreNode := fluxgraph.MakeCore(CoreType, subpath, 0) + fluxgraph.MakeBidirectionalEdge(computeNodeB.Id, coreNode.Id) + + // Finally, add some memory to the second compute node + memoryNode := fluxgraph.MakeMemory(MemoryType, subpath, 1<<10, 0) + fluxgraph.MakeBidirectionalEdge(computeNodeA.Id, memoryNode.Id) + + out, err = fluxgraph.ToJson() + assert.Nil(t, err) + fmt.Println() + fmt.Println("== Graph with Two Subnets, Two Nodes, with GPU/Core/Memory:") + fmt.Println(out) + +} diff --git a/src/fluence/jgf/types.go b/src/fluence/jgf/types.go index 21ccd00..8359c28 100644 --- a/src/fluence/jgf/types.go +++ b/src/fluence/jgf/types.go @@ -16,7 +16,9 @@ limitations under the License. package jgf -type node struct { +import "fmt" + +type Node struct { Id string `json:"id"` Label string `json:"label,omitempty"` Metadata nodeMetadata `json:"metadata,omitempty"` @@ -49,14 +51,97 @@ type nodeMetadata struct { } type graph struct { - Nodes []node `json:"nodes"` + Nodes []Node `json:"nodes"` Edges []edge `json:"edges"` // Metadata metadata `json:"metadata,omitempty"` Directed bool `json:"directed,omitempty"` } -type Fluxjgf struct { - Graph graph `json:"graph"` - Elements int64 `json:"-"` - NodeMap map[string]node `json:"-"` +type FluxJGF struct { + Graph graph `json:"graph"` + NodeMap map[string]Node `json:"-"` + + // Counters for specific resource types (e.g., rack, node) + Resources ResourceCounter `json:"-"` +} + +// ResourceCounter keeps track of indices for each resource type +type ResourceCounter struct { + + // count of elements by resource type + counts map[string]int64 + + // Total elements in the graph + Elements int64 + + // Name or path of root + RootName string +} + +// ResourceCount provides complete metadata to populate a new node +// This object is returned by the resourceCounter for a node to use +// to quickly derive values, etc. +type ResourceCount struct { + + // Name of the resource (e.g., "red") + Name string + + // Name of the resource type (e.g., "node") + Type string + + // Element ID, in the context of total elements in the graph + ElementId int64 + + // Index or count for the resource in question + Index int64 +} + +// Return the resource name + resource +// This is scoped to the resource and not global for all the +// elements in the graph +func (r *ResourceCount) NameWithIndex() string { + return fmt.Sprintf("%s%d", r.Name, r.Index) +} + +// StringElementId is the global index as a string +func (r *ResourceCount) StringElementId() string { + return fmt.Sprintf("%d", r.ElementId) +} + +// StringResourceIndex is the string variant of the resource index +func (r *ResourceCount) StringResourceIndex() string { + return fmt.Sprintf("%d", r.Index) +} + +// NextIndex returns the next global index and adds 1 to the count +func (r *ResourceCounter) NextIndex() int64 { + nextIndex := r.Elements + r.Elements = nextIndex + 1 + return nextIndex +} + +// NextIndex returns the next resource index and adds 1 to the count +func (r *ResourceCounter) NextResourceIndex(resourceType string) int64 { + nextIndex, ok := r.counts[resourceType] + if !ok { + nextIndex = int64(0) + } + r.counts[resourceType] = nextIndex + 1 + return nextIndex +} + +// getCounter returns the counter context for a specific resource type +func (r *ResourceCounter) getCounter( + resourceName string, + resourceType string, +) ResourceCount { + resourceCount := ResourceCount{ + Index: r.NextResourceIndex(resourceName), + Type: resourceType, + Name: resourceName, + ElementId: r.NextIndex(), + } + + // Update the count for the next element (global) and resource count + return resourceCount } diff --git a/src/fluence/utils/utils.go b/src/fluence/utils/utils.go index 490a0e0..19fadf8 100644 --- a/src/fluence/utils/utils.go +++ b/src/fluence/utils/utils.go @@ -19,7 +19,8 @@ import ( ) var ( - controlPlaneLabel = "node-role.kubernetes.io/control-plane" + controlPlaneLabel = "node-role.kubernetes.io/control-plane" + defaultClusterName = "k8scluster" ) // RegisterExisting uses the in cluster API to get existing pods @@ -67,12 +68,12 @@ func RegisterExisting(clientset *kubernetes.Clientset, ctx context.Context) (map return nodes, nil } -// CreateJGF creates the Json Graph Format +// CreateInClusterJGF creates the Json Graph Format from the Kubernetes API // We currently don't have support in fluxion to allocate jobs for existing pods, // so instead we create the graph with fewer resources. When that support is // added (see sig-scheduler-plugins/pkg/fluence/register.go) we can // remove the adjustment here, which is more of a hack -func CreateJGF(filename string, skipLabel *string) error { +func CreateInClusterJGF(filename string, skipLabel string) error { ctx := context.Background() config, err := rest.InClusterConfig() if err != nil { @@ -91,22 +92,31 @@ func CreateJGF(filename string, skipLabel *string) error { } // Create a Flux Json Graph Format (JGF) with all cluster nodes - fluxgraph := jgf.InitJGF() + fluxgraph := jgf.NewFluxJGF() - // Top level of the graph is the cluster + // Initialize the cluster. The top level of the graph is the cluster // This assumes fluxion is only serving one cluster. // previous comments indicate that we choose between the level // of a rack and a subnet. A rack doesn't make sense (the nodes could // be on multiple racks) so subnet is likely the right abstraction - cluster := fluxgraph.MakeCluster("k8scluster") - - vcores := 0 + clusterNode, err := fluxgraph.InitCluster(defaultClusterName) + if err != nil { + return err + } fmt.Println("Number nodes ", len(nodes.Items)) + + // TODO for follow up / next PR: + // Metrics / summary should be an attribute of the JGF outer flux graph + // Resources should come in from entire group (and not repres. pod) var totalAllocCpu int64 totalAllocCpu = 0 - sdnCount := int64(0) - for nodeIndex, node := range nodes.Items { + // Keep a lookup of subnet nodes in case we see one twice + // We don't want to create a new entity for it in the graph + subnetLookup := map[string]jgf.Node{} + var subnetCounter int64 = 0 + + for nodeCount, node := range nodes.Items { // We should not be scheduling to the control plane _, ok := node.Labels[controlPlaneLabel] @@ -117,8 +127,8 @@ func CreateJGF(filename string, skipLabel *string) error { // Anything labeled with "skipLabel" meaning it is present, // should be skipped - if *skipLabel != "" { - _, ok := node.Labels[*skipLabel] + if skipLabel != "" { + _, ok := node.Labels[skipLabel] if ok { fmt.Printf("Skipping node %s\n", node.GetName()) continue @@ -141,12 +151,20 @@ func CreateJGF(filename string, skipLabel *string) error { return err } - // Here we build the subnet according to topology.kubernetes.io/zone label + // Have we seen this subnet node before? subnetName := node.Labels["topology.kubernetes.io/zone"] - subnet := fluxgraph.MakeSubnet(sdnCount, subnetName) - sdnCount = sdnCount + 1 - fluxgraph.MakeEdge(cluster, subnet, jgf.ContainsRelation) - fluxgraph.MakeEdge(subnet, cluster, jgf.InRelation) + subnetNode, exists := subnetLookup[subnetName] + if !exists { + // Build the subnet according to topology.kubernetes.io/zone label + subnetNode = fluxgraph.MakeSubnet(subnetName, subnetCounter) + subnetCounter += 1 + + // This is one example of bidirectional, I won't document in + // all following occurrences but this is what the function does + // [cluster] -> contains -> [subnet] + // [subnet] -> in -> [cluster] + fluxgraph.MakeBidirectionalEdge(clusterNode.Id, subnetNode.Id) + } // These are requests for existing pods, for cpu and memory reqs := computeTotalRequests(pods) @@ -174,43 +192,43 @@ func CreateJGF(filename string, skipLabel *string) error { gpuAllocatable, hasGpuAllocatable := node.Status.Allocatable["nvidia.com/gpu"] // TODO possibly look at pod resources vs. node.Status.Allocatable + // Make the compute node, which is a child of the subnet + // The parameters here are the node name, and the parent path + computeNode := fluxgraph.MakeNode(node.Name, subnetNode.Metadata.Name, int64(nodeCount)) - workernode := fluxgraph.MakeNode(nodeIndex, false, node.Name) - fluxgraph.MakeEdge(subnet, workernode, jgf.ContainsRelation) - fluxgraph.MakeEdge(workernode, subnet, jgf.InRelation) + // [subnet] -> contains -> [compute node] + fluxgraph.MakeBidirectionalEdge(subnetNode.Id, computeNode.Id) + // Here we are adding GPU resources under nodes if hasGpuAllocatable { fmt.Println("GPU Resource quantity ", gpuAllocatable.Value()) for index := 0; index < int(gpuAllocatable.Value()); index++ { - gpu := fluxgraph.MakeGPU(int64(index), jgf.NvidiaGPU, 1) - fluxgraph.MakeEdge(workernode, gpu, jgf.ContainsRelation) - fluxgraph.MakeEdge(gpu, workernode, jgf.InRelation) + + // The subpath (from and not including root) is the subnet -> node + subpath := fmt.Sprintf("%s/%s", subnetNode.Metadata.Name, computeNode.Metadata.Name) + + // TODO: can this size be greater than 1? + gpuNode := fluxgraph.MakeGPU(jgf.NvidiaGPU, subpath, 1, int64(index)) + + // [compute] -> contains -> [gpu] + fluxgraph.MakeBidirectionalEdge(computeNode.Id, gpuNode.Id) } } + // Here is where we are adding cores for index := 0; index < int(availCpu); index++ { - core := fluxgraph.MakeCore(int64(index), jgf.CoreType) - fluxgraph.MakeEdge(workernode, core, jgf.ContainsRelation) - fluxgraph.MakeEdge(core, workernode, jgf.InRelation) - - // Question from Vanessa: - // How can we get here and have vcores ever not equal to zero? - if vcores == 0 { - fluxgraph.MakeNFDProperties(core, int64(index), "cpu-", &node.Labels) - } else { - for virtualCore := 0; virtualCore < vcores; virtualCore++ { - vcore := fluxgraph.MakeVCore(core, int64(virtualCore), jgf.VirtualCoreType) - fluxgraph.MakeNFDProperties(vcore, int64(index), "cpu-", &node.Labels) - } - } + subpath := fmt.Sprintf("%s/%s", subnetNode.Metadata.Name, computeNode.Metadata.Name) + coreNode := fluxgraph.MakeCore(jgf.CoreType, subpath, int64(index)) + fluxgraph.MakeBidirectionalEdge(computeNode.Id, coreNode.Id) } + // Here is where we are adding memory fractionMem := availMem >> 30 for i := 0; i < int(fractionMem); i++ { - mem := fluxgraph.MakeMemory(int64(i), jgf.MemoryType, "MB", 1<<10) - fluxgraph.MakeEdge(workernode, mem, jgf.ContainsRelation) - fluxgraph.MakeEdge(mem, workernode, jgf.InRelation) + subpath := fmt.Sprintf("%s/%s", subnetNode.Metadata.Name, computeNode.Metadata.Name) + memoryNode := fluxgraph.MakeMemory(jgf.MemoryType, subpath, 1<<10, int64(i)) + fluxgraph.MakeBidirectionalEdge(computeNode.Id, memoryNode.Id) } } fmt.Printf("\nCan request at most %d exclusive cpu", totalAllocCpu) @@ -219,7 +237,6 @@ func CreateJGF(filename string, skipLabel *string) error { return err } return nil - } // computeTotalRequests sums up the pod requests for the list. We do not consider limits.