Skip to content

Commit

Permalink
lang: interfaces, funcs: Port Func API to new Stream signature
Browse files Browse the repository at this point in the history
This removes the `Close() error` and replaces it with a more modern
Stream api that takes a context. This removes boilerplate and makes
integration with concurrent code easier. The only downside is that there
isn't an explicit cleanup step, but only one function was even using
that and it was possible to switch it to a defer in Stream.
  • Loading branch information
purpleidea committed Sep 19, 2023
1 parent 991ac1a commit 9bfd610
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 33 deletions.
14 changes: 1 addition & 13 deletions lang/funcs/dage/dage.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,6 @@ func (obj *Engine) deleteVertex(f interfaces.Func) error {
// lookup when we're in an unlocked state.
delete(obj.state, f)
obj.graph.DeleteVertex(f)
//return f.Close() // XXX: change api to close with ctx instead.
return nil
}

Expand Down Expand Up @@ -364,7 +363,6 @@ Loop:

rollback := func() {
for f := range status {
//f.Close() // shutdown if necessary
obj.graph.DeleteVertex(f)
delete(obj.state, f)
}
Expand Down Expand Up @@ -671,8 +669,7 @@ func (obj *Engine) Run(ctx context.Context) error {
if obj.Debug {
obj.SafeLogf("Running func `%s`", node)
}
//runErr := f.Stream(node.ctx) // XXX: new API!
runErr := f.Stream()
runErr := f.Stream(node.ctx)
if obj.Debug {
obj.SafeLogf("Exiting func `%s`", node)
}
Expand All @@ -688,15 +685,6 @@ func (obj *Engine) Run(ctx context.Context) error {
// if node never loaded, then we error in the node.output loop!
}(f, node)

node.wg.Add(1)
go func(f interfaces.Func, node *state) {
defer node.wg.Done()
select {
case <-ctx.Done():
}
_ = f.Close() // XXX: change api to close with ctx instead.
}(f, node)

// process events
obj.wgAg.Add(1)
node.wg.Add(1)
Expand Down
19 changes: 5 additions & 14 deletions lang/funcs/dage/dage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ type testFunc struct {
Func func(types.Value) (types.Value, error)
Meta *meta

value types.Value
init *interfaces.Init
closeChan chan struct{}
value types.Value
init *interfaces.Init
}

func (obj *testFunc) String() string { return obj.Name }
Expand All @@ -60,12 +59,10 @@ func (obj *testFunc) Validate() error {

func (obj *testFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}

// XXX: replace with ctx closer
func (obj *testFunc) Stream() error {
func (obj *testFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
defer obj.init.Logf("stream closed")
obj.init.Logf("stream startup")
Expand Down Expand Up @@ -104,7 +101,7 @@ func (obj *testFunc) Stream() error {
}
}

case <-obj.closeChan:
case <-ctx.Done():
return nil
}

Expand All @@ -125,18 +122,12 @@ func (obj *testFunc) Stream() error {
}
}()

case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}

// XXX: replace with ctx closer
func (obj *testFunc) Close() error {
close(obj.closeChan)
return nil
}

type meta struct {
EventCount int
Event chan struct{}
Expand Down
12 changes: 6 additions & 6 deletions lang/funcs/dage/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package dage

import (
"context"
"fmt"
"sync"
"testing"
Expand Down Expand Up @@ -105,12 +106,11 @@ type testNullFunc struct {
name string
}

func (obj *testNullFunc) String() string { return obj.name }
func (obj *testNullFunc) Info() *interfaces.Info { return nil }
func (obj *testNullFunc) Validate() error { return nil }
func (obj *testNullFunc) Init(*interfaces.Init) error { return nil }
func (obj *testNullFunc) Stream() error { return nil }
func (obj *testNullFunc) Close() error { return nil }
func (obj *testNullFunc) String() string { return obj.name }
func (obj *testNullFunc) Info() *interfaces.Info { return nil }
func (obj *testNullFunc) Validate() error { return nil }
func (obj *testNullFunc) Init(*interfaces.Init) error { return nil }
func (obj *testNullFunc) Stream(context.Context) error { return nil }

func TestTxn1(t *testing.T) {
graph, err := pgraph.NewGraph("test")
Expand Down

0 comments on commit 9bfd610

Please sign in to comment.