Skip to content

Commit

Permalink
Rename checkConnection
Browse files Browse the repository at this point in the history
The previous method name could give the impression that the connection
actually would be checked when the method was called.

What the method actually does it just to report the flag that is set when the
initial connection has been established and the client is ready to receive data
from the broker/server. That should hopefully be more clear after the change of name.

Old name kept on "top level" to not introduce any backward incompatible changes.

Fixes eclipse/kuksa.val#523
  • Loading branch information
erikbosch committed Mar 11, 2024
1 parent 3b8bc86 commit f425c17
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 37 deletions.
19 changes: 17 additions & 2 deletions kuksa-client/kuksa_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,23 @@ def __init__(self, config):
self.backend = cli_backend.Backend.from_config(config)
self.loop = None

def checkConnection(self):
return self.backend.checkConnection()
# PEP 702 deprecated decorator available first in Python 3.13
def checkConnection(self) -> bool:
"""
Check if thread has established a connection to the broker/server.
Note that this method does not indicate the current state of the connection,
This method may return True even if the broker/server currently is not reachable.
Deprecated - Use connection_established() instead"
"""
return self.connection_established()

def connection_established(self) -> bool:
"""
Check if thread has established a connection to the broker/server.
Note that this method does not indicate the current state of the connection,
This method may return True even if the broker/server currently is not reachable.
"""
return self.backend.connection_established()

def stop(self):
self.backend.stop()
Expand Down
47 changes: 26 additions & 21 deletions kuksa-client/kuksa_client/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def refresh_metadata(self):
self.metadata = metadata_tree_to_dict(entries["metadata"])

def path_completer(self, text, line, begidx, endidx):
if not self.checkConnection():
if not self.connection_established():
return None

if len(self.pathCompletionItems) == 0:
Expand Down Expand Up @@ -362,15 +362,15 @@ def do_authorize(self, args):
"""Authorize the client to interact with the server"""
if args.token_or_tokenfile is not None:
self.token_or_tokenfile = args.token_or_tokenfile
if self.checkConnection():
if self.connection_established():
resp = self.commThread.authorize(self.token_or_tokenfile)
print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter()))

@with_category(VSS_COMMANDS)
@with_argparser(ap_setValue)
def do_setValue(self, args):
"""Set the value of a path"""
if self.checkConnection():
if self.connection_established():
# If there is a blank before a single/double quote on the kuksa-client cli then
# the argparser shell will remove it, there is nothing we can do to it
# This gives off behavior for examples like:
Expand All @@ -394,7 +394,7 @@ def do_setValue(self, args):
@with_argparser(ap_setValues)
def do_setValues(self, args):
"""Set the value of given paths"""
if self.checkConnection():
if self.connection_established():
resp = self.commThread.setValues(
dict(getattr(args, "Path=Value")), args.attribute
)
Expand All @@ -405,7 +405,7 @@ def do_setValues(self, args):
@with_argparser(ap_setTargetValue)
def do_setTargetValue(self, args):
"""Set the target value of a path"""
if self.checkConnection():
if self.connection_established():
resp = self.commThread.setValue(args.Path, args.Value, "targetValue")
print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter()))
self.pathCompletionItems = []
Expand All @@ -414,7 +414,7 @@ def do_setTargetValue(self, args):
@with_argparser(ap_setTargetValues)
def do_setTargetValues(self, args):
"""Set the target value of given paths"""
if self.checkConnection():
if self.connection_established():
resp = self.commThread.setValues(
dict(getattr(args, "Path=Value")), "targetValue"
)
Expand All @@ -425,7 +425,7 @@ def do_setTargetValues(self, args):
@with_argparser(ap_getValue)
def do_getValue(self, args):
"""Get the value of a path"""
if self.checkConnection():
if self.connection_established():
resp = self.commThread.getValue(args.Path, args.attribute)
print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter()))
self.pathCompletionItems = []
Expand All @@ -434,7 +434,7 @@ def do_getValue(self, args):
@with_argparser(ap_getValues)
def do_getValues(self, args):
"""Get the value of given paths"""
if self.checkConnection():
if self.connection_established():
resp = self.commThread.getValues(args.Path, args.attribute)
print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter()))
self.pathCompletionItems = []
Expand All @@ -443,7 +443,7 @@ def do_getValues(self, args):
@with_argparser(ap_getTargetValue)
def do_getTargetValue(self, args):
"""Get the value of a path"""
if self.checkConnection():
if self.connection_established():
resp = self.commThread.getValue(args.Path, "targetValue")
print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter()))
self.pathCompletionItems = []
Expand All @@ -452,7 +452,7 @@ def do_getTargetValue(self, args):
@with_argparser(ap_getTargetValues)
def do_getTargetValues(self, args):
"""Get the value of given paths"""
if self.checkConnection():
if self.connection_established():
resp = self.commThread.getValues(args.Path, "targetValue")
print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter()))
self.pathCompletionItems = []
Expand All @@ -461,7 +461,7 @@ def do_getTargetValues(self, args):
@with_argparser(ap_subscribe)
def do_subscribe(self, args):
"""Subscribe the value of a path"""
if self.checkConnection():
if self.connection_established():
if args.output_to_file:
logPath = (
pathlib.Path.cwd()
Expand All @@ -485,7 +485,7 @@ def do_subscribe(self, args):
@with_argparser(ap_subscribeMultiple)
def do_subscribeMultiple(self, args):
"""Subscribe to updates of given paths"""
if self.checkConnection():
if self.connection_established():
if args.output_to_file:
logPath = (
pathlib.Path.cwd()
Expand All @@ -510,7 +510,7 @@ def do_subscribeMultiple(self, args):
@with_argparser(ap_unsubscribe)
def do_unsubscribe(self, args):
"""Unsubscribe an existing subscription"""
if self.checkConnection():
if self.connection_established():
resp = self.commThread.unsubscribe(args.SubscribeId)
print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter()))
self.subscribeIds.discard(args.SubscribeId)
Expand All @@ -523,15 +523,15 @@ def stop(self):

