Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distribute chunks over slaves #53

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 86 additions & 46 deletions prt.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import urllib
import urllib2
import uuid
import Queue

from distutils.spawn import find_executable

Expand Down Expand Up @@ -90,6 +91,8 @@ def colored(msg, *args):
NEW_TRANSCODER_NAME = "plex_transcoder"
ORIGINAL_TRANSCODER_NAME = "Plex Transcoder"

SEGMENTS_PER_NODE = 5

REMOTE_ARGS = ("%(env)s;"
"cd %(working_dir)s;"
"%(command)s %(args)s")
Expand Down Expand Up @@ -303,6 +306,25 @@ def transcode_local():
if output and is_debug:
log.debug(output.strip('\n'))

def get_available_remote_servers():
config = get_config()
servers = config["servers"]

for hostname, host in servers.items():
log.debug("Getting load for host '%s'" % hostname)
host["load"] = get_system_load_remote(hostname, host["port"], host["user"])

if not host["load"]:
# If no load is returned, then it is likely that the host
# is offline or unreachable
log.debug("Couldn't get load for host '%s'" % hostname)
del servers[hostname]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that you're able to remove elements from a dictionary whilst iterating over it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will work in python 2.7, but not python 3.x. In python 3.x dict.items() returns an iterator, whereas in python 2.7 it does not.

Copy link
Author

@JJK801 JJK801 Feb 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok thanks i'm not a python developper at all, i will fix it asap

continue

log.debug("Available servers : %s\n" % servers)

return servers

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PEP8: 2 blank lines in between functions please

def transcode_remote():
setup_logging()

Expand Down Expand Up @@ -356,54 +378,73 @@ def transcode_remote():
except Exception, e:
log.error("Error retreiving host list via '%s': %s" % (config["servers_script"], str(e)))

hostname, host = None, None

# Let's try to load-balance
min_load = None
for hostname, host in servers.items():

log.debug("Getting load for host '%s'" % hostname)
load = get_system_load_remote(hostname, host["port"], host["user"])

if not load:
# If no load is returned, then it is likely that the host
# is offline or unreachable
log.debug("Couldn't get load for host '%s'" % hostname)
command = command.replace("127.0.0.1", config["ipaddress"]).split(' ')
segment_time = int(command[command.index("-segment_time") + 1])
ss = int(command[command.index("-ss") + 1])
start_segment = int(command[command.index("-segment_start_number") + 1])
q = Queue.Queue()
init = True
just_finished = False
transcoding_servers = []
consecutive_errors = 0
log.info("Initializing distributed trancode %s" % command)
log.info("Segment time: %s" % segment_time)

while (consecutive_errors < 5) and (q.empty() is False or init is True or just_finished is True):
log.info("Fetching available servers")
available_servers = get_available_remote_servers()
just_finished = False

for hostname, host in available_servers.items():
log.info("Checking server %s" % hostname)
if hostname in transcoding_servers:
log.info("Server already transcoding a segment")
continue

log.info("Starting trancoder with segment %s" % str(start_segment))
proc = process_segment(host, hostname, start_segment, segment_time, ss, command)
transcoding_servers.append(hostname)
start_segment+=SEGMENTS_PER_NODE
ss+=segment_time*SEGMENTS_PER_NODE
q.put((proc, hostname))

if init is True:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if init

log.info("Distributed transcode initialized")
init = False
continue

log.debug("Log for '%s': %s" % (hostname, str(load)))

# XXX: Use more that just 1-minute load?
if min_load is None or min_load[1] > load[0]:
min_load = (hostname, load[0],)
proc, hostname = q.get()
log.info("Checking if %s finished transcode" % hostname)
code = proc.poll()

if min_load is None:
log.info("No hosts found...using local")
return transcode_local()

# Select lowest-load host
log.info("Host with minimum load is '%s'" % min_load[0])
hostname, host = min_load[0], servers[min_load[0]]
if code is None:
q.put((proc, hostname))
else:
log.info("%s finished transcode" % hostname)
just_finished = True
if code == 1:
consecutive_errors += 1
log.info("Transcode returned an error (%s)" % str(consecutive_errors))
else:
consecutive_errors = 0

log.info("Using transcode host '%s'" % hostname)
transcoding_servers.remove(hostname)

# Remap the 127.0.0.1 reference to the proper address
command = command.replace("127.0.0.1", config["ipaddress"])
time.sleep(.300)

#
# TODO: Remap file-path to PMS URLs
#
log.info("Transcode finished")

args = ["ssh", "%s@%s" % (host["user"], hostname), "-p", host["port"]] + [command]
def process_segment (host, hostname, segment, time, ss, command):
command = ["ssh", "%s@%s" % (host["user"], hostname), "-p", host["port"]] + command
cmd = []

log.info("Launching transcode_remote with args %s\n" % args)
command[command.index("-segment_start_number") + 1] = str(segment)
command[command.index("-ss") + 1] = str(ss)
command.insert(command.index("-i"), "-t")
command.insert(command.index("-i"), str(int(command[command.index("-segment_time") + 1]) * SEGMENTS_PER_NODE))

# Spawn the process
proc = subprocess.Popen(args)
proc.wait()

log.info("Transcode stopped on host '%s'" % hostname)

return subprocess.Popen(command)

def re_get(regex, string, group=0, default=None):
match = regex.search(string)
Expand Down Expand Up @@ -597,13 +638,13 @@ def usage():
print "Usage:\n"
print " %s [options]\n" % os.path.basename(sys.argv[0])
print (
"Options:\n\n"
" usage, help, -h, ? Show usage page\n"
" get_load Show the load of the system\n"
" get_cluster_load Show the load of all systems in the cluster\n"
" install Install PRT for the first time and then sets up configuration\n"
" overwrite Fix PRT after PMS has had a version update breaking PRT\n"
" add_host Add an extra host to the list of slaves PRT is to use\n"
"Options:\n\n"
" usage, help, -h, ? Show usage page\n"
" get_load Show the load of the system\n"
" get_cluster_load Show the load of all systems in the cluster\n"
" install Install PRT for the first time and then sets up configuration\n"
" overwrite Fix PRT after PMS has had a version update breaking PRT\n"
" add_host Add an extra host to the list of slaves PRT is to use\n"
" remove_host Removes a host from the list of slaves PRT is to use\n"
" sessions Display current sessions\n"
" check_config Checks the current configuration for errors\n")
Expand Down Expand Up @@ -707,4 +748,3 @@ def main():
else:
usage()
sys.exit(-1)