Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handling grpc._channel._MultiThreadedRendezvous #104

Open
brunoonovais opened this issue Nov 10, 2022 · 6 comments
Open

handling grpc._channel._MultiThreadedRendezvous #104

brunoonovais opened this issue Nov 10, 2022 · 6 comments
Labels
enhancement New feature or request

Comments

@brunoonovais
Copy link
Collaborator

brunoonovais commented Nov 10, 2022

Hi Folks,

On a subscribe_stream call, we may receive the grpc._channel._MultiThreadedRendezvous exception.
It seems that for some reason, this is not caught by the script using pygnmi, but can be caught inside the enqueue_updates call, example:

            try:
                for update in subscription:
                    self._updates.put(update)
            except grpc.RpcError as e:
                print(f"{e = }")

I couldn't figure out why, so opening this case to see if we can fix this and close the channel when such error is seen, otherwise it can't be treated. Example of my code that never hits this exception:

    try:
        with gNMIclient(target=host, username=username, password=password, insecure=True, debug=True) as gc:
            telemetry_stream = gc.subscribe_stream(subscribe=subscriptionlist)
            for telemetry_entry in telemetry_stream:
                log.info(f"telemetry_entry={telemetry_entry}")
    except grpc.RpcError as rpc_error:
        print(f'RPC failed with code {rpc_error.code()}: {rpc_error}')

A simple way to hit this is by subscribing to a path that doesn't exist.

@brunoonovais
Copy link
Collaborator Author

Seems like the issue is that subscribe_stream is running on its own thread.
I wonder if we instead use the concurrent.futures module with a future we'll be able to handle this better by capturing the thread's result.

@akarneliuk
Copy link
Owner

Hey @brunoonovais ,

The idea of running subscribe_stream in a separate thread was done for a purpose of scalability to ensure that multiple threads can co-exist. We can try to look if there will be any benefits of changing the structure.

Best,
Anton

@akarneliuk akarneliuk added the enhancement New feature or request label Nov 13, 2022
@brunoonovais
Copy link
Collaborator Author

brunoonovais commented Nov 13, 2022 via email

@theblues25
Copy link

I encountered a similar problem while using subscribe2. While it generally works well, we face issues with the session to the device, such as network problems, node reboot, or service interruptions, I am unable to receive errors through exceptions. The subscribe session stops sending updates and returns nothing, and I am unsure if there is another way to handle this. It would be helpful to have a method to check whether the grpc session to the device is still normal while receiving updates from the subscribe, or perhaps a way to reconnect the existing session again.

@sai1274
Copy link
Contributor

sai1274 commented Nov 17, 2023

I'm encountering similar issue while trying to subscribe for the path greater than the limit

subscription_dict = {'subscription': [{'path': 'openconfig-interfaces:interfaces/', 'mode':'sample'}], 'mode': 'poll', 'encoding':'json'}
    subscription_ids = []
    for i in range(2):
        subscription_id = client.subscribe_poll(subscribe=subscription_dict)
        subscription_ids.append(subscription_id)
        time.sleep(2)

It's throwing me the below error but i'm unable to catch it

Exception in thread Thread-11:
Traceback (most recent call last):
  File "/usr/lib64/python3.6/threading.py", line 937, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.6/threading.py", line 885, in run
    self._target(*self._args, **self._kwargs)
  File "/ws/sampsamp-bgl/venv/lib64/python3.6/site-packages/pygnmi/client.py", line 999, in enqueue_updates
    for update in subscription:
  File "/ws/sampsamp-bgl/venv/lib64/python3.6/site-packages/grpc/_channel.py", line 426, in __next__
    return self._next()
  File "/ws/sampsamp-bgl/venv/lib64/python3.6/site-packages/grpc/_channel.py", line 827, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.RESOURCE_EXHAUSTED
        details = "gNMI: subscribe: exceeded the maximum number of streaming gRPCs per user: 1"
        debug_error_string = "UNKNOWN:Error received from peer ipv4:xx.xx.xx.xx:57400 {created_time:"2023-11-17T17:28:33.376998151+05:30", grpc_status:8, grpc_message:"gNMI: subscribe: exceeded the maximum number of streaming gRPCs per user: 1"}"
>

Is there any fix?
We tried to catch the exception in enqueue_updates even then i'm unable to catch it

def enqueue_updates():
            stub = gNMIStub(channel)
            subscription = stub.Subscribe(_client_stream, metadata=metadata)
            for update in subscription:
                try:
                    self._updates.put(update)
                except Exception as error:
                    print(str(error))
                    raise Exception(error)

@jhanm12
Copy link

jhanm12 commented Nov 17, 2023

I'm encountering similar issue while trying to subscribe for the path greater than the limit

subscription_dict = {'subscription': [{'path': 'openconfig-interfaces:interfaces/', 'mode':'sample'}], 'mode': 'poll', 'encoding':'json'}
    subscription_ids = []
    for i in range(2):
        subscription_id = client.subscribe_poll(subscribe=subscription_dict)
        subscription_ids.append(subscription_id)
        time.sleep(2)

It's throwing me the below error but i'm unable to catch it

Exception in thread Thread-11:
Traceback (most recent call last):
  File "/usr/lib64/python3.6/threading.py", line 937, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.6/threading.py", line 885, in run
    self._target(*self._args, **self._kwargs)
  File "/ws/sampsamp-bgl/venv/lib64/python3.6/site-packages/pygnmi/client.py", line 999, in enqueue_updates
    for update in subscription:
  File "/ws/sampsamp-bgl/venv/lib64/python3.6/site-packages/grpc/_channel.py", line 426, in __next__
    return self._next()
  File "/ws/sampsamp-bgl/venv/lib64/python3.6/site-packages/grpc/_channel.py", line 827, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.RESOURCE_EXHAUSTED
        details = "gNMI: subscribe: exceeded the maximum number of streaming gRPCs per user: 1"
        debug_error_string = "UNKNOWN:Error received from peer ipv4:xx.xx.xx.xx:57400 {created_time:"2023-11-17T17:28:33.376998151+05:30", grpc_status:8, grpc_message:"gNMI: subscribe: exceeded the maximum number of streaming gRPCs per user: 1"}"
>

Is there any fix? We tried to catch the exception in enqueue_updates even then i'm unable to catch it

def enqueue_updates():
            stub = gNMIStub(channel)
            subscription = stub.Subscribe(_client_stream, metadata=metadata)
            for update in subscription:
                try:
                    self._updates.put(update)
                except Exception as error:
                    print(str(error))
                    raise Exception(error)

@akarneliuk Can you please help us with this? We are interested in capturing the error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants