Skip to content

Commit

Permalink
Merge pull request #2447 from actiontech/issue_2348_new
Browse files Browse the repository at this point in the history
feat: plugins process pid file and kill plugins process residual
  • Loading branch information
ColdWaterLW authored Jun 6, 2024
2 parents d665ac2 + 2538005 commit 0c424f8
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 6 deletions.
13 changes: 8 additions & 5 deletions sqle/driver/plugin_adapter_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
sqlDriver "database/sql/driver"
"errors"
"fmt"
"os"
"sync"

driverV2 "github.com/actiontech/sqle/sqle/driver/v2"
Expand All @@ -18,11 +19,12 @@ import (
)

type PluginProcessorV2 struct {
cfg func(cmdBase string, cmdArgs []string) *goPlugin.ClientConfig
cmdBase string
cmdArgs []string
client *goPlugin.Client
meta *driverV2.DriverMetas
cfg func(cmdBase string, cmdArgs []string) *goPlugin.ClientConfig
cmdBase string
cmdArgs []string
client *goPlugin.Client
meta *driverV2.DriverMetas
pluginPidFilePath string
sync.Mutex
}

Expand Down Expand Up @@ -142,6 +144,7 @@ func (d *PluginProcessorV2) Stop() error {
if d.client != nil {
d.client.Kill()
}
os.Remove(d.pluginPidFilePath)
d.Unlock()
return nil
}
Expand Down
143 changes: 142 additions & 1 deletion sqle/driver/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"

"github.com/actiontech/sqle/sqle/config"
Expand Down Expand Up @@ -177,6 +180,26 @@ func (pm *pluginManager) Start(pluginDir string, pluginConfigList []config.Plugi
return err
}

// kill plugins process residual and remove pidfile
var wg sync.WaitGroup
dir := GetPluginPidDirPath(pluginDir)
if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if !info.IsDir() && strings.HasSuffix(info.Name(), ".pid") {
wg.Add(1)
go func() {
defer wg.Done()
err = KillResidualPluginsProcess(path)
if err != nil {
log.NewEntry().Warnf("stop residual plugin %s error: %v", path, err)
}
}()
}
return nil
}); err != nil {
log.NewEntry().Warnf("stop residual plugin file path walk error: %v", err)
}
wg.Wait()

// register plugin
for _, p := range plugins {
cmdBase := filepath.Join(pluginDir, p.Name())
Expand All @@ -202,12 +225,17 @@ func (pm *pluginManager) Start(pluginDir string, pluginConfigList []config.Plugi
return fmt.Errorf("plugin %v failed to start, error: %v Please check the sqled.log for more details", p.Name(), err)
}

pluginPidFilePath := GetPluginPidFilePath(pluginDir, p.Name())
err = WritePidFile(pluginPidFilePath, int64(client.ReattachConfig().Pid))
if err != nil {
return fmt.Errorf("write plugin %s pid file failed, error: %v", pluginPidFilePath, err)
}
var pp PluginProcessor
switch client.NegotiatedVersion() {
case driverV1.ProtocolVersion:
pp = &PluginProcessorV1{cfg: getClientConfig, cmdBase: cmdBase, cmdArgs: cmdArgs, client: client}
case driverV2.ProtocolVersion:
pp = &PluginProcessorV2{cfg: getClientConfig, cmdBase: cmdBase, cmdArgs: cmdArgs, client: client}
pp = &PluginProcessorV2{cfg: getClientConfig, cmdBase: cmdBase, cmdArgs: cmdArgs, client: client, pluginPidFilePath: pluginPidFilePath}
}
if err := pm.register(pp); err != nil {
stopErr := pp.Stop()
Expand Down Expand Up @@ -243,3 +271,116 @@ func (pm *pluginManager) OpenPlugin(l *logrus.Entry, pluginName string, cfg *dri
}
return pm.pluginProcessors[pluginName].Open(l, cfg)
}

func KillResidualPluginsProcess(pidFile string) error {
process, err := GetProcessByPidFile(pidFile)
if err != nil {
return fmt.Errorf("get plugin %s process failed, error: %v", pidFile, err)
}
if process != nil {
err = StopProcess(process)
if err != nil {
return fmt.Errorf("stop plugin process [%v] failed, error: %v", process.Pid, err)
}
}
err = os.Remove(pidFile)
if err != nil {
return fmt.Errorf("remove pid file %s error: %v", pidFile, err)
}
return nil
}

// 根据pid文件获取进程信息
func GetProcessByPidFile(pluginPidFile string) (*os.Process, error) {
if _, err := os.Stat(pluginPidFile); err != nil {
if !os.IsNotExist(err) {
return nil, err
}
} else {
pidContent, err := os.ReadFile(pluginPidFile)
if err != nil {
return nil, err
}
if len(pidContent) == 0 {
return nil, nil
}
pid, err := strconv.Atoi(string(pidContent))
if err != nil {
return nil, err
}
// 获取进程
process, err := GetProcessByPid(pid)
if err != nil {
return nil, err
}
return process, nil
}
return nil, nil
}

// 根据pid获取进程信息,若进程已退出则返回nil
func GetProcessByPid(pid int) (*os.Process, error) {
process, err := os.FindProcess(pid)
if err != nil {
return nil, err
}
// 检查进程是否存在的方式
err = process.Signal(syscall.Signal(0))
if err != nil {
if errors.Is(err, os.ErrProcessDone) {
return nil, nil
}
return nil, err
}
return process, nil
}

// 退出进程
func StopProcess(process *os.Process) error {
doneChan := time.NewTicker(2 * time.Second)
defer doneChan.Stop()
for {
select {
case <-doneChan.C:
log.NewEntry().Warnf("stop plugin process [%v] failed, just kill it ", process.Pid)
err := process.Kill()
if err != nil {
return err
}
return nil
default:
err := process.Signal(syscall.SIGTERM)
if errors.Is(err, os.ErrProcessDone) {
return nil
}
}
}
}

func GetPluginPidDirPath(pluginDir string) string {
return filepath.Join(pluginDir, "pidfile")
}

func GetPluginPidFilePath(pluginDir string, pluginName string) string {
return filepath.Join(GetPluginPidDirPath(pluginDir), pluginName+".pid")
}

func WritePidFile(pidFilePath string, pid int64) error {
_, err := os.Stat(pidFilePath)
if os.IsNotExist(err) {
if err := os.MkdirAll(filepath.Dir(pidFilePath), 0644); err != nil {
return err
}
}
file, err := os.OpenFile(pidFilePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return err
}
defer file.Close()

_, err = fmt.Fprintf(file, "%d", pid)
if err != nil {
return err
}
return nil
}

0 comments on commit 0c424f8

Please sign in to comment.