Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aci multi part upload pool #91

Open
wants to merge 16 commits into
base: master
Choose a base branch
from

Conversation

rescuerunaways
Copy link
Contributor

No description provided.


for i := 0; i < chunkCount; i++ {
chunkStart := int64(i) * int64(chunkSize)
chunkEnd := int64(chunkSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The end of the chunk is at chunkStart + chunkSize or chunkStart + lastChunkSize. I see you're actually passing the size of it, so let's just call it currentChunkSize or similar

ChunkSize int64
ETag string
}, chunkCount)
errors := make(chan error, chunkCount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need a chunkCount capacity channel for this, just have one channel and at the end try to select from it to chek if there's been any errors.

Also, cancel the context at the first error so that other workers stop downloading other chunks if their work will be discarded anyways

chunkData, err := io.ReadAll(io.NewSectionReader(file, job.ChunkStart, job.ChunkSize))
if err != nil {
errors <- fmt.Errorf("read chunk %d: %s", job.ChunkNumber, err)
wg.Done()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use defer() instead of adding this to all lines

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even better yet - use the WaitGroup only for synchronizing the goroutines, since we're waiting on the goroutines, they can handle their own shutdown when there are no more items in the channel. So like

var wg sync.WaitGroup

ctx, cancelFunc := context.WithCancel(context.Background()) // Or if we already have a parent, use that
ch := make(chan int, chunkCount)
errCh := make(chan error)
for i := 0; i < chunkCount; i++ {
	ch <- job
}
close(ch)
for i := 0; i < 10; i++ {
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			select {
			case job, ok := <-ch:
				if !ok {
					return
				}
				if err := uploadChunk(job); err != nil {
					cancelFunc()
					return
				}
			case <-ctx.Done():
			default:
				return
			}
		}
	}()
}
wg.Wait() // Wait on workers

wg := sync.WaitGroup{}

worker := func() {
for job := range jobs {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a cancelable context and check if it's cancelled to propagate cancellation (and to support error handling - see above).

For this, you'll need to change the range to a select: the context cancellation or the closure of the channel should end the goroutine.

@rescuerunaways rescuerunaways marked this pull request as ready for review December 4, 2023 07:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants