You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
(Apologies in advance for the poor formatting - I am unable to control why some text appears in bold and why it doesnt sometimes)
I am facing a problem with two streams of map reduce jobs. Making an attempt to explain as clearly as possible.
I have two map reduce streams.
Stream 1 functions : map1 , reduce1
Stream 2 functions: map2, reduce2
The streams themselves need to be scheduled serially stream 1, Stream 2, Stream1 , Stream 2 ………. and so on till all streams terminated
In both cases map will have many parallel jobs, but there is only one instance of a reduce job. I expect many servers to be available for running the map jobs in parallel.
Now here's the catch. Stream 1 has to run after Stream 2. To maximize the server resources I would like to start running map2 instances on nodes or cpus within nodes which have no map1 instances to run - only one node is going to run reduce1.
Another requirement is that reduce1 has to run immediately after the last map1 job has finished. It should not be queue behind the map2 jobs(from stream 2) if they have been submitted.
I realized from the documentation that JobCluster will not suit my purpose as the nodes will execute jobs in order of submission. They would not give priority to reduce1 if map2 jobs have been submitted prior to it.
So decided to try my luck with SharedjobCluster, with dispyscheduler invoked with options - "--cooperative --early_cluster_scheduler"
New clusters are created for each instance of the stream.
I used the dispy api to start clusters for the streams as follows
If my application has to succeed then all the streams will run in sequence and the program will terminate.
But this happens roughly only 50% of the time.
Many a time I see an entire cluster failing on dispyscheduler like this
.
.
2019-09-16 10:12:51 dispyscheduler - New computation 139651049625024: relegate, /tmp/dispy/scheduler/127.0.0.1/relegate_r55abhr2
2019-09-16 10:12:51 dispyscheduler - Submitted job 139651049487120 / 1568608971.6798518
2019-09-16 10:12:51 dispyscheduler - Running job 139651060019872 on 127.0.0.1 (busy: 2 / 2)
2019-09-16 10:12:51 dispyscheduler - Running job 139651049487720 on 127.0.0.1 (busy: 2 / 2)
2019-09-16 10:12:51 dispyscheduler - Submitted job 139651060019632 / 1568608971.6837986
2019-09-16 10:12:51 dispyscheduler - Submitted job 139651049488080 / 1568608971.6853924
2019-09-16 10:12:51 dispyscheduler - Transfer of computation "relegate" to 127.0.0.1 failed
2019-09-16 10:12:51 dispyscheduler - Submitted job 139651049487480 / 1568608971.687945
2019-09-16 10:12:51 dispyscheduler - Failed to setup 127.0.0.1 for computation "relegate": -1
.
.
The cluster fails sometimes with not a single job run. The lines in between "New computation …" and "Failed to setup …" belong to the previous cluster
I look at the displynode output and I correlate it with the following lines
.
.
.
2019-09-16 10:12:51 dispynode - New computation "939c7c52a9b683a0e1be5f74b70397790961dcbf" from 127.0.0.1
2019-09-16 10:12:51 dispynode - Sending result for job 139651049487720 (11)
2019-09-16 10:12:51 dispynode - Sending result for job 139651060019872 (11)
2019-09-16 10:12:51 dispynode - Computation "939c7c52a9b683a0e1be5f74b70397790961dcbf" from 127.0.0.1 done
.
.
.
The "…done" message for the cluster at dispynode with not a single job from it executed. In this case also the lines between "New computation …" and "…done" belong to the previous cluster
Apart from this I sometimes I see dispynode troubles with the following error message sometimes
Exception in thread Thread-2:
Traceback (most recent call last):
File "/home/manoj/anaconda3/lib/python3.7/threading.py", line 917, in _bootstrap_inner
self.run()
File "/home/manoj/anaconda3/lib/python3.7/threading.py", line 865, in run
self._target(*self._args, **self._kwargs)
File "/home/manoj/anaconda3/bin/dispynode.py", line 2327, in __reply_Q
proc.join(2)
File "/home/manoj/anaconda3/lib/python3.7/multiprocessing/process.py", line 139, in join
assert self._popen is not None, 'can only join a started process'
AssertionError: can only join a started process
Some inferences -
The cluster seems to be failing because dispynode is refusing the job. Again this does not happen all the time.
Here it looks like the entire cluster is failing. I would like to be able to catch this exception and resubmit the cluster, but dispy does not seem to have a facility for catching the exception. Even the cluster_status callback function does not give a cluster status per se, but only a job status and the node status. There is no notification even to the callback function observed used for job level callback.
Requests -
I need help with the following
Get an idea why dispynode is failing ocassionally - then I could find a way to prevent it.
Get an exception in cluster_status callback function for the cluster itself - so I can close the cluster and restart it.
Manoj
The text was updated successfully, but these errors were encountered:
Hi,
(Apologies in advance for the poor formatting - I am unable to control why some text appears in bold and why it doesnt sometimes)
I am facing a problem with two streams of map reduce jobs. Making an attempt to explain as clearly as possible.
I have two map reduce streams.
Stream 1 functions : map1 , reduce1
Stream 2 functions: map2, reduce2
The streams themselves need to be scheduled serially stream 1, Stream 2, Stream1 , Stream 2 ………. and so on till all streams terminated
In both cases map will have many parallel jobs, but there is only one instance of a reduce job. I expect many servers to be available for running the map jobs in parallel.
Now here's the catch. Stream 1 has to run after Stream 2. To maximize the server resources I would like to start running map2 instances on nodes or cpus within nodes which have no map1 instances to run - only one node is going to run reduce1.
Another requirement is that reduce1 has to run immediately after the last map1 job has finished. It should not be queue behind the map2 jobs(from stream 2) if they have been submitted.
I realized from the documentation that JobCluster will not suit my purpose as the nodes will execute jobs in order of submission. They would not give priority to reduce1 if map2 jobs have been submitted prior to it.
So decided to try my luck with SharedjobCluster, with dispyscheduler invoked with options - "--cooperative --early_cluster_scheduler"
New clusters are created for each instance of the stream.
I used the dispy api to start clusters for the streams as follows
For - Stream 1, Stream 2, Stream1 , Stream 2, Stream 1, Stream 2 ……
The cluster numbers are - cluster 1, cluster 2, cluster 3, cluster 4, cluster 5, cluster 6 ………..
Each cluster is closed after last job in the stream (the reduce job) completes.
All this is coded into the attached file sched.py sched.zip
I run my setup in 3 terminal windows as follows - everything on my Ubuntu 18.04 virtual machine.
Environment -
(base) manoj@Ubuntu-MSHV-Virtual-Machine:/Documents/PacManSched$ pip show dispy
Name: dispy
Version: 4.11.0
Summary: Distributed and Parallel Computing with/for Python.
Home-page: http://dispy.sourceforge.net
Author: Giridhar Pemmasani
Author-email: [email protected]
License: Apache 2.0
Location: /home/manoj/anaconda3/lib/python3.7/site-packages
Requires: pycos
Required-by:
(base) manoj@Ubuntu-MSHV-Virtual-Machine:/Documents/PacManSched$ uname -a
Linux Ubuntu-MSHV-Virtual-Machine 4.15.0-62-generic #69-Ubuntu SMP Wed Sep 4 20:55:53 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux
(base) manoj@Ubuntu-MSHV-Virtual-Machine:/Documents/PacManSched$ python -V
Python 3.7.3
Commands I run -
Terminal 1 : dispynode.py -c 2 -d -i 127.0.0.1 --clean --debug --force_cleanup
Terminal 2 : dispyscheduler.py -i 127.0.0.1 --cooperative --early_cluster_scheduler --clean --debug --cleanup_nodes
Terminal 3 : python sched.py
Observations -
If my application has to succeed then all the streams will run in sequence and the program will terminate.
But this happens roughly only 50% of the time.
Many a time I see an entire cluster failing on dispyscheduler like this
.
.
2019-09-16 10:12:51 dispyscheduler - New computation 139651049625024: relegate, /tmp/dispy/scheduler/127.0.0.1/relegate_r55abhr2
2019-09-16 10:12:51 dispyscheduler - Submitted job 139651049487120 / 1568608971.6798518
2019-09-16 10:12:51 dispyscheduler - Running job 139651060019872 on 127.0.0.1 (busy: 2 / 2)
2019-09-16 10:12:51 dispyscheduler - Running job 139651049487720 on 127.0.0.1 (busy: 2 / 2)
2019-09-16 10:12:51 dispyscheduler - Submitted job 139651060019632 / 1568608971.6837986
2019-09-16 10:12:51 dispyscheduler - Submitted job 139651049488080 / 1568608971.6853924
2019-09-16 10:12:51 dispyscheduler - Transfer of computation "relegate" to 127.0.0.1 failed
2019-09-16 10:12:51 dispyscheduler - Submitted job 139651049487480 / 1568608971.687945
2019-09-16 10:12:51 dispyscheduler - Failed to setup 127.0.0.1 for computation "relegate": -1
.
.
The cluster fails sometimes with not a single job run. The lines in between "New computation …" and "Failed to setup …" belong to the previous cluster
I look at the displynode output and I correlate it with the following lines
.
.
.
2019-09-16 10:12:51 dispynode - New computation "939c7c52a9b683a0e1be5f74b70397790961dcbf" from 127.0.0.1
2019-09-16 10:12:51 dispynode - Sending result for job 139651049487720 (11)
2019-09-16 10:12:51 dispynode - Sending result for job 139651060019872 (11)
2019-09-16 10:12:51 dispynode - Computation "939c7c52a9b683a0e1be5f74b70397790961dcbf" from 127.0.0.1 done
.
.
.
The "…done" message for the cluster at dispynode with not a single job from it executed. In this case also the lines between "New computation …" and "…done" belong to the previous cluster
Apart from this I sometimes I see dispynode troubles with the following error message sometimes
Exception in thread Thread-2:
Traceback (most recent call last):
File "/home/manoj/anaconda3/lib/python3.7/threading.py", line 917, in _bootstrap_inner
self.run()
File "/home/manoj/anaconda3/lib/python3.7/threading.py", line 865, in run
self._target(*self._args, **self._kwargs)
File "/home/manoj/anaconda3/bin/dispynode.py", line 2327, in __reply_Q
proc.join(2)
File "/home/manoj/anaconda3/lib/python3.7/multiprocessing/process.py", line 139, in join
assert self._popen is not None, 'can only join a started process'
AssertionError: can only join a started process
Some inferences -
Requests -
I need help with the following
Manoj
The text was updated successfully, but these errors were encountered: