Skip to content

Commit

Permalink
Merge pull request #99 from fly-apps/add_repl_slot_admin
Browse files Browse the repository at this point in the history
Add admin api to view Replication slots
  • Loading branch information
DAlperin authored Apr 5, 2023
2 parents 559cfcc + 06fcd85 commit 9dcff36
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 0 deletions.
114 changes: 114 additions & 0 deletions pkg/commands/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package commands
import (
"encoding/json"
"fmt"
"github.com/fly-examples/postgres-ha/pkg/flypg"
"github.com/pkg/errors"
"io"
"net/http"
"os/exec"
Expand Down Expand Up @@ -174,3 +176,115 @@ func handleUpdateSettings(w http.ResponseWriter, r *http.Request) {

render.JSON(w, resp, http.StatusOK)
}

func handleReplicationStats(w http.ResponseWriter, r *http.Request) {
conn, close, err := localConnection(r.Context())
if err != nil {
render.Err(w, err)
return
}
defer close()

stats, err := admin.ResolveReplicationLag(r.Context(), conn)
if err != nil {
render.Err(w, err)
return
}

resp := &Response{Result: stats}

render.JSON(w, resp, http.StatusOK)
}

func handleStolonDBUid(w http.ResponseWriter, r *http.Request) {
env, err := util.BuildEnv()
if err != nil {
render.Err(w, err)
}

data, err := stolon.FetchClusterData(env)
if err != nil {
render.Err(w, err)
}

node, err := flypg.NewNode()
if err != nil {
render.Err(w, err)
}

for _, db := range data.DBs {
if db.Spec.KeeperUID == node.KeeperUID {
resp := &Response{Result: db.UID}
render.JSON(w, resp, http.StatusOK)
return
}
}

render.Err(w, errors.New("can't find db"))
}

func handleEnableReadonly(w http.ResponseWriter, r *http.Request) {
conn, close, err := localConnection(r.Context())
if err != nil {
render.Err(w, err)
return
}
defer close()

err = admin.SetReadonly(r.Context(), conn, true)
if err != nil {
render.Err(w, err)
}

args := []string{"root", "pkill", "haproxy"}

cmd := exec.Command("gosu", args...)

if err := cmd.Run(); err != nil {
render.Err(w, err)
return
}

if cmd.ProcessState.ExitCode() != 0 {
err := fmt.Errorf(cmd.ProcessState.String())
render.Err(w, err)
return
}

resp := &Response{Result: true}

render.JSON(w, resp, http.StatusOK)
}

func handleDisableReadonly(w http.ResponseWriter, r *http.Request) {
conn, close, err := localConnection(r.Context())
if err != nil {
render.Err(w, err)
return
}
defer close()

err = admin.SetReadonly(r.Context(), conn, false)
if err != nil {
render.Err(w, err)
}

args := []string{"root", "pkill", "haproxy"}

cmd := exec.Command("gosu", args...)

if err := cmd.Run(); err != nil {
render.Err(w, err)
return
}

if cmd.ProcessState.ExitCode() != 0 {
err := fmt.Errorf(cmd.ProcessState.String())
render.Err(w, err)
return
}

resp := &Response{Result: true}

render.JSON(w, resp, http.StatusOK)
}
4 changes: 4 additions & 0 deletions pkg/commands/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func Handler() http.Handler {
r.Get("/failover/trigger", handleFailoverTrigger)
r.Get("/restart", handleRestart)
r.Get("/settings/view", handleViewSettings)
r.Get("/replicationstats", handleReplicationStats)
r.Post("/readonly/enable", handleEnableReadonly)
r.Post("/readonly/disable", handleDisableReadonly)
r.Get("/dbuid", handleStolonDBUid)
r.Post("/settings/update", handleUpdateSettings)
})

Expand Down
54 changes: 54 additions & 0 deletions pkg/flypg/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"crypto/md5"
"fmt"
"github.com/pkg/errors"
"os"
"strings"

Expand Down Expand Up @@ -252,6 +253,59 @@ func ResolveRole(ctx context.Context, pg *pgx.Conn) (string, error) {
return "leader", nil
}

type ReplicationStat struct {
Name string
Diff int
}

func ResolveReplicationLag(ctx context.Context, pg *pgx.Conn) ([]*ReplicationStat, error) {
sql := "select application_name, pg_current_wal_lsn() - flush_lsn as diff from pg_stat_replication;"

rows, err := pg.Query(ctx, sql)
if err != nil {
return nil, err
}
defer rows.Close()
var stats []*ReplicationStat
for rows.Next() {
var s ReplicationStat
if err := rows.Scan(&s.Name, &s.Diff); err != nil {
return nil, err
}
stats = append(stats, &s)
}
return stats, nil
}

func SetReadonly(ctx context.Context, pg *pgx.Conn, enable bool) error {
role, err := ResolveRole(ctx, pg)
if err != nil {
return err
}
if role != "leader" {
return errors.New("can't set non primary to read-only")
}

databases, err := ListDatabases(ctx, pg)
if err != nil {
return err
}

for _, db := range databases {
// exclude administrative dbs
if db.Name == "postgres" {
continue
}

sql := fmt.Sprintf("ALTER DATABASE %s SET default_transaction_read_only=%v;", db.Name, enable)
if _, err = pg.Exec(ctx, sql); err != nil {
return fmt.Errorf("failed to alter readonly state on db %s: %s", db.Name, err)
}
}

return nil
}

func ResolveSettings(ctx context.Context, pg *pgx.Conn, list []string) (*flypg.Settings, error) {
node, err := flypg.NewNode()
if err != nil {
Expand Down

0 comments on commit 9dcff36

Please sign in to comment.