diff --git a/SecurePivxMasternodeTool.spec b/SecurePivxMasternodeTool.spec index 3e843ad..0c606e9 100644 --- a/SecurePivxMasternodeTool.spec +++ b/SecurePivxMasternodeTool.spec @@ -11,7 +11,7 @@ def libModule(module, source, dest): m = __import__(module) module_path = os_path.dirname(m.__file__) del m - print("libModule %s" % str(( os_path.join(module_path, source), dest ))) + print(f"libModule {(os.path.join(module_path, source), dest)}") return ( os_path.join(module_path, source), dest ) @@ -89,7 +89,7 @@ exe = EXE(pyz, strip=False, upx=False, console=False, - icon=os_path.join(base_dir, 'img', 'spmt.%s' % ('icns' if os_type=='darwin' else 'ico')) ) + icon = os.path.join(base_dir, 'img', f'spmt.{"icns" if os_type == "darwin" else "ico"}') #coll = COLLECT(exe, # a.binaries, @@ -125,7 +125,7 @@ if os_type == 'win32': os.rename(dist_path, dist_path_win) # Create NSIS compressed installer print('Creating Windows installer (requires NSIS)') - os.system('\"c:\\program files (x86)\\NSIS\\makensis.exe\" %s' % os.path.join(base_dir, 'setup.nsi')) + os.system(f'"{os.path.join("c:", "program files (x86)", "NSIS", "makensis.exe")}" {os.path.join(base_dir, "setup.nsi")}') if os_type == 'linux': @@ -135,8 +135,7 @@ if os_type == 'linux': os.rename(dist_path, dist_path_linux) # Compress dist Dir print('Compressing Linux App Folder') - os.system('tar -zcvf %s -C %s %s' % ('SPMT-v' + version_str + '-x86_64-gnu_linux.tar.gz', - base_dir, 'SPMT-v' + version_str + '-gnu_linux')) + os.system(f'tar -zcvf SPMT-v{version_str}-x86_64-gnu_linux.tar.gz -C {base_dir} SPMT-v{version_str}-gnu_linux') if os_type == 'darwin': @@ -151,5 +150,4 @@ if os_type == 'darwin': os.chdir(base_dir) # Compress dist Dir print('Compressing Mac App Folder') - os.system('tar -zcvf %s -C %s %s' % ('SPMT-v' + version_str + '-MacOSX.tar.gz', - base_dir, 'SPMT-v' + version_str + '-MacOSX')) + os.system(f'tar -zcvf SPMT-v{version_str}-MacOSX.tar.gz -C {base_dir} SPMT-v{version_str}-MacOSX') diff --git a/requirements.txt b/requirements.txt index a373f12..4ce8a6f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,9 @@ python-bitcoinrpc==1.0 bitcoin==1.1.42 btchip-python==0.1.27 trezor==0.11.1 -PyQt5>=5.9,<5.14.1 +PyQt5>=5.15.10 requests>=2.18.4,<=2.23 simplejson<=3.13.2 ecdsa==0.13.3 +wheel==0.35.0 +setuptools==53.0.0 \ No newline at end of file diff --git a/spmt.py b/spmt.py index b8803de..de25b17 100644 --- a/spmt.py +++ b/spmt.py @@ -66,7 +66,7 @@ label = QLabel(splash) label.setStyleSheet(labelstyle) - label.setGeometry((splash_pix.width()-500)/2, splash_pix.height()-40, 500, 20) + label.setGeometry(int((splash_pix.width() - 500) / 2), int(splash_pix.height() - 40), 500, 20) label.setAlignment(Qt.AlignCenter) progressText = "loading..." diff --git a/src/blockbookClient.py b/src/blockbookClient.py index cbf9a75..bde2f6d 100644 --- a/src/blockbookClient.py +++ b/src/blockbookClient.py @@ -19,7 +19,7 @@ def process_blockbook_exceptions_int(*args, **kwargs): new_url = "https://testnet.fuzzbawls.pw" else: new_url = "https://zkbitcoin.com/" - message = "BlockBook Client exception on %s\nTrying backup server %s" % (client.url, new_url) + message = f"BlockBook Client exception on {client.url}\nTrying backup server {new_url}" printException(getCallerName(True), getFunctionName(True), message, str(e)) try: @@ -42,9 +42,9 @@ def __init__(self, isTestnet=False): self.url = "https://explorer.rockdev.org/" def checkResponse(self, method, param=""): - url = self.url + "/api/%s" % method + url = f"{self.url}/api/{method}" if param != "": - url += "/%s" % param + url += "/{param}" resp = requests.get(url, data={}, verify=True) if resp.status_code == 200: data = resp.json() diff --git a/src/database.py b/src/database.py index 48e2baf..46635e9 100644 --- a/src/database.py +++ b/src/database.py @@ -113,7 +113,7 @@ def releaseCursor(self, rollingBack=False, vacuum=False): raise Exception("Database closed") def initTables(self): - printDbg("DB: Initializing tables...") + printDbg(f"DB: Initializing tables...") try: cursor = self.conn.cursor() @@ -187,19 +187,19 @@ def initTable_RPC(self, cursor): ''' def clearTable(self, table_name): - printDbg("DB: Clearing table %s..." % table_name) + printDbg(f"DB: Clearing table {table_name}...") cleared_RPC = False try: cursor = self.getCursor() - cursor.execute("DELETE FROM %s" % table_name) + cursor.execute(f"DELETE FROM {table_name}") # in case, reload default RPC and emit changed signal if table_name == 'CUSTOM_RPC_SERVERS': self.initTable_RPC(cursor) cleared_RPC = True - printDbg("DB: Table %s cleared" % table_name) + printDbg(f"DB: Table {table_name} cleared") except Exception as e: - err_msg = 'error clearing %s in database' % table_name + err_msg = f'error clearing {table_name} in database' printException(getCallerName(), getFunctionName(), err_msg, e.args) finally: @@ -208,14 +208,14 @@ def clearTable(self, table_name): self.app.sig_changed_rpcServers.emit() def removeTable(self, table_name): - printDbg("DB: Dropping table %s..." % table_name) + printDbg(f"DB: Dropping table {table_name}...") try: cursor = self.getCursor() - cursor.execute("DROP TABLE IF EXISTS %s" % table_name) - printDbg("DB: Table %s removed" % table_name) + cursor.execute(f"DROP TABLE IF EXISTS {table_name}") + printDbg(f"DB: Table {table_name} removed") except Exception as e: - err_msg = 'error removing table %s from database' % table_name + err_msg = f'error removing table {table_name} from database' printException(getCallerName(), getFunctionName(), err_msg, e.args) finally: @@ -247,7 +247,7 @@ def addRPCServer(self, protocol, host, user, passwd): self.app.sig_changed_rpcServers.emit() def editRPCServer(self, protocol, host, user, passwd, id): - printDbg("DB: Editing RPC server with id %d" % id) + printDbg(f"DB: Editing RPC server with id {id}") changed_RPC = False try: cursor = self.getCursor() @@ -270,15 +270,15 @@ def editRPCServer(self, protocol, host, user, passwd, id): def getRPCServers(self, custom, id=None): tableName = "CUSTOM_RPC_SERVERS" if custom else "PUBLIC_RPC_SERVERS" if id is not None: - printDbg("DB: Getting RPC server with id %d from table %s" % (id, tableName)) + printDbg(f"DB: Getting RPC server with id {id} from table {tableName}") else: - printDbg("DB: Getting all RPC servers from table %s" % tableName) + printDbg(f"DB: Getting all RPC servers from table {tableName}") try: cursor = self.getCursor() if id is None: - cursor.execute("SELECT * FROM %s" % tableName) + cursor.execute(f"SELECT * FROM {tableName}") else: - cursor.execute("SELECT * FROM %s WHERE id = ?" % tableName, (id,)) + cursor.execute(f"SELECT * FROM {tableName} WHERE id = ?", (id,)) rows = cursor.fetchall() except Exception as e: @@ -305,7 +305,7 @@ def getRPCServers(self, custom, id=None): return server_list def removeRPCServer(self, id): - printDbg("DB: Remove RPC server with id %d" % id) + printDbg(f"DB: Remove RPC server with id {id}") removed_RPC = False try: cursor = self.getCursor() @@ -389,7 +389,7 @@ def addMasternode(self, mn, old_mn=None): add_defaultKeys_to_dict(mn, DEFAULT_MN_CONF) if old_mn is not None: - printDbg("DB: Editing masternode %s" % old_mn) + printDbg(f"DB: Editing masternode {old_mn}") try: cursor = self.getCursor() @@ -415,7 +415,7 @@ def addMasternode(self, mn, old_mn=None): self.addNewMasternode(mn) def deleteMasternode(self, mn_name): - printDbg("DB: Deleting masternode %s" % mn_name) + printDbg(f"DB: Deleting masternode {mn_name}") try: cursor = self.getCursor() cursor.execute("DELETE FROM MASTERNODES WHERE name = ? ", (mn_name,)) @@ -475,22 +475,22 @@ def deleteReward(self, tx_hash, tx_ouput_n): except Exception as e: err_msg = 'error deleting UTXO from DB' - printException(getCallerName(), getFunctionName(), err_msg, e.args) + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e.args}") finally: self.releaseCursor(vacuum=True) - def getReward(self, tx_hash, tx_ouput_n): + def getReward(self, tx_hash, tx_output_n): logging.debug("DB: Getting reward") try: cursor = self.getCursor() cursor.execute("SELECT * FROM REWARDS" - " WHERE tx_hash = ? AND tx_ouput_n = ?", (tx_hash, tx_ouput_n)) + " WHERE tx_hash = ? AND tx_ouput_n = ?", (tx_hash, tx_output_n)) rows = cursor.fetchall() except Exception as e: - err_msg = 'error getting reward %s-%d' % (tx_hash, tx_ouput_n) - printException(getCallerName(), getFunctionName(), err_msg, e) + err_msg = f'error getting reward {tx_hash}-{tx_output_n}' + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e}") rows = [] finally: self.releaseCursor() @@ -499,6 +499,7 @@ def getReward(self, tx_hash, tx_ouput_n): return self.rewards_from_rows(rows)[0] return None + def getRewardsList(self, mn_name=None): try: cursor = self.getCursor() @@ -507,13 +508,13 @@ def getRewardsList(self, mn_name=None): printDbg("DB: Getting rewards of all masternodes") cursor.execute("SELECT * FROM REWARDS") else: - printDbg("DB: Getting rewards of masternode %s" % mn_name) + printDbg(f"DB: Getting rewards of masternode {mn_name}") cursor.execute("SELECT * FROM REWARDS WHERE mn_name = ?", (mn_name,)) rows = cursor.fetchall() except Exception as e: - err_msg = 'error getting rewards list for masternode %s' % mn_name - printException(getCallerName(), getFunctionName(), err_msg, e) + err_msg = f'error getting rewards list for masternode {mn_name}' + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e}") rows = [] finally: self.releaseCursor() @@ -538,7 +539,7 @@ def txes_from_rows(self, rows): return txes def addRawTx(self, tx_hash, rawtx, lastfetch=0): - logging.debug("DB: Adding rawtx for %s" % tx_hash) + logging.debug(f"DB: Adding rawtx for {tx_hash}") try: cursor = self.getCursor() @@ -548,26 +549,26 @@ def addRawTx(self, tx_hash, rawtx, lastfetch=0): ) except Exception as e: - err_msg = 'error adding rawtx to DB' - printException(getCallerName(), getFunctionName(), err_msg, e) + err_msg = f'error adding rawtx to DB' + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e}") finally: self.releaseCursor() def deleteRawTx(self, tx_hash): - logging.debug("DB: Deleting rawtx for %s" % tx_hash) + logging.debug(f"DB: Deleting rawtx for {tx_hash}") try: cursor = self.getCursor() cursor.execute("DELETE FROM RAWTXES WHERE tx_hash = ?", (tx_hash,)) except Exception as e: err_msg = 'error deleting rawtx from DB' - printException(getCallerName(), getFunctionName(), err_msg, e.args) + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e.args}") finally: self.releaseCursor(vacuum=True) def getRawTx(self, tx_hash): - logging.debug("DB: Getting rawtx for %s" % tx_hash) + logging.debug(f"DB: Getting rawtx for {tx_hash}") try: cursor = self.getCursor() @@ -576,8 +577,8 @@ def getRawTx(self, tx_hash): rows = cursor.fetchall() except Exception as e: - err_msg = 'error getting raw tx for %s' % tx_hash - printException(getCallerName(), getFunctionName(), err_msg, e) + err_msg = f'error getting raw tx for {tx_hash}' + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e}") rows = [] finally: self.releaseCursor() @@ -597,7 +598,7 @@ def clearRawTxes(self, minTime): except Exception as e: err_msg = 'error deleting rawtx from DB' - printException(getCallerName(), getFunctionName(), err_msg, e.args) + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e.args}") finally: self.releaseCursor(vacuum=True) @@ -663,7 +664,7 @@ def addProposal(self, p): except Exception as e: err_msg = 'error adding proposal to DB' - printException(getCallerName(), getFunctionName(), err_msg, e) + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e}") finally: self.releaseCursor() @@ -676,13 +677,13 @@ def getMyVotes(self, p_hash=None): printDbg("DB: Getting votes for all proposals") cursor.execute("SELECT * FROM MY_VOTES") else: - printDbg("DB: Getting votes for proposal %s" % p_hash) + printDbg(f"DB: Getting votes for proposal {p_hash}") cursor.execute("SELECT * FROM MY_VOTES WHERE p_hash = ?", (p_hash,)) rows = cursor.fetchall() except Exception as e: err_msg = 'error getting myVotes from DB' - printException(getCallerName(), getFunctionName(), err_msg, e) + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e}") rows = [] finally: self.releaseCursor() @@ -698,7 +699,7 @@ def getProposalsList(self): except Exception as e: err_msg = 'error getting proposals from DB' - printException(getCallerName(), getFunctionName(), err_msg, e) + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e}") rows = [] finally: self.releaseCursor() diff --git a/src/hwdevice.py b/src/hwdevice.py index 7bf1b9e..732a742 100644 --- a/src/hwdevice.py +++ b/src/hwdevice.py @@ -19,7 +19,7 @@ def check_api_init(func): def func_int(*args, **kwargs): hwDevice = args[0] if hwDevice.api is None: - logging.warning("%s: hwDevice.api is None" % func.__name__) + logging.warning(f"{func.__name__}: hwDevice.api is None") raise Exception("HW device: client not initialized") return func(*args, **kwargs) @@ -38,7 +38,7 @@ def __init__(self, main_wnd, *args, **kwargs): printOK("HW: Class initialized") def initDevice(self, hw_index): - printDbg("HW: initializing hw device with index %d" % hw_index) + printDbg(f"HW: initializing hw device with index {hw_index}") if hw_index >= len(HW_devices): raise Exception("Invalid HW index") @@ -53,7 +53,7 @@ def initDevice(self, hw_index): self.api.initDevice() self.sig1done = self.api.sig1done self.api.sig_disconnected.connect(self.main_wnd.clearHWstatus) - printOK("HW: hw device with index %d initialized" % hw_index) + printOK(f"HW: hw device with index {hw_index} initialized") @check_api_init def clearDevice(self): @@ -68,7 +68,7 @@ def clearDevice(self): @check_api_init def getStatus(self): printDbg("HW: checking device status...") - printOK("Status: %d" % self.api.status) + printOK(f"Status: {self.api.status}") return self.api.model, self.api.status, self.api.messages[self.api.status] def prepare_transfer_tx(self, caller, bip32_path, utxos_to_spend, dest_address, tx_fee, isTestnet=False): @@ -86,17 +86,17 @@ def prepare_transfer_tx_bulk(self, caller, rewardsArray, dest_address, tx_fee, i @check_api_init def scanForAddress(self, account, spath, isTestnet=False): - printOK("HW: Scanning for Address n. %d on account n. %d" % (spath, account)) + printOK(f"HW: Scanning for Address n. {spath} on account n. {account}") return self.api.scanForAddress(account, spath, isTestnet) @check_api_init def scanForBip32(self, account, address, starting_spath=0, spath_count=10, isTestnet=False): - printOK("HW: Scanning for Bip32 path of address: %s" % address) + printOK(f"HW: Scanning for Bip32 path of address: {address}") found = False spath = -1 for i in range(starting_spath, starting_spath + spath_count): - printDbg("HW: checking path... %d'/0/%d" % (account, i)) + printDbg(f"HW: checking path... {account}'/0/{i}") curr_addr = self.api.scanForAddress(account, i, isTestnet) if curr_addr == address: @@ -110,7 +110,7 @@ def scanForBip32(self, account, address, starting_spath=0, spath_count=10, isTes @check_api_init def scanForPubKey(self, account, spath, isTestnet=False): - printOK("HW: Scanning for PubKey of address n. %d on account n. %d" % (spath, account)) + printOK(f"HW: Scanning for PubKey of address n. {spath} on account n. {account}") return self.api.scanForPubKey(account, spath, isTestnet) @check_api_init diff --git a/src/ledgerClient.py b/src/ledgerClient.py index f6666d9..1ab1c71 100644 --- a/src/ledgerClient.py +++ b/src/ledgerClient.py @@ -99,9 +99,9 @@ def initDevice(self): printDbg("Ledger Initialized") self.status = 1 ver = self.chip.getFirmwareVersion() - printOK("Ledger HW device connected [v. %s]" % str(ver.get('version'))) + printOK(f"Ledger HW device connected [v. {ver.get('version')}]") # Check device is unlocked - bip32_path = MPATH + "%d'/0/%d" % (0, 0) + bip32_path = MPATH + f"{0}'/0/{0}" _ = self.chip.getWalletPublicKey(bip32_path) self.status = 2 self.sig_progress.connect(self.updateSigProgress) @@ -128,8 +128,7 @@ def append_inputs_to_TX(self, utxo, bip32_path): utxo_tx_index = utxo['vout'] if utxo_tx_index < 0 or utxo_tx_index > len(prev_transaction.outputs): - raise Exception('Incorrect value of outputIndex for UTXO %s-%d' % - (utxo['txid'], utxo['vout'])) + raise Exception(f"Incorrect value of outputIndex for UTXO {utxo['txid']}-{utxo['vout']}") trusted_input = self.chip.getTrustedInput(prev_transaction, utxo_tx_index) self.trusted_inputs.append(trusted_input) @@ -141,8 +140,8 @@ def append_inputs_to_TX(self, utxo, bip32_path): if pubkey_hash != pubkey_hash_from_script: text = "Error: The hashes for the public key for the BIP32 path, and the UTXO locking script do not match." text += "Your signed transaction will not be validated by the network.\n" - text += "pubkey_hash: %s\n" % pubkey_hash.hex() - text += "pubkey_hash_from_script: %s\n" % pubkey_hash_from_script.hex() + text += f"pubkey_hash: {pubkey_hash.hex()}\n" + text += f"pubkey_hash_from_script: {pubkey_hash_from_script.hex()}\n" printDbg(text) self.arg_inputs.append({ @@ -201,10 +200,10 @@ def prepare_transfer_tx_bulk(self, caller, rewardsArray, dest_address, tx_fee, i self.mBox2 = QMessageBox(caller) self.messageText = "

Confirm transaction on your device, with the following details:

" - # messageText += "From bip32_path: %s

" % str(bip32_path) - self.messageText += "

Payment to:
%s

" % dest_address - self.messageText += "

Net amount:
%s PIV

" % str(round(self.amount / 1e8, 8)) - self.messageText += "

Fees:
%s PIV

" % str(round(int(tx_fee) / 1e8, 8)) + # messageText += f"From bip32_path: {bip32_path}

" + self.messageText += f"

Payment to:
{dest_address}

" + self.messageText += f"

Net amount:
{round(self.amount / 1e8, 8)} PIV

" + self.messageText += f"

Fees:
{round(int(tx_fee) / 1e8, 8)} PIV

" messageText = self.messageText + "Signature Progress: 0 %" self.mBox2.setText(messageText) self.mBox2.setIconPixmap(caller.tabMain.ledgerImg.scaledToHeight(200, Qt.SmoothTransformation)) @@ -219,10 +218,10 @@ def prepare_transfer_tx_bulk(self, caller, rewardsArray, dest_address, tx_fee, i def scanForAddress(self, account, spath, isTestnet=False): with self.lock: if not isTestnet: - curr_path = MPATH + "%d'/0/%d" % (account, spath) + curr_path = MPATH + f"{account}'/0/{spath}" curr_addr = self.chip.getWalletPublicKey(curr_path).get('address')[12:-2] else: - curr_path = MPATH_TESTNET + "%d'/0/%d" % (account, spath) + curr_path = MPATH_TESTNET + f"{account}'/0/{spath}" pubkey = compress_public_key(self.chip.getWalletPublicKey(curr_path).get('publicKey')).hex() curr_addr = pubkey_to_address(pubkey, isTestnet) @@ -230,7 +229,7 @@ def scanForAddress(self, account, spath, isTestnet=False): @process_ledger_exceptions def scanForPubKey(self, account, spath, isTestnet=False): - hwpath = "%d'/0/%d" % (account, spath) + hwpath = f"{account}'/0/{spath}" if isTestnet: curr_path = MPATH_TESTNET + hwpath else: @@ -277,8 +276,7 @@ def signMess(self, caller, hwpath, message, isTestnet=False): printOK('Signing Message') self.mBox = QMessageBox(caller) - messageText = "Check display of your hardware device\n\n- message hash:\n\n%s\n\n-path:\t%s\n" % ( - message_sha, path) + messageText = f"Check display of your hardware device\n\n- message hash:\n\n{message_sha}\n\n-path:\t{path}\n" self.mBox.setText(messageText) self.mBox.setIconPixmap(caller.tabMain.ledgerImg.scaledToHeight(200, Qt.SmoothTransformation)) self.mBox.setWindowTitle("CHECK YOUR LEDGER") @@ -387,6 +385,6 @@ def signTxFinish(self): self.sigTxabort.emit() def updateSigProgress(self, percent): - messageText = self.messageText + "Signature Progress: " + str(percent) + " %" + messageText = f"{self.messageText}Signature Progress: {percent} %" self.mBox2.setText(messageText) QApplication.processEvents() diff --git a/src/mainWindow.py b/src/mainWindow.py index b8d25cc..2f05bbf 100644 --- a/src/mainWindow.py +++ b/src/mainWindow.py @@ -6,7 +6,6 @@ import logging import os -from time import strftime, gmtime import threading from PyQt5.QtCore import pyqtSignal, Qt, QThread @@ -149,7 +148,7 @@ def __init__(self, parent, masternode_list, imgDir): # -- Let's go self.mnode_to_change = None - printOK("Hello! Welcome to " + parent.title) + printOK(f"Hello! Welcome to {parent.title}") def append_to_console(self, text): self.consoleArea.moveCursor(QTextCursor.End) @@ -260,7 +259,7 @@ def loadIcons(self): def loadMNConf(self, fileName): hot_masternodes = loadMNConfFile(fileName) if hot_masternodes is None: - messText = "Unable to load data from file '%s'" % fileName + messText = f"Unable to load data from file '{fileName}'" myPopUp_sb(self, "warn", "SPMT - Load MN Conf", messText) else: new_masternodes = [] @@ -283,14 +282,14 @@ def loadMNConf(self, fileName): elif new_nodes == 1: final_message = "1 External Masternode " else: - final_message = "%d External Masternodes " % new_nodes + final_message = f"{new_nodes} External Masternodes " final_message += "added to the list." if new_nodes > 0: - final_message += "
" + str([x['name'] for x in new_masternodes]) + ". " + final_message += f"
{[x['name'] for x in new_masternodes]}. " printOK(final_message) if len(skip_masternodes) > 0: - final_message = "Following entries skipped due to duplicate names:
" - final_message += str([x['name'] for x in skip_masternodes]) + ". " + final_message = "Following entries skipped due to duplicate names:
" \ + f"{[x['name'] for x in skip_masternodes]}. " printOK(final_message) def onCheckHw(self): @@ -315,15 +314,15 @@ def checkVersion(self, ctrl): (remote_version[0] == local_version[0] and remote_version[1] > local_version[1]) or \ (remote_version[0] == local_version[0] and remote_version[1] == local_version[1] and remote_version[2] > local_version[2]): - self.versionMess = 'New Version Available: %s ' % (self.gitVersion) - self.versionMess += '(download)' + self.versionMess = f'New Version Available: {self.gitVersion} ' \ + '(download)' else: self.versionMess = "You have the latest version of SPMT" def updateVersion(self): if self.versionMess is not None: self.versionLabel.setText(self.versionMess) - printOK("Remote version: %s" % str(self.gitVersion)) + printOK(f"Remote version: {self.gitVersion}") def onChangeSelectedHW(self, i): # Clear status @@ -343,21 +342,21 @@ def onCleanConsole(self): self.consoleArea.clear() def onSaveConsole(self): - timestamp = strftime('%Y-%m-%d_%H-%M-%S', gmtime(now())) + timestamp = now().strftime('%Y-%m-%d_%H-%M-%S') options = QFileDialog.Options() options |= QFileDialog.DontUseNativeDialog - fileName, _ = QFileDialog.getSaveFileName(self, "Save Logs to file", "SPMT_Logs_%s.txt" % timestamp, "All Files (*);; Text Files (*.txt)", options=options) + fileName, _ = QFileDialog.getSaveFileName(self, "Save Logs to file", f"SPMT_Logs_{timestamp}.txt", "All Files (*);; Text Files (*.txt)", options=options) try: if fileName: - printOK("Saving logs to %s" % fileName) + printOK(f"Saving logs to {fileName}") log_file = open(fileName, 'w+', encoding="utf-8") log_text = self.consoleArea.toPlainText() log_file.write(log_text) log_file.close() except Exception as e: - err_msg = "error writing Log file" - printException(getCallerName(), getFunctionName(), err_msg, e.args) + err_msg = f"error writing Log file" + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e.args}") def onTabChange(self): # tabRewards @@ -396,14 +395,14 @@ def saveMNListOrder(self): def showHWstatus(self): self.updateHWleds() - myPopUp_sb(self, "info", 'SPMT - hw check', "%s" % self.hwStatusMess) + myPopUp_sb(self, "info", 'SPMT - hw check', f"{self.hwStatusMess}") def showRPCstatus(self, server_index, fDebug): # Update displayed status only if selected server is not changed if server_index == self.header.rpcClientsBox.currentIndex(): self.updateRPCled(fDebug) if fDebug: - myPopUp_sb(self, "info", 'SPMT - rpc check', "%s" % self.rpcStatusMess) + myPopUp_sb(self, "info", 'SPMT - rpc check', f"{self.rpcStausMess}") def updateHWleds(self): if self.hwStatus == 1: @@ -423,7 +422,7 @@ def updateHWstatus(self, ctrl): printDbg(str(e)) pass - printDbg("status:%s - mess: %s" % (self.hwStatus, self.hwStatusMess)) + printDbg("status:{self.hwStatus} - mess: {self.hwStatusMess}") def updateLastBlockLabel(self): text = '--' @@ -451,9 +450,9 @@ def updateLastBlockPing(self): color = "green" self.header.lastPingIcon.setPixmap(self.connGreen_icon) if self.rpcResponseTime is not None: - self.header.responseTimeLabel.setText("%.3f" % self.rpcResponseTime) - self.header.responseTimeLabel.setStyleSheet("color: %s" % color) - self.header.lastPingIcon.setStyleSheet("color: %s" % color) + self.header.responseTimeLabel.setText(f"{self.rpcResponseTime}") + self.header.responseTimeLabel.setStyleSheet(f"color: {color}") + self.header.lastPingIcon.setStyleSheet(f"color: {color}") def updateRPCled(self, fDebug=False): if self.rpcConnected: @@ -509,14 +508,14 @@ def updateRPClist(self): def updateRPCstatus(self, ctrl, fDebug=False): rpc_index, rpc_protocol, rpc_host, rpc_user, rpc_password = self.getRPCserver() if fDebug: - printDbg("Trying to connect to RPC %s://%s..." % (rpc_protocol, rpc_host)) + printDbg(f"Trying to connect to RPC {rpc_protocol}://{rpc_host}...") try: rpcClient = RpcClient(rpc_protocol, rpc_host, rpc_user, rpc_password) status, statusMess, lastBlock, r_time1, isTestnet = rpcClient.getStatus() isBlockchainSynced, r_time2 = rpcClient.isBlockchainSynced() except Exception as e: - printException(getCallerName(), getFunctionName(), "exception updating RPC status:", str(e)) + printException(getCallerName(), getFunctionName(), f"exception updating RPC status: {e}") # clear status self.rpcClient = None self.sig_clearRPCstatus.emit() diff --git a/src/masternode.py b/src/masternode.py index 5f096e1..c6d185d 100644 --- a/src/masternode.py +++ b/src/masternode.py @@ -41,12 +41,12 @@ def __init__(self, tab_main, name, ip, port, mnPrivKey, hwAcc, collateral=None, self.mnPubKey = bitcoin.privkey_to_pubkey(self.mnPrivKey) self.hwAcc = hwAcc self.spath = collateral['spath'] - self.nodePath = "%d'/0/%d" % (self.hwAcc, self.spath) + self.nodePath = f"{self.hwAcc}'/0/{self.spath}" self.collateral = collateral self.isTestnet = isTestnet self.currHeight = 0 Masternode.mnCount += 1 - printOK("Initializing MNode with collateral: %s" % self.nodePath) + printOK(f"Initializing MNode with collateral: {self.nodePath}") def getOldBroadcastMessage(self): self.sig_time = int(time.time()) @@ -77,7 +77,7 @@ def signature1(self, device): serializedData = self.getNewBroadcastMessage() else: serializedData = self.getOldBroadcastMessage() - printDbg("SerializedData: %s" % serializedData) + printDbg(f"SerializedData: {serializedData}") # HW wallet signature device.signMess(self.tab_main.caller, self.nodePath, serializedData, self.isTestnet) # wait for signal when device.sig1 is ready then --> finalizeStartMessage @@ -108,10 +108,10 @@ def signature2(self, block_hash): fNewSigs = NewSigsActive(self.currHeight, self.isTestnet) mnping = self.getPingMessage(fNewSigs, block_hash) if fNewSigs: - printDbg("mnping: %s" % mnping.hex()) + printDbg(f"mnping: {mnping.hex()}") sig2 = ecdsa_sign_bin(mnping, self.mnWIF) # local else: - printDbg("mnping: %s" % mnping) + printDbg(f"mnping: {mnping}") sig2 = ecdsa_sign(mnping, self.mnWIF) return (b64decode(sig2).hex()), fNewSigs @@ -127,7 +127,7 @@ def finalizeStartMessage(self, text): self.sigdone.emit("None") return - printOK("first signature: %s" % sig1) + printOK(f"first signature: {sig1}") # ------ some default config scriptSig = '' sequence = 0xffffffff @@ -135,10 +135,10 @@ def finalizeStartMessage(self, text): try: block_hash = self.rpcClient.getBlockHash(self.currHeight - 12) if block_hash is None: - raise Exception('Unable to get blockhash for block %d' % self.currHeight - 12) + raise Exception(f'Unable to get blockhash for block {self.currHeight-12}') - printDbg("Current block from PIVX client: %s" % str(self.currHeight)) - printDbg("Hash of 12 blocks ago: %s" % block_hash) + printDbg(f"Current block from PIVX client: {self.currHeight}") + printDbg(f"Hash of 12 blocks ago: {block_hash}") vintx = bytes.fromhex(self.collateral['txid'])[::-1].hex() vinno = self.collateral['txidn'].to_bytes(4, byteorder='big')[::-1].hex() @@ -158,7 +158,7 @@ def finalizeStartMessage(self, text): last_ping_block_hash = bytes.fromhex(block_hash)[::-1].hex() sig2, fNewSigs = self.signature2(block_hash) - printOK("second signature: %s" % sig2) + printOK(f"second signature: {sig2}") work = vintx + vinno + vinsig + vinseq work += ipv6map + collateral_in + delegate_in @@ -175,7 +175,7 @@ def finalizeStartMessage(self, text): work += "0" * 16 # Emit signal - printDbg("EMITTING: %s" % work) + printDbg(f"EMITTING: {work}") self.sigdone.emit(work) def startMessage(self, device, rpcClient): diff --git a/src/misc.py b/src/misc.py index 4fa8523..eff7d6a 100644 --- a/src/misc.py +++ b/src/misc.py @@ -33,7 +33,7 @@ def add_defaultKeys_to_dict(dictObj, defaultObj): def appendMasternode(mainWnd, mn): - printDbg("saving MN configuration for %s" % mn['name']) + printDbg(f"saving MN configuration for {mn['name']}") # If we are changing a MN, remove previous entry: if mainWnd.mnode_to_change is not None: @@ -93,11 +93,11 @@ def checkTxInputs(parentWindow, num_of_inputs): return None if num_of_inputs > MAX_INPUTS_NO_WARNING: - warning = "Warning: Trying to spend %d inputs.\nA few minutes could be required " \ + warning = f"Warning: Trying to spend {num_of_inputs} inputs.\nA few minutes could be required " \ "for the transaction to be prepared and signed.\n\nThe hardware device must remain unlocked " \ "during the whole time (it's advised to disable the auto-lock feature)\n\n" \ - "Do you wish to proceed?" % num_of_inputs - title = "SPMT - spending more than %d inputs" % MAX_INPUTS_NO_WARNING + "Do you wish to proceed?" + title = f"SPMT - spending more than {MAX_INPUTS_NO_WARNING} inputs" return myPopUp(parentWindow, "warn", title, warning) return QMessageBox.Yes @@ -120,15 +120,15 @@ def clean_v4_migration(wnd): with open(rpc_file, encoding="utf-8") as data_file: rpc_config = json.load(data_file) # copy to database - rpc_host = "%s:%d" % (rpc_config['rpc_ip'], rpc_config['rpc_port']) + rpc_host = f"{rpc_config['rpc_ip']}:{rpc_config['rpc_port']}" wnd.db.editRPCServer("http", rpc_host, rpc_config['rpc_user'], rpc_config['rpc_password'], 0) printDbg("...saved to Database") # and delete old file os.remove(rpc_file) printDbg("old rpcServer.json file deleted") except Exception as e: - mess = "Error importing old rpc_config file" - printException(getCallerName(), getFunctionName(), mess, e) + mess = f"Error importing old rpc_config file" + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{mess}", f"{e}") if os.path.exists(cache_file): # If cache file exists, delete it @@ -136,8 +136,8 @@ def clean_v4_migration(wnd): os.remove(cache_file) printDbg("old cache.json file deleted") except Exception as e: - mess = "Error deleting old cache file" - printException(getCallerName(), getFunctionName(), mess, e) + mess = f"Error deleting old cache file" + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{mess}", f"{e}") if os.path.exists(mn_file): # If mn file exists @@ -152,8 +152,8 @@ def clean_v4_migration(wnd): os.remove(mn_file) printDbg("old masternodes.json file deleted") except Exception as e: - mess = "Error importing old masternodes_config file" - printException(getCallerName(), getFunctionName(), mess, e) + mess = f"Error importing old masternodes_config file" + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{mess}", f"{e}") # Remove old logs if os.path.exists(log_file): @@ -236,15 +236,15 @@ def ipport(ip, port): if ip is None or port is None: return None elif ip.endswith('.onion'): - return ip + ':' + port + return f"{ip}:{port}" else: ipAddr = ip_address(ip) if ipAddr.version == 4: - return ip + ':' + port + return f"{ip}:{port}" elif ipAddr.version == 6: - return "[" + ip + "]:" + port + return f"[{ip}]:{port}" else: - raise Exception("invalid IP version number") + raise Exception("Invalid IP version number") def is_hex(s): @@ -334,7 +334,7 @@ def now(): def persistCacheSetting(cache_key, cache_value): settings = QSettings('PIVX', 'SecurePivxMasternodeTool') if not settings.contains(cache_key): - printDbg("Cache key %s not found" % str(cache_key)) + printDbg(f"Cache key {cache_key} not found") printOK("Adding new cache key to settings...") if type(cache_value) in [list, dict]: @@ -354,7 +354,7 @@ def printDbg(what): def printDbg_msg(what): what = clean_for_html(what) timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(now())) - log_line = '{} : {}
'.format(timestamp, what) + log_line = f'{timestamp} : {what}
' return log_line @@ -363,7 +363,7 @@ def printError( function_name, what ): - logging.error("%s | %s | %s" % (caller_name, function_name, what)) + logging.error(f"{caller_name} | {function_name} | {what}") log_line = printException_msg(caller_name, function_name, what, None, True) redirect_print(log_line) @@ -376,9 +376,9 @@ def printException( ): what = err_msg if errargs is not None: - what += " ==> %s" % str(errargs) - logging.warning("%s | %s | %s" % (caller_name, function_name, what)) - text = printException_msg(caller_name, function_name, err_msg, errargs) + what += f" ==> {errargs}" + logging.warning(f"{caller_name} | {function_name} | {what}") + text = printException_msg(f"{caller_name}", f"{function_name}", f"{err_msg}", f"{errargs}") redirect_print(text) @@ -393,19 +393,19 @@ def printException_msg( msg = 'ERROR
' else: msg = 'EXCEPTION
' - msg += 'caller : %s
' % caller_name - msg += 'function : %s
' % function_name + msg += f'caller : {caller_name}
' + msg += f'function : {function_name}
' msg += '' if errargs: - msg += 'err: %s
' % str(errargs) + msg += f'err: {errargs}
' - msg += '===> %s

