Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add posibility to attach ReceiverLink delayed in code & createReceiverLink promise will resolve immediatelly #311

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

misak113
Copy link

@misak113 misak113 commented Apr 10, 2017

  • it is feature when you want to accept all messages even that it already waiting in queue. Otherwise the immediatelly attach will invoke receiving messages yarlier then event emmitter on('messasge'... can be registered.
  • it has result that first messages are overlooked & never be consumed

This change is Reviewable

…rLink promise will resolve immediatelly

* it is feature when you want to accept all messages even that it already waiting in queue. Otherwise the immediatelly attach will invoke receiving messages yarlier then event emmitter on('messasge'... can be registered.
* it has result that first messages are overlooked & never be consumed
@mbroadst
Copy link
Collaborator

@misak113 can you explain in more detail what this is attempting to accomplish? The reason the attachPromise is pushed to _onAttach is that we don't want to resolve the promise until the link has actually been attached (in order to ensure order)

@misak113
Copy link
Author

misak113 commented Apr 15, 2017

I found the problem, when starts consuming from queue (RabbitMQ) where already exists some messages waiting for consumption.
Immediately after createReceiver (internally) all messages are consumed however Promise with created receiverLink is resolved (returned) after it. So it is not possible to subscribe EventEmitter for "message" before all messages in queue are consumed.
I'm using RabbitMQ with enabled plugin for AMQP 1.0 protocol version.

I try to show it in code:

export async function bindOne<TPayload extends IEventPayload>(
  client: Client, eventType: string,
  onEvent: (event: IEvent<TPayload>) => Promise<void>
) {
  const queueName = QUEUE_NAME_PREFIX + eventType;

  // THIS part is problematic, because after resolving this line are messages
  // already consumed (emitted to receiverLink EventEmitter)
  const receiver = await client.createReceiver(queueName);

  // So this line would consume only new messages appeared in queue.
  // All consumed messages are kept as not acked messages in queue & will be
  // released after receiver is destroyed.
  receiver.on('message', async (message: Message) => {
    try {
      const event = message.body;
      await onEvent(event);
      receiver.accept(message);
    } catch (error) {
      receiver.reject(message);
      throw error;
    }
  });
	
  receiver.on('errorReceived', (error: Error) => console.error(error));
  // this line is necessary to be added if I apply this fixing commit & use receiver
  // policy with manually attach
  receiver.attach();
}

@vIceBerg
Copy link

Hi!

What about this change? I face the same issue and would like it to be resolved....

@amarzavery
Copy link
Collaborator

@vIceBerg - This repository is not being actively maintained. I suggest using rhea.

@vIceBerg
Copy link

Hi!

I know... I'm waiting for your Pull Request on Azure SDK for Service Bus using AMQP... Meanwhile, I try to find some solutions to temporarily patch my problems... This was one of them.

I ended up forking the repo and copy/pasted the changes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants