Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Real-time task report function #1001

Merged
merged 71 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
bf0263c
Update task.go
NeganZhao Jun 19, 2024
5e6dc8b
Update task.go
NeganZhao Jun 19, 2024
6c8da2b
Update task.go
NeganZhao Jun 19, 2024
dc4f6ce
Update task.go
NeganZhao Jun 19, 2024
1d048ea
Update task.go
NeganZhao Jun 19, 2024
e643a77
Update task.go
NeganZhao Jun 19, 2024
a814cc7
Update task.go
NeganZhao Jun 19, 2024
3464c22
Update task.go
NeganZhao Jun 19, 2024
7e09590
Update task.go
NeganZhao Jun 19, 2024
fba25cb
d
NeganZhao Jun 19, 2024
2e3e1b3
aa
NeganZhao Jun 20, 2024
52df0b8
Update task.go
NeganZhao Jun 20, 2024
8c09ade
Update task.go
NeganZhao Jun 20, 2024
5f07571
Update task.go
NeganZhao Jun 20, 2024
e89c36f
Update task.go
NeganZhao Jun 20, 2024
3bcf055
Update task.go
NeganZhao Jun 20, 2024
f18b617
Update task.go
NeganZhao Jun 20, 2024
2cce027
Update task.go
NeganZhao Jun 20, 2024
28891f4
Update task.go
NeganZhao Jun 20, 2024
9f62297
Update task.go
NeganZhao Jun 20, 2024
03f3a87
Update task.go
NeganZhao Jun 20, 2024
78910ee
Update task.go
NeganZhao Jun 20, 2024
9b334f2
Update task.go
NeganZhao Jun 20, 2024
4ee6755
Update task.go
NeganZhao Jun 20, 2024
799c6d9
ad
NeganZhao Jun 20, 2024
9994a5e
Update task.go
NeganZhao Jun 20, 2024
8b65423
Update task.go
NeganZhao Jun 20, 2024
9467f96
Update task.go
NeganZhao Jun 20, 2024
1aba411
Update task.go
NeganZhao Jun 20, 2024
483582b
Update task.go
NeganZhao Jun 20, 2024
eb58a79
Update task.go
NeganZhao Jun 20, 2024
8a09434
Update cmd_nix.go
NeganZhao Jun 20, 2024
1ba127d
Update task.go
NeganZhao Jun 20, 2024
dc77436
Update task.go
NeganZhao Jun 20, 2024
531e1a8
Update task.go
NeganZhao Jun 20, 2024
d788b44
Update task.go
NeganZhao Jun 20, 2024
1d68755
Update task.go
NeganZhao Jun 20, 2024
0a45acc
Update task.go
NeganZhao Jun 20, 2024
7998e14
Update task.go
NeganZhao Jun 20, 2024
edfa0dc
Update task.go
NeganZhao Jun 20, 2024
a354a42
Update task.go
NeganZhao Jun 20, 2024
a6bb245
Update task.go
NeganZhao Jun 20, 2024
4250788
Update tasks.go
NeganZhao Jun 20, 2024
0ab829f
s
NeganZhao Jun 20, 2024
f03e28e
Update tasks.go
NeganZhao Jun 20, 2024
686b322
a
NeganZhao Jun 20, 2024
05ad2e5
Update heartbeat.go
NeganZhao Jun 20, 2024
b471dff
aa
NeganZhao Jun 20, 2024
13325da
Update tasks.go
NeganZhao Jun 20, 2024
f4b568f
Update tasks.go
NeganZhao Jun 20, 2024
aae5781
Update tasks.go
NeganZhao Jun 20, 2024
7d7b639
Update heartbeat.go
NeganZhao Jun 20, 2024
b1f5913
Update tasks.go
NeganZhao Jun 20, 2024
33b5cbe
Update tasks.go
NeganZhao Jun 20, 2024
e729a6a
a
NeganZhao Jun 20, 2024
0142811
realtime report
NeganZhao Jun 21, 2024
c06af96
Update task.go
NeganZhao Jun 21, 2024
e1fd24a
Update task.go
NeganZhao Jun 21, 2024
afeaca8
Update task.go
NeganZhao Jun 21, 2024
faaa5b4
Update task.go
NeganZhao Jun 21, 2024
206a0a8
Update task.go
NeganZhao Jun 21, 2024
eb6c3f4
Update task.go
NeganZhao Jun 21, 2024
8e9cdfb
error realtime output
NeganZhao Jun 21, 2024
40335bd
a
NeganZhao Jul 2, 2024
dab45db
Update task.go
NeganZhao Jul 2, 2024
31f8184
Update task.go
NeganZhao Jul 2, 2024
5ca023f
Update task.go
NeganZhao Jul 2, 2024
f65d04f
Update task.go
NeganZhao Jul 2, 2024
18d72d5
Update task.go
NeganZhao Jul 2, 2024
8535793
Merge branch 'flashcatcloud:main' into main
NeganZhao Jul 11, 2024
dd2cee8
Update task.go
NeganZhao Jul 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ibex/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func heartbeat() {
}

var resp types.ReportResponse

err := client.GetCli().Call("Server.Report", req, &resp)

if err != nil {
Expand Down
48 changes: 43 additions & 5 deletions ibex/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
package ibex

import (
"bufio"
"bytes"
"fmt"
"io"
"log"
"os/exec"
"os/user"
Expand Down Expand Up @@ -281,28 +283,65 @@ func (t *Task) start() {
}
}

cmd.Stdout = &t.Stdout
cmd.Stderr = &t.Stderr
cmd.Stdin = t.Stdin
t.Cmd = cmd

stdout, err := t.Cmd.StdoutPipe()
if err != nil {
log.Printf("E! cannot read ouput of task[%d]: %v", t.Id, err)
}

stderr, err := t.Cmd.StderrPipe()

if err != nil {
log.Printf("E! cannot read err of task[%d]: %v", t.Id, err)
}

err = CmdStart(cmd)

if err != nil {
log.Printf("E! cannot start cmd of task[%d]: %v", t.Id, err)
return
}

go runProcess(t)
go runProcessRealtime(stdout, stderr, t)
}

func (t *Task) kill() {
go killProcess(t)
}

func runProcess(t *Task) {
func runProcessRealtime(stdout io.ReadCloser, stderr io.ReadCloser, t *Task) {
t.SetAlive(true)
defer t.SetAlive(false)

reader := bufio.NewReader(stdout)

go func() {
for {
line, err2 := reader.ReadString('\n')
if err2 != nil || io.EOF == err2 {
break
}
t.Stdout.WriteString(line)

persistResult(t)
}
}()

errReader := bufio.NewReader(stderr)

go func() {
for {
line, err2 := errReader.ReadString('\n')
if err2 != nil || io.EOF == err2 {
break
}
t.Stderr.WriteString(line)
persistResult(t)
}
}()

err := t.Cmd.Wait()
if err != nil {
if strings.Contains(err.Error(), "signal: killed") {
Expand All @@ -326,7 +365,6 @@ func runProcess(t *Task) {

func persistResult(t *Task) {
metadir := config.Config.Ibex.MetaDir

stdout := filepath.Join(metadir, fmt.Sprint(t.Id), "stdout")
stderr := filepath.Join(metadir, fmt.Sprint(t.Id), "stderr")
doneFlag := filepath.Join(metadir, fmt.Sprint(t.Id), fmt.Sprintf("%d.done", t.Clock))
Expand Down
2 changes: 1 addition & 1 deletion ibex/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (lt *LocalTasksT) ReportTasks() []types.ReportTask {
rt := types.ReportTask{Id: id, Clock: t.Clock}

rt.Status = t.GetStatus()
if rt.Status == "running" || rt.Status == "killing" {
if rt.Status == "killing" {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里running为什么去掉了?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

原本的日志上报如果是在running状态的话是不会上报的,这里去掉了可以每次心跳上报一次

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

running的日志是否有必要上报?只上报错误日志是否就够了?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果任务运行的时间过长,比如一两天,但是并不知道该任务运行到哪一步了。实时的话就方便查看

// intermediate state
continue
}
Expand Down
Loading