Skip to content

Commit

Permalink
NOISSUE - Refactor single algorithm processing (#117)
Browse files Browse the repository at this point in the history
* Refactor single algorithm processing

Simplified the agent service's algorithm handling logic to process a single algorithm instead of multiple. This change:
- Removed the `Algorithms` type and associated stringer implementation.
- Updated the state machine and service logic to expect a singular algorithm, aligning the agent's internal state transitions with the new model.
- Adjusted the manager service and computations test server to mirror these changes in their respective payload structures, ensuring API and test consistency.
- Altered README files to reflect the simplified interaction model and removed outdated descriptions.
- Reverted the protoc-gen-go version used for generating protobuf files to maintain compatibility with the rest of the codebase.

The single-algorithm approach streamlines the computation running process, reducing complexity and potential error conditions. It directly impacts how external services will construct and send computation requests.

Signed-off-by: SammyOina <[email protected]>

* Update protoc-gen-go version to v1.33.0

Signed-off-by: SammyOina <[email protected]>

* Refactor variable name in computations.go and grpc.go

Signed-off-by: SammyOina <[email protected]>

---------

Signed-off-by: SammyOina <[email protected]>
  • Loading branch information
SammyOina authored Apr 17, 2024
1 parent 3a14896 commit 64f7e7f
Show file tree
Hide file tree
Showing 13 changed files with 88 additions and 111 deletions.
1 change: 0 additions & 1 deletion agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ The service is configured using the environment variables from the following tab
| AGENT_GRPC_SERVER_KEY | Path to gRPC server key in pem format | "" |
| AGENT_GRPC_SERVER_CA_CERTS | Path to gRPC server CA certificate | "" |
| AGENT_GRPC_CLIENT_CA_CERTS | Path to gRPC client CA certificate | "" |
| COCOS_NOTIFICATION_SERVER_URL | Server to receive notification events from agent. | http:/localhost:9000 |


## Deployment
Expand Down
17 changes: 2 additions & 15 deletions agent/computations.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ import (
"reflect"
)

var (
_ fmt.Stringer = (*Datasets)(nil)
_ fmt.Stringer = (*Algorithms)(nil)
)
var _ fmt.Stringer = (*Datasets)(nil)

type AgentConfig struct {
LogLevel string `json:"log_level"`
Expand All @@ -29,7 +26,7 @@ type Computation struct {
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
Datasets Datasets `json:"datasets,omitempty"`
Algorithms Algorithms `json:"algorithms,omitempty"`
Algorithm Algorithm `json:"algorithms,omitempty"`
ResultConsumers []string `json:"result_consumers,omitempty"`
AgentConfig AgentConfig `json:"agent_config,omitempty"`
}
Expand All @@ -42,14 +39,6 @@ func (d *Datasets) String() string {
return string(dat)
}

func (a *Algorithms) String() string {
dat, err := json.Marshal(a)
if err != nil {
return ""
}
return string(dat)
}

type Dataset struct {
Dataset []byte `json:"-"`
Hash [32]byte `json:"hash,omitempty"`
Expand All @@ -66,8 +55,6 @@ type Algorithm struct {
ID string `json:"id,omitempty"`
}

type Algorithms []Algorithm

func containsID(slice interface{}, id string) int {
rangeOnMe := reflect.ValueOf(slice)
for i := 0; i < rangeOnMe.Len(); i++ {
Expand Down
27 changes: 13 additions & 14 deletions agent/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Service interface {

type agentService struct {
computation Computation // Holds the current computation request details.
algorithms [][]byte // Stores the algorithms received for the computation.
algorithm []byte // Stores the algorithm received for the computation.
datasets [][]byte // Stores the datasets received for the computation.
result []byte // Stores the result of the computation.
sm *StateMachine // Manages the state transitions of the agent service.
Expand All @@ -84,46 +84,45 @@ func New(ctx context.Context, logger *slog.Logger, eventSvc events.Service, cmp
go svc.sm.Start(ctx)
svc.sm.SendEvent(start)
svc.sm.StateFunctions[idle] = svc.publishEvent("in-progress", json.RawMessage{})
svc.sm.StateFunctions[receivingManifests] = svc.publishEvent("in-progress", json.RawMessage{})
svc.sm.StateFunctions[receivingAlgorithms] = svc.publishEvent("in-progress", json.RawMessage{})
svc.sm.StateFunctions[receivingManifest] = svc.publishEvent("in-progress", json.RawMessage{})
svc.sm.StateFunctions[receivingAlgorithm] = svc.publishEvent("in-progress", json.RawMessage{})
svc.sm.StateFunctions[receivingData] = svc.publishEvent("in-progress", json.RawMessage{})
svc.sm.StateFunctions[resultsReady] = svc.publishEvent("in-progress", json.RawMessage{})
svc.sm.StateFunctions[complete] = svc.publishEvent("in-progress", json.RawMessage{})
svc.sm.StateFunctions[running] = svc.runComputation

svc.computation = cmp
svc.sm.SendEvent(manifestsReceived)
svc.sm.SendEvent(manifestReceived)
return svc
}

func (as *agentService) Algo(ctx context.Context, algorithm Algorithm) error {
if as.sm.GetState() != receivingAlgorithms {
if as.sm.GetState() != receivingAlgorithm {
return errStateNotReady
}
if len(as.computation.Algorithms) == 0 {
if as.algorithm != nil {
return errAllManifestItemsReceived
}

hash := sha3.Sum256(algorithm.Algorithm)

index := containsID(as.computation.Algorithms, algorithm.ID)
index := containsID(as.computation.Algorithm, algorithm.ID)
switch index {
case -1:
return errUndeclaredAlgorithm
default:
if as.computation.Algorithms[index].Provider != algorithm.Provider {
if as.computation.Algorithm.Provider != algorithm.Provider {
return errProviderMissmatch
}
if hash != as.computation.Algorithms[index].Hash {
if hash != as.computation.Algorithm.Hash {
return errHashMismatch
}
as.computation.Algorithms = slices.Delete(as.computation.Algorithms, index, index+1)
}

as.algorithms = append(as.algorithms, algorithm.Algorithm)
as.algorithm = algorithm.Algorithm

if len(as.computation.Algorithms) == 0 {
as.sm.SendEvent(algorithmsReceived)
if as.algorithm != nil {
as.sm.SendEvent(algorithmReceived)
}

return nil
Expand Down Expand Up @@ -202,7 +201,7 @@ func (as *agentService) runComputation() {
as.sm.logger.Debug("computation run started")
defer as.sm.SendEvent(runComplete)
as.publishEvent("in-progress", json.RawMessage{})()
result, err := run(as.algorithms[0], as.datasets[0])
result, err := run(as.algorithm, as.datasets[0])
if err != nil {
as.runError = err
as.publishEvent("failed", json.RawMessage{})()
Expand Down
18 changes: 9 additions & 9 deletions agent/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ type state int

const (
idle state = iota
receivingManifests
receivingAlgorithms
receivingManifest
receivingAlgorithm
receivingData
running
resultsReady
Expand All @@ -26,8 +26,8 @@ type event int

const (
start event = iota
manifestsReceived
algorithmsReceived
manifestReceived
algorithmReceived
dataReceived
runComplete
resultsConsumed
Expand Down Expand Up @@ -56,13 +56,13 @@ func NewStateMachine(logger *slog.Logger) *StateMachine {
}

sm.Transitions[idle] = make(map[event]state)
sm.Transitions[idle][start] = receivingManifests
sm.Transitions[idle][start] = receivingManifest

sm.Transitions[receivingManifests] = make(map[event]state)
sm.Transitions[receivingManifests][manifestsReceived] = receivingAlgorithms
sm.Transitions[receivingManifest] = make(map[event]state)
sm.Transitions[receivingManifest][manifestReceived] = receivingAlgorithm

sm.Transitions[receivingAlgorithms] = make(map[event]state)
sm.Transitions[receivingAlgorithms][algorithmsReceived] = receivingData
sm.Transitions[receivingAlgorithm] = make(map[event]state)
sm.Transitions[receivingAlgorithm][algorithmReceived] = receivingData

sm.Transitions[receivingData] = make(map[event]state)
sm.Transitions[receivingData][dataReceived] = running
Expand Down
4 changes: 2 additions & 2 deletions agent/state_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions agent/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ func TestStateMachineTransitions(t *testing.T) {
event event
expected state
}{
{idle, start, receivingManifests},
{receivingManifests, manifestsReceived, receivingAlgorithms},
{receivingAlgorithms, algorithmsReceived, receivingData},
{idle, start, receivingManifest},
{receivingManifest, manifestReceived, receivingAlgorithm},
{receivingAlgorithm, algorithmReceived, receivingData},
{receivingData, dataReceived, running},
{running, runComplete, resultsReady},
{resultsReady, resultsConsumed, complete},
Expand Down
4 changes: 2 additions & 2 deletions internal/server/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type serviceRegister func(srv *grpc.Server)

var _ server.Server = (*Server)(nil)

func New(ctx context.Context, cancel context.CancelFunc, name string, config server.Config, registerService serviceRegister, logger *slog.Logger, agent *agent.Service) server.Server {
func New(ctx context.Context, cancel context.CancelFunc, name string, config server.Config, registerService serviceRegister, logger *slog.Logger, agentSvc *agent.Service) server.Server {
listenFullAddress := fmt.Sprintf("%s:%s", config.Host, config.Port)
return &Server{
BaseServer: server.BaseServer{
Expand All @@ -65,7 +65,7 @@ func New(ctx context.Context, cancel context.CancelFunc, name string, config ser
Logger: logger,
},
registerService: registerService,
agent: agent,
agent: agentSvc,
}
}

Expand Down
6 changes: 3 additions & 3 deletions manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,10 @@ MANAGER_QEMU_KERNEL_HASH=true \

### Verifying VM launch

NB: To verify that the manager successfully launched the VM, you need to open three terminals on the same machine. In one terminal, you need to launch the Manager test server by executing (with the environment variables of choice):
NB: To verify that the manager successfully launched the VM, you need to open three terminals on the same machine. In one terminal, you need to launch the computations server by executing (with the environment variables of choice):

```bash
go run ./test/manager-server/main.go
go run ./test/computations/main.go <dataset path> <algo path>
```

and in the second the manager by executing (with the environment variables of choice):
Expand All @@ -217,7 +217,7 @@ and in the second the manager by executing (with the environment variables of ch
go run ./cmd/manager/main.go
```

Ensure that the Manager can connect to the Manager test server by setting the MANAGER_GRPC_PORT with the port value of the Manager test server. The Manager test server is listening on the default value of the MANAGER_GRPC_PORT. In the last one, you can run the verification commands.
Ensure that the Manager can connect to the Manager test server by setting the MANAGER_GRPC_PORT with the port value of the Manager test server. In the last terminal, you can run the verification commands.

To verify that the manager launched the VM successfully, run the following command:

Expand Down
2 changes: 1 addition & 1 deletion manager/manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ message ComputationRunReq {
string name = 2;
string description = 3;
repeated Dataset datasets = 4;
repeated Algorithm algorithms = 5;
Algorithm algorithm = 5;
repeated string result_consumers = 6;
AgentConfig agent_config = 7;
}
Expand Down
9 changes: 2 additions & 7 deletions manager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,8 @@ func (ms *managerService) Run(ctx context.Context, c *manager.ComputationRunReq)
LogLevel: c.AgentConfig.LogLevel,
},
}
for _, algo := range c.Algorithms {
if len(algo.Hash) != hashLength {
ms.publishEvent("vm-provision", c.Id, "failed", json.RawMessage{})
return "", errInvalidHashLength
}
ac.Algorithms = append(ac.Algorithms, agent.Algorithm{ID: algo.Id, Provider: algo.Provider, Hash: [hashLength]byte(algo.Hash)})
}
ac.Algorithm = agent.Algorithm{ID: c.Algorithm.Id, Provider: c.Algorithm.Provider, Hash: [hashLength]byte(c.Algorithm.Hash)}

for _, data := range c.Datasets {
if len(data.Hash) != hashLength {
ms.publishEvent("vm-provision", c.Id, "failed", json.RawMessage{})
Expand Down
Loading

0 comments on commit 64f7e7f

Please sign in to comment.