Memory management in workflow termination #406
Replies: 1 comment
-
I made an error in the last paragraph of the box above where I started this discussion. M/TD is not the right factor for the number of partitions. That formula is not correct even for a single node as it doesn't have any allowance for the total data set size, which is what control how partitioning should be done. So, let me break this down a bit more. Let Ds be the size of the entire data set in bytes and Nds be the number of data objects to be handled in a workflow. Further, let Nw be the number of worker nodes and again let T be the number of cores which I assume dask will use to create worker thread tasks in each node. i.e. I assume that dask will create TNw worker threads to complete a job made up of a string of map operators. Let D again be the size of the largest data object being run through the chain of map operators. What I had above is more a measure of the number of data objects each worker node (not thread) can handle and not abort with a memory fault. That quantity, the maximum number of data objects a worker node can handle, is M/DT. It would be more accurate for modeling to say the maximum number of objects we should let a node try to handle is kM/DT where k is some fraction. We need, I think, to design some tests to establish k empirically. With things all now defined I can now maybe get this right. From what I observe when bags are handled by the scheduler the approach the scheduler seems to use is to send data to worker in blocks of the partition size. I think that is what the documentation says and it certain makes sense as a way to handle a large data irregular data set like a bag. With that understanding kM/DT is number of data objects that each partition should contain. Thus, the number of partitions should be something of the order of (k M Nds)/D T. Evaluate my simple formula critically and let's discuss this. Something I can see as useful for the documentation is a graphic showing a large rectangular box made up of squares stacked from left to right to illustrate a data set defined as a bag. Then we would have a set of 3 or 4 smaller rectangular boxes below then representing the memory size of 3 or 4 nodes. The partition size is then visually the number of boxes fitting into each cluster box. I think it is that simple. If we can confirm this simple model is correct and can establish the constant k then I could easily wrote documentation on this issue. We will need tests to do move forward on this. Something we will need is a cluster memory monitoring tool. I'm not sure dask diagnostics are sufficient but maybe that is all we need. There is also an issue about whether or not spark behaves the same as dask in handling large data sets. |
Beta Was this translation helpful? Give feedback.
-
This discussion emerged from memory management issues we have encountered with dask in recent tests on very large data sets that caused memory overflows and mysterious behavior. An important result was that we found that using the
compute
method of dask as the terminator of a workflow for a string of map operators on a large bag will always overflow the memory of the scheduler node if the final map output exceeds the memory of the scheduler node. A recommendation of a better approach found here is to use the distributed clientpersist
method. Then the results of the final output need only fit in the combined memory of the worker nodes. The above source suggests only usingcompute
to gather the results form data distributed through the cluster withpersist
.Since we understand that fundamental problem now, it means we need to do several things to make this system usable:
Number 1 requires no discussion. We just need to do it. The only thing is that item 1 is probably the last thing we'll need to do as what is written there will depend upon how we elect to deal with 2 and 3.
For item 2, I suggest we need to things: (a) a new option on database save methods to return something small that summarizes the save result, and (b) a standard function that can be used to return a data structure similar if not identical to the save optional return. Let me expand a bit on each of these.
For (a) I suggest a save option parameter should be available for the
save_data
andsave_ensemble_data
methods ofDatabase
. I suggest it should be a boolean with the namereturn_saved_data
and be False by default. With this change a save would behave the same as now only if one setreturn_saved_data
to True in the call to the save function. Even though that may break some of our tutorials it will reduce the occurence of mysterious memory fault crashes. It also would implicitly tell the user it is a bad idea to do intermediate saves within a sequence of map operators as from what we now know load and save are usually the bottleneck to performance. When False I think it should return something simple like number of samples save, or boolean indicating the live state of the datum it handled, or some small tuple with several such items. e.g. we might have two booleans: one for live state and one for success or failure of the save. Another might be a pair with number of samples and number or bytes using 0 or -1 to indicate dead data and failure respectively. Thoughts?For (b) I think we need one or more standard functions defined in
mspasspy.db.database
. An initial suggestions in the terminator function I used in the test workflow we've been beating on the past few weeks. It is something like this:A standard function should be a bit more robust than that, but it illustrates the idea. It would typically be used like this:
Then when the lazy computation is run the return is only a list of booleans which would only overflow scheduler memory in an inconceivably large data set.
Finally, item 3 may need some empirical work to flesh out the idea. I know how to get a base estimate from what I've observed watching large jobs run for long periods. Let M be the memory size per worker and D be the approximate size of the largest copy of data objects passing through a workflow (usually the largest time window read on input). Then the number of partitions of the bag need to be some fraction of M/D to guarantee the scheduler won't push too many copies to a worker. I think it gets a little more complicated when, as it nearly always is, each worker will spawn multiple threads. If T is the number of threads a worker node will spawn I think the bag needs to be some fraction of M/TD. We need to figure out a test to measure memory use in a real workflow to validate that simple theory. Then we can proceed to produce useful documentation and then some helper tools.
Beta Was this translation helpful? Give feedback.
All reactions