-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path030-delayed.Rmd
154 lines (115 loc) · 5.04 KB
/
030-delayed.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
---
output:
pdf_document: default
html_document: default
---
# Computation Graphs with `delayed`
_Jeremy Coyle_
## Intro
## Architecture
## Other
## Previous Documentation
R supports a range of options to parallelize computation. For an overview, see
the [HPC Task
View](https://CRAN.R-project.org/view=HighPerformanceComputing) on
CRAN. In general, these options work extremely well for problems that are
_embarassingly parallel_, in that they support procedures such as parallel
`lapply` calls and parallel `for` loops -- essentially `map` operations.
However, there is no easy way to parallelize _dependent_ tasks in R.
In contrast, the Python language has the excellent framework for exactly this
purpose -- [`dask`](http://dask.pydata.org/en/latest/). `dask` makes it easy to
build up a graph of interdependent tasks and then execute them in parallel in an
order that optimizes performance [@daskLibrary]. The present package seeks to
reproduce a subset of that functionality in R, specifically the
[`delayed`](http://dask.pydata.org/en/latest/delayed.html) module. To
parallelize across the tasks, we leverage the excellent
[`future`](https://github.com/HenrikBengtsson/future/) package [@futurePackage].
The power of the `delayed` framework is best appreciated when demonstrated by
example.
---
## Example
The two primary ways to generate `Delayed` objects in R are via the `delayed`
and `delayed_fun` functions.
`delayed` is used to delay expressions
```{r delay_expr, echo=TRUE, results='markup'}
library(delayed)
# delay a simple expression
delayed_object <- delayed(3 + 4)
print(delayed_object)
# compute its result
delayed_object$compute()
```
...while `delayed_fun` wraps a function so that it returns `Delayed` results
```{r delay_fun, echo=TRUE, results='markup'}
# delay a function
x2 <- function(x) {x * x}
delayed_x2 <- delayed_fun(x2)
# calling it returns a delayed call
delayed_object <- delayed_x2(4)
print(delayed_object)
# again, we can compute its result
delayed_object$compute()
```
These elements of the functionality of `delayed` are substantially similar to
the facilities already offered by the `future` package. `delayed` diverges from
`future` by offereing the ability to chain `Delayed` objects together. For
example:
```{r delay_chain, echo=TRUE, results='markup'}
# delay a simple expression
delayed_object_7 <- delayed(3 + 4)
# and another
delayed_object_3 <- delayed(1 + 2)
# delay a function for addition
adder <- function(x, y){x + y}
delayed_adder <- delayed_fun(adder)
# but now, use one delayed as input to another
chained_delayed_10 <- delayed_adder(delayed_object_7, delayed_object_3)
# We can still compute its result.
chained_delayed_10$compute()
```
We can visualize the dependency structure of these delayed tasks by calling
`plot` on the resultant `Delayed` object:
```{r, fig.show='hold'}
plot(chained_delayed_10)
```
---
## Parallelization
Now that we've had an elementary look at the functionality offered by `delayed`,
we may take a look at how to parallelize dependent computations -- the core
problem addressed by the package. We can easily parallelize across dependency
structures by specifying a `future` `plan`. Let's try it out
```{r}
library(future)
plan(multicore, workers = 2)
# re-define the delayed object from above
delayed_object_7 <- delayed(3 + 4)
delayed_object_3 <- delayed(1 + 2)
chained_delayed_10 <- delayed_adder(delayed_object_7, delayed_object_3)
# compute it using the future plan (two multicore workers), verbose mode lets
# us see the computation order
chained_delayed_10$compute(nworkers = 2, verbose = TRUE)
```
The above illustrates the typical lifecycle of a delayed task. Such procedures
start with a state of `"waiting"`, which means that a given task depends on
other delayed tasks that are not yet complete. If the task in question has no
delayed dependencies -- or when these dependencies become resolved -- the task
transitions to a `"ready"` state. This means it will be run as soon as a worker
is available to process the task. Once the task is assigned to a worker, the
state of the task changes to `"running"`; and when processing of the task is
complete, it is finally marked `"resolved"`.
---
## Future Work
### Scheduling Tasks
When multiple tasks are simulatenously `"ready"`, the scheduler must decide
which to assign to the next available worker. Currently, the scheduler simply
prioritizes tasks that are likely to result in other tasks becoming "ready". In
the future, we plan to build more advanced scheduling features, similar to those
available in the `dask` library. An overview of that functionality is described
here: https://distributed.readthedocs.io/en/latest/scheduling-policies.html
### Distributed Data
Another key features of `dask` is [_data
locality_](https://distributed.readthedocs.io/en/latest/locality.html). That is,
data is only present on workers that need it for a given task, and is only
shared between workers when necessary. Tasks are then prioritized to workers
that have all the necessary components. We have begun to implement a similar
framework, though this work remains incomplete.