Know how many items are in stream #352
-
My top-level question is: if I have a My use case is this. I've got a bunch of jobs to launch on an external service. At the start of my script I load them all (synchronously) into my stream. Then I start an async worker that pulls jobs from the stream and launches them. I have a second worker that listens for status updates from an external queue and takes some actions based on that, one of which could be to re-queue a job if I need to retry it. For that reason I can't just say in my launch worker anything like "hey, just What I'm trying to work out is how to know I'm done. My launch worker will just keep pulling and launching jobs until there aren't any more, then it blocks waiting. My status update worker will keep listening for updates until there aren't any more, then it blocks waiting. I am also keeping track of the number of jobs "in flight", by which I mean I've launched them and they aren't in a terminal state. What I was thinking is having a third worker just checking if we are done. It could say "is the number of in-flight jobs == 0? and is the number of jobs left to launch == 0? If so, we are done and we can cancel the workers." But that gets me to my original question: I don't know how to get that "number of jobs left to launch" from the stream. I could check Alternatively I could also keep a counter for |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 4 replies
-
Yes, if either
How about just closing the sending side? |
Beta Was this translation helpful? Give feedback.
-
Doesn't the stream being "empty" mean that there are no items queued to be delivered? If so, then
When you create a memory object stream, you define the maximum buffer size. If a task tries to send an object to the stream and there are no tasks waiting to receive, the object goes to the buffer if there are free slots in the stream. In such cases, the |
Beta Was this translation helpful? Give feedback.
Doesn't the stream being "empty" mean that there are no items queued to be delivered? If so, then
tasks_waiting_receive
is a non-factor.When you create a memory object stream, you define the maximum buffer size. If a task tries to send an object to the stream and there are no tasks waiting to receive, the object goes to the buffer if there are free slots in the stream. In such cases, the
send()
method returns immediately without blocking. Thecurrent_buffer_used
statistics tells you how many items there are in the buffer waiting for tasks to …