Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using Observable with Batches #87

Open
lukepolo opened this issue Dec 3, 2024 · 4 comments
Open

Using Observable with Batches #87

lukepolo opened this issue Dec 3, 2024 · 4 comments

Comments

@lukepolo
Copy link

lukepolo commented Dec 3, 2024

Is this possible? I see that manually processing jobs is a limitation , though this is kinda a mixture,

Currently getting[TimeoutError]: Timeout has occurred

this.worker = new WorkerPro(
      this.queueName,
      async (batch, token) => {
        return new Observable((subscriber) => {
          async function processJob(job: JobPro) {
            await job.getChildrenValues().then((childrenValues) => {
              callback(this.convertToJob(job, token), childrenValues)
                .then((data) => {
                  subscriber.next(data);
                  subscriber.complete();
                })
                .catch(async (error) => {
                  subscriber.error(error);
                });
            });
          }

          if(!workerOptions.batch?.size) {
             return processJob.bind(this)(batch);
          }

          return Promise.all(batch.getBatch().map(processJob.bind(this)));
        });
      },
      workerOptions,
    )
      .on("error", (error) => {
        this.handleError(BullMQErrorTypes.Worker, error);
      })
      .on("completed", async (batch: JobPro, result) => {
        if(!workerOptions.batch?.size) {
          return successCallback(this.convertToJob(batch), result);
        }

        for(const job of batch.getBatch()) {
          await successCallback(this.convertToJob(job), result);
        }
      })
      .on("failed", async (batch: JobPro, error) => {
        if(!workerOptions.batch?.size) {
          return failedCallback(this.convertToJob(batch), error);
        }

        for(const job of batch.getBatch()) {
          await failedCallback(this.convertToJob(job), error);
        }
      });
@lukepolo
Copy link
Author

lukepolo commented Dec 3, 2024

ok I found its my TTL , i setup a TTL for all my jobs, but the wrapper didn't have one , working on how to add it to to the batch

@lukepolo lukepolo closed this as completed Dec 3, 2024
@lukepolo lukepolo reopened this Dec 3, 2024
@lukepolo
Copy link
Author

lukepolo commented Dec 3, 2024

Ok after fixing the TTL it seems now if i have a batch larger than 1 it fails to secure the lock

QueueWorker <error> Unable to process job (HelloJob): Error: Missing lock for job 2538,2539,2540,2541,2542,2543,2544,2545,2546,2547. storeResult
    at QueueEventsPro.<anonymous> (/Users/luke/code/work/framework/qx-microservice-template/dist/app.js:6494:64)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5) +243ms

@manast
Copy link
Contributor

manast commented Dec 5, 2024

I think we do not have proper tests for this case, so we need to do that in order to guarantee the correct behaviour.

@lukepolo
Copy link
Author

lukepolo commented Dec 5, 2024

Gotcha for now if we enable batches for the worker ill ignore the TTL

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants