diff --git a/pkg/commands/admin.go b/pkg/commands/admin.go index 13c7d230..90449dd7 100644 --- a/pkg/commands/admin.go +++ b/pkg/commands/admin.go @@ -3,6 +3,8 @@ package commands import ( "encoding/json" "fmt" + "github.com/fly-examples/postgres-ha/pkg/flypg" + "github.com/pkg/errors" "io" "net/http" "os/exec" @@ -174,3 +176,115 @@ func handleUpdateSettings(w http.ResponseWriter, r *http.Request) { render.JSON(w, resp, http.StatusOK) } + +func handleReplicationStats(w http.ResponseWriter, r *http.Request) { + conn, close, err := localConnection(r.Context()) + if err != nil { + render.Err(w, err) + return + } + defer close() + + stats, err := admin.ResolveReplicationLag(r.Context(), conn) + if err != nil { + render.Err(w, err) + return + } + + resp := &Response{Result: stats} + + render.JSON(w, resp, http.StatusOK) +} + +func handleStolonDBUid(w http.ResponseWriter, r *http.Request) { + env, err := util.BuildEnv() + if err != nil { + render.Err(w, err) + } + + data, err := stolon.FetchClusterData(env) + if err != nil { + render.Err(w, err) + } + + node, err := flypg.NewNode() + if err != nil { + render.Err(w, err) + } + + for _, db := range data.DBs { + if db.Spec.KeeperUID == node.KeeperUID { + resp := &Response{Result: db.UID} + render.JSON(w, resp, http.StatusOK) + return + } + } + + render.Err(w, errors.New("can't find db")) +} + +func handleEnableReadonly(w http.ResponseWriter, r *http.Request) { + conn, close, err := localConnection(r.Context()) + if err != nil { + render.Err(w, err) + return + } + defer close() + + err = admin.SetReadonly(r.Context(), conn, true) + if err != nil { + render.Err(w, err) + } + + args := []string{"root", "pkill", "haproxy"} + + cmd := exec.Command("gosu", args...) + + if err := cmd.Run(); err != nil { + render.Err(w, err) + return + } + + if cmd.ProcessState.ExitCode() != 0 { + err := fmt.Errorf(cmd.ProcessState.String()) + render.Err(w, err) + return + } + + resp := &Response{Result: true} + + render.JSON(w, resp, http.StatusOK) +} + +func handleDisableReadonly(w http.ResponseWriter, r *http.Request) { + conn, close, err := localConnection(r.Context()) + if err != nil { + render.Err(w, err) + return + } + defer close() + + err = admin.SetReadonly(r.Context(), conn, false) + if err != nil { + render.Err(w, err) + } + + args := []string{"root", "pkill", "haproxy"} + + cmd := exec.Command("gosu", args...) + + if err := cmd.Run(); err != nil { + render.Err(w, err) + return + } + + if cmd.ProcessState.ExitCode() != 0 { + err := fmt.Errorf(cmd.ProcessState.String()) + render.Err(w, err) + return + } + + resp := &Response{Result: true} + + render.JSON(w, resp, http.StatusOK) +} diff --git a/pkg/commands/handler.go b/pkg/commands/handler.go index 51732151..68c6d3a4 100644 --- a/pkg/commands/handler.go +++ b/pkg/commands/handler.go @@ -32,6 +32,10 @@ func Handler() http.Handler { r.Get("/failover/trigger", handleFailoverTrigger) r.Get("/restart", handleRestart) r.Get("/settings/view", handleViewSettings) + r.Get("/replicationstats", handleReplicationStats) + r.Post("/readonly/enable", handleEnableReadonly) + r.Post("/readonly/disable", handleDisableReadonly) + r.Get("/dbuid", handleStolonDBUid) r.Post("/settings/update", handleUpdateSettings) }) diff --git a/pkg/flypg/admin/admin.go b/pkg/flypg/admin/admin.go index f118bfde..3077b761 100644 --- a/pkg/flypg/admin/admin.go +++ b/pkg/flypg/admin/admin.go @@ -5,6 +5,7 @@ import ( "context" "crypto/md5" "fmt" + "github.com/pkg/errors" "os" "strings" @@ -252,6 +253,59 @@ func ResolveRole(ctx context.Context, pg *pgx.Conn) (string, error) { return "leader", nil } +type ReplicationStat struct { + Name string + Diff int +} + +func ResolveReplicationLag(ctx context.Context, pg *pgx.Conn) ([]*ReplicationStat, error) { + sql := "select application_name, pg_current_wal_lsn() - flush_lsn as diff from pg_stat_replication;" + + rows, err := pg.Query(ctx, sql) + if err != nil { + return nil, err + } + defer rows.Close() + var stats []*ReplicationStat + for rows.Next() { + var s ReplicationStat + if err := rows.Scan(&s.Name, &s.Diff); err != nil { + return nil, err + } + stats = append(stats, &s) + } + return stats, nil +} + +func SetReadonly(ctx context.Context, pg *pgx.Conn, enable bool) error { + role, err := ResolveRole(ctx, pg) + if err != nil { + return err + } + if role != "leader" { + return errors.New("can't set non primary to read-only") + } + + databases, err := ListDatabases(ctx, pg) + if err != nil { + return err + } + + for _, db := range databases { + // exclude administrative dbs + if db.Name == "postgres" { + continue + } + + sql := fmt.Sprintf("ALTER DATABASE %s SET default_transaction_read_only=%v;", db.Name, enable) + if _, err = pg.Exec(ctx, sql); err != nil { + return fmt.Errorf("failed to alter readonly state on db %s: %s", db.Name, err) + } + } + + return nil +} + func ResolveSettings(ctx context.Context, pg *pgx.Conn, list []string) (*flypg.Settings, error) { node, err := flypg.NewNode() if err != nil {