Skip to content

Commit

Permalink
Merge pull request #11 from getoutreach/async
Browse files Browse the repository at this point in the history
feat: async contract
  • Loading branch information
pavelsmejkal authored Oct 15, 2024
2 parents b2b3020 + cbcf5c6 commit c276356
Show file tree
Hide file tree
Showing 29 changed files with 841 additions and 209 deletions.
39 changes: 29 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@

A library to manage application dependency graph and orchestrate service tasks.

<img src="plumber.png" width="300"/>

## Contributing

Please read the [CONTRIBUTING.md](CONTRIBUTING.md) document for guidelines on developing and contributing changes.

## High-level Overview

### Example

For a comprehensive example of a plumber usage please navigate to [example folder](/_user_/_project_/tree/_branch_/example/cmd/example).

### Service dependency management

Simple but effective dependency management that is focused on readability. Main goal is to get rid off the repetitive error checking during the construction.
Expand Down Expand Up @@ -94,7 +100,7 @@ Also when doing graceful shutdown the tasks needs to be closed in reversed order
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

signaler := plumber.NewErrorSignaler()
signal := plumber.NewSignal()

err := plumber.Start(ctx,
// Serial pipeline. Task are started sequentially and closed in reverse order.
Expand All @@ -103,8 +109,7 @@ err := plumber.Start(ctx,
fmt.Println("pipeline is closing")
return nil
}),
plumber.GracefulRunner(func(ctx context.Context, ready plumber.ReadyFunc) error {
ready()
plumber.GracefulRunner(func(ctx context.Context) error {
fmt.Println("Task 1 starting")
<-ctx.Done()
return nil
Expand All @@ -114,12 +119,28 @@ err := plumber.Start(ctx,
}),
// The parallel pipeline all task are stared and closed in parallel.
plumber.Parallel(
plumber.SimpleRunner(func(ctx context.Context) error {
fmt.Println("Task 2 starting")
<-ctx.Done()
// Runner that implements Runner, Readier, Closeable
plumber.NewRunner(
func(ctx context.Context) error {
go func() {
time.Sleep(1 * time.Second)
fmt.Println("Task 2 is ready")
signal.Notify()
}()
fmt.Println("Task 2 starting")
<-ctx.Done()
return nil
},
plumber.WithClose(func(ctx context.Context) error {
fmt.Println("Task 2 closing")
return nil
}),
plumber.WithReady(signal),
)
plumber.NewRunner(func(ctx context.Context) error {
return nil
}),
plumber.SimpleRunner(func(ctx context.Context) error {
plumber.NewRunner(func(ctx context.Context) error {
fmt.Println("Task 3 starting")
<-ctx.Done()
return nil
Expand All @@ -146,7 +167,7 @@ err := plumber.Start(ctx,
// Dependency graph based runner
&a.D4,
&a.HTTP.Server,
).With(plumber.Signaler(signaler)),
),
// The pipeline needs to finish startup phase within 30 seconds. If not, run context is canceled. Close is initiated.
plumber.Readiness(30*time.Second),
// The pipeline needs to gracefully close with 120 seconds. If not, internal run and close contexts are canceled.
Expand All @@ -155,7 +176,5 @@ err := plumber.Start(ctx,
plumber.TTL(120*time.Second),
// When given signals will be received pipeline will be closed gracefully.
plumber.SignalCloser(),
// When some tasks covered with signaler reports and error pipeline will be closed.
plumber.Closing(signaler),
)
```
45 changes: 3 additions & 42 deletions closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ func (o *Options) Close() {
// Option type pattern for a Start method
type Option func(*Options)

// Readiness allows to limit time for a RunnerCloser to return from the Run method
// Readiness allows to limit time for a Closer to return from the Run method
// When duration is reached the run context is automatically canceled and the Close method is invoked
func Readiness(d time.Duration) Option {
return func(*Options) {}
}

// CloseTimeout allows to limit time for a RunnerCloser to return from the Close method
// CloseTimeout allows to limit time for a Closer to return from the Close method
// When duration is reached the close context is automatically canceled
func CloseTimeout(d time.Duration) Option {
return func(o *Options) {
Expand Down Expand Up @@ -105,7 +105,7 @@ func CloserFunc(func(close func())) Option {
return func(*Options) {}
}

// ContextCloser closes the RunnerCloser based on given context.
// ContextCloser closes the Closer based on given context.
// When given context is ended the closer invokes the Close method.
// It MUST be used with separate or detached context. See DetachCancellation
func ContextCloser(detachedCtx context.Context) Option {
Expand All @@ -121,42 +121,3 @@ func ContextCloser(detachedCtx context.Context) Option {
})
}
}

func Closing(s *ErrorSignaler) Option {
return func(o *Options) {
s.Listen(func(err error) {
o.Close()
})
}
}

func Canceling(s *ErrorSignaler) Option {
return func(o *Options) {
s.Listen(func(err error) {
o.Cancel()
})
}
}

// ErrorSignaler is a struct providing notifications when task is closed with error.
// Signalers are used usually in combination with Closing | Canceling closers.
type ErrorSignaler struct {
listeners []func(error)
}

// NewErrorSignaler return a new instance of a error signaler
func NewErrorSignaler() *ErrorSignaler {
return &ErrorSignaler{}
}

// Signal broadcasts given error to all registered listeners
func (s *ErrorSignaler) Signal(err error) {
for _, l := range s.listeners {
l(err)
}
}

// Listen registers a listener into signaler
func (s *ErrorSignaler) Listen(listener func(err error)) {
s.listeners = append(s.listeners, listener)
}
2 changes: 1 addition & 1 deletion container.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (c *Container) CleanupError(fn func() error) {
c.cleanup = append(c.cleanup, fn)
}

// Close calls all clenaup functions
// Close calls all cleanup functions
func (c *Container) Close() error {
errs := []error{}
for _, cleanup := range c.cleanup {
Expand Down
31 changes: 31 additions & 0 deletions example/adapter/async/async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2024 Outreach Corporation. All Rights Reserved.
// Description: async infra for example application

// Package async provides async infra for example application
package async

import (
"context"
"fmt"

"github.com/getoutreach/plumber"
"github.com/getoutreach/plumber/example/contract"
)

// Publisher service
type Publisher struct {
*plumber.BaseLooper
broker string
}

func NewPublisher(broker string) *Publisher {
return &Publisher{
broker: broker,
BaseLooper: contract.NewWorker("async.Publisher"),
}
}

func (p *Publisher) Publish(ctx context.Context, e *contract.Entity) error {
fmt.Printf("Publishing entity #%v name=%s\n", e.ID, e.Name)
return nil
}
64 changes: 64 additions & 0 deletions example/adapter/database/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2024 Outreach Corporation. All Rights Reserved.
// Description: database infra for example application

// Package database provides database infra for example application
package database

import (
"context"
"fmt"
"sync/atomic"

"github.com/getoutreach/plumber"
"github.com/getoutreach/plumber/example/contract"
)

// EntityMask provides a mask to format an entity
const EntityMask = "Entity %v"

// Repository represents a plain database repository
type Repository struct {
id int64
}

func NewRepository() (*Repository, error) {
return &Repository{}, nil
}

func (s *Repository) Get(ctx context.Context, id int64) (*contract.Entity, error) {
return &contract.Entity{
ID: id,
Name: fmt.Sprintf(EntityMask, id),
}, nil
}

func (s *Repository) Create(ctx context.Context, name string) (*contract.Entity, error) {
nextID := atomic.AddInt64(&s.id, 1)
return &contract.Entity{
ID: nextID,
Name: fmt.Sprintf(EntityMask, nextID),
}, nil
}

// BatchingRepository represents a database repository that can batch single entity reads into batch query
type BatchingRepository struct {
*plumber.BaseLooper
inner contract.Repository
}

func NewBatchingRepository(inner contract.Repository, batchSize int) (*BatchingRepository, error) {
r := &BatchingRepository{
inner: inner,
BaseLooper: contract.NewWorker("database.BatchingRepository"),
}
return r, nil
}

func (s *BatchingRepository) Get(ctx context.Context, id int64) (*contract.Entity, error) {
// Here suppose to be a logic that batches requests into a single batch query
return s.inner.Get(ctx, id)
}

func (s *BatchingRepository) Create(ctx context.Context, name string) (*contract.Entity, error) {
return s.inner.Create(ctx, name)
}
33 changes: 33 additions & 0 deletions example/adapter/graphql/graphql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Outreach Corporation. All Rights Reserved.
// Description: graphql infra for example application

// Package graphql provides graphql infra for example application
package graphql

import (
"github.com/getoutreach/plumber"
"github.com/getoutreach/plumber/example/contract"
"github.com/getoutreach/plumber/example/service"
)

// Server represents a graphql server
type Server struct {
*plumber.BaseLooper
port int32
querier *service.QueryService
mutator contract.MutatorService
}

// NewServer returns intance of the *Server
func NewServer(
port int32,
querier *service.QueryService,
mutator contract.MutatorService,
) (*Server, error) {
return &Server{
port: port,
querier: querier,
mutator: mutator,
BaseLooper: contract.NewWorker("graphql.Server"),
}, nil
}
33 changes: 33 additions & 0 deletions example/adapter/grpc/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Outreach Corporation. All Rights Reserved.
// Description: grpc infra for example application

// Package grpc provides grpc infra for example application
package grpc

import (
"github.com/getoutreach/plumber"
"github.com/getoutreach/plumber/example/contract"
"github.com/getoutreach/plumber/example/service"
)

// Server represents a grpc server
type Server struct {
*plumber.BaseLooper
port int32
querier *service.QueryService
mutator contract.MutatorService
}

// NewServer returns intance of the *Server
func NewServer(
port int32,
querier *service.QueryService,
mutator contract.MutatorService,
) (*Server, error) {
return &Server{
port: port,
querier: querier,
mutator: mutator,
BaseLooper: contract.NewWorker("grpc.Server"),
}, nil
}
41 changes: 41 additions & 0 deletions example/application.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2024 Outreach Corporation. All Rights Reserved.
// Description: application root dependency container
package example

import (
"context"

"github.com/getoutreach/plumber"
)

// Config represents a application configuration structure
type Config struct {
AsyncBroker string
}

// Definer allows to redefine container on startup
type Definer = func(ctx context.Context, cf *Config, a *Container)

// Container represents root application dependency container
type Container struct {
plumber.Container
Async *Async
Database *Database
GraphQL *GraphQL
GRPC *GRPC
Service *Service
}

// NewApplication returns instance of the root dependency container
func NewApplication(ctx context.Context, cf *Config, definers ...Definer) *Container {
a := &Container{
GRPC: new(GRPC),
Database: new(Database),
GraphQL: new(GraphQL),
Service: new(Service),
Async: new(Async),
}
return plumber.DefineContainers(ctx, cf, definers, a,
a.Async, a.Database, a.GRPC, a.GraphQL, a.Service,
)
}
22 changes: 22 additions & 0 deletions example/application_async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2024 Outreach Corporation. All Rights Reserved.
// Description: async related dependencies
package example

import (
"context"

"github.com/getoutreach/plumber"
"github.com/getoutreach/plumber/example/adapter/async"
)

// Async service represents async processing related dependency container
type Async struct {
Publisher plumber.R[*async.Publisher]
}

// Define resolves dependencies
func (c *Async) Define(ctx context.Context, cf *Config, a *Container) {
c.Publisher.Define(func() *async.Publisher {
return async.NewPublisher(cf.AsyncBroker)
})
}
Loading

0 comments on commit c276356

Please sign in to comment.