Skip to content

Commit

Permalink
fix application_name on replica startup
Browse files Browse the repository at this point in the history
  • Loading branch information
benwaffle committed Aug 2, 2024
1 parent 74fadf3 commit bbd55f0
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 3 deletions.
47 changes: 46 additions & 1 deletion internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/fly-apps/postgres-flex/internal/privnet"
"github.com/fly-apps/postgres-flex/internal/utils"
"github.com/jackc/pgx/v5"
"golang.org/x/exp/slices"
)

type Node struct {
Expand Down Expand Up @@ -272,7 +273,8 @@ func (n *Node) PostInit(ctx context.Context) error {
// Restart repmgrd in the event the IP changes for an already registered node.
// This can happen if the underlying volume is moved to a different node.
// TODO - this isn't an IP anymore
daemonRestartRequired := n.RepMgr.daemonRestartRequired(member)
//daemonRestartRequired := n.RepMgr.daemonRestartRequired(member)
daemonRestartRequired := false

switch member.Role {
case PrimaryRoleName:
Expand Down Expand Up @@ -316,6 +318,49 @@ func (n *Node) PostInit(ctx context.Context) error {
}
}
case StandbyRoleName:
// This section handles migration from 6pn as repmgr node name to machine ID as repmgr node name
primary, err := n.RepMgr.PrimaryMember(ctx, repConn)
if err != nil {
return fmt.Errorf("failed to resolve primary member when updating standby: %s", err)
}

primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.NodeName)
if err != nil {
return fmt.Errorf("failed to establish connection to primary: %s", err)
}
defer func() { _ = primaryConn.Close(ctx) }()

rows, err := primaryConn.Query(ctx, "select application_name from pg_stat_replication")
if err != nil {
return fmt.Errorf("failed to query pg_stat_replication: %s", err)
}
defer rows.Close()

var applicationNames []string
for rows.Next() {
var applicationName string
if err := rows.Scan(&applicationName); err != nil {
return fmt.Errorf("failed to scan application_name: %s", err)
}
applicationNames = append(applicationNames, applicationName)
}
if err := rows.Err(); err != nil {
return fmt.Errorf("failed to iterate over rows: %s", err)
}

if slices.Contains(applicationNames, n.PrivateIP) {
log.Printf("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID...")

if err := n.RepMgr.regenReplicationConf(ctx); err != nil {
return fmt.Errorf("failed to clone standby: %s", err)
}

if err := n.PGConfig.reload(ctx); err != nil {
return fmt.Errorf("failed to reload postgresql: %s", err)
}
}
// end of 6pn -> machine ID migration stuff

// Register existing standby to apply any configuration changes.
if err := n.RepMgr.registerStandby(daemonRestartRequired); err != nil {
return fmt.Errorf("failed to register existing standby: %s", err)
Expand Down
8 changes: 8 additions & 0 deletions internal/flypg/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,3 +606,11 @@ func diskSizeInBytes(dir string) (uint64, error) {
}
return stat.Blocks * uint64(stat.Bsize), nil
}

func (*PGConfig) reload(ctx context.Context) error {
_, err := utils.RunCmd(ctx, "postgres", "pg_ctl", "-D", "/data/postgresql/", "reload")
if err != nil {
return fmt.Errorf("failed to reload postgres: %s", err)
}
return nil
}
30 changes: 28 additions & 2 deletions internal/flypg/repmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,19 @@ func (r *RepMgr) NewLocalConnection(ctx context.Context) (*pgx.Conn, error) {
return openConnection(ctx, host, r.DatabaseName, r.Credentials)
}

func (r *RepMgr) NewRemoteConnection(ctx context.Context, machineID string) (*pgx.Conn, error) {
hostname := fmt.Sprintf("%s.vm.%s.internal", machineID, r.AppName)
// target - can be an IP address, machine ID in the current app, or other hostname
func (r *RepMgr) NewRemoteConnection(ctx context.Context, target string) (*pgx.Conn, error) {
var hostname string

ip := net.ParseIP(target)
if ip != nil {
hostname = target
} else if len(target) == 14 {
hostname = fmt.Sprintf("%s.vm.%s.internal", target, r.AppName)
} else {
hostname = target
}

host := net.JoinHostPort(hostname, strconv.Itoa(r.Port))
return openConnection(ctx, host, r.DatabaseName, r.Credentials)
}
Expand Down Expand Up @@ -326,6 +337,21 @@ func (r *RepMgr) clonePrimary(machineId string) error {
return nil
}

func (r *RepMgr) regenReplicationConf(ctx context.Context) error {
// TODO: do we need -c?
if _, err := utils.RunCmd(ctx, "postgres",
"repmgr", "--replication-conf-only",
"-h", r.PrivateIP, // TODO: should this be the hostname, or even just localhost
"-p", fmt.Sprint(r.Port),
"-d", r.DatabaseName,
"-U", r.Credentials.Username,
"-f", r.ConfigPath,
"standby", "clone", "-F"); err != nil {
return fmt.Errorf("failed to regenerate replication conf: %s", err)
}
return nil
}

type Member struct {
ID int
NodeName string
Expand Down

0 comments on commit bbd55f0

Please sign in to comment.