-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathworkers.go
243 lines (213 loc) · 5.98 KB
/
workers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
// Copyright (c) Paulo Suzart. All rights reserved.
// The use and distribution terms for this software are covered by the
// Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
// which can be found in the file epl-v10.html at the root of this distribution.
// By using this software in any fashion, you are agreeing to be bound by
// the terms of this license.
// You must not remove this notice, or any other, from this software.
package main
import (
"net/http"
"log"
"sync"
"time"
"code.google.com/p/go.exp/old/netchan"
)
//Represents a set of request to be performed
//against Task.Host
type Task struct {
Host, User, Password string
Requests, Id int
MasterAddr, ContentType string
Session Session
Cookie Cookie
}
//Reported by the worker through resultChan
type WorkSummary struct {
ErrCount int //total errors
SucCount int //total success
Avg float64 //average response time
Max, Min int64 //the slowest requiest
}
//Put t to w.Channel()
func (self *Task) Send(w Worker) {
w.Channel() <- *self
}
//The worker interface
type Worker interface {
//Should return the input channel to
//interact with Worker
Channel() chan Task
// Should be called in a go routine
Serve()
}
//A local workers is used in standalone mode
//as well as in worker mode.
type LocalWorker struct {
//the Worker input channel to
//receive tasks
channel chan Task
masterChannel chan WorkSummary
mode *string
//ctrlChan chan bool
}
//Worker interface implemented:w
func (self *LocalWorker) Channel() chan Task {
return self.channel
}
func (self *LocalWorker) SetMasterChan(c chan WorkSummary) {
self.masterChannel = c
}
//Creates a new LocalWorker. If export is true, than
//the LocalWorker exports its input channel in the network address
//provided by workerAddr
func NewLocalWorker(mode, hostAddr *string) (w *LocalWorker) {
defer func() {
if e := recover(); e != nil {
log.Fatalf("Panic starting the worker!!!!", e)
}
}()
w = new(LocalWorker)
w.channel = make(chan Task, 10)
w.mode = mode
//exports the channels
if *mode == WORKER {
e := netchan.NewExporter()
e.Export("workerChannel", w.channel, netchan.Recv)
e.ListenAndServe("tcp", *hostAddr)
}
return
}
//Keeps a sort of cached channels.
//A worker will often submiti many tasks that can
//be received by a single imported channel
//representing such a master.
var _sessions map[int64]chan WorkSummary = make(map[int64]chan WorkSummary)
var mu *sync.RWMutex = new(sync.RWMutex)
//Helper function to import the Master channel from masterAddr
func importMasterChan(t Task) (c chan WorkSummary, err error) {
mu.Lock()
defer mu.Unlock()
if c, present := _sessions[t.Session.Id]; present {
log.Printf("Cached Session %v", t.Session.Id)
return c, nil
}
imp, err := netchan.Import("tcp", t.MasterAddr)
if err != nil {
log.Printf("Failed to create importer for %v", t.MasterAddr)
return nil, err
}
c = make(chan WorkSummary, 10)
imp.Import("masterChannel", c, netchan.Send, 10)
go func() {
err := <-imp.Errors()
log.Print(err)
}()
_sessions[t.Session.Id] = c
go cacheWatcher(t.Session)
return c, nil
}
//A cache watcher function cleans up the cache after
//2 times the session length
func cacheWatcher(session Session) {
time.Sleep(time.Duration(session.Timeout * 2))
mu.Lock()
log.Printf("Cleanning up Session %v", session.Id)
delete(_sessions, session.Id)
mu.Unlock()
}
//Listen to the worker channel. Every Task is executed by a different
//go routine.
//Waits until a task come fom w.channel
func (self *LocalWorker) Serve() {
log.Print("Waiting for tasks...")
for {
task := <-self.channel
if *self.mode == WORKER {
if mchan, err := importMasterChan(task); err != nil {
log.Printf("Unable to Contact Master %s. Ignoring task.", task.MasterAddr)
continue
} else {
self.SetMasterChan(mchan)
}
}
log.Printf("Task Received from %v", task.MasterAddr)
go self.execute(task)
}
}
//Excecutes a task and send back a response to
//w.masterChannel. masterChannel can be set by
//w.SetMasterChan in standalone mode or
//dynamically imported in worker mode
func (w *LocalWorker) execute(task Task) {
defer func() {
if e := recover(); e != nil {
log.Printf("Erro Fatal: %v", e)
}
}()
client := NewHTTPClient(task.Host, task.ContentType, task.Cookie)
client.Auth(task.User, task.Password)
var totalElapsed int64
totalErr := 0
totalSuc := 0
var max int64 = 0
var min int64 = -1
//perform n times the request
for i := 0; i < task.Requests; i++ {
start := time.Now().UnixNano()
resp, err := client.DoRequest()
elapsed := time.Now().UnixNano() - start
if err == nil && resp != nil && resp.StatusCode == http.StatusOK {
totalSuc += 1
totalElapsed += elapsed
max = Max(max, elapsed)
min = Min(min, elapsed)
} else {
//Any response other than 200 will be a
//failure
totalErr += 1
}
}
summary := &WorkSummary{
ErrCount: totalErr,
SucCount: totalSuc,
Max: max,
Min: min,
}
if totalSuc != 0 {
summary.Avg = float64(totalElapsed / int64(totalSuc))
}
w.masterChannel <- *summary
log.Printf("Summary sent to %s", task.MasterAddr)
}
//Holds a reference to an imported channel
//from the actual worker
type ProxyWorker struct {
channel chan Task
importer *netchan.Importer
}
//Creates a new Proxy importing 'workerChannel' from Worker running
//on workerAddr
func NewProxyWorker(workerAddr string) (p *ProxyWorker, err error) {
log.Printf("Setting up a ProxyWorker for %s", workerAddr)
p = new(ProxyWorker)
imp, err := netchan.Import("tcp", workerAddr)
if err != nil {
return
}
p.importer = imp
return
}
//Worker interface implemented
func (self *ProxyWorker) Channel() chan Task {
return self.channel
}
//Import the worker channel represented by this
//Proxy. Better if executed in go Serve()
func (self *ProxyWorker) Serve() {
self.channel = make(chan Task)
err := self.importer.Import("workerChannel", self.channel, netchan.Send, 10)
if err != nil {
log.Print(err)
}
}