-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjob_caster.py
167 lines (137 loc) · 5.31 KB
/
job_caster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import atexit
from collections import deque
import logging
import threading
import gevent
import zerorpc
from colors import warn, error
from broadcast import Service, Discover, Broadcaster
from helper import get_my_ip, get_open_port, get_zerorpc_address, load
__author__ = 'ranmocy'
_JOB_CASTER_TYPE = '_spark.job.'
logger = logging.getLogger(__name__)
class JobFinished(Exception):
pass
class JobTaken(Exception):
pass
class JobServerHandler(object):
def __init__(self, services, address=get_zerorpc_address(port=get_open_port())):
self.services = services
self.address = address
self.server = zerorpc.Server(self)
self.server.bind(self.address)
self.thread = gevent.spawn(self.server.run)
logger.info("Job server started at " + self.address)
self.lock = threading.Lock()
def __del__(self):
self.thread.kill()
def take(self, uuid):
try:
# self.lock.acquire()
if uuid not in self.services: # job is finished
print 'finished job', uuid
raise JobFinished()
service = self.services[uuid]
if not service.is_active(): # job is taken
print 'taken job'
raise JobTaken()
# De-activate to avoid multiple worker doing same job
service.deactivate()
logger.debug("Deactivated job:" + service.name)
# - if it's taken, set a timer.
# - If timeout and no result, broadcast again since that worker is dead, or too slow.
services = self.services
def reactivate():
if service.partition.uuid in services:
# Still not finished
service.activate()
logger.debug("Reactivated job:" + service.name)
gevent.spawn_later(10, reactivate)
logger.debug("Return job:" + service.name)
return service.partition.dump()
finally:
# self.lock.release()
pass
class JobServer(Broadcaster):
def __init__(self):
super(JobServer, self).__init__(name='Spark.JobServer')
self.ip = get_my_ip()
self.port = get_open_port()
self.address = get_zerorpc_address(port=self.port)
self.jobs = {} # uuid => service
self.handler = JobServerHandler(self.jobs, address=self.address)
atexit.register(lambda: self.__del__())
def __del__(self):
super(JobServer, self).__del__()
self.handler.__del__()
def add(self, partition):
uuid = partition.uuid
if uuid in self.jobs:
logger.warning('duplicated job service:' + uuid)
return
service = Service(name=uuid, type=_JOB_CASTER_TYPE, location=self.ip, port=self.port)
service.partition = partition # attach additional information for handler
self.jobs[uuid] = service
super(JobServer, self).add(service)
def remove(self, partition):
uuid = partition.uuid
if uuid in self.jobs:
service = self.jobs[uuid]
del self.jobs[uuid]
super(JobServer, self).remove(service)
class JobDiscover(Discover):
def __init__(self):
queue = deque()
self.queue = queue
discover = self
self.partition_cache = {}
def found_func(seeker, result):
for uuid in discover.results:
if result.uuid == uuid:
return
self.results[result.uuid] = {result}
queue.append(result)
logger.info("Found "+result.type+":"+result.sname+" at "+result.address)
super(JobDiscover, self).__init__(type=_JOB_CASTER_TYPE, found_func=found_func)
def get_partition_from_job(self, job):
if job.uuid in self.partition_cache:
return self.partition_cache[job.uuid]
try:
client = zerorpc.Client()
client.connect(job.address)
obj_str = client.take(job.uuid)
if obj_str is None:
raise Exception("get_partition_from_job: Can't be None.")
except zerorpc.RemoteError as e:
if e.name == JobTaken.__name__:
print warn('Remote job is taken. Skip.')
elif e.name == JobFinished.__name__:
print warn('Remote job is finished. Skip.')
else:
print error('Remote error at getting partition. Skip.')
return None
except zerorpc.LostRemote:
print error('Lost remote at getting partition. Skip.')
return None
else:
logger.info('take job:' + job.address)
partition = load(obj_str)
self.partition_cache[job.uuid] = partition
return partition
def take_next_job(self):
while True:
try:
result = self.queue.popleft()
if result.uuid not in self.results:
# outdated result
print 'Job outdated:'+result.uuid
gevent.sleep(0.5)
continue
except IndexError:
gevent.sleep(0.5)
continue
else:
return result
def suspend_job(self, result):
gevent.spawn_later(1, self.queue.append, result)
# self.queue.append(result)