Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor pool (close #247) #250

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 85 additions & 16 deletions control/pool/dependency.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,98 @@
package pool

import (
"context"
"time"

"github.com/DoNewsCode/core/config"
"github.com/DoNewsCode/core/contract"
"github.com/DoNewsCode/core/contract/lifecycle"
"github.com/DoNewsCode/core/di"
)

// Providers provide a *pool.Pool to the core.
func Providers(options ...ProviderOptionFunc) di.Deps {
return di.Deps{func() *Pool {
return NewPool(options...)
}}
func Providers() di.Deps {
return di.Deps{
provideDefaultPool,
providePoolFactory(),
di.Bind(new(*Factory), new(Maker)),
}
}

// ProviderOptionFunc is the functional option to Providers.
type ProviderOptionFunc func(pool *Pool)
func provideDefaultPool(maker Maker) (*Pool, error) {
return maker.Make("default")
}

// WithConcurrency sets the maximum concurrency for the pool.
func WithConcurrency(concurrency int) ProviderOptionFunc {
return func(pool *Pool) {
pool.concurrency = concurrency
}
// factoryIn is the injection parameter for provideDatabaseOut.
type factoryIn struct {
di.In

Conf contract.ConfigUnmarshaler
OnReloadEvent lifecycle.ConfigReload `optional:"true"`
Counter *Counter `optional:"true"`
}

type poolConfig struct {
// Cap is the maximum number of tasks. If it exceeds the maximum number, new workers will be added automatically.
// Default is 10.
Cap int32 `yaml:"cap" json:"cap"`
// Concurrency limits the maximum number of workers.
// Default is 1000.
Concurrency int32 `yaml:"concurrency" json:"concurrency"`
// IdleTimeout idle workers will be recycled after this duration.
// Default is 10 minutes.
IdleTimeout config.Duration `yaml:"idle_timeout" json:"idle_timeout"`
}

// WithCounter sets the counter for the pool.
func WithCounter(counter *Counter) ProviderOptionFunc {
return func(pool *Pool) {
pool.counter = counter
// out
type out struct {
di.Out

Factory *Factory
}

func providePoolFactory() func(p factoryIn) (out, func(), error) {
ctx, cancel := context.WithCancel(context.Background())
var worker = &worker{
ch: make(chan job),
incWorkerChan: make(chan int32),
cap: 10,
timeout: 10 * time.Minute,
}
return func(factoryIn factoryIn) (out, func(), error) {
factory := di.NewFactory[*Pool](func(name string) (pair di.Pair[*Pool], err error) {

pool := &Pool{
ch: worker.ch,
incJobCountFunc: worker.incJobCount,
}
if factoryIn.Counter != nil {
pool.counter = factoryIn.Counter.PoolName(name)
}

return di.Pair[*Pool]{
Conn: pool,
Closer: nil,
}, err
})
var (
conf poolConfig
)
_ = factoryIn.Conf.Unmarshal("pool", &conf)
if conf.Cap > 0 {
worker.cap = conf.Cap
}
if conf.Concurrency > 0 {
worker.concurrency = conf.Concurrency
}
if !conf.IdleTimeout.IsZero() {
worker.timeout = conf.IdleTimeout.Duration
}
worker.run(ctx)
return out{
Factory: factory,
}, func() {
cancel()
factory.Close()
}, nil
}
}
2 changes: 1 addition & 1 deletion control/pool/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func Example() {
core.WithInline("http.addr", ":9777"),
core.WithInline("log.level", "none"),
)
c.Provide(pool.Providers(pool.WithConcurrency(1)))
c.Provide(pool.Providers())

c.Invoke(func(p *pool.Pool, dispatcher lifecycle.HTTPServerStart) {
dispatcher.On(func(ctx context.Context, payload lifecycle.HTTPServerStartPayload) error {
Expand Down
13 changes: 13 additions & 0 deletions control/pool/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package pool

import (
"github.com/DoNewsCode/core/di"
)

// Maker models Factory
type Maker interface {
Make(name string) (*Pool, error)
}

// Factory is the *di.Factory that creates *Pool.
type Factory = di.Factory[*Pool]
66 changes: 9 additions & 57 deletions control/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// Let's go through all the disadvantages. First, the backpressure is lost.
// There is no way to limit the maximum goroutine the handler can create. clients
// can easily flood the server. Secondly, the graceful shutdown process is
// ruined. The http server can shutdown itself without losing any request, but
// ruined. The http server can shut down itself without losing any request, but
// the async jobs created with "go" are not protected by the server. You will
// lose all unfinished jobs once the server shuts down and program exits. lastly,
// the async job may want to access the original request context, maybe for
Expand Down Expand Up @@ -44,25 +44,10 @@ package pool

import (
"context"
"sync"

"github.com/DoNewsCode/core/ctxmeta"

"github.com/oklog/run"
)

// NewPool returned func(contract.Dispatcher) *Pool
func NewPool(options ...ProviderOptionFunc) *Pool {
pool := Pool{
ch: make(chan job),
concurrency: 10,
}
for _, f := range options {
f(&pool)
}
return &pool
}

type job struct {
fn func()
}
Expand All @@ -71,25 +56,9 @@ type job struct {
// web servers. See the package documentation about its advantage over creating a
// goroutine directly.
type Pool struct {
ch chan job
concurrency int
counter *Counter
}

// ProvideRunGroup implements core.RunProvider
func (p *Pool) ProvideRunGroup(group *run.Group) {
ctx, cancel := context.WithCancel(context.Background())

group.Add(func() error {
return p.Run(ctx)
}, func(err error) {
cancel()
})
}

// Module implements di.Modular
func (p *Pool) Module() interface{} {
return p
counter *Counter
ch chan job
incJobCountFunc func()
}

// Go dispatchers a job to the async worker pool. requestContext is the context
Expand All @@ -103,32 +72,15 @@ func (p *Pool) Go(requestContext context.Context, function func(asyncContext con
function(ctxmeta.WithoutCancel(requestContext))
},
}
if p.incJobCountFunc != nil {
p.incJobCountFunc()
}

select {
case p.ch <- j:
p.counter.IncAsyncJob()
default:
p.counter.IncSyncJob()
j.fn()
}
}

// Run starts the async worker pool and block until it finishes.
func (p *Pool) Run(ctx context.Context) error {
var wg sync.WaitGroup
for i := 0; i < p.concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case j := <-p.ch:
p.counter.IncAsyncJob()
j.fn()
case <-ctx.Done():
return
}
}
}()
}
wg.Wait()
return nil
}
64 changes: 19 additions & 45 deletions control/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,59 +5,33 @@ import (
"testing"
"time"

"github.com/oklog/run"
"github.com/DoNewsCode/core/config"
)

func TestPool_Go(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
p := NewPool(WithConcurrency(1))
go p.Run(ctx)

f, cancel, _ := providePoolFactory()(factoryIn{
Conf: config.MapAdapter{},
})
time.Sleep(time.Millisecond)
p, _ := f.Factory.Make("default")
p.Go(context.Background(), func(asyncContext context.Context) {
cancel()
})

}

func TestPool_FallbackToSyncMode(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
p := NewPool(WithConcurrency(1))
go p.Run(ctx)
time.Sleep(time.Millisecond)

ts := time.Now()
var executed = make(chan struct{})

// saturate the pool
p.Go(ctx, func(asyncContext context.Context) {
time.Sleep(time.Second)
})
// fallback to sync mode
p.Go(ctx, func(asyncContext context.Context) {
close(executed)
})
<-executed
// job channel not be blocked, so the interval should be less than 1 second
if time.Since(ts) >= time.Second {
t.Fatal("timeout: sync mode should be used")
}
cancel()
}

func TestPool_contextValue(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

p := NewPool(WithConcurrency(1))
go p.Run(ctx)
f, cancel, _ := providePoolFactory()(factoryIn{Conf: config.MapAdapter{}})
time.Sleep(time.Millisecond)

p, _ := f.Factory.Make("default")

key := struct{}{}
requestContext := context.WithValue(context.Background(), key, "foo")

execute := make(chan struct{})
p.Go(requestContext, func(asyncContext context.Context) {
if _, ok := asyncContext.Deadline(); ok {
t.Fatalf("asyncContext shouldn't have deadline set")
Expand All @@ -66,15 +40,15 @@ func TestPool_contextValue(t *testing.T) {
if value != "foo" {
t.Fatalf("want foo, got %s", value)
}
cancel()
execute <- struct{}{}
})
<-execute
cancel()
}

func TestPool_ProvideRunGroup(t *testing.T) {
t.Parallel()
p := NewPool(WithConcurrency(1))
var group run.Group
group.Add(func() error { return nil }, func(err error) {})
p.ProvideRunGroup(&group)
group.Run()
func TestPool_Nil_Valid(t *testing.T) {
var p Pool
p.Go(context.Background(), func(asyncContext context.Context) {

})
}
Loading