def getMetaData(self, path):
"""Get MetaData of the path"""
if self.checkConnection():
if self.connection_established():
return self.commThread.getMetaData(path)
return "{}"

@with_category(VSS_COMMANDS_SERVER)
@with_argparser(ap_updateVSSTree)
def do_updateVSSTree(self, args):
"""Update VSS Tree Entry"""
if self.checkConnection():
if self.connection_established():
resp = self.commThread.updateVSSTree(args.Json)
if resp is not None:
print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter()))
Expand All @@ -540,7 +540,7 @@ def do_updateVSSTree(self, args):
@with_argparser(ap_updateMetaData)
def do_updateMetaData(self, args):
"""Update MetaData of a given path"""
if self.checkConnection():
if self.connection_established():
resp = self.commThread.updateMetaData(args.Path, args.Json)
print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter()))

Expand All @@ -561,10 +561,15 @@ def do_disconnect(self, _args):
self.commThread.stop()
self.commThread = None

def checkConnection(self):
if self.commThread is None or not self.commThread.checkConnection():
def connection_established(self) -> bool:
"""
Check if thread has established a connection to the broker/server.
Note that this method does not indicate the current state of the connection,
This method may return True even if the broker/server currently is not reachable.
"""
if self.commThread is None or not self.commThread.connection_established():
self.connect()
return self.commThread.checkConnection()
return self.commThread.connection_established()

def connect(self):
"""Connect to the VISS/gRPC Server"""
Expand Down Expand Up @@ -622,10 +627,10 @@ def connect(self):

waitForConnection = threading.Condition()
waitForConnection.acquire()
waitForConnection.wait_for(self.commThread.checkConnection, timeout=1)
waitForConnection.wait_for(self.commThread.connection_established, timeout=1)
waitForConnection.release()

if self.commThread.checkConnection():
if self.commThread.connection_established():
pass
else:
print(
Expand Down
16 changes: 8 additions & 8 deletions kuksa-client/kuksa_client/cli_backend/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def __init__(self, config):
self.token = str(self.token_or_tokenfile)
else:
self.token = ""
self.grpcConnected = False
self.grpc_connection_established = False

self.sendMsgQueue = queue.Queue()
self.run = False
Expand All @@ -80,11 +80,11 @@ def __init__(self, config):
"metadata": (kuksa_client.grpc.Field.METADATA, kuksa_client.grpc.View.METADATA),
}

# Function to check connection status
def checkConnection(self):
if self.grpcConnected:
return True
return False
def connection_established(self) -> bool:
"""
Function to check connection status
"""
return self.grpc_connection_established

# Function to stop the communication
def stop(self):
Expand Down Expand Up @@ -215,10 +215,10 @@ def _sendReceiveMsg(self, req, timeout):

# Async function to handle the gRPC calls
async def _grpcHandler(self, vss_client: kuksa_client.grpc.aio.VSSClient):
self.grpcConnected = True
self.run = True
subscriber_manager = kuksa_client.grpc.aio.SubscriberManager(
vss_client)
self.grpc_connection_established = True
while self.run:
try:
(call, requestArgs, responseQueue) = self.sendMsgQueue.get_nowait()
Expand Down Expand Up @@ -257,7 +257,7 @@ async def _grpcHandler(self, vss_client: kuksa_client.grpc.aio.VSSClient):
responseQueue.put(
(None, {"error": "ValueError in casting the value."}))

self.grpcConnected = False
self.grpc_connection_established = False

# Update VSS Tree Entry
def updateVSSTree(self, jsonStr, timeout=5):
Expand Down
14 changes: 8 additions & 6 deletions kuksa-client/kuksa_client/cli_backend/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
class Backend(cli_backend.Backend):
def __init__(self, config):
super().__init__(config)
self.wsConnected = False
self.ws_connection_established = False
self.subprotocol = None
self.token = None
self.subscriptionCallbacks = {}
Expand Down Expand Up @@ -72,7 +72,7 @@ async def _sender_handler(self, webSocket):
return

async def _msgHandler(self, webSocket):
self.wsConnected = True
self.ws_connection_established = True
self.run = True
recv = asyncio.Task(self._receiver_handler(webSocket))
send = asyncio.Task(self._sender_handler(webSocket))
Expand Down Expand Up @@ -111,7 +111,7 @@ def _sendReceiveMsg(self, req, timeout):

# Function to stop the communication
def stop(self):
self.wsConnected = False
self.ws_connection_established = False
self.run = False
print("Server disconnected.")

Expand Down Expand Up @@ -284,9 +284,11 @@ def unsubscribe(self, subId, timeout=5):

return res

# Function to check connection
def checkConnection(self):
return self.wsConnected
def connection_established(self) -> bool:
"""
Function to check connection
"""
return self.ws_connection_established

async def connect(self, _=None):
subprotocols = ["VISSv2"]
Expand Down

0 comments on commit f425c17

Please sign in to comment.