' % err_msg + msg += f'===> {err_msg}
' return msg def printOK(what): logging.debug(what) - msg = '===> ' + what + '
' + msg = f'===> {what}
' redirect_print(msg) @@ -476,12 +476,12 @@ def saveCacheSettings(cache): def sec_to_time(seconds): days = seconds // 86400 - seconds -= days * 86400 + seconds %= days * 86400 hrs = seconds // 3600 - seconds -= hrs * 3600 + seconds %= hrs * 3600 mins = seconds // 60 - seconds -= mins * 60 - return "{} days, {} hrs, {} mins, {} secs".format(days, hrs, mins, seconds) + seconds %= mins * 60 + return f"{days} days, {hrs} hrs, {mins} mins, {seconds} secs" def splitString(text, n): diff --git a/src/qt/dlg_budgetProjection.py b/src/qt/dlg_budgetProjection.py index aad1742..65c3de2 100644 --- a/src/qt/dlg_budgetProjection.py +++ b/src/qt/dlg_budgetProjection.py @@ -35,11 +35,11 @@ def displayBudgetProjection(self): # blocks to next superBlock (== minutes) blocks_to_SB = self.next_superBlock - self.main_wnd.caller.rpcLastBlock - self.ui.nextSuperBlock_label.setText("%s" % str(self.next_superBlock)) - timeToNextSB = "%s" % sec_to_time(60 * blocks_to_SB) + self.ui.nextSuperBlock_label.setText(f"{self.next_superBlock}") + timeToNextSB = f"{sec_to_time(60 * blocks_to_SB)}" self.ui.timeToNextSB_label.setText(timeToNextSB) total = self.projection[-1].get('Total_Allotted') - total_label = "%s PIV" % str(total) + total_label = f"{total} PIV" self.ui.allottedBudget_label.setText(total_label) self.ui.remainingBudget_label.setText("%s PIV" % str(round(432000.0 - total, 8))) self.ui.passingProposals_label.setText("%s" % str(len(self.projection))) @@ -59,7 +59,7 @@ def item(value): def loadBudgetProjection_thread(self, ctrl): self.projection = [] if not self.main_wnd.caller.rpcConnected: - printException(getCallerName(), getFunctionName(), "RPC server not connected", "") + printException(f"{getCallerName()}", f"{getFunctionName()}", "RPC server not connected", "") return # get next superBlock self.next_superBlock = self.main_wnd.caller.rpcClient.getNextSuperBlock() diff --git a/src/qt/dlg_configureRPCservers.py b/src/qt/dlg_configureRPCservers.py index 4fd0f50..06b3b77 100644 --- a/src/qt/dlg_configureRPCservers.py +++ b/src/qt/dlg_configureRPCservers.py @@ -37,12 +37,12 @@ def insert_server_list(self, server): index = self.main_wnd.mainWindow.getServerListIndex(server) server_line = QWidget() server_row = QHBoxLayout() - server_text = "%s://%s" % (server['protocol'], server['host']) + server_text = f"{server['protocol']}://{server['host']}" if server['id'] == 0 and server['isCustom']: # Local Wallet server_text = server_text + "  Local Wallet" elif not server['isCustom']: - server_text = "%s" % server_text + server_text = f"{server_text}" server_row.addWidget(QLabel(server_text)) server_row.addStretch(1) # -- Edit button @@ -122,9 +122,8 @@ def onClose(self): self.close() def onRemoveServer(self, index): - mess = "Are you sure you want to remove server with index %d (%s) from list?" % ( - index, self.main_wnd.mainWindow.rpcServersList[index].get('host')) - ans = myPopUp(self, QMessageBox.Question, 'SPMT - remove server', mess) + mess = f"Are you sure you want to remove server with index {index} ({self.main_wnd.mainWindow.rpcServersList[index].get('host')}) from list?" + ans = myPopUp(self, QMessageBox.Question, 'SPMT - remove server', f"{mess}") if ans == QMessageBox.Yes: # Remove entry from database id = self.main_wnd.mainWindow.rpcServersList[index].get('id') @@ -137,7 +136,7 @@ def onSave(self): user = self.ui.user_edt.text() passwd = self.ui.passwd_edt.text() # Check malformed URL - url_string = "%s://%s:%s@%s" % (protocol, user, passwd, host) + url_string = f"{protocol}://{user}:{passwd}@{host}" if checkRPCstring(url_string): if self.changing_index is None: # Save new entry in DB. diff --git a/src/qt/dlg_findCollTx.py b/src/qt/dlg_findCollTx.py index 3f0b5f2..15c3fd4 100644 --- a/src/qt/dlg_findCollTx.py +++ b/src/qt/dlg_findCollTx.py @@ -71,8 +71,8 @@ def load_utxos_thread(self, ctrl): round(int(utxo.get('satoshis', 0)) / 1e8, 8) == 10000.00000000] except Exception as e: - errorMsg = 'Error occurred while calling getaddressutxos method: ' + str(e) - printError(getCallerName(), getFunctionName(), errorMsg) + errorMsg = f'Error occurred while calling getaddressutxos method: {e}' + printError(f"{getCallerName()}", f"{getFunctionName()}", f"{errorMsg}") except Exception: pass diff --git a/src/qt/dlg_mnStatus.py b/src/qt/dlg_mnStatus.py index 013c0af..4505ef5 100644 --- a/src/qt/dlg_mnStatus.py +++ b/src/qt/dlg_mnStatus.py @@ -30,7 +30,7 @@ def setupUi(self, MnStatusDlg): MnStatusDlg.setModal(True) layout = QVBoxLayout(MnStatusDlg) layout.setContentsMargins(10, 15, 10, 10) - name = QLabel("%s" % self.mnAlias) + name = QLabel(f"{self.mnAlias}") name.setAlignment(Qt.AlignCenter) layout.addWidget(name) body = QFormLayout() diff --git a/src/qt/dlg_pinMatrix.py b/src/qt/dlg_pinMatrix.py index 6c3f498..ee49994 100644 --- a/src/qt/dlg_pinMatrix.py +++ b/src/qt/dlg_pinMatrix.py @@ -39,12 +39,12 @@ def onOK(self): if self.pin: if len(self.pin) > 9: text = "The PIN entered exceeds the 9-character limit." - myPopUp_sb(self, "warn", 'Wrong PIN!', text) + myPopUp_sb(self, "warn", 'Wrong PIN!', f"{text}") else: self.accept() else: text = "No PIN entered" - myPopUp_sb(self, "warn", 'Wrong PIN!', text) + myPopUp_sb(self, "warn", 'Wrong PIN!', f"{text}") def setupUI(self): Ui_pinMatrixDlg.setupUi(self, self) @@ -64,7 +64,7 @@ def setupUi(self, PinMatrixDlg): layout.setContentsMargins(10, 8, 10, 10) # Header - title = QLabel("%s" % PinMatrixDlg.text) + title = QLabel(f"{PinMatrixDlg.text}") title.setAlignment(Qt.AlignCenter) layout.addWidget(title) diff --git a/src/qt/dlg_proposalDetails.py b/src/qt/dlg_proposalDetails.py index dee41d1..7c0b349 100644 --- a/src/qt/dlg_proposalDetails.py +++ b/src/qt/dlg_proposalDetails.py @@ -53,41 +53,41 @@ def setupUi(self, PropDetailsDlg): PropDetailsDlg.setModal(True) layout = QVBoxLayout(PropDetailsDlg) layout.setContentsMargins(10, 15, 10, 10) - name = QLabel("%s" % PropDetailsDlg.data.name) + name = QLabel(f"{PropDetailsDlg.data.name}") name.setAlignment(Qt.AlignCenter) layout.addWidget(name) body = QFormLayout() body.setLabelAlignment(Qt.AlignRight) body.setVerticalSpacing(20) body.setContentsMargins(25, 10, 25, 30) - link = "%s" % (PropDetailsDlg.data.URL, PropDetailsDlg.data.URL) + link = f"{PropDetailsDlg.data.URL}" link_label = QLabel(link) link_label.setOpenExternalLinks(True) - body.addRow(QLabel("URL: "), link_label) - body.addRow(QLabel("TotalPayment: "), QLabel(str(PropDetailsDlg.data.ToalPayment))) - body.addRow(QLabel("MonthlyPayment: "), QLabel(str(PropDetailsDlg.data.MonthlyPayment))) + body.addRow(QLabel(f"URL: "), QLabel(link_label)) + body.addRow(QLabel(f"TotalPayment: "), QLabel(f"{PropDetailsDlg.data.TotalPayment}")) + body.addRow(QLabel(f"MonthlyPayment: "), QLabel(f"{PropDetailsDlg.data.MonthlyPayment}")) hashLabel = self.selectable_line(PropDetailsDlg.data.Hash) - body.addRow(QLabel("Hash: "), hashLabel) + body.addRow(QLabel(f"Hash: "), hashLabel) feeHashLabel = self.selectable_line(PropDetailsDlg.data.FeeHash) - body.addRow(QLabel("FeeHash: "), feeHashLabel) - body.addRow(QLabel("BlockStart: "), QLabel(str(PropDetailsDlg.data.BlockStart))) - body.addRow(QLabel("BlockEnd: "), QLabel(str(PropDetailsDlg.data.BlockEnd))) - body.addRow(QLabel("TotalPayCount: "), QLabel(str(PropDetailsDlg.data.TotalPayCount))) - body.addRow(QLabel("RemainingPayCount: "), QLabel(str(PropDetailsDlg.data.RemainingPayCount))) + body.addRow(QLabel(f"FeeHash: "), feeHashLabel) + body.addRow(QLabel(f"BlockStart: "), QLabel(f"{PropDetailsDlg.data.BlockStart}")) + body.addRow(QLabel(f"BlockEnd: "), QLabel(f"{PropDetailsDlg.data.BlockEnd}")) + body.addRow(QLabel(f"TotalPayCount: "), QLabel(f"{PropDetailsDlg.data.TotalPayCount}")) + body.addRow(QLabel(f"RemainingPayCount: "), QLabel(f"{PropDetailsDlg.data.RemainingPayCount}")) addyLabel = self.selectable_line(PropDetailsDlg.data.PaymentAddress) - body.addRow(QLabel("PaymentAddress: "), addyLabel) - votes = "%d YEAS / " % PropDetailsDlg.data.Yeas - votes += "%d ABSTAINS / " % PropDetailsDlg.data.Abstains - votes += "%d NAYS" % PropDetailsDlg.data.Nays - body.addRow(QLabel("Votes: "), QLabel(votes)) - my_yeas = ["%s (%s)" % (x[0], strftime('%Y-%m-%d %H:%M:%S', - gmtime(x[1]))) for x in PropDetailsDlg.myYeas] + body.addRow(QLabel(f"PaymentAddress: "), addyLabel) + votes = (f"{PropDetailsDlg.data.Yeas} YEAS / " + + f"{PropDetailsDlg.data.Abstains} ABSTAINS / " + + f"{PropDetailsDlg.data.Nays} NAYS") + body.addRow(QLabel(f"Votes: "), QLabel(votes)) + my_yeas = [f"{x[0]} ({strftime('%Y-%m-%d %H:%M:%S', gmtime(x[1]))})" + for x in PropDetailsDlg.myYeas] body.addRow(QLabel("My Yeas: "), self.scroll(my_yeas)) - my_abstains = ["%s (%s)" % (x[0], strftime('%Y-%m-%d %H:%M:%S', - gmtime(x[1]))) for x in PropDetailsDlg.myAbstains] + my_abstains = [f"{x[0]} ({strftime('%Y-%m-%d %H:%M:%S', gmtime(x[1]))})" + for x in PropDetailsDlg.myAbstains] body.addRow(QLabel("My Abstains: "), self.scroll(my_abstains)) - my_nays = ["%s (%s)" % (x[0], strftime('%Y-%m-%d %H:%M:%S', - gmtime(x[1]))) for x in PropDetailsDlg.myNays] + my_nays = [f"{x[0]} ({strftime('%Y-%m-%d %H:%M:%S', gmtime(x[1]))})" + for x in PropDetailsDlg.myNays] body.addRow(QLabel("My Nays: "), self.scroll(my_nays)) layout.addLayout(body) self.okButton = QPushButton('OK') diff --git a/src/qt/dlg_signmessage.py b/src/qt/dlg_signmessage.py index 89ebb36..9cd6c0a 100644 --- a/src/qt/dlg_signmessage.py +++ b/src/qt/dlg_signmessage.py @@ -81,7 +81,7 @@ def loadAddressComboBox(self, mn_list): address = x['collateral'].get('address') hwAcc = x['hwAcc'] spath = x['collateral'].get('spath') - hwpath = "%d'/0/%d" % (hwAcc, spath) + hwpath = f"{hwAcc}'/0/{spath}" isTestnet = x['isTestnet'] comboBox.addItem(name, [address, hwpath, isTestnet]) # add generic address (bold) @@ -104,7 +104,7 @@ def displaySignature(self, sig): ok = ecdsa_verify_addr(self.ui.messageTextEdt.toPlainText(), b64encode(sig), self.currAddress) if not ok: mess = "Signature doesn't verify." - myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - no signature', mess) + myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - no signature', f"{mess}") def findPubKey(self): device = self.main_wnd.hwdevice @@ -114,7 +114,7 @@ def findPubKey(self): mess = "Unable to find public key. The action was refused on the device or another application " mess += "might have taken over the USB communication with the device.

" mess += "The operation was canceled." - myPopUp_sb(self.main_wnd, QMessageBox.Critical, 'SPMT - PK not found', mess) + myPopUp_sb(self.main_wnd, QMessageBox.Critical, 'SPMT - PK not found', f"{mess}") return self.updateGenericAddress(pk) @@ -133,11 +133,10 @@ def findSpath_done(self): addy = self.ui.addressLineEdit.text().strip() starting_spath = self.curr_starting_spath spath_count = self.curr_spath_count - mess = "Scanned addresses %d to %d of HW account %d.
" % ( - starting_spath, starting_spath + spath_count - 1, self.hwAcc) - mess += "Unable to find the address %s.
Maybe it's on a different account.

" % addy - mess += "Do you want to scan %d more addresses of account n.%d ?" % (spath_count, self.hwAcc) - ans = myPopUp(self.main_wnd, QMessageBox.Question, 'SPMT - spath search', mess) + mess = f"Scanned addresses {starting_spath} to {starting_spath + spath_count - 1} of HW account {self.hwAcc}.
" + mess += f"Unable to find the address {addy}.
Maybe it's on a different account.

" + mess += f"Do you want to scan {spath_count} more addresses of account n.{self.hwAcc} ?" + ans = myPopUp(self.main_wnd, QMessageBox.Question, 'SPMT - spath search', f"{mess}") if ans == QMessageBox.Yes: # Look for 10 more addresses starting_spath += spath_count @@ -163,7 +162,7 @@ def onChangeSelectedAddress(self): def onCopy(self): if self.ui.signatureTextEdt.document().isEmpty(): mess = "Nothing to copy. Sign message first." - myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - no signature', mess) + myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - no signature', f"{mess}") return cb = QApplication.clipboard() cb.clear(mode=cb.Clipboard) @@ -177,7 +176,7 @@ def onEdit(self): def onSave(self): if self.ui.signatureTextEdt.document().isEmpty(): mess = "Nothing to save. Sign message first." - myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - no signature', mess) + myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - no signature', f"{mess}") return options = QFileDialog.Options() options |= QFileDialog.DontUseNativeDialog @@ -192,7 +191,7 @@ def onSave(self): return except Exception as e: err_msg = "error writing signature to file" - printException(getCallerName(), getFunctionName(), err_msg, e.args) + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e.args}") myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - NOT saved', "Signature NOT saved to file") def onSearchPK(self): @@ -203,19 +202,19 @@ def onSearchPK(self): addy = self.ui.addressLineEdit.text().strip() if len(addy) == 0: mess = "No address. Insert PIVX address first." - myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - no address', mess) + myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - no address', f"{mess}") return if not checkPivxAddr(addy, self.currIsTestnet): net = "testnet" if self.currIsTestnet else "mainnet" - mess = "PIVX address not valid. Insert valid PIVX %s address" % net - myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - invalid address', mess) + mess = f"PIVX address not valid. Insert valid PIVX {net} address" + myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - invalid address', f"{mess}") return # check hw connection while self.main_wnd.hwStatus != 2: mess = "HW device not connected. Try to connect?" - ans = myPopUp(self.main_wnd, QMessageBox.Question, 'SPMT - hw check', mess) + ans = myPopUp(self.main_wnd, QMessageBox.Question, 'SPMT - hw check', f"{mess}") if ans == QMessageBox.No: return # re connect @@ -234,12 +233,12 @@ def onSign(self): # check message if self.ui.messageTextEdt.document().isEmpty(): mess = "Nothing to sign. Insert message." - myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - no message', mess) + myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - no message', f"{mess}") return # check hw connection while self.main_wnd.hwStatus != 2: mess = "HW device not connected. Try to connect?" - ans = myPopUp(self.main_wnd, QMessageBox.Question, 'SPMT - hw check', mess) + ans = myPopUp(self.main_wnd, QMessageBox.Question, 'SPMT - hw check', f"{mess}") if ans == QMessageBox.No: return # re connect @@ -257,10 +256,10 @@ def onSign(self): # wait for signal when device.sig1 is ready then --> displaySignature except Exception as e: err_msg = "error during signature" - printException(getCallerName(), getFunctionName(), err_msg, e.args) + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e.args}") except KeyboardInterrupt: err_msg = "Keyboard Interrupt" - printException(getCallerName(), getFunctionName(), err_msg, '') + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", '') def onToggleRadio(self, isFromAddress): self.ui.fromAddressBox.setVisible(isFromAddress) @@ -283,14 +282,14 @@ def updateGenericAddress(self, pk): # double check address addy = self.ui.addressLineEdit.text().strip() if addy != genericAddy: - mess = "Error! retrieved address (%s) different from input (%s)" % (genericAddy, addy) - myPopUp_sb(self.main_wnd, QMessageBox.Critical, 'SPMT - address mismatch', mess) + mess = f"Error! retrieved address ({genericAddy}) different from input ({addy})" + myPopUp_sb(self.main_wnd, QMessageBox.Critical, 'SPMT - address mismatch', f"{mess}") self.ui.addressLabel.setText("") return # update generic address self.setSignEnabled(True) self.currAddress = genericAddy - self.currHwPath = "%d'/0/%d" % (self.hwAcc, self.spath) + self.currHwPath = f"{self.hwAcc}'/0/{self.spath}" self.ui.addressLabel.setText(self.currAddress) self.ui.editBtn.setVisible(True) @@ -307,19 +306,19 @@ def onVerify(self): addy = self.ui.addressLineEdit.text().strip() if len(addy) == 0: mess = "No address inserted" - myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - no address', mess) + myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - no address', f"{mess}") return if not checkPivxAddr(addy, True) and not checkPivxAddr(addy, False): mess = "PIVX address not valid. Insert valid PIVX address" - myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - invalid address', mess) + myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - invalid address', f"{mess}") return if self.ui.messageTextEdt.document().isEmpty(): mess = "No message inserted" - myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - no message', mess) + myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - no message', f"{mess}") return if self.ui.signatureTextEdt.document().isEmpty(): mess = "No signature inserted" - myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - no signature', mess) + myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - no signature', f"{mess}") return try: @@ -327,14 +326,14 @@ def onVerify(self): self.ui.signatureTextEdt.toPlainText(), self.ui.addressLineEdit.text().strip()) except Exception as e: - mess = "Error decoding signature:\n" + str(e) - myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - invalid signature', mess) + mess = f"Error decoding signature:\n{e}" + myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'SPMT - invalid signature', f"{mess}") ok = False if ok: mess = "Signature OK" else: mess = "Signature doesn't verify" - mess = "" + mess + "" + mess = f"{mess}" self.ui.resultLabel.setText(mess) self.ui.resultLabel.setVisible(True) diff --git a/src/qt/dlg_sweepAll.py b/src/qt/dlg_sweepAll.py index 1d061d8..45495ba 100644 --- a/src/qt/dlg_sweepAll.py +++ b/src/qt/dlg_sweepAll.py @@ -63,7 +63,7 @@ def display_utxos(self): x = {} x['name'] = mn['name'] x['addr'] = mn['collateral'].get('address') - x['path'] = "%d'/0/%d" % (mn['hwAcc'], mn['collateral'].get('spath')) + x['path'] = f"{mn['hwAcc']}'/0/{mn['collateral'].get('spath')}" x['utxos'] = [r for r in rewards if r['mn_name'] == x['name'] # this mn's UTXOs and r['txid'] != mn['collateral'].get('txid') # except the collateral @@ -96,7 +96,7 @@ def item(value): self.ui.tableW.setItem(row, 1, item(mnode['addr'])) newInputs = len(mnode['utxos']) numOfInputs += newInputs - rewards_line = "%s PIV" % mnode['total_rewards'] + rewards_line = f"{mnode['total_rewards']} PIV" self.ui.tableW.setItem(row, 2, item(rewards_line)) self.ui.tableW.setItem(row, 3, item(str(newInputs))) @@ -105,8 +105,8 @@ def item(value): self.ui.tableW.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch) total = sum([float(mnode['total_rewards']) for mnode in self.rewardsArray]) - self.ui.totalLine.setText("%s PIV" % str(round(total, 8))) - self.ui.noOfUtxosLine.setText("%s" % str(numOfInputs)) + self.ui.totalLine.setText(f"{round(total, 8)} PIV") + self.ui.noOfUtxosLine.setText(f"{numOfInputs}") # update fee estimatedTxSize = (44 + numOfInputs * 148) * 1.0 / 1000 # kB @@ -124,7 +124,7 @@ def onButtonSend(self): # Check HW device while self.main_tab.caller.hwStatus != 2: mess = "HW device not connected. Try to connect?" - ans = myPopUp(self.main_tab.caller, QMessageBox.Question, 'SPMT - hw check', mess) + ans = myPopUp(self.main_tab.caller, QMessageBox.Question, 'SPMT - hw check', f"{mess}") if ans == QMessageBox.No: return # re connect @@ -161,7 +161,7 @@ def update_loading_utxos(self, percent): if percent < 100: self.ui.buttonSend.setEnabled(False) self.ui.lblMessage.show() - self.ui.lblMessage.setText("Loading rewards...%d%%" % percent) + self.ui.lblMessage.setText(f"Loading rewards...{percent}%") else: self.ui.buttonSend.setEnabled(True) self.ui.lblMessage.hide() diff --git a/src/qt/guiHeader.py b/src/qt/guiHeader.py index 8821c1f..67049a3 100644 --- a/src/qt/guiHeader.py +++ b/src/qt/guiHeader.py @@ -32,7 +32,7 @@ def __init__(self, caller, *args, **kwargs): self.button_checkRpc.setToolTip("try to connect to RPC server") self.centralBox.addWidget(self.button_checkRpc, 0, 2) self.rpcLed = QLabel() - self.rpcLed.setToolTip("%s" % caller.rpcStatusMess) + self.rpcLed.setToolTip(f"{caller.rpcStatusMess}") self.rpcLed.setPixmap(caller.ledGrayH_icon) self.centralBox.addWidget(self.rpcLed, 0, 3) self.lastPingBox = QWidget() @@ -69,7 +69,7 @@ def __init__(self, caller, *args, **kwargs): self.button_checkHw.setToolTip("try to connect to hardware device") self.centralBox.addWidget(self.button_checkHw, 1, 2) self.hwLed = QLabel() - self.hwLed.setToolTip("Status: %s" % caller.hwStatusMess) + self.hwLed.setToolTip(f"Status: {caller.hwStatusMess}") self.hwLed.setPixmap(caller.ledGrayH_icon) self.centralBox.addWidget(self.hwLed, 1, 3) self.layout.addLayout(self.centralBox) diff --git a/src/qt/gui_tabMNConf.py b/src/qt/gui_tabMNConf.py index c9f9e49..5c96ad4 100644 --- a/src/qt/gui_tabMNConf.py +++ b/src/qt/gui_tabMNConf.py @@ -59,7 +59,7 @@ def initConfigForm(self, masternode_alias=None): if not masternode_alias: self.configForm.setTitle("New Masternode") else: - self.configForm.setTitle("Edit Masternode [%s]" % masternode_alias) + self.configForm.setTitle(f"Edit Masternode [{masternode_alias}]") layout = QFormLayout() layout.setFieldGrowthPolicy(QFormLayout.AllNonFixedFieldsGrow) diff --git a/src/qt/gui_tabMain.py b/src/qt/gui_tabMain.py index 0bfceec..a0e47f7 100644 --- a/src/qt/gui_tabMain.py +++ b/src/qt/gui_tabMain.py @@ -87,9 +87,9 @@ def insert_mn_list(self, name, ip, port, row=None, isHardware=True): # --- Label & Balance self.mnLabel[name] = QLabel() if isHardware: - self.mnLabel[name].setText("%s [%s]" % (name, ip)) + self.mnLabel[name].setText(f"{name} [{ip}]") else: - self.mnLabel[name].setText("%s [%s]" % (name, ip)) + self.mnLabel[name].setText(f"{name} [{ip}]") mnRowLayout.addWidget(self.mnLabel[name]) self.mnBalance[name] = QLabel() mnRowLayout.addWidget(self.mnBalance[name]) @@ -109,12 +109,12 @@ def insert_mn_list(self, name, ip, port, row=None, isHardware=True): # --- Details button self.btn_details[name] = QToolButton() self.btn_details[name].setIcon(self.details_icon) - self.btn_details[name].setToolTip('Check status details of masternode "%s"' % name) + self.btn_details[name].setToolTip(f'Check status details of masternode "{name}"') mnRowLayout.addWidget(self.btn_details[name]) self.btn_details[name].hide() # --- Rewards button self.btn_rewards[name] = QPushButton() - self.btn_rewards[name].setToolTip('Transfer rewards from "%s"' % name) + self.btn_rewards[name].setToolTip(f'Transfer rewards from "{name}"') self.btn_rewards[name].setIcon(self.rewards_icon) self.btn_rewards[name].alias = name if not isHardware: @@ -123,7 +123,7 @@ def insert_mn_list(self, name, ip, port, row=None, isHardware=True): mnRowLayout.addWidget(self.btn_rewards[name]) # --- Start button self.btn_start[name] = QPushButton() - self.btn_start[name].setToolTip('Start masternode "%s"' % name) + self.btn_start[name].setToolTip(f'Start masternode "{name}"') self.btn_start[name].setIcon(self.startMN_icon) self.btn_start[name].alias = name if not isHardware: @@ -132,7 +132,7 @@ def insert_mn_list(self, name, ip, port, row=None, isHardware=True): mnRowLayout.addWidget(self.btn_start[name]) # --- Edit button self.btn_edit[name] = QPushButton() - self.btn_edit[name].setToolTip('Edit masternode "%s"' % name) + self.btn_edit[name].setToolTip(f'Edit masternode "{name}"') self.btn_edit[name].setIcon(self.editMN_icon) self.btn_edit[name].alias = name if not isHardware: @@ -141,7 +141,7 @@ def insert_mn_list(self, name, ip, port, row=None, isHardware=True): mnRowLayout.addWidget(self.btn_edit[name]) # --- Remove button self.btn_remove[name] = QPushButton() - self.btn_remove[name].setToolTip('Delete masternode "%s"' % name) + self.btn_remove[name].setToolTip(f'Delete masternode "{name}"') self.btn_remove[name].setIcon(self.removeMN_icon) self.btn_remove[name].alias = name mnRowLayout.addWidget(self.btn_remove[name]) diff --git a/src/rpcClient.py b/src/rpcClient.py index c2327cc..f29774d 100644 --- a/src/rpcClient.py +++ b/src/rpcClient.py @@ -16,22 +16,28 @@ def process_RPC_exceptions(func): - def process_RPC_exceptions_int(*args, **kwargs): + def wrapper(*args, **kwargs): try: args[0].httpConnection.connect() return func(*args, **kwargs) - except Exception as e: message = "Exception in RPC client" printException(getCallerName(True), getFunctionName(True), message, str(e)) + # Return a default value based on the expected return structure of the wrapped function + if func.__name__ == 'getStatus': + return False, "Error: RPC call failed", 0, None, False + elif func.__name__ == 'isBlockchainSynced': + return False, None + # Handle other functions or provide a general default return + else: + return None finally: try: args[0].httpConnection.close() except Exception as e: printDbg(e) pass - - return process_RPC_exceptions_int + return wrapper class RpcClient: @@ -40,11 +46,11 @@ def __init__(self, rpc_protocol, rpc_host, rpc_user, rpc_password): # Lock for threads self.lock = threading.RLock() - self.rpc_url = "%s://%s:%s@%s" % (rpc_protocol, rpc_user, rpc_password, rpc_host) + self.rpc_url = f"{rpc_protocol}://{rpc_user}:{rpc_password}@{rpc_host}" host, port = rpc_host.split(":") if rpc_protocol == "https": - self.httpConnection = httplib.HTTPSConnection(host, port, timeout=20, context=ssl._create_unverified_context()) + self.httpConnection = httplib.HTTPSConnection(host, port, timeout=20, context=ssl.create_default_context()) else: self.httpConnection = httplib.HTTPConnection(host, port, timeout=20) diff --git a/src/spmtApp.py b/src/spmtApp.py index 8d0d692..f7d23ff 100644 --- a/src/spmtApp.py +++ b/src/spmtApp.py @@ -31,7 +31,7 @@ class ServiceExit(Exception): def service_shutdown(signum, frame): - print('Caught signal %d' % signum) + print(f'Caught signal {signum}') raise ServiceExit @@ -54,7 +54,7 @@ def __init__(self, imgDir, app, start_args): # Get version and title self.version = getSPMTVersion() - self.title = 'SPMT - Secure PIVX Masternode Tool - v.%s-%s' % (self.version['number'], self.version['tag']) + self.title = f'SPMT - Secure PIVX Masternode Tool - v.{self.version["number"]}-{self.version["tag"]}' # Open database self.db = Database(self) diff --git a/src/tabGovernance.py b/src/tabGovernance.py index 24358d9..11db802 100644 --- a/src/tabGovernance.py +++ b/src/tabGovernance.py @@ -101,7 +101,7 @@ def itemButton(value, icon_num): btn = QPushButton() if icon_num == 0: btn.setIcon(self.ui.link_icon) - btn.setToolTip("Open WebPage: %s" % str(value)) + btn.setToolTip(f"Open WebPage: {value}") btn.clicked.connect(lambda: QDesktopServices.openUrl(QUrl(str(value)))) else: btn.setIcon(self.ui.search_icon) @@ -116,7 +116,7 @@ def itemButton(value, icon_num): # update MN count mnCount = self.caller.parent.cache['MN_count'] - self.ui.mnCountLabel.setText("Total MN Count: %d" % mnCount) + self.ui.mnCountLabel.setText("Total MN Count: {mnCount}") # Make room for new list self.ui.proposalBox.setRowCount(len(proposals)) @@ -141,11 +141,11 @@ def itemButton(value, icon_num): self.ui.proposalBox.setItem(row, 3, monthlyPay) # 4 - payments - payments = "%d / %d" % (prop.RemainingPayCount, prop.TotalPayCount) + payments = f"{prop.RemainingPayCount} / {prop.TotalPayCount}" self.ui.proposalBox.setItem(row, 4, item(payments)) # 5 - network votes - net_votes = "%d / %d / %d" % (prop.Yeas, prop.Abstains, prop.Nays) + net_votes = f"{prop.Yeas} / {prop.Abstains} / {prop.Nays}" votes = item(net_votes) if (prop.Yeas - prop.Nays) > 0.1 * mnCount: votes.setBackground(Qt.green) @@ -173,7 +173,7 @@ def itemButton(value, icon_num): def loadProposals_thread(self, ctrl): if not self.caller.rpcConnected: - printException(getCallerName(), getFunctionName(), "RPC server not connected", "") + printException(f"{getCallerName()} {getFunctionName()} RPC server not connected") return # clear proposals DB @@ -238,11 +238,11 @@ def onToggleExpiring(self): def onVote(self, vote_code): if len(self.selectedProposals) == 0: message = "NO PROPOSAL SELECTED. Select proposals from the list." - myPopUp_sb(self.caller, "crit", 'Vote on proposals', message) + myPopUp_sb(self.caller, "crit", 'Vote on proposals', f"{message}") return if len(self.votingMasternodes) == 0: message = "NO MASTERNODE SELECTED FOR VOTING. Click on 'Select Masternodes...'" - myPopUp_sb(self.caller, "crit", 'Vote on proposals', message) + myPopUp_sb(self.caller, "crit", 'Vote on proposals', f"{message}") return reply = self.summaryDlg(vote_code) @@ -251,13 +251,13 @@ def onVote(self, vote_code): ThreadFuns.runInThread(self.vote_thread, ([vote_code]), self.vote_thread_end) def summaryDlg(self, vote_code): - message = "Voting %s on the following proposal(s):

" % str(self.vote_codes[vote_code]).upper() + message = f"Voting {self.vote_codes[vote_code].upper()} on the following proposal(s):

" for prop in self.selectedProposals: - message += "  - %s
    (%s)

" % (prop.name, prop.Hash) + message += f"  - {prop.name}
    ({prop.Hash})

" message += "
with following masternode(s):

" - for mn in self.votingMasternodes: - message += "  - %s
" % mn[1] + message += f"  - {mn[1]}
" + dlg = ScrollMessageBox(self.caller, message) @@ -282,7 +282,7 @@ def updateSelectedMNlabel(self): if selected_MN == 1: label = "1 masternode selected for voting" else: - label = "%d masternodes selected for voting" % selected_MN + label = f"{selected_MN} masternodes selected for voting" self.ui.selectedMNlabel.setText(label) def updateSelection(self): @@ -290,7 +290,7 @@ def updateSelection(self): if len(self.selectedProposals) == 1: self.ui.selectedPropLabel.setText("1 proposal selected") else: - self.ui.selectedPropLabel.setText("%d proposals selected" % len(self.selectedProposals)) + self.ui.selectedPropLabel.setText(f"{len(self.selectedProposals)} {'proposal' if len(self.selectedProposals) == 1 else 'proposals'} selected") def getBudgetVoteMess(self, fNewSigs, txid, txidn, hash, vote_code, sig_time): if fNewSigs: @@ -302,14 +302,14 @@ def getBudgetVoteMess(self, fNewSigs, txid, txidn, hash, vote_code, sig_time): ss += (sig_time).to_bytes(8, byteorder='little') return bitcoin.bin_dbl_sha256(ss) else: - serialize_for_sig = '%s-%d' % (txid, txidn) - serialize_for_sig += hash + str(vote_code) + str(sig_time) + serialize_for_sig = f'{txid}-{txidn}' + serialize_for_sig += f'{hash} {vote_code} {sig_time}' return serialize_for_sig def vote_thread(self, ctrl, vote_code): # vote_code index for ["yes", "abstain", "no"] if not isinstance(vote_code, int) or vote_code not in range(3): - raise Exception("Wrong vote_code %s" % str(vote_code)) + raise Exception(f"Wrong vote_code {vote_code}") self.successVotes = 0 self.failedVotes = 0 self.currHeight = self.caller.rpcClient.getBlockCount() @@ -329,7 +329,7 @@ def vote_thread(self, ctrl, vote_code): # Get mnPrivKey currNode = next(x for x in self.caller.masternode_list if x['name'] == mn[1]) if currNode is None: - printDbg("currNode not found for current voting masternode %s" % mn[1]) + printDbg(f"currNode not found for current voting masternode {mn[1]}") self.clear() raise Exception() mnPrivKey = currNode['mnPrivKey'] @@ -343,12 +343,13 @@ def vote_thread(self, ctrl, vote_code): sig_time += delay_secs # Print Debug line to console - mess = "Processing '%s' vote on behalf of masternode [%s]" % (self.vote_codes[vote_code], mn[1]) - mess += " for the proposal {%s}" % prop.name + mess = f"Processing '{self.vote_codes[vote_code]}' vote on behalf of masternode [{mn[1]}] " + mess += f"for the proposal {{{prop.name}}}" if self.ui.randomDelayCheck.isChecked(): - mess += " with offset of %d seconds" % delay_secs + mess += f" with offset of {delay_secs} seconds" printDbg(mess) + # Serialize and sign vote fNewSigs = NewSigsActive(self.currHeight, self.isTestnet) serialize_for_sig = self.getBudgetVoteMess(fNewSigs, @@ -383,9 +384,9 @@ def vote_thread(self, ctrl, vote_code): def vote_thread_end(self): message = '

Votes sent

' if self.successVotes > 0: - message += '

Successful Votes: %d

' % self.successVotes + message += f'

Successful Votes: {self.successVotes}

' if self.failedVotes > 0: - message += '

Failed Votes: %d' % self.failedVotes + message += f'

Failed Votes: {self.failedVotes}' myPopUp_sb(self.caller, "info", 'Vote Finished', message) # refresh my votes on proposals self.ui.selectedPropLabel.setText("0 proposals selected") diff --git a/src/tabMNConf.py b/src/tabMNConf.py index 28f1092..d97b039 100644 --- a/src/tabMNConf.py +++ b/src/tabMNConf.py @@ -42,7 +42,7 @@ def addressToSpath(self): # Check HW device if self.caller.hwStatus != 2: myPopUp_sb(self.caller, "crit", 'SPMT - hw device check', "Connect to HW device first") - printDbg("Unable to connect to hardware device. The device status is: %d" % self.caller.hwStatus) + printDbg(f"Unable to connect to hardware device. The device status is: {self.caller.hwStatus}") return None self.runInThread(self.findSpath, (0, 10), self.findSpath_done) @@ -51,7 +51,7 @@ def findSpath(self, ctrl, starting_spath, spath_count): currHwAcc = self.ui.edt_hwAccount.value() # first scan. Subsequent called by findSpath_done self.spath_found, self.spath = self.caller.hwdevice.scanForBip32(currHwAcc, currAddr, starting_spath, spath_count, self.isTestnet()) - printOK("Bip32 scan complete. result=%s spath=%s" % (self.spath_found, self.spath)) + printOK(f"Bip32 scan complete. result={self.spath_found} spath={self.spath}") self.curr_starting_spath = starting_spath self.curr_spath_count = spath_count @@ -63,17 +63,17 @@ def findSpath_done(self): spath_count = self.curr_spath_count if self.spath_found: - printOK("spath is %d" % spath) - mess = "Found address %s in HW account %s with spath_id %s" % (currAddr, currHwAcc, spath) - myPopUp_sb(self.caller, "info", 'SPMT - spath search', mess) + printOK(f"spath is {spath}") + mess = f"Found address {currAddr} in HW account {currHwAcc} with spath_id {spath}" + myPopUp_sb(self.caller, "info", 'SPMT - spath search', f"{mess}") self.ui.edt_spath.setValue(spath) self.findPubKey() else: - mess = "Scanned addresses %d to %d of HW account %d.
" % (starting_spath, starting_spath + spath_count - 1, currHwAcc) - mess += "Unable to find the address %s.
Maybe it's on a different account.

" % currAddr - mess += "Do you want to scan %d more addresses of account n.%d ?" % (spath_count, currHwAcc) - ans = myPopUp(self.caller, "crit", 'SPMT - spath search', mess) + mess = f"Scanned addresses {starting_spath} to {starting_spath + spath_count - 1} of HW account {currHwAcc}.
" + mess += f"Unable to find the address {currAddr}.
Maybe it's on a different account.

" + mess += f"Do you want to scan {spath_count} more addresses of account n.{currHwAcc} ?" + ans = myPopUp(self.caller, "crit", 'SPMT - spath search', f"{mess}") if ans == QMessageBox.Yes: starting_spath += spath_count self.runInThread(self.findSpath, (starting_spath, spath_count), self.findSpath_done) @@ -85,7 +85,7 @@ def findPubKey(self): # Check HW device if self.caller.hwStatus != 2: myPopUp_sb(self.caller, "crit", 'SPMT - hw device check', "Connect to HW device first") - printDbg("Unable to connect to hardware device. The device status is: %d" % self.caller.hwStatus) + printDbg(f"Unable to connect to hardware device. The device status is: {self.caller.hwStatus}") return None result = self.caller.hwdevice.scanForPubKey(currHwAcc, currSpath, self.isTestnet()) @@ -94,7 +94,7 @@ def findPubKey(self): warningText = "Unable to find public key. The action was refused on the device or another application " warningText += "might have taken over the USB communication with the device.

" warningText += "To continue click the Retry button.\nTo cancel, click the Abort button." - mBox = QMessageBox(QMessageBox.Critical, "WARNING", warningText, QMessageBox.Retry) + mBox = QMessageBox(QMessageBox.Critical, "WARNING", f"{warningText}", QMessageBox.Retry) mBox.setStandardButtons(QMessageBox.Retry | QMessageBox.Abort) while result is None: @@ -107,9 +107,9 @@ def findPubKey(self): result = self.caller.hwdevice.scanForPubKey(currHwAcc, currSpath, self.isTestnet()) - mess = "Found public key:\n%s" % result - myPopUp_sb(self.caller, "info", "SPMT - findPubKey", mess) - printOK("Public Key: %s" % result) + mess = f"Found public key:\n{result}" + myPopUp_sb(self.caller, "info", "SPMT - findPubKey", f"{mess}") + printOK(f"Public Key: {result}") self.ui.edt_pubKey.setText(result) def findRow_mn_list(self, name): @@ -159,7 +159,7 @@ def onLookupTx(self): printDbg("Checking RPC connection") if not self.caller.rpcConnected: myPopUp_sb(self.caller, "crit", 'SPMT - hw device check', "Connect to RPC server first") - printDbg("Unable to connect: %s" % self.caller.rpcStatusMess) + printDbg(f"Unable to connect: {self.caller.rpcStatusMess}") return None try: # Update Lookup dialog @@ -170,7 +170,7 @@ def onLookupTx(self): self.ui.edt_txidn.setValue(txidn) except Exception as e: - printDbg(e) + printDbg(f"{e}") def onGenerateMNkey(self): printDbg("Generate MNkey pressed") @@ -190,17 +190,17 @@ def onSaveMNConf(self): try: if self.ui.edt_pubKey.text() == "" or self.ui.edt_txid.text() == "" or self.ui.edt_mnPrivKey.text() == "": mess_text = 'Attention! Complete the form before saving.
' - mess_text += "pubKey = %s
" % self.ui.edt_pubKey.text() - mess_text += "txId = %s
" % self.ui.edt_txid.text() - mess_text += "mnPrivKey = %s
" % self.ui.edt_mnPrivKey.text() - myPopUp_sb(self.caller, "crit", 'Complete Form', mess_text) + mess_text += f"pubKey = {self.ui.edt_pubKey.text()}
" + mess_text += f"txId = {self.ui.edt_txid.text()}
" + mess_text += f"mnPrivKey = {self.ui.edt_mnPrivKey.text()}
" + myPopUp_sb(self.caller, "crit", 'Complete Form', f"{mess_text}") return if not is_hex(self.ui.edt_txid.text()): mess_text = 'Attention! txid format is not valid.
' - mess_text += "txId = %s
" % self.ui.edt_txid.text() + mess_text += f"txId = {self.ui.edt_txid.text()}
" mess_text += 'transaction id must be in hex format.
' - myPopUp_sb(self.caller, "crit", 'Complete Form', mess_text) + myPopUp_sb(self.caller, "crit", 'Complete Form', f"{mess_text}") return # check for duplicate names @@ -210,9 +210,9 @@ def onSaveMNConf(self): if self.caller.mnode_to_change is not None: old_alias = self.caller.mnode_to_change['name'] if self.caller.isMasternodeInList(mn_alias) and old_alias != mn_alias: - mess_text = 'Attention! The name %s is already in use for another masternode.
' % mn_alias + mess_text = f'Attention! The name {mn_alias} is already in use for another masternode.
' mess_text += 'Choose a different name (alias) for the masternode' - myPopUp_sb(self.caller, "crit", 'Complete Form', mess_text) + myPopUp_sb(self.caller, "crit", 'Complete Form', f"{mess_text}") return # create new item @@ -244,9 +244,9 @@ def onSaveMNConf(self): self.onCancelMNConfig() except Exception as e: - error_msg = "ERROR: %s" % e + error_msg = f"ERROR: {e}" printDbg(error_msg) - myPopUp_sb(self.caller, "crit", 'ERROR', error_msg) + myPopUp_sb(self.caller, "crit", 'ERROR', f"{error_msg}") def spathToAddress(self): printOK("spathToAddress pressed") @@ -255,7 +255,7 @@ def spathToAddress(self): # Check HW device if self.caller.hwStatus != 2: myPopUp_sb(self.caller, "crit", 'SPMT - hw device check', "Connect to HW device first") - printDbg("Unable to connect to hardware device. The device status is: %d" % self.caller.hwStatus) + printDbg(f"Unable to connect to hardware device. The device status is: {self.caller.hwStatus}") return None addr = self.caller.hwdevice.scanForAddress(currHwAcc, currSpath, self.isTestnet()) if addr: diff --git a/src/tabMain.py b/src/tabMain.py index fc8ae29..e627770 100644 --- a/src/tabMain.py +++ b/src/tabMain.py @@ -42,7 +42,7 @@ def __init__(self, caller): def displayMNlistUpdated(self): for masternode in self.caller.masternode_list: - printDbg("Checking %s (%s)..." % (masternode['name'], masternode['collateral'].get('txid'))) + printDbg(f"Checking {masternode['name']} ({masternode['collateral'].get('txid')})...") self.displayMNStatus(masternode) time.sleep(0.1) @@ -55,8 +55,8 @@ def displayMNStatus(self, currMN): try: statusData['balance'] = self.caller.apiClient.getBalance(mn.get('addr')) except Exception as e: - err_msg = "error getting balance of %s" % mn.get('addr') - printException(getCallerName(), getFunctionName(), err_msg, e) + err_msg = f"error getting balance of {mn.get('addr')}" + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e}") masternode_alias = currMN['name'] self.ui.btn_details[masternode_alias].disconnect() @@ -64,7 +64,7 @@ def displayMNStatus(self, currMN): self.ui.btn_details[masternode_alias].show() if statusData is None: - printOK("%s Not Found" % (masternode_alias)) + printOK(f"{masternode_alias} Not Found") self.ui.mnLed[masternode_alias].setPixmap(self.caller.ledGrayV_icon) msg = "Masternode not found." self.ui.mnStatusLabel[masternode_alias].setText(msg) @@ -73,22 +73,22 @@ def displayMNStatus(self, currMN): else: display_text = "" if statusData['balance'] is not None: - self.ui.mnBalance[masternode_alias].setText(' %s PIV' % str(statusData['balance'])) + self.ui.mnBalance[masternode_alias].setText(f' {statusData["balance"]} PIV') self.ui.mnBalance[masternode_alias].show() - printOK("Got status %s for %s" % (statusData['status'], masternode_alias)) + printOK(f"Got status {statusData['status']} for {masternode_alias}") if statusData['status'] == 'ENABLED': self.ui.mnLed[masternode_alias].setPixmap(self.caller.ledGreenV_icon) - display_text += '%s  ' % statusData['status'] + display_text += f'{statusData["status"]}  ' position = statusData.get('queue_pos') total_count = len(self.all_masternodes.get('masternodes')) - display_text += '%d/%d' % (position, total_count) + display_text += f'{position}/{total_count}' self.ui.mnStatusProgress[masternode_alias].setRange(0, total_count) self.ui.mnStatusProgress[masternode_alias].setValue(total_count - position) self.ui.mnStatusProgress[masternode_alias].show() else: self.ui.mnLed[masternode_alias].setPixmap(self.caller.ledRedV_icon) - display_text += '%s  ' % statusData['status'] + display_text += f'{statusData["status"]}  ' self.ui.mnStatusLabel[masternode_alias].setText(display_text) self.ui.mnStatusLabel[masternode_alias].show() @@ -99,7 +99,7 @@ def onCheckAllMN(self): if not self.caller.rpcConnected: myPopUp_sb(self.caller, "crit", 'SPMT - hw device check', "RPC server must be connected to perform this action.") - printDbg("Unable to connect: %s" % self.caller.rpcStatusMess) + printDbg(f"Unable to connect: {self.caller.rpcStatusMess}") return if self.caller.masternode_list is None or self.caller.masternode_list == []: myPopUp_sb(self.caller, "crit", 'SPMT - Check-All masternodes', @@ -120,7 +120,7 @@ def onDisplayStatusDetails(self, masternode_alias, statusData): except Exception as e: err_msg = "error in displayStatusDetails" - printException(getCallerName(), getFunctionName(), err_msg, e.args) + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e.args}") def onEditMN(self, data=None): if not data: @@ -146,7 +146,7 @@ def onRemoveMN(self, data=None): masternode_alias = target.alias reply = myPopUp(self.caller, "warn", 'Confirm REMOVE', - "Are you sure you want to remove\nmasternoode:'%s'" % masternode_alias, QMessageBox.No) + f"Are you sure you want to remove\nmasternoode:'{masternode_alias}'", QMessageBox.No) if reply == QMessageBox.No: return @@ -190,7 +190,7 @@ def onStartAllMN(self): except Exception as e: err_msg = "error before starting node" - printException(getCallerName(), getFunctionName(), err_msg, e) + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e}") def onStartMN(self, data=None): # Check RPC & HW device @@ -202,11 +202,11 @@ def onStartMN(self, data=None): if not data: target = self.ui.sender() masternode_alias = target.alias - printOK("Start-masternode %s pressed" % masternode_alias) + printOK(f"Start-masternode {masternode_alias} pressed") for mn_conf in self.caller.masternode_list: if mn_conf['name'] == masternode_alias: reply = myPopUp(self.caller, QMessageBox.Question, 'Confirm START', - "Are you sure you want to start masternoode:\n'%s'?" % mn_conf['name'], + f"Are you sure you want to start masternoode:\n'{mn_conf['name']}'?", QMessageBox.Yes) if reply == QMessageBox.Yes: self.masternodeToStart = Masternode(self, mn_conf['name'], mn_conf['ip'], mn_conf['port'], @@ -220,7 +220,7 @@ def onStartMN(self, data=None): except Exception as e: err_msg = "error before starting node" - printException(getCallerName(), getFunctionName(), err_msg, e) + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e}") def onSweepAllRewards(self): if not self.caller.rpcConnected: @@ -231,7 +231,7 @@ def onSweepAllRewards(self): except Exception as e: err_msg = "exception in SweepAll_dlg" - printException(getCallerName(), getFunctionName(), err_msg, e) + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e}") # Activated by signal 'sigdone' from masternode def sendBroadcast(self, text): @@ -240,7 +240,7 @@ def sendBroadcast(self, text): return printOK("Start Message ready for being relayed...") - logging.debug("Start Message: %s" % text) + logging.debug(f"Start Message: {text}") ret = self.caller.rpcClient.decodemasternodebroadcast(text) if ret is None: myPopUp_sb(self.caller, "crit", 'message decoding failed', 'message decoding failed') @@ -249,7 +249,7 @@ def sendBroadcast(self, text): return msg = "Broadcast START message?\n" + json.dumps(ret, indent=4, sort_keys=True) - reply = myPopUp(self.caller, "quest", 'message decoded', msg, QMessageBox.Yes) + reply = myPopUp(self.caller, "quest", 'message decoded', f"{msg}", QMessageBox.Yes) if reply == QMessageBox.No: self.sendBroadcastCheck() return @@ -262,10 +262,10 @@ def sendBroadcast(self, text): message += "If your remote server is correctly configured and connected to the network, " message += "the output of the ./pivx-cli getmasternodestatus command on the VPS should show:
" message += "
\"message\": \"Masternode successfully started\"" - myPopUp_sb(self.caller, "info", 'message relayed', message) + myPopUp_sb(self.caller, "info", 'message relayed', f"{message}") else: printDbg("Masternode broadcast NOT sent") - printException(getCallerName(), getFunctionName(), json.dumps(ret2), "Error sending masternode broadcast") + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{json.dumps(ret2)}", "Error sending masternode broadcast") self.sendBroadcastCheck() @@ -276,12 +276,12 @@ def sendBroadcastCheck(self): def startMN(self): if self.caller.hwStatus != 2: - myPopUp_sb(self.caller, "warn", 'SPMT - hw device check', self.caller.hwStatusMess) + myPopUp_sb(self.caller, "warn", 'SPMT - hw device check', f"{self.caller.hwStatusMess}") elif not self.caller.rpcConnected: - myPopUp_sb(self.caller, "warn", 'SPMT - rpc device check', self.caller.rpcStatusMess) + myPopUp_sb(self.caller, "warn", 'SPMT - rpc device check', f"{self.caller.rpcStatusMess}") else: self.masternodeToStart = self.mnToStartList.pop() - printDbg("Starting...%s" % self.masternodeToStart.name) + printDbg(f"Starting...{self.masternodeToStart.name}") self.masternodeToStart.startMessage(self.caller.hwdevice, self.caller.rpcClient) # wait for signal when masternode.work is ready then ---> sendBroadcast diff --git a/src/tabRewards.py b/src/tabRewards.py index 80005dd..6019292 100644 --- a/src/tabRewards.py +++ b/src/tabRewards.py @@ -97,7 +97,7 @@ def item(value): # mark cold utxos if utxo['staker'] != "": self.ui.rewardsList.box.item(row, 2).setIcon(self.caller.tabMain.coldStaking_icon) - self.ui.rewardsList.box.item(row, 2).setToolTip("Staked by %s" % utxo['staker']) + self.ui.rewardsList.box.item(row, 2).setToolTip(f"Staked by {utxo['staker']}") # MARK COLLATERAL UTXO if txId == self.curr_txid: @@ -113,7 +113,7 @@ def item(value): self.ui.rewardsList.box.item(row, i).setFlags(Qt.NoItemFlags) ttip = self.ui.rewardsList.box.item(row, i).toolTip() self.ui.rewardsList.box.item(row, i).setToolTip( - ttip + "\n(Immature - %d confirmations required)" % required) + ttip + f"\n(Immature - {required} confirmations required)") self.ui.rewardsList.box.resizeColumnsToContents() @@ -127,7 +127,7 @@ def item(value): if not self.caller.rpcConnected: self.ui.resetStatusLabel('PIVX wallet not connected') else: - self.ui.resetStatusLabel('Found no Rewards for %s' % self.curr_addr) + self.ui.resetStatusLabel(f'Found no Rewards for {self.curr_addr}') def getSelection(self): # Get selected rows indexes @@ -162,7 +162,7 @@ def loadMnSelect(self, isInitializing=False): txidn = x['collateral'].get('txidn') hwAcc = x['hwAcc'] spath = x['collateral'].get('spath') - hwpath = "%d'/0/%d" % (hwAcc, spath) + hwpath = f"{hwAcc}'/0/{spath}" self.ui.mnSelect.addItem(name, [address, txid, txidn, hwpath]) # restore previous index @@ -180,7 +180,7 @@ def load_utxos_thread(self, ctrl): # If rpc is not connected and hw device is Ledger, warn and return. if not self.caller.rpcConnected and self.caller.hwModel == 0: - printError(getCallerName(), getFunctionName(), 'PIVX daemon not connected - Unable to update UTXO list') + printError(f"{getCallerName()}", f"{getFunctionName()}", 'PIVX daemon not connected - Unable to update UTXO list') return total_num_of_utxos = 0 @@ -190,13 +190,13 @@ def load_utxos_thread(self, ctrl): rewards = self.caller.apiClient.getAddressUtxos(mn['collateral'].get('address')) if rewards is None: - printError(getCallerName(), getFunctionName(), 'API client not responding.') + printError(f"{getCallerName()}", f"{getFunctionName()}", 'API client not responding.') return mn_rewards[mn['name']] = rewards total_num_of_utxos += len(rewards) - printDbg("Number of UTXOs to load: %d" % total_num_of_utxos) + printDbg(f"Number of UTXOs to load: {total_num_of_utxos}") curr_utxo = 0 for mn in mn_rewards: @@ -206,7 +206,7 @@ def load_utxos_thread(self, ctrl): # Get raw tx rawtx = TxCache(self.caller)[utxo['txid']] if rawtx is None: - printDbg("Unable to get raw TX with hash=%s from RPC server." % utxo['txid']) + printDbg(f"Unable to get raw TX with hash={utxo['txid']} from RPC server.") # Don't save UTXO if raw TX is unavailable mn_rewards[mn].remove(utxo) continue @@ -280,21 +280,21 @@ def onSendRewards(self): warning1 = "Are you sure you want to transfer the collateral?" warning2 = "Really?" warning3 = "Take a deep breath. Do you REALLY want to transfer your collateral?" - ans = myPopUp(self.caller, "warn", 'SPMT - warning', warning1) + ans = myPopUp(self.caller, "warn", 'SPMT - warning', f"{warning1}") if ans == QMessageBox.No: return None else: - ans2 = myPopUp(self.caller, "warn", 'SPMT - warning', warning2) + ans2 = myPopUp(self.caller, "warn", 'SPMT - warning', f"{warning2}") if ans2 == QMessageBox.No: return None else: - ans2 = myPopUp(self.caller, "crit", 'SPMT - warning', warning3) + ans2 = myPopUp(self.caller, "crit", 'SPMT - warning', f"{warning3}") if ans2 == QMessageBox.No: return None # Check HW device while self.caller.hwStatus != 2: mess = "HW device not connected. Try to connect?" - ans = myPopUp(self.caller, QMessageBox.Question, 'SPMT - hw check', mess) + ans = myPopUp(self.caller, QMessageBox.Question, 'SPMT - hw check', f"{mess}") if ans == QMessageBox.No: return # re connect @@ -343,9 +343,9 @@ def SendRewards(self, inputs=None, gui=None): # LET'S GO if inputs is None: - printDbg("Sending from PIVX address %s to PIVX address %s " % (self.curr_addr, self.dest_addr)) + printDbg(f"Sending from PIVX address {self.curr_addr} to PIVX address {self.dest_addr}") else: - printDbg("Sweeping rewards to PIVX address %s " % self.dest_addr) + printDbg(f"Sweeping rewards to PIVX address {self.dest_addr}") printDbg("Preparing transaction. Please wait...") self.ui.loadingLine.show() self.ui.loadingLinePercent.show() @@ -380,7 +380,7 @@ def SendRewards(self, inputs=None, gui=None): err_msg = "Error while preparing transaction.
" err_msg += "Probably Blockchain wasn't synced when trying to fetch raw TXs.
" err_msg += "Wait for full synchronization then hit 'Clear/Reload'" - printException(getCallerName(), getFunctionName(), err_msg, e.args) + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e.args}") def onToggleCollateral(self): if self.ui.rewardsList.box.collateralRow is not None: @@ -425,12 +425,12 @@ def FinishSend_int(self, serialized_tx, amount_to_send): try: self.txFinished = True tx_hex = serialized_tx.hex() - printDbg("Raw signed transaction: " + tx_hex) - printDbg("Amount to send :" + amount_to_send) + printDbg(f"Raw signed transaction: {tx_hex}") + printDbg(f"Amount to send: {amount_to_send}") if len(tx_hex) > 90000: - mess = "Transaction's length exceeds 90000 bytes. Select less UTXOs and try again." - myPopUp_sb(self.caller, "crit", 'transaction Warning', mess) + mess = f"Transaction's length exceeds 90000 bytes. Select less UTXOs and try again." + myPopUp_sb(self.caller, "crit", 'transaction Warning', f"{mess}") else: decodedTx = None @@ -438,14 +438,14 @@ def FinishSend_int(self, serialized_tx, amount_to_send): decodedTx = ParseTx(tx_hex, self.caller.isTestnetRPC) destination = decodedTx.get("vout")[0].get("scriptPubKey").get("addresses")[0] amount = decodedTx.get("vout")[0].get("value") - message = '

Broadcast signed transaction?

Destination address:
%s

' % destination - message += '

Amount: %s PIV
' % str(round(amount / 1e8, 8)) - message += 'Fees: %s PIV
Size: %d Bytes

' % (str(round(self.currFee / 1e8, 8)), len(tx_hex) / 2) + message = f'

Broadcast signed transaction?

Destination address:
{destination}

' + message += f'

Amount: {round(amount / 1e8, 8)} PIV
' + message += f'Fees: {round(self.currFee / 1e8, 8)} PIV
Size: {len(tx_hex) / 2} Bytes

' except Exception as e: - printException(getCallerName(), getFunctionName(), "decoding exception", str(e)) + printException(f"{getCallerName()}", f"{getFunctionName()}", "decoding exception", f"{e}") message = '

Unable to decode TX- Broadcast anyway?

' - mess1 = QMessageBox(QMessageBox.Information, 'Send transaction', message, parent=self.caller) + mess1 = QMessageBox(QMessageBox.Information, 'Send transaction', f"{message}", parent=self.caller) if decodedTx is not None: mess1.setDetailedText(json.dumps(decodedTx, indent=4, sort_keys=False)) mess1.setStandardButtons(QMessageBox.Yes | QMessageBox.No) @@ -455,9 +455,9 @@ def FinishSend_int(self, serialized_tx, amount_to_send): txid = self.caller.rpcClient.sendRawTransaction(tx_hex) if txid is None: raise Exception("Unable to send TX - connection to RPC server lost.") - printDbg("Transaction sent. ID: %s" % txid) + printDbg(f"Transaction sent. ID: {txid}") mess2_text = "

Transaction successfully sent.

" - mess2 = QMessageBox(QMessageBox.Information, 'transaction Sent', mess2_text, parent=self.caller) + mess2 = QMessageBox(QMessageBox.Information, 'transaction Sent', f"{mess2_text}", parent=self.caller) mess2.setDetailedText(txid) mess2.exec_() # remove spent rewards from DB @@ -472,7 +472,7 @@ def FinishSend_int(self, serialized_tx, amount_to_send): except Exception as e: err_msg = "Exception in FinishSend" - printException(getCallerName(), getFunctionName(), err_msg, e.args) + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e.args}") # Activated by signal sigTxabort from hwdevice def AbortSend(self): @@ -500,8 +500,8 @@ def updateSelection(self, clicked_item=None): # update suggested fee and selected rewards estimatedTxSize = (44 + numOfInputs * 148) * 1.0 / 1000 # kB self.suggestedFee = round(self.feePerKb * estimatedTxSize, 8) - printDbg("estimatedTxSize is %s kB" % str(estimatedTxSize)) - printDbg("suggested fee is %s PIV (%s PIV/kB)" % (str(self.suggestedFee), str(self.feePerKb))) + printDbg(f"estimatedTxSize is {estimatedTxSize} kB") + printDbg(f"suggested fee is {self.suggestedFee} PIV ({self.feePerKb} PIV/kB)") self.ui.selectedRewardsLine.setText(str(round(total / 1e8, 8))) @@ -512,7 +512,7 @@ def updateSelection(self, clicked_item=None): def update_loading_utxos(self, percent): if percent < 100: - self.ui.resetStatusLabel('Checking explorer... %d%%' % percent) + self.ui.resetStatusLabel(f'Checking explorer... {percent}%') else: self.display_mn_utxos() @@ -523,4 +523,4 @@ def updateTotalBalance(self, rewards): nAmount = nAmount + utxo['satoshis'] totalBalance = str(round(nAmount / 1e8, 8)) - self.ui.addrAvailLine.setText("%s PIVs" % totalBalance) + self.ui.addrAvailLine.setText(f"{totalBalance} PIVs") diff --git a/src/tests/testHwDeviceMethods.py b/src/tests/testHwDeviceMethods.py index 356ef06..7848e47 100644 --- a/src/tests/testHwDeviceMethods.py +++ b/src/tests/testHwDeviceMethods.py @@ -112,7 +112,7 @@ def test_signature(self): # verify with rpc client result = self.rpcClient.verifyMessage(pivx_address, signature, message) - print("sig = %s\naddress=%s" % (signature, pivx_address)) + print(f"sig = {signature}\naddress={pivx_address}") self.assertTrue(result) # ----------------------------------------------------------------------------------- @@ -178,14 +178,14 @@ def signTx(self, device, bip32_path, utxos_to_spend, dest_address, tx_fee, rawtr raw_tx = bytearray.fromhex(rawtransactions[utxo['tx_hash']]) if not raw_tx: - raise Exception("Can't find raw transaction for txid: " + rawtransactions[utxo['tx_hash']]) + raise Exception(f"Can't find raw transaction for txid: {rawtransactions[utxo['tx_hash']]}") # parse the raw transaction, so that we can extract the UTXO locking script we refer to prev_transaction = bitcoinTransaction(raw_tx) utxo_tx_index = utxo['tx_ouput_n'] if utxo_tx_index < 0 or utxo_tx_index > len(prev_transaction.outputs): - raise Exception('Incorrect value of outputIndex for UTXO %s' % str(idx)) + raise Exception(f'Incorrect value of outputIndex for UTXO {idx}') trusted_input = self.device.chip.getTrustedInput(prev_transaction, utxo_tx_index) self.trusted_inputs.append(trusted_input) @@ -197,8 +197,8 @@ def signTx(self, device, bip32_path, utxos_to_spend, dest_address, tx_fee, rawtr if pubkey_hash != pubkey_hash_from_script: text = "Error: different public key hashes for the BIP32 path and the UTXO" text += "locking script. Your signed transaction will not be validated by the network.\n" - text += "pubkey_hash: %s\n" % str(pubkey_hash) - text += "pubkey_hash_from_script: %s\n" % str(pubkey_hash_from_script) + text += f"pubkey_hash: {pubkey_hash}\n" + text += f"pubkey_hash_from_script: {pubkey_hash_from_script}\n" print(text) self.arg_inputs.append({ diff --git a/src/tests/testPivx_b58Methods.py b/src/tests/testPivx_b58Methods.py index a3968bf..8cd26ac 100644 --- a/src/tests/testPivx_b58Methods.py +++ b/src/tests/testPivx_b58Methods.py @@ -14,20 +14,20 @@ class TestPivx_b58Methods(unittest.TestCase): def test_encodeDecode(self): # get 32 random bytes text = self.randomBytesString(32) - print("\nRandom Bytes: %s" % text.hex()) + print(f"\nRandom Bytes: {text.hex()}") # encode base58 encoded_text = b58encode(text) - print("\nEncoded Text: %s\n" % encoded_text) + print(f"\nEncoded Text: {encoded_text}\n") # verify self.assertEqual(b58decode(encoded_text), text) def test_decodeEncode(self): # get 10 random base58 chars text = self.randomB58String(10) - print("\nRandom Text: %s" % text) + print(f"\nRandom Text: {text}") # decode base58 decoded_text = b58decode(text) - print("\nDecoded Text: %s\n" % decoded_text) + print(f"\nDecoded Text: {decoded_text}\n") # verify self.assertEqual(b58encode(decoded_text), text) diff --git a/src/threads.py b/src/threads.py index 3a22896..4a9cb8a 100644 --- a/src/threads.py +++ b/src/threads.py @@ -44,7 +44,7 @@ def on_thread_finished_int(thread_arg, on_thread_finish_arg, skip_raise_exceptio # starting thread from another thread causes an issue of not passing arguments' # values to on_thread_finished_int function, so on_thread_finish is not called st = traceback.format_stack() - print('Running thread from inside another thread. Stack: \n' + ''.join(st)) + print(r'Running thread from inside another thread. Stack: \n' + '\n'.join(st)) thread = WorkerThread(worker_fun=worker_fun, worker_fun_args=worker_fun_args) diff --git a/src/trezorClient.py b/src/trezorClient.py index f3232ae..543a548 100644 --- a/src/trezorClient.py +++ b/src/trezorClient.py @@ -40,7 +40,7 @@ def process_trezor_exceptions_int(*args, **kwargs): return except Exception as e: err_mess = "Trezor Exception" - printException(getCallerName(True), getFunctionName(True), err_mess, str(e)) + printException(f"{getCallerName(True)}", f"{getFunctionName(True)}", f"{err_mess}", f"{e}") raise DisconnectedException(str(e), hwDevice) return process_trezor_exceptions_int @@ -81,7 +81,7 @@ def __init__(self, model, main_wnd, *args, **kwargs): @process_trezor_exceptions def append_inputs_to_TX(self, utxo, bip32_path, inputs): if utxo['staker'] != "": - printException(getCallerName(), getFunctionName(), "Unable to sing P2CS on Trezor", "") + printException(f"{getCallerName()}", f"{getFunctionName()}", "Unable to sing P2CS on Trezor", "") return # Update amount self.amount += int(utxo['satoshis']) @@ -129,23 +129,17 @@ def initDevice(self): self.client = TrezorClient(d, ui) except IOError: raise Exception("TREZOR device is currently in use") - printOK("Trezor HW device connected [v. %s.%s.%s]" % ( - self.client.features.major_version, - self.client.features.minor_version, - self.client.features.patch_version) - ) + printOK(f"Trezor HW device connected [v.{self.client.features.major_version}.{self.client.features.minor_version}.{self.client.features.patch_version}]") self.status = 1 model = self.client.features.model or "1" if not self.checkModel(model): self.status = 3 - self.messages[3] = "Wrong device model (%s) detected.\nLooking for model %s." % ( - HW_devices[self.model][0], model - ) + self.messages[3] = f"Wrong device model ({HW_devices[self.model][0]}) detected.\nLooking for model {model}." return required_version = MINIMUM_FIRMWARE_VERSION[model] - printDbg("Current version is %s (minimum required: %s)" % (str(self.client.version), str(required_version))) + printDbg(f"Current version is {self.client.version} (minimum required: {required_version})") # Check device is unlocked - bip32_path = parse_path(MPATH + "%d'/0/%d" % (0, 0)) + bip32_path = parse_path(MPATH + f"{0}'/0/{0}") _ = btc.get_address(self.client, 'PIVX', bip32_path, False) self.status = 2 @@ -229,9 +223,9 @@ def prepare_transfer_tx_bulk(self, caller, rewardsArray, dest_address, tx_fee, i self.mBox2 = QMessageBox(caller) self.messageText = "

Signing transaction...

" # messageText += "From bip32_path: %s

" % str(bip32_path) - self.messageText += "

Payment to:
%s

" % dest_address - self.messageText += "

Net amount:
%s PIV

" % str(round(self.amount / 1e8, 8)) - self.messageText += "

Fees:
%s PIV

" % str(round(int(tx_fee) / 1e8, 8)) + self.messageText += f"

Payment to:
{dest_address}

" + self.messageText += f"

Net amount:
{round(self.amount / 1e8, 8)} PIV

" + self.messageText += f"

Fees:
{round(int(tx_fee) / 1e8, 8)} PIV

" messageText = self.messageText + "Signature Progress: 0 %" self.mBox2.setText(messageText) self.setBoxIcon(self.mBox2, caller) @@ -246,17 +240,17 @@ def prepare_transfer_tx_bulk(self, caller, rewardsArray, dest_address, tx_fee, i def scanForAddress(self, account, spath, isTestnet=False): with self.lock: if not isTestnet: - curr_path = parse_path(MPATH + "%d'/0/%d" % (account, spath)) + curr_path = parse_path(MPATH + f"{account}'/0/{spath}") curr_addr = btc.get_address(self.client, 'PIVX', curr_path, False) else: - curr_path = parse_path(MPATH_TESTNET + "%d'/0/%d" % (account, spath)) + curr_path = parse_path(MPATH_TESTNET + f"{account}'/0/{spath}") curr_addr = btc.get_address(self.client, 'PIVX Testnet', curr_path, False) return curr_addr @process_trezor_exceptions def scanForPubKey(self, account, spath, isTestnet=False): - hwpath = "%d'/0/%d" % (account, spath) + hwpath = f"{account}'/0/{spath}" if isTestnet: path = MPATH_TESTNET + hwpath else: @@ -281,8 +275,7 @@ def signMess(self, caller, hwpath, message, isTestnet=False): path = MPATH + hwpath # Connection pop-up self.mBox = QMessageBox(caller) - messageText = "Check display of your hardware device\n\n- message:\n\n%s\n\n-path:\t%s\n" % ( - splitString(message, 32), path) + messageText = f"Check display of your hardware device\n\n- message:\n\n{splitString(message, 32)}\n\n-path:\t{path}\n" self.mBox.setText(messageText) self.setBoxIcon(self.mBox, caller) self.mBox.setWindowTitle("CHECK YOUR TREZOR") @@ -342,7 +335,7 @@ def updateSigProgress(self, percent): t = self.mBox2.text() messageText = t + "
Please confirm action on your Trezor device..." else: - messageText = self.messageText + "Signature Progress: " + str(percent) + " %" + messageText = self.messageText + f"Signature Progress: {percent}%" self.mBox2.setText(messageText) QApplication.processEvents() @@ -407,7 +400,7 @@ def copy_tx_meta(tx): idx = res.serialized.signature_index sig = res.serialized.signature if signatures[idx] is not None: - raise ValueError("Signature for index %d already filled" % idx) + raise ValueError(f"Signature for index {idx} already filled") signatures[idx] = sig # emit completion percent percent = 10 + int(90 * (idx+1) / len(signatures)) @@ -479,7 +472,7 @@ def get_pin(self, code=None) -> str: else: desc = "PIN" - pin = ask_for_pin_callback("Please enter {}".format(desc)) + pin = ask_for_pin_callback(f"Please enter {desc}") if pin is None: raise exceptions.Cancelled return pin diff --git a/src/utils.py b/src/utils.py index 000eefd..5310339 100644 --- a/src/utils.py +++ b/src/utils.py @@ -62,7 +62,7 @@ def compose_tx_locking_script(dest_address, isTestnet): """ pubkey_hash = bytearray.fromhex(b58check_to_hex(dest_address)) # convert address to a public key hash if len(pubkey_hash) != 20: - raise Exception('Invalid length of the public key hash: ' + str(len(pubkey_hash))) + raise Exception(f'Invalid length of the public key hash: {len(pubkey_hash)}') if (((not isTestnet) and (dest_address[0] in P2PKH_PREFIXES)) or (isTestnet and (dest_address[0] in P2PKH_PREFIXES_TNET))): @@ -81,7 +81,7 @@ def compose_tx_locking_script(dest_address, isTestnet): pubkey_hash + \ OP_EQUAL else: - mess = 'Invalid dest address prefix: ' + dest_address[0] + mess = f'Invalid dest address prefix: {dest_address[0]}' if isTestnet: mess += ' for testnet' raise Exception(mess) @@ -151,7 +151,7 @@ def extract_pkh_from_locking_script(script): elif IsPayToColdStaking(script): return script[28:48] - raise Exception('Non-standard locking script type (should be P2PKH or P2PK). len is %d' % len(script)) + raise Exception(f'Non-standard locking script type (should be P2PKH or P2PK). len is {len(script)}') def IsPayToColdStaking(script): @@ -181,7 +181,7 @@ def ipmap(ip, port): vchAddr = base64.b32decode(ip[0:-6], True) vchAddrBytes = vchOnionPrefix + vchAddr[:32] if len(vchAddr) != 35 and vchAddr[-1] != b'\x03': - raise Exception('Invalid TorV3 address %s' % str(ip)) + raise Exception(f'Invalid TorV3 address {ip}') return vchAddrBytes.hex() + int(port).to_bytes(2, byteorder='big').hex() ipAddr = ip_address(ip) @@ -198,16 +198,16 @@ def ipmap(ip, port): ipv6map += a else: - raise Exception("invalid version number (%d)" % ipAddr.version) + raise Exception(f"invalid version number ({ipAddr.version})") ipv6map += int(port).to_bytes(2, byteorder='big').hex() if len(ipv6map) != 36: - raise Exception("Problems! len is %d" % len(ipv6map)) + raise Exception(f"Problems! len is {len(ipv6map)}") return ipv6map except Exception as e: - err_msg = "error in ipmap" - printException(getCallerName(), getFunctionName(), err_msg, e.args) + err_msg = f"error in ipmap" + printException(f"{getCallerName()}", f"{getFunctionName()}", f"{err_msg}", f"{e.args}") def num_to_varint(a): @@ -248,19 +248,19 @@ def serialize_input_str(tx, prevout_n, sequence, script_sig): Based on project: https://github.com/chaeplin/dashmnb. """ s = ['CTxIn('] - s.append('COutPoint(%s, %s)' % (tx, prevout_n)) + s.append(f'COutPoint({tx}, {prevout_n})') s.append(', ') if tx == '00' * 32 and prevout_n == 0xffffffff: - s.append('coinbase %s' % script_sig) + s.append(f'coinbase {script_sig}') else: script_sig2 = script_sig if len(script_sig2) > 24: script_sig2 = script_sig2[0:24] - s.append('scriptSig=%s' % script_sig2) + s.append(f'scriptSig={script_sig2}') if sequence != 0xffffffff: - s.append(', nSequence=%d' % sequence) + s.append(f', nSequence={sequence}') s.append(')') return ''.join(s) diff --git a/src/workerThread.py b/src/workerThread.py index fcbebb9..473945e 100644 --- a/src/workerThread.py +++ b/src/workerThread.py @@ -44,5 +44,5 @@ def run(self): try: self.worker_result = self.worker_fun(self.ctrl_obj, *self.worker_fun_args) except Exception as e: - printError("worker thread", "run", str(e)) + printError("worker thread", "run", f"{e}") self.stop()