Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
benwaffle committed Aug 1, 2024
1 parent af508f6 commit 6809214
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 36 deletions.
6 changes: 3 additions & 3 deletions cmd/monitor/monitor_dead_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ func deadMemberMonitorTick(ctx context.Context, node *flypg.Node, seenAt map[int
}

for _, voter := range votingMembers {
sConn, err := node.RepMgr.NewRemoteConnection(ctx, voter.Hostname)
sConn, err := node.RepMgr.NewRemoteConnection(ctx, voter.NodeName)
if err != nil {
// TODO - Verify the exception that's getting thrown.
if time.Since(seenAt[voter.ID]) >= deadMemberRemovalThreshold {
log.Printf("Removing dead member: %s\n", voter.Hostname)
log.Printf("Removing dead member: %s\n", voter.NodeName)
if err := node.RepMgr.UnregisterMember(voter); err != nil {
log.Printf("failed to unregister member %s: %v", voter.Hostname, err)
log.Printf("failed to unregister member %s: %v", voter.NodeName, err)
continue
}
delete(seenAt, voter.ID)
Expand Down
6 changes: 3 additions & 3 deletions internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (n *Node) Init(ctx context.Context) error {
return fmt.Errorf("failed to resolve member over dns: %s", err)
}

if err := n.RepMgr.clonePrimary(cloneTarget.Hostname); err != nil {
if err := n.RepMgr.clonePrimary(cloneTarget.NodeName); err != nil {
// Clean-up the directory so it can be retried.
if rErr := os.Remove(n.DataDir); rErr != nil {
log.Printf("[ERROR] failed to cleanup postgresql dir after clone error: %s\n", rErr)
Expand Down Expand Up @@ -326,7 +326,7 @@ func (n *Node) PostInit(ctx context.Context) error {
}

// Register existing witness to apply any configuration changes.
if err := n.RepMgr.registerWitness(primary.Hostname); err != nil {
if err := n.RepMgr.registerWitness(primary.NodeName); err != nil {
return fmt.Errorf("failed to register existing witness: %s", err)
}
default:
Expand Down Expand Up @@ -408,7 +408,7 @@ func (n *Node) PostInit(ctx context.Context) error {
return fmt.Errorf("failed to resolve primary member: %s", err)
}

if err := n.RepMgr.registerWitness(primary.Hostname); err != nil {
if err := n.RepMgr.registerWitness(primary.NodeName); err != nil {
return fmt.Errorf("failed to register witness: %s", err)
}
} else {
Expand Down
12 changes: 6 additions & 6 deletions internal/flypg/readonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,31 +70,31 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error {

for _, member := range members {
if member.Role == PrimaryRoleName {
endpoint := fmt.Sprintf("http://%s:5500/%s", fmt.Sprintf("%s.vm.%s.internal", member.Hostname, n.AppName), target)
endpoint := fmt.Sprintf("http://%s:5500/%s", fmt.Sprintf("%s.vm.%s.internal", member.NodeName, n.AppName), target)
resp, err := http.Get(endpoint)
if err != nil {
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.Hostname, err)
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.NodeName, err)
continue
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode > 299 {
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %d\n", member.Hostname, resp.StatusCode)
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %d\n", member.NodeName, resp.StatusCode)
}
}
}

for _, member := range members {
endpoint := fmt.Sprintf("http://%s:5500/%s", fmt.Sprintf("%s.vm.%s.internal", member.Hostname, n.AppName), RestartHaproxyEndpoint)
endpoint := fmt.Sprintf("http://%s:5500/%s", fmt.Sprintf("%s.vm.%s.internal", member.NodeName, n.AppName), RestartHaproxyEndpoint)
resp, err := http.Get(endpoint)
if err != nil {
log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.Hostname, err)
log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.NodeName, err)
continue
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode > 299 {
log.Printf("[WARN] Failed to restart haproxy on member %s: %d\n", member.Hostname, resp.StatusCode)
log.Printf("[WARN] Failed to restart haproxy on member %s: %d\n", member.NodeName, resp.StatusCode)
}
}

Expand Down
19 changes: 10 additions & 9 deletions internal/flypg/repmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func (r *RepMgr) NewLocalConnection(ctx context.Context) (*pgx.Conn, error) {
return openConnection(ctx, host, r.DatabaseName, r.Credentials)
}

func (r *RepMgr) NewRemoteConnection(ctx context.Context, hostname string) (*pgx.Conn, error) {
func (r *RepMgr) NewRemoteConnection(ctx context.Context, machineID string) (*pgx.Conn, error) {
hostname := fmt.Sprintf("%s.vm.%s.internal", machineID, r.AppName)
host := net.JoinHostPort(hostname, strconv.Itoa(r.Port))
return openConnection(ctx, host, r.DatabaseName, r.Credentials)
}
Expand Down Expand Up @@ -289,10 +290,10 @@ func (r *RepMgr) unregisterWitness(id int) error {
return err
}

func (r *RepMgr) rejoinCluster(hostname string) error {
func (r *RepMgr) rejoinCluster(machineID string) error {
cmdStr := fmt.Sprintf("repmgr -f %s node rejoin -h %s -p %d -U %s -d %s --force-rewind --no-wait",
r.ConfigPath,
hostname,
fmt.Sprintf("%s.vm.%s.internal", machineID, r.AppName),
r.Port,
r.Credentials.Username,
r.DatabaseName,
Expand Down Expand Up @@ -327,7 +328,7 @@ func (r *RepMgr) clonePrimary(machineId string) error {

type Member struct {
ID int
Hostname string
NodeName string
Active bool
Region string
Role string
Expand All @@ -344,7 +345,7 @@ func (*RepMgr) Members(ctx context.Context, pg *pgx.Conn) ([]Member, error) {
var members []Member
for rows.Next() {
var member Member
if err := rows.Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role); err != nil {
if err := rows.Scan(&member.ID, &member.NodeName, &member.Region, &member.Active, &member.Role); err != nil {
return nil, err
}

Expand Down Expand Up @@ -377,7 +378,7 @@ func (r *RepMgr) Member(ctx context.Context, conn *pgx.Conn) (*Member, error) {
func (*RepMgr) PrimaryMember(ctx context.Context, pg *pgx.Conn) (*Member, error) {
var member Member
sql := "select node_id, node_name, location, active, type from repmgr.nodes where type = 'primary' and active = true;"
err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role)
err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.NodeName, &member.Region, &member.Active, &member.Role)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -414,7 +415,7 @@ func (*RepMgr) MemberByID(ctx context.Context, pg *pgx.Conn, id int) (*Member, e
var member Member
sql := fmt.Sprintf("select node_id, node_name, location, active, type from repmgr.nodes where node_id = %d;", id)

err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role)
err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.NodeName, &member.Region, &member.Active, &member.Role)
if err != nil {
return nil, err
}
Expand All @@ -426,7 +427,7 @@ func (*RepMgr) MemberByHostname(ctx context.Context, pg *pgx.Conn, hostname stri
var member Member
sql := fmt.Sprintf("select node_id, node_name, location, active, type from repmgr.nodes where node_name = '%s';", hostname)

err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role)
err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.NodeName, &member.Region, &member.Active, &member.Role)
if err != nil {
return nil, err
}
Expand All @@ -447,7 +448,7 @@ func (r *RepMgr) ResolveMemberOverDNS(ctx context.Context) (*Member, error) {
continue
}

conn, err := r.NewRemoteConnection(ctx, fmt.Sprintf("%s.vm.%s.internal", machineId, r.AppName))
conn, err := r.NewRemoteConnection(ctx, machineId)
if err != nil {
continue
}
Expand Down
29 changes: 14 additions & 15 deletions internal/flypg/zombie.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"log"
"net"
"os"

"github.com/fly-apps/postgres-flex/internal/utils"
Expand Down Expand Up @@ -95,9 +94,9 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp

for _, standby := range standbys {
// Check for connectivity
mConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.Hostname)
mConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.NodeName)
if err != nil {
log.Printf("[WARN] Failed to connect to %s\n", standby.Hostname)
log.Printf("[WARN] Failed to connect to %s\n", standby.NodeName)
sample.totalInactive++
continue
}
Expand All @@ -106,7 +105,7 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp
// Verify the primary
primary, err := node.RepMgr.PrimaryMember(ctx, mConn)
if err != nil {
log.Printf("[WARN] Failed to resolve primary from standby %s\n", standby.Hostname)
log.Printf("[WARN] Failed to resolve primary from standby %s\n", standby.NodeName)
sample.totalInactive++
continue
}
Expand All @@ -118,9 +117,9 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp
sample.totalActive++

// Record conflict when primary doesn't match.
if primary.Hostname != node.PrivateIP {
if primary.NodeName != node.MachineID {
sample.totalConflicts++
sample.conflictMap[primary.Hostname]++
sample.conflictMap[primary.NodeName]++
}
}

Expand Down Expand Up @@ -199,24 +198,24 @@ func handleZombieLock(ctx context.Context, n *Node) error {
// If the zombie lock contains a hostname, it means we were able to
// resolve the real primary and will attempt to rejoin it.
if primaryStr != "" {
ip := net.ParseIP(primaryStr)
if ip == nil {
return fmt.Errorf("zombie.lock file contains an invalid ipv6 address")
}
//ip := net.ParseIP(primaryStr)
//if ip == nil {
// return fmt.Errorf("zombie.lock file contains an invalid ipv6 address")
//}

conn, err := n.RepMgr.NewRemoteConnection(ctx, ip.String())
conn, err := n.RepMgr.NewRemoteConnection(ctx, primaryStr)
if err != nil {
return fmt.Errorf("failed to establish a connection to our rejoin target %s: %s", ip.String(), err)
return fmt.Errorf("failed to establish a connection to our rejoin target %s: %s", primaryStr, err)
}
defer func() { _ = conn.Close(ctx) }()

primary, err := n.RepMgr.PrimaryMember(ctx, conn)
if err != nil {
return fmt.Errorf("failed to confirm primary on recover target %s: %s", ip.String(), err)
return fmt.Errorf("failed to confirm primary on recover target %s: %s", primaryStr, err)
}

// Confirm that our rejoin target still identifies itself as the primary.
if primary.Hostname != ip.String() {
if primary.NodeName != primaryStr {
// Clear the zombie.lock file so we can attempt to re-resolve the correct primary.
if err := RemoveZombieLock(); err != nil {
return fmt.Errorf("failed to remove zombie lock: %s", err)
Expand All @@ -231,7 +230,7 @@ func handleZombieLock(ctx context.Context, n *Node) error {
return ErrZombieLockRegionMismatch
}

if err := n.RepMgr.rejoinCluster(primary.Hostname); err != nil {
if err := n.RepMgr.rejoinCluster(primary.NodeName); err != nil {
return fmt.Errorf("failed to rejoin cluster: %s", err)
}

Expand Down

0 comments on commit 6809214

Please sign in to comment.