Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for the configuration of the size of the fetch queue and workers #120

Merged
merged 3 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/mercury/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func main() {
// Populate the manifest with the contents of the config file
manifest.Populate(config.Feeds)
if !*flags.NoFetch {
manifest.Prime(config.Cache, config.Timeout.Duration)
manifest.Prime(config.Cache, config.Timeout.Duration, config.Parallelism, config.JobQueueDepth)
}
if err := manifest.Save(manifestPath); err != nil {
log.Fatal(err)
Expand Down
40 changes: 27 additions & 13 deletions internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io/fs"
"os"
"path/filepath"
"runtime"

"github.com/BurntSushi/toml"
"github.com/kgaughan/mercury/internal/manifest"
Expand All @@ -14,19 +15,21 @@ import (

// Config describes our configuration.
type Config struct {
Name string `toml:"name"`
URL string `toml:"url"`
Owner string `toml:"owner"`
Email string `toml:"email"`
FeedID string `toml:"feed_id"`
Cache string `toml:"cache"`
Timeout utils.Duration `toml:"timeout"`
themePath string `toml:"theme"`
Theme fs.FS `toml:"-"`
Output string `toml:"output"`
Feeds []manifest.Feed `toml:"feed"`
ItemsPerPage int `toml:"items"`
MaxPages int `toml:"max_pages"`
Name string `toml:"name"`
URL string `toml:"url"`
Owner string `toml:"owner"`
Email string `toml:"email"`
FeedID string `toml:"feed_id"`
Cache string `toml:"cache"`
Timeout utils.Duration `toml:"timeout"`
themePath string `toml:"theme"`
Theme fs.FS `toml:"-"`
Output string `toml:"output"`
Feeds []manifest.Feed `toml:"feed"`
ItemsPerPage int `toml:"items"`
MaxPages int `toml:"max_pages"`
JobQueueDepth int `toml:"job_queue_depth"`
Parallelism int `toml:"parallelism"`
}

// Load loads our configuration file.
Expand All @@ -37,6 +40,9 @@ func (c *Config) Load(path string) error {
c.Output = "./output"
c.ItemsPerPage = 10
c.MaxPages = 5
// These are both somewhat arbitrary
c.JobQueueDepth = 2 * runtime.NumCPU()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): Consider adding upper bounds to JobQueueDepth and Parallelism to prevent resource exhaustion on many-CPU systems

While lower bounds are enforced, unbounded values on many-CPU systems could lead to excessive resource usage. Consider adding reasonable upper limits like math.Min(runtime.NumCPU(), 32) for Parallelism.

Suggested implementation:

	"math"
	"github.com/BurntSushi/toml"
	// Cap values to prevent resource exhaustion on many-CPU systems
	maxCPUs := 32 // Maximum reasonable number of CPUs to utilize
	c.JobQueueDepth = int(math.Min(float64(2*runtime.NumCPU()), float64(2*maxCPUs)))
	c.Parallelism = int(math.Min(float64(runtime.NumCPU()), float64(maxCPUs)))

c.Parallelism = runtime.NumCPU()

if _, err := toml.DecodeFile(path, c); err != nil {
return fmt.Errorf("cannot load configuration: %w", err)
Expand All @@ -47,6 +53,14 @@ func (c *Config) Load(path string) error {
return fmt.Errorf("cannot normalize configuration path: %w", err)
}

// Enforce some sensible lower bounds on feed fetching parallelism
if c.Parallelism < 1 {
c.Parallelism = 1
}
if c.JobQueueDepth < c.Parallelism {
c.JobQueueDepth = c.Parallelism
}

c.Cache = filepath.Join(configDir, c.Cache)
if c.themePath == "" {
c.Theme = dflt.Theme
Expand Down
10 changes: 4 additions & 6 deletions internal/manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"log"
"os"
"runtime"
"sync"
"time"

Expand Down Expand Up @@ -56,13 +55,11 @@ func (m *Manifest) Save(path string) error {
return nil
}

func (m *Manifest) Prime(cache string, timeout time.Duration) {
func (m *Manifest) Prime(cache string, timeout time.Duration, parallelism, jobQueueDepth int) {
var wg sync.WaitGroup
jobs := make(chan *fetchJob, jobQueueDepth)

// The channel depth is kind of arbitrary.
jobs := make(chan *fetchJob, 2*runtime.NumCPU())

for i := 0; i < runtime.NumCPU(); i++ {
for i := 0; i < parallelism; i++ {
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -80,6 +77,7 @@ func (m *Manifest) Prime(cache string, timeout time.Duration) {
Item: item,
}
}

close(jobs)
wg.Wait()
}
Loading