From 064571145db35285cf9dd11400d8673ba29096de Mon Sep 17 00:00:00 2001 From: wanghucheng Date: Wed, 5 Jun 2024 17:51:11 +0800 Subject: [PATCH 1/4] feat: plugins process pid file and kill plugins process residual --- sqle/driver/plugin_adapter_v2.go | 13 +-- sqle/driver/plugin_manager.go | 140 ++++++++++++++++++++++++++++++- 2 files changed, 147 insertions(+), 6 deletions(-) diff --git a/sqle/driver/plugin_adapter_v2.go b/sqle/driver/plugin_adapter_v2.go index d1923b68de..e8e213340f 100644 --- a/sqle/driver/plugin_adapter_v2.go +++ b/sqle/driver/plugin_adapter_v2.go @@ -5,6 +5,7 @@ import ( sqlDriver "database/sql/driver" "errors" "fmt" + "os" "sync" driverV2 "github.com/actiontech/sqle/sqle/driver/v2" @@ -18,11 +19,12 @@ import ( ) type PluginProcessorV2 struct { - cfg func(cmdBase string, cmdArgs []string) *goPlugin.ClientConfig - cmdBase string - cmdArgs []string - client *goPlugin.Client - meta *driverV2.DriverMetas + cfg func(cmdBase string, cmdArgs []string) *goPlugin.ClientConfig + cmdBase string + cmdArgs []string + client *goPlugin.Client + meta *driverV2.DriverMetas + pluginPidFilePath string sync.Mutex } @@ -142,6 +144,7 @@ func (d *PluginProcessorV2) Stop() error { if d.client != nil { d.client.Kill() } + os.Remove(d.pluginPidFilePath) d.Unlock() return nil } diff --git a/sqle/driver/plugin_manager.go b/sqle/driver/plugin_manager.go index 6045d93606..f62325362f 100644 --- a/sqle/driver/plugin_manager.go +++ b/sqle/driver/plugin_manager.go @@ -5,7 +5,10 @@ import ( "os" "os/exec" "path/filepath" + "strconv" "strings" + "sync" + "syscall" "time" "github.com/actiontech/sqle/sqle/config" @@ -177,6 +180,26 @@ func (pm *pluginManager) Start(pluginDir string, pluginConfigList []config.Plugi return err } + // kill plugins process residual and remove pidfile + var wg sync.WaitGroup + dir := GetPluginPidDirPath(pluginDir) + if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if !info.IsDir() && strings.HasSuffix(info.Name(), ".pid") { + wg.Add(1) + go func() { + defer wg.Done() + err = KillResidualPluginsProcess(path) + if err != nil { + log.NewEntry().Warnf("stop plugin error: %v", err) + } + }() + } + return nil + }); err != nil { + log.NewEntry().Warnf("file path walk stop plugin error: %v", err) + } + wg.Wait() + // register plugin for _, p := range plugins { cmdBase := filepath.Join(pluginDir, p.Name()) @@ -202,12 +225,17 @@ func (pm *pluginManager) Start(pluginDir string, pluginConfigList []config.Plugi return fmt.Errorf("plugin %v failed to start, error: %v Please check the sqled.log for more details", p.Name(), err) } + pluginPidFilePath := GetPluginPidFilePath(pluginDir, p.Name()) + err = WritePidFile(pluginPidFilePath, int64(client.ReattachConfig().Pid)) + if err != nil { + return fmt.Errorf("write plugin %s pid file failed, error: %v", pluginPidFilePath, err) + } var pp PluginProcessor switch client.NegotiatedVersion() { case driverV1.ProtocolVersion: pp = &PluginProcessorV1{cfg: getClientConfig, cmdBase: cmdBase, cmdArgs: cmdArgs, client: client} case driverV2.ProtocolVersion: - pp = &PluginProcessorV2{cfg: getClientConfig, cmdBase: cmdBase, cmdArgs: cmdArgs, client: client} + pp = &PluginProcessorV2{cfg: getClientConfig, cmdBase: cmdBase, cmdArgs: cmdArgs, client: client, pluginPidFilePath: pluginPidFilePath} } if err := pm.register(pp); err != nil { stopErr := pp.Stop() @@ -243,3 +271,113 @@ func (pm *pluginManager) OpenPlugin(l *logrus.Entry, pluginName string, cfg *dri } return pm.pluginProcessors[pluginName].Open(l, cfg) } + +func KillResidualPluginsProcess(pidFile string) error { + process, err := GetProcessByPidFile(pidFile) + if err != nil { + return fmt.Errorf("get plugin %s process failed, error: %v", pidFile, err) + } + if process != nil { + err = StopProcess(process) + if err != nil { + return fmt.Errorf("stop plugin process [%v] failed, error: %v", process.Pid, err) + } + } + os.Remove(pidFile) + return nil +} + +// 根据pid文件获取进程信息 +func GetProcessByPidFile(pluginPidFile string) (*os.Process, error) { + if _, err := os.Stat(pluginPidFile); err != nil { + if !os.IsNotExist(err) { + return nil, err + } + } else { + pidContent, err := os.ReadFile(pluginPidFile) + if err != nil { + return nil, err + } + if len(pidContent) == 0 { + return nil, nil + } + pid, err := strconv.Atoi(string(pidContent)) + if err != nil { + return nil, err + } + // 获取进程 + process, err := GetProcessByPid(pid) + if err != nil { + return nil, err + } + return process, nil + } + return nil, nil +} + +// 根据pid获取进程信息,若进程已退出则返回nil +func GetProcessByPid(pid int) (*os.Process, error) { + process, err := os.FindProcess(pid) + if err != nil { + return nil, err + } + // 检查进程是否存在的方式 + err = process.Signal(syscall.Signal(0)) + if err != nil { + if errors.Is(err, os.ErrProcessDone) { + return nil, nil + } + return nil, err + } + return process, nil +} + +// 退出进程 +func StopProcess(process *os.Process) error { + doneChan := time.NewTicker(2 * time.Second) + defer doneChan.Stop() + for { + select { + case <-doneChan.C: + log.NewEntry().Warnf("stop plugin process [%v] failed, just kill it ", process.Pid) + err := process.Kill() + if err != nil { + return err + } + return nil + default: + err := process.Signal(syscall.SIGTERM) + if errors.Is(err, os.ErrProcessDone) { + return nil + } + } + } +} + +func GetPluginPidDirPath(pluginDir string) string { + return filepath.Join(pluginDir, "pidfile") +} + +func GetPluginPidFilePath(pluginDir string, pluginName string) string { + return filepath.Join(GetPluginPidDirPath(pluginDir), pluginName+".pid") +} + +func WritePidFile(pidFilePath string, pid int64) error { + _, err := os.Stat(pidFilePath) + if os.IsNotExist(err) { + if err := os.MkdirAll(filepath.Dir(pidFilePath), 0644); err != nil { + return err + } + } + file, err := os.OpenFile(pidFilePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if err != nil { + return err + } + defer file.Close() + + _, err = fmt.Fprintf(file, "%d", pid) + if err != nil { + return err + } + return nil +} From 000236606a8026b2ef9b5f16db9bd20a6751e75c Mon Sep 17 00:00:00 2001 From: wanghucheng Date: Thu, 6 Jun 2024 10:26:46 +0800 Subject: [PATCH 2/4] modify: stop residual plugin log --- sqle/driver/plugin_manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sqle/driver/plugin_manager.go b/sqle/driver/plugin_manager.go index f62325362f..cefdfd8784 100644 --- a/sqle/driver/plugin_manager.go +++ b/sqle/driver/plugin_manager.go @@ -190,13 +190,13 @@ func (pm *pluginManager) Start(pluginDir string, pluginConfigList []config.Plugi defer wg.Done() err = KillResidualPluginsProcess(path) if err != nil { - log.NewEntry().Warnf("stop plugin error: %v", err) + log.NewEntry().Warnf("stop residual plugin error: %v", err) } }() } return nil }); err != nil { - log.NewEntry().Warnf("file path walk stop plugin error: %v", err) + log.NewEntry().Warnf("stop residual plugin file path walk error: %v", err) } wg.Wait() From 1ea42fb5028ca3bc07a1236e88254b3150a3c790 Mon Sep 17 00:00:00 2001 From: wanghucheng Date: Thu, 6 Jun 2024 10:33:19 +0800 Subject: [PATCH 3/4] add: remove pid file error log --- sqle/driver/plugin_manager.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sqle/driver/plugin_manager.go b/sqle/driver/plugin_manager.go index cefdfd8784..93a21141e5 100644 --- a/sqle/driver/plugin_manager.go +++ b/sqle/driver/plugin_manager.go @@ -283,7 +283,10 @@ func KillResidualPluginsProcess(pidFile string) error { return fmt.Errorf("stop plugin process [%v] failed, error: %v", process.Pid, err) } } - os.Remove(pidFile) + err = os.Remove(pidFile) + if err != nil { + return fmt.Errorf("remove pid file error: %v", err) + } return nil } From 2538005b65dd6299b1449882d09e3ed1ccbc7e84 Mon Sep 17 00:00:00 2001 From: wanghucheng Date: Thu, 6 Jun 2024 15:09:53 +0800 Subject: [PATCH 4/4] modify: add file name for remove pid file error log --- sqle/driver/plugin_manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sqle/driver/plugin_manager.go b/sqle/driver/plugin_manager.go index 93a21141e5..dc1912f0c1 100644 --- a/sqle/driver/plugin_manager.go +++ b/sqle/driver/plugin_manager.go @@ -190,7 +190,7 @@ func (pm *pluginManager) Start(pluginDir string, pluginConfigList []config.Plugi defer wg.Done() err = KillResidualPluginsProcess(path) if err != nil { - log.NewEntry().Warnf("stop residual plugin error: %v", err) + log.NewEntry().Warnf("stop residual plugin %s error: %v", path, err) } }() } @@ -285,7 +285,7 @@ func KillResidualPluginsProcess(pidFile string) error { } err = os.Remove(pidFile) if err != nil { - return fmt.Errorf("remove pid file error: %v", err) + return fmt.Errorf("remove pid file %s error: %v", pidFile, err) } return nil }