From c4bdf213e7523a1c091b6085d1fb9ab16760e8a4 Mon Sep 17 00:00:00 2001 From: Max Date: Wed, 30 Oct 2024 16:26:55 +0800 Subject: [PATCH 1/2] Fix memory leaks, Add ReadCloser and WriteCloser methods to FileSystem interface; refactor Process to manage execution results --- api/handler.go | 41 ++++++++++++++++------ api/http.go | 11 +++--- fs/fs.go | 10 ++++++ fs/fs_test.go | 47 +++++++++++++++++++++++++ fs/process.go | 4 +-- fs/system/system.go | 25 ++++++++++++++ fs/types.go | 3 ++ http/process.go | 5 ++- process/process.go | 76 ++++++++++++++++++++++++++++++++++------- process/process_test.go | 70 +++++++++++++++++++++++++++++++++++++ process/types.go | 1 + 11 files changed, 262 insertions(+), 31 deletions(-) diff --git a/api/handler.go b/api/handler.go index 69aecb94..9f55c088 100644 --- a/api/handler.go +++ b/api/handler.go @@ -56,17 +56,27 @@ func (path Path) defaultHandler(getArgs argsHandler) func(c *gin.Context) { } } + // Release Memory + defer func() { resp = nil; body = nil }() switch data := body.(type) { case maps.Map, map[string]interface{}, []interface{}, []maps.Map, []map[string]interface{}: + defer func() { data = nil }() c.JSON(status, data) c.Done() return case []byte: + defer func() { data = nil }() c.Data(status, contentType, data) c.Done() return + case io.ReadCloser: + defer data.Close() + c.DataFromReader(status, -1, contentType, data, nil) + c.Done() + return + case error: ex := exception.Err(data, 500) c.JSON(ex.Code, gin.H{"message": ex.Message, "code": ex.Code}) @@ -281,13 +291,13 @@ func (path Path) execProcess(ctx context.Context, chRes chan<- interface{}, c *g } process.WithContext(ctx) - res, err := process.Exec() + err = process.Execute() if err != nil { log.Error("[Path] %s %s", path.Path, err.Error()) chRes <- err return } - chRes <- res + chRes <- process.Value() } func (path Path) runProcess(ctx context.Context, c *gin.Context, getArgs argsHandler) interface{} { @@ -308,7 +318,12 @@ func (path Path) runProcess(ctx context.Context, c *gin.Context, getArgs argsHan } process.WithContext(ctx) - return process.Run() + err := process.Execute() + if err != nil { + log.Error("[Path] %s %s", path.Path, err.Error()) + exception.Err(err, 500).Throw() + } + return process.Value() } func (path Path) reqContentType(c *gin.Context) string { @@ -322,23 +337,27 @@ func (path Path) reqContentType(c *gin.Context) string { func (path Path) setResponseHeaders(c *gin.Context, resp interface{}, contentType string) string { if len(path.Out.Headers) > 0 { res := any.Of(resp) - if res.IsMap() { // 处理变量 + + // Parse Headers + if res.IsMap() { data := res.Map().MapStrAny.Dot() for name, value := range path.Out.Headers { v := helper.Bind(value, data) if v != nil { - c.Writer.Header().Set(name, fmt.Sprintf("%v", v)) + path.Out.Headers[name] = fmt.Sprintf("%v", v) } } - } else { - for name, value := range path.Out.Headers { - c.Writer.Header().Set(name, value) - if name == "Content-Type" { - contentType = value - } + } + + // Set Headers and replace Content-Type if exists + for name, value := range path.Out.Headers { + c.Writer.Header().Set(name, value) + if name == "Content-Type" { + contentType = value } } } + return contentType } diff --git a/api/http.go b/api/http.go index ddaa156f..0aff9ccc 100644 --- a/api/http.go +++ b/api/http.go @@ -357,7 +357,7 @@ func (http HTTP) parseIn(in []interface{}) func(c *gin.Context) []interface{} { } ext := filepath.Ext(file.Filename) - dir, err := os.MkdirTemp(os.TempDir(), "upload") + dir, err := os.MkdirTemp("", "upload") if err != nil { return types.UploadFile{Error: fmt.Sprintf("%s %s", arg[1], err.Error())} } @@ -366,13 +366,13 @@ func (http HTTP) parseIn(in []interface{}) func(c *gin.Context) []interface{} { if err != nil { return types.UploadFile{Error: fmt.Sprintf("%s %s", arg[1], err.Error())} } + defer tmpfile.Close() - err = c.SaveUploadedFile(file, tmpfile.Name()) - if err != nil { + if err := c.SaveUploadedFile(file, tmpfile.Name()); err != nil { return types.UploadFile{Error: fmt.Sprintf("%s %s", arg[1], err.Error())} } - return types.UploadFile{ + uploadFile := types.UploadFile{ UID: c.GetHeader("Content-Uid"), Range: c.GetHeader("Content-Range"), Sync: c.GetHeader("Content-Sync") == "true", // sync upload or not @@ -381,6 +381,9 @@ func (http HTTP) parseIn(in []interface{}) func(c *gin.Context) []interface{} { Size: file.Size, Header: file.Header, } + file = nil + tmpfile = nil + return uploadFile }) } else { // 原始数值 new := v diff --git a/fs/fs.go b/fs/fs.go index 001ca55d..5fdc0d5d 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -144,6 +144,11 @@ func ReadFile(xfs FileSystem, file string) ([]byte, error) { return xfs.ReadFile(file) } +// ReadCloser returns a ReadCloser to read the named file. +func ReadCloser(xfs FileSystem, file string) (io.ReadCloser, error) { + return xfs.ReadCloser(file) +} + // WriteFile writes data to the named file, creating it if necessary. // // If the file does not exist, WriteFile creates it with permissions perm (before umask); otherwise WriteFile truncates it before writing, without changing permissions. @@ -151,6 +156,11 @@ func WriteFile(xfs FileSystem, file string, data []byte, perm uint32) (int, erro return xfs.WriteFile(file, data, perm) } +// WriteCloser returns a WriteCloser that writes to the named file. +func WriteCloser(xfs FileSystem, file string, perm uint32) (io.WriteCloser, error) { + return xfs.WriteCloser(file, perm) +} + // Write writes the content of reader to the named file, creating it if necessary. func Write(xfs FileSystem, file string, reader io.Reader, perm uint32) (int, error) { return xfs.Write(file, reader, perm) diff --git a/fs/fs_test.go b/fs/fs_test.go index 72510f4c..6618c70b 100644 --- a/fs/fs_test.go +++ b/fs/fs_test.go @@ -5,6 +5,7 @@ import ( "encoding/base64" "fmt" "image" + "io" "math/rand" "os" "path/filepath" @@ -286,6 +287,52 @@ func TestWrite(t *testing.T) { } } +func TestReadCloser(t *testing.T) { + stores := testStores(t) + f := testFiles(t) + for name, stor := range stores { + clear(stor, t) + data := testData(t) + + // Write + length, err := WriteFile(stor, f["F1"], data, 0644) + assert.Nil(t, err, name) + checkFileExists(stor, t, f["F1"], name) + checkFileSize(stor, t, f["F1"], length, name) + checkFileMode(stor, t, f["F1"], 0644, name) + + // ReadCloser + rc, err := ReadCloser(stor, f["F1"]) + assert.Nil(t, err, name) + content, err := io.ReadAll(rc) + assert.Nil(t, err, name) + assert.Equal(t, data, content, name) + rc.Close() + } +} + +func TestWriteCloser(t *testing.T) { + stores := testStores(t) + f := testFiles(t) + for name, stor := range stores { + clear(stor, t) + + // WriteCloser + wc, err := WriteCloser(stor, f["F1"], 0644) + assert.Nil(t, err, name) + data := testData(t) + n, err := wc.Write(data) + assert.Nil(t, err, name) + assert.Equal(t, len(data), n, name) + wc.Close() + + // Check the content + fileContent, err := ReadFile(stor, f["F1"]) + assert.Nil(t, err, name) + assert.Equal(t, data, fileContent, name) + } +} + func TestAppendFile(t *testing.T) { stores := testStores(t) f := testFiles(t) diff --git a/fs/process.go b/fs/process.go index 3e3ebc74..c1524c95 100644 --- a/fs/process.go +++ b/fs/process.go @@ -575,7 +575,7 @@ func processDownload(process *process.Process) interface{} { process.ValidateArgNums(1) stor := stor(process) file := process.ArgsString(0) - data, err := ReadFile(stor, file) + reader, err := ReadCloser(stor, file) if err != nil { exception.New(err.Error(), 500).Throw() } @@ -586,7 +586,7 @@ func processDownload(process *process.Process) interface{} { } return map[string]interface{}{ - "content": data, + "content": reader, "type": mimeType, } } diff --git a/fs/system/system.go b/fs/system/system.go index 62f06b31..b3656cbb 100644 --- a/fs/system/system.go +++ b/fs/system/system.go @@ -91,6 +91,31 @@ func (f *File) ReadFile(file string) ([]byte, error) { return os.ReadFile(file) } +// ReadCloser returns a ReadCloser with the file content +func (f *File) ReadCloser(file string) (io.ReadCloser, error) { + file, err := f.absPath(file) + if err != nil { + return nil, err + } + return os.OpenFile(file, os.O_RDONLY, 0) +} + +// WriteCloser returns a WriteCloser with the file content +func (f *File) WriteCloser(file string, perm uint32) (io.WriteCloser, error) { + file, err := f.absPath(file) + if err != nil { + return nil, err + } + + dir := filepath.Dir(file) + err = os.MkdirAll(dir, os.ModePerm) + if err != nil && !os.IsExist(err) { + return nil, err + } + + return os.OpenFile(file, os.O_CREATE|os.O_WRONLY, fs.FileMode(perm)) +} + // WriteFile writes data to the named file, creating it if necessary. // // If the file does not exist, WriteFile creates it with permissions perm (before umask); otherwise WriteFile truncates it before writing, without changing permissions. diff --git a/fs/types.go b/fs/types.go index 7b146923..80027a66 100644 --- a/fs/types.go +++ b/fs/types.go @@ -23,6 +23,9 @@ type FileSystem interface { MkdirTemp(dir string, pattern string) (string, error) Glob(pattern string) ([]string, error) + ReadCloser(file string) (io.ReadCloser, error) + WriteCloser(file string, perm uint32) (io.WriteCloser, error) + Remove(name string) error RemoveAll(name string) error diff --git a/http/process.go b/http/process.go index 418219e8..8ed52c3d 100644 --- a/http/process.go +++ b/http/process.go @@ -349,12 +349,15 @@ func processHTTPStream(p *process.Process) interface{} { return HandlerReturnError } - res, err := procesHandler.WithSID(p.Sid).WithGlobal(p.Global).Exec() + err = procesHandler.WithSID(p.Sid).WithGlobal(p.Global).Execute() if err != nil { log.Error("[http.Stream] %s %s", handler, err.Error()) return HandlerReturnError } + defer procesHandler.Release() + // Get the result + res := procesHandler.Value() if v, ok := res.(int); ok { return v } diff --git a/process/process.go b/process/process.go index 0279ac96..1d81668a 100644 --- a/process/process.go +++ b/process/process.go @@ -30,7 +30,54 @@ func Of(name string, args ...interface{}) (*Process, error) { return process, nil } +// Execute execute the process and return error only +func (process *Process) Execute() error { + var hd Handler + hd, err := process.handler() + if err != nil { + return err + } + + defer func() { err = exception.Catch(recover()) }() + value := hd(process) + process._val = &value + return err +} + +// Release the value of the process +func (process *Process) Release() { + process._val = nil +} + +// Dispose the process after run success +func (process *Process) Dispose() { + if process.Runtime != nil { + process.Runtime.Dispose() + } + + process.Args = nil + process.Global = nil + process.Context = nil + process.Runtime = nil + process._val = nil + process = nil +} + +// Value get the result of the process +func (process *Process) Value() interface{} { + if process._val != nil { + return *process._val + } + return nil +} + // Run the process +// **** +// +// This function casues a memory leak, will be disposed in the future, +// Use Execute() instead +// +// **** func (process *Process) Run() interface{} { hd, err := process.handler() if err != nil { @@ -42,6 +89,22 @@ func (process *Process) Run() interface{} { } // Exec execute the process and return error +// +// **** +// +// This function casues a memory leak, will be disposed in the future, +// Use Execute() instead +// Example: +// +// process := Of("models.user.pet.Find", 1, {}) +// err := process.Execute(); +// if err != nil { +// // handle error +// } +// defer process.Release() // or process.Dispose() if you want to relese the runtime isolate after run success +// result := process.Value() // Get the result +// +// **** func (process *Process) Exec() (value interface{}, err error) { var hd Handler @@ -104,19 +167,6 @@ func (process *Process) WithRuntime(runtime Runtime) *Process { return process } -// Dispose the process after run success -func (process *Process) Dispose() { - if process.Runtime != nil { - process.Runtime.Dispose() - } - - process.Args = nil - process.Global = nil - process.Context = nil - process.Runtime = nil - process = nil -} - // handler get the process handler func (process *Process) handler() (Handler, error) { if hander, has := Handlers[process.Handler]; has && hander != nil { diff --git a/process/process_test.go b/process/process_test.go index 42cf54b4..f7df6fa4 100644 --- a/process/process_test.go +++ b/process/process_test.go @@ -209,6 +209,76 @@ func TestExec(t *testing.T) { assert.Equal(t, "Exception|404:models.widget.Notfound Handler -> models.notfound not found", err.Error()) } +func TestExectueAndRelease(t *testing.T) { + + prepare(t) + var p *Process = nil + + // unit.test.prepare + p = New("unit.test.prepare", "foo", "bar") + err := p.Execute() + if err != nil { + t.Fatal(err) + } + res := p.Value() + data, ok := res.(map[string]interface{}) + assert.True(t, ok) + assert.Equal(t, "unit", data["group"]) + assert.Equal(t, "", data["method"]) + assert.Equal(t, "", data["id"]) + assert.Equal(t, []interface{}{"foo", "bar"}, data["args"]) + p.Release() + assert.Equal(t, nil, p.Value()) + + // models.widget.Test + p = New("models.widget.Test", "foo", "bar") + err = p.Execute() + res = p.Value() + data, ok = res.(map[string]interface{}) + assert.True(t, ok) + assert.Equal(t, "models", data["group"]) + assert.Equal(t, "Test", data["method"]) + assert.Equal(t, "widget", data["id"]) + assert.Equal(t, []interface{}{"foo", "bar"}, data["args"]) + p.Release() + assert.Equal(t, nil, p.Value()) + + // flows.widget + p = New("flows.widget", "foo", "bar") + err = p.Execute() + res = p.Value() + data, ok = res.(map[string]interface{}) + assert.True(t, ok) + assert.Equal(t, "flows", data["group"]) + assert.Equal(t, "", data["method"]) + assert.Equal(t, "widget", data["id"]) + assert.Equal(t, []interface{}{"foo", "bar"}, data["args"]) + p.Release() + assert.Equal(t, nil, p.Value()) + + // session.Get + p = New("session.Get", "foo", "bar") + err = p.Execute() + res = p.Value() + data, ok = res.(map[string]interface{}) + assert.True(t, ok) + assert.Equal(t, "session", data["group"]) + assert.Equal(t, "Get", data["method"]) + assert.Equal(t, "", data["id"]) + assert.Equal(t, []interface{}{"foo", "bar"}, data["args"]) + p.Release() + assert.Equal(t, nil, p.Value()) + + // models.widget.Notfound + p = New("models.widget.Notfound", "foo", "bar") + err = p.Execute() + res = p.Value() + assert.Equal(t, nil, res) + assert.Equal(t, "Exception|404:models.widget.Notfound Handler -> models.notfound not found", err.Error()) + p.Release() + assert.Equal(t, nil, p.Value()) +} + func TestWithSID(t *testing.T) { prepare(t) diff --git a/process/types.go b/process/types.go index 0e7a579d..066fc780 100644 --- a/process/types.go +++ b/process/types.go @@ -16,6 +16,7 @@ type Process struct { Sid string // Session ID Context context.Context // Context Runtime Runtime // Runtime + _val *interface{} // Value // The result of the process } // Runtime interface From b39978d3d64d9ae8030c0020bf08117492e7c92a Mon Sep 17 00:00:00 2001 From: Max Date: Wed, 30 Oct 2024 16:43:12 +0800 Subject: [PATCH 2/2] Fix misspelling --- process/process.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/process/process.go b/process/process.go index 1d81668a..365f8dff 100644 --- a/process/process.go +++ b/process/process.go @@ -74,7 +74,7 @@ func (process *Process) Value() interface{} { // Run the process // **** // -// This function casues a memory leak, will be disposed in the future, +// This function causes a memory leak, will be disposed in the future, // Use Execute() instead // // **** @@ -92,7 +92,7 @@ func (process *Process) Run() interface{} { // // **** // -// This function casues a memory leak, will be disposed in the future, +// This function causes a memory leak, will be disposed in the future, // Use Execute() instead // Example: //