From 73d9e4f07d693445546072268b5dadf44687be73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9my=20JOURDIN?= Date: Thu, 16 Feb 2017 14:43:06 +0100 Subject: [PATCH 1/2] Distribute chunks over slaves --- prt.py | 132 +++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 86 insertions(+), 46 deletions(-) diff --git a/prt.py b/prt.py index ae39332..e8458ba 100644 --- a/prt.py +++ b/prt.py @@ -19,6 +19,7 @@ import urllib import urllib2 import uuid +import Queue from distutils.spawn import find_executable @@ -90,6 +91,8 @@ def colored(msg, *args): NEW_TRANSCODER_NAME = "plex_transcoder" ORIGINAL_TRANSCODER_NAME = "Plex Transcoder" +SEGMENTS_PER_NODE = 5 + REMOTE_ARGS = ("%(env)s;" "cd %(working_dir)s;" "%(command)s %(args)s") @@ -303,6 +306,25 @@ def transcode_local(): if output and is_debug: log.debug(output.strip('\n')) +def get_available_remote_servers(): + config = get_config() + servers = config["servers"] + + for hostname, host in servers.items(): + log.debug("Getting load for host '%s'" % hostname) + host["load"] = get_system_load_remote(hostname, host["port"], host["user"]) + + if not host["load"]: + # If no load is returned, then it is likely that the host + # is offline or unreachable + log.debug("Couldn't get load for host '%s'" % hostname) + del servers[hostname] + continue + + log.debug("Available servers : %s\n" % servers) + + return servers + def transcode_remote(): setup_logging() @@ -356,54 +378,73 @@ def transcode_remote(): except Exception, e: log.error("Error retreiving host list via '%s': %s" % (config["servers_script"], str(e))) - hostname, host = None, None - - # Let's try to load-balance - min_load = None - for hostname, host in servers.items(): - - log.debug("Getting load for host '%s'" % hostname) - load = get_system_load_remote(hostname, host["port"], host["user"]) - - if not load: - # If no load is returned, then it is likely that the host - # is offline or unreachable - log.debug("Couldn't get load for host '%s'" % hostname) + command = command.replace("127.0.0.1", config["ipaddress"]).split(' ') + segment_time = int(command[command.index("-segment_time") + 1]) + ss = int(command[command.index("-ss") + 1]) + start_segment = int(command[command.index("-segment_start_number") + 1]) + q = Queue.Queue() + init = True + just_finished = False + transcoding_servers = [] + consecutive_errors = 0 + log.info("Initializing distributed trancode %s" % command) + log.info("Segment time: %s" % segment_time) + + while (consecutive_errors < 5) and (q.empty() is False or init is True or just_finished is True): + log.info("Fetching available servers") + available_servers = get_available_remote_servers() + just_finished = False + + for hostname, host in available_servers.items(): + log.info("Checking server %s" % hostname) + if hostname in transcoding_servers: + log.info("Server already transcoding a segment") + continue + + log.info("Starting trancoder with segment %s" % str(start_segment)) + proc = process_segment(host, hostname, start_segment, segment_time, ss, command) + transcoding_servers.append(hostname) + start_segment+=SEGMENTS_PER_NODE + ss+=segment_time*SEGMENTS_PER_NODE + q.put((proc, hostname)) + + if init is True: + log.info("Distributed transcode initialized") + init = False continue - log.debug("Log for '%s': %s" % (hostname, str(load))) - - # XXX: Use more that just 1-minute load? - if min_load is None or min_load[1] > load[0]: - min_load = (hostname, load[0],) + proc, hostname = q.get() + log.info("Checking if %s finished transcode" % hostname) + code = proc.poll() - if min_load is None: - log.info("No hosts found...using local") - return transcode_local() - - # Select lowest-load host - log.info("Host with minimum load is '%s'" % min_load[0]) - hostname, host = min_load[0], servers[min_load[0]] + if code is None: + q.put((proc, hostname)) + else: + log.info("%s finished transcode" % hostname) + just_finished = True + if code == 1: + consecutive_errors += 1 + log.info("Transcode returned an error (%s)" % str(consecutive_errors)) + else: + consecutive_errors = 0 - log.info("Using transcode host '%s'" % hostname) + transcoding_servers.remove(hostname) - # Remap the 127.0.0.1 reference to the proper address - command = command.replace("127.0.0.1", config["ipaddress"]) + time.sleep(.300) - # - # TODO: Remap file-path to PMS URLs - # + log.info("Transcode finished") - args = ["ssh", "%s@%s" % (host["user"], hostname), "-p", host["port"]] + [command] +def process_segment (host, hostname, segment, time, ss, command): + command = ["ssh", "%s@%s" % (host["user"], hostname), "-p", host["port"]] + command + cmd = [] - log.info("Launching transcode_remote with args %s\n" % args) + command[command.index("-segment_start_number") + 1] = str(segment) + command[command.index("-ss") + 1] = str(ss) + command.insert(command.index("-i"), "-t") + command.insert(command.index("-i"), str(int(command[command.index("-segment_time") + 1]) * SEGMENTS_PER_NODE)) # Spawn the process - proc = subprocess.Popen(args) - proc.wait() - - log.info("Transcode stopped on host '%s'" % hostname) - + return subprocess.Popen(command) def re_get(regex, string, group=0, default=None): match = regex.search(string) @@ -597,13 +638,13 @@ def usage(): print "Usage:\n" print " %s [options]\n" % os.path.basename(sys.argv[0]) print ( - "Options:\n\n" - " usage, help, -h, ? Show usage page\n" - " get_load Show the load of the system\n" - " get_cluster_load Show the load of all systems in the cluster\n" - " install Install PRT for the first time and then sets up configuration\n" - " overwrite Fix PRT after PMS has had a version update breaking PRT\n" - " add_host Add an extra host to the list of slaves PRT is to use\n" + "Options:\n\n" + " usage, help, -h, ? Show usage page\n" + " get_load Show the load of the system\n" + " get_cluster_load Show the load of all systems in the cluster\n" + " install Install PRT for the first time and then sets up configuration\n" + " overwrite Fix PRT after PMS has had a version update breaking PRT\n" + " add_host Add an extra host to the list of slaves PRT is to use\n" " remove_host Removes a host from the list of slaves PRT is to use\n" " sessions Display current sessions\n" " check_config Checks the current configuration for errors\n") @@ -707,4 +748,3 @@ def main(): else: usage() sys.exit(-1) - From ab2dc3bceb2db4c4d8eb981ef55fd67546da99b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9my=20JOURDIN?= Date: Mon, 20 Feb 2017 10:12:29 +0100 Subject: [PATCH 2/2] Fix get_available_remote_servers compatibility + Lint --- prt.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/prt.py b/prt.py index e8458ba..9e3b58d 100644 --- a/prt.py +++ b/prt.py @@ -131,6 +131,7 @@ def printf(message, *args, **kwargs): sys.stdout.write(colored(message % args, color, attrs=attrs)) sys.stdout.flush() + def get_auth_token(): url = "https://plex.tv/users/sign_in.json" payload = urllib.urlencode({ @@ -251,6 +252,7 @@ def overwrite_transcoder_after_upgrade(): print "Transcoder hasn't been previously installed, please use install option" sys.exit(1) + def build_env(host=None): # TODO: This really should be done in a way that is specific to the target # in the case that the target is a different architecture than the host @@ -306,9 +308,11 @@ def transcode_local(): if output and is_debug: log.debug(output.strip('\n')) + def get_available_remote_servers(): config = get_config() servers = config["servers"] + available_servers = {} for hostname, host in servers.items(): log.debug("Getting load for host '%s'" % hostname) @@ -318,12 +322,14 @@ def get_available_remote_servers(): # If no load is returned, then it is likely that the host # is offline or unreachable log.debug("Couldn't get load for host '%s'" % hostname) - del servers[hostname] continue - log.debug("Available servers : %s\n" % servers) + available_servers[hostname] = host + + log.debug("Available servers : %s\n" % available_servers) + + return available_servers - return servers def transcode_remote(): setup_logging() @@ -408,7 +414,7 @@ def transcode_remote(): ss+=segment_time*SEGMENTS_PER_NODE q.put((proc, hostname)) - if init is True: + if init: log.info("Distributed transcode initialized") init = False continue @@ -434,6 +440,7 @@ def transcode_remote(): log.info("Transcode finished") + def process_segment (host, hostname, segment, time, ss, command): command = ["ssh", "%s@%s" % (host["user"], hostname), "-p", host["port"]] + command cmd = [] @@ -446,6 +453,7 @@ def process_segment (host, hostname, segment, time, ss, command): # Spawn the process return subprocess.Popen(command) + def re_get(regex, string, group=0, default=None): match = regex.search(string) if match: @@ -456,6 +464,7 @@ def re_get(regex, string, group=0, default=None): return match.groups() return default + def et_get(node, attrib, default=None): if node is not None: return node.attrib.get(attrib, default) @@ -478,6 +487,7 @@ def get_plex_sessions(auth_token=None): } return sessions + def get_sessions(): sessions = {} @@ -528,6 +538,7 @@ def get_sessions(): sessions[m.groups()[0]] = data return sessions + def check_config(): """ Run through various diagnostic checks to see if things are configured