diff --git a/universe/auto_syncer.go b/universe/auto_syncer.go index 3a32df1ab..2058c5324 100644 --- a/universe/auto_syncer.go +++ b/universe/auto_syncer.go @@ -76,6 +76,12 @@ type FederationPushReq struct { // federation proof push was successful. resp chan *Proof + // proofSyncLogged is a boolean that indicates, if true, that the + // proof leaf sync attempt should be logged and actively managed to + // ensure that the federation push procedure is repeated in the event of + // a failure. + proofSyncLogged bool + err chan error } @@ -227,48 +233,117 @@ func (f *FederationEnvoy) syncServerState(ctx context.Context, return nil } -// pushProofToFederation attempts to push out a new proof to the current -// federation in parallel. -func (f *FederationEnvoy) pushProofToFederation(uniID Identifier, key LeafKey, - leaf *Leaf) { +func (f *FederationEnvoy) pushProofToServer(ctx context.Context, + uniID Identifier, key LeafKey, leaf *Leaf, addr ServerAddr) error { - // Fetch all universe servers in our federation. - fedServers, err := f.tryFetchServers() - if err != nil || len(fedServers) == 0 { - return + remoteUniverseServer, err := f.cfg.NewRemoteRegistrar(addr) + if err != nil { + return fmt.Errorf("cannot push proof unable to connect "+ + "to remote server(%v): %w", addr.HostStr(), err) + } + + _, err = remoteUniverseServer.UpsertProofLeaf( + ctx, uniID, key, leaf, + ) + if err != nil { + return fmt.Errorf("cannot push proof to remote "+ + "server(%v): %w", addr.HostStr(), err) + } + + return nil +} + +func (f *FederationEnvoy) pushProofToServerLogged(ctx context.Context, + uniID Identifier, key LeafKey, leaf *Leaf, addr ServerAddr) error { + + // Ensure that we have a pending sync log entry for this + // leaf and server pair. This will allow us to handle all + // pending syncs in the event of a restart or at a different + // point in the envoy. + _, err := f.cfg.FederationDB.UpsertFederationProofSyncLog( + ctx, uniID, key, addr, SyncDirectionPush, + ProofSyncStatusPending, true, + ) + if err != nil { + log.Warnf("unable to log proof sync as pending: %v", + err) + return nil + } + + remoteUniverseServer, err := f.cfg.NewRemoteRegistrar(addr) + if err != nil { + log.Warnf("cannot push proof unable to connect "+ + "to remote server(%v): %v", addr.HostStr(), + err) + return nil + } + + _, err = remoteUniverseServer.UpsertProofLeaf( + ctx, uniID, key, leaf, + ) + if err != nil { + log.Warnf("cannot push proof to remote "+ + "server(%v): %v", addr.HostStr(), err) + return nil + } + + // We did not encounter an error in our proof push + // attempt. Log the proof sync attempt as complete. + _, err = f.cfg.FederationDB.UpsertFederationProofSyncLog( + ctx, uniID, key, addr, SyncDirectionPush, + ProofSyncStatusComplete, false, + ) + if err != nil { + log.Warnf("unable to log proof sync attempt: %v", + err) + return nil } + return nil +} + +// pushProofToFederation attempts to push out a new proof to the current +// federation in parallel. +func (f *FederationEnvoy) pushProofToFederation(ctx context.Context, + uniID Identifier, key LeafKey, leaf *Leaf, fedServers []ServerAddr, + proofSyncLogged bool) { + log.Infof("Pushing new proof to %v federation members, proof_key=%v", len(fedServers), spew.Sdump(key)) - ctx, cancel := f.WithCtxQuitNoTimeout() - defer cancel() - // To push a new proof out, we'll attempt to dial to the remote // registrar, then will attempt to push the new proof directly to the // register. pushNewProof := func(ctx context.Context, addr ServerAddr) error { - remoteUniverseServer, err := f.cfg.NewRemoteRegistrar(addr) - if err != nil { - log.Warnf("cannot push proof unable to connect "+ - "to remote server(%v): %v", addr.HostStr(), - err) + // If we are logging proof sync attempts, we will use the + // logged version of the push function. + if proofSyncLogged { + err := f.pushProofToServerLogged( + ctx, uniID, key, leaf, addr, + ) + if err != nil { + log.Warnf("cannot push proof via logged "+ + "server push: %v", err) + return nil + } + return nil } - _, err = remoteUniverseServer.UpsertProofLeaf( - ctx, uniID, key, leaf, - ) + // If we are not logging proof sync attempts, we will use the + // non-logged version of the push function. + err := f.pushProofToServer(ctx, uniID, key, leaf, addr) if err != nil { - log.Warnf("cannot push proof to remote "+ - "server(%v): %v", addr.HostStr(), err) + log.Warnf("cannot push proof: %v", err) + return nil } + return nil } // To conclude, we'll attempt to push the new proof to all the universe // servers in parallel. - err = fn.ParSlice(ctx, fedServers, pushNewProof) + err := fn.ParSlice(ctx, fedServers, pushNewProof) if err != nil { // TODO(roasbeef): retry in the background until successful? log.Errorf("unable to push proof to federation: %v", err) @@ -276,6 +351,60 @@ func (f *FederationEnvoy) pushProofToFederation(uniID Identifier, key LeafKey, } } +// filterProofSyncPending filters out servers that have already been synced +// with for the given leaf. +func (f *FederationEnvoy) filterProofSyncPending(fedServers []ServerAddr, + uniID Identifier, key LeafKey) ([]ServerAddr, error) { + + // If there are no servers to filter, then we'll return early. This + // saves from querying the database unnecessarily. + if len(fedServers) == 0 { + return nil, nil + } + + ctx, cancel := f.WithCtxQuit() + defer cancel() + + // Select all sync push complete log entries for the given universe + // leaf. If there are any servers which are sync complete within this + // log set, we will filter them out of our target server set. + logs, err := f.cfg.FederationDB.QueryFederationProofSyncLog( + ctx, uniID, key, SyncDirectionPush, + ProofSyncStatusComplete, + ) + if err != nil { + return nil, fmt.Errorf("unable to query federation sync log: %w", + err) + } + + // Filter out servers that we've already pushed to. + filteredFedServers := fn.Filter(fedServers, func(a ServerAddr) bool { + for idx := range logs { + log := logs[idx] + + // Filter out servers that have a log entry with sync + // status complete. We've already specified that we + // want sync complete logs only by this point, but we'll + // explicitly check here to make our logic clear. + if log.ServerAddr.HostStr() == a.HostStr() && + log.SyncStatus == ProofSyncStatusComplete { + + return false + } + + // TODO(ffranr): Add timestamp check to filter out + // servers that we've pushed to recently. + } + + // By this point we haven't found logs corresponding to the + // given server, we will therefore return true and include the + // server as a sync target for the given leaf. + return true + }) + + return filteredFedServers, nil +} + // syncer is the main goroutine that's responsible for interacting with the // federation envoy. It also accepts incoming requests to push out new updates // to the federation. @@ -289,6 +418,16 @@ func (f *FederationEnvoy) syncer() { syncTicker := time.NewTicker(f.cfg.SyncInterval) defer syncTicker.Stop() + // We'll use a timeout that's slightly less than the sync interval to + // help avoid ticking into a new sync event before the previous event + // has finished. + syncContextTimeout := f.cfg.SyncInterval - 1*time.Second + if syncContextTimeout < 0 { + // If the sync interval is less than a second, then we'll use + // the sync interval as the timeout. + syncContextTimeout = f.cfg.SyncInterval + } + for { select { // A new sync event has just been triggered, so we'll attempt @@ -313,6 +452,51 @@ func (f *FederationEnvoy) syncer() { continue } + // After we've synced with the federation, we'll + // attempt to push out any pending proofs that we + // haven't yet completed. + ctxFetchLog, cancelFetchLog := f.WithCtxQuitNoTimeout() + syncDirection := SyncDirectionPush + logEntries, err := + f.cfg.FederationDB.FetchPendingProofsSyncLog( + ctxFetchLog, &syncDirection, + ) + if err != nil { + log.Warnf("unable to query pending push "+ + "sync log: %w", err) + continue + } + cancelFetchLog() + + // TODO(ffranr): Take account of any new servers that + // have been added since the last time we populated the + // log for a given proof leaf. Pending proof sync log + // entries are only relevant for the set of servers + // that existed at the time the log entry was created. + // If a new server is added, then we should create a + // new log entry for the new server. + + for idx := range logEntries { + entry := logEntries[idx] + + go func() { + ctx, cancel := f.CtxBlockingCustomTimeout( + syncContextTimeout, + ) + defer cancel() + + servers := []ServerAddr{ + entry.ServerAddr, + } + + f.pushProofToFederation( + ctx, entry.UniID, entry.LeafKey, + &entry.Leaf, servers, + true, + ) + }() + } + // A new push request has just arrived. We'll perform a // asynchronous registration with the local Universe registrar, // then push it out in an async manner to the federation @@ -341,11 +525,50 @@ func (f *FederationEnvoy) syncer() { // proof out to the federation in the background. pushReq.resp <- newProof - // With the response sent above, we'll push this out to - // all the Universe servers in the background. - go f.pushProofToFederation( - pushReq.ID, pushReq.Key, pushReq.Leaf, - ) + // Fetch all universe servers in our federation. + fedServers, err := f.tryFetchServers() + if err != nil { + err := fmt.Errorf("unable to fetch federation servers: %w", err) + log.Warnf(err.Error()) + pushReq.err <- err + continue + } + + if len(fedServers) == 0 { + log.Warnf("could not find any federation " + + "servers") + continue + } + + if pushReq.proofSyncLogged { + // We are attempting to sync using the + // logged proof sync procedure. We will + // therefore narrow down the set of target + // servers based on the sync log. Only servers + // that are not yet push sync complete will be + // targeted. + fedServers, err = f.filterProofSyncPending( + fedServers, pushReq.ID, pushReq.Key, + ) + if err != nil { + log.Warnf("failed to filter " + + "federation servers") + continue + } + } + + // With the response sent above, we'll push this + // out to all the Universe servers in the + // background. + go func() { + ctx, cancel := f.WithCtxQuitNoTimeout() + defer cancel() + f.pushProofToFederation( + ctx, pushReq.ID, pushReq.Key, + pushReq.Leaf, fedServers, + pushReq.proofSyncLogged, + ) + }() case pushReq := <-f.batchPushRequests: ctx, cancel := f.WithCtxQuitNoTimeout() @@ -370,13 +593,34 @@ func (f *FederationEnvoy) syncer() { // we'll return back to the caller. pushReq.resp <- struct{}{} + // Fetch all universe servers in our federation. + fedServers, err := f.tryFetchServers() + if err != nil { + err := fmt.Errorf("unable to fetch "+ + "federation servers: %w", err) + log.Warnf(err.Error()) + pushReq.err <- err + continue + } + + if len(fedServers) == 0 { + log.Warnf("could not find any federation " + + "servers") + continue + } + // With the response sent above, we'll push this out to // all the Universe servers in the background. go func() { + ctxPush, cancelPush := f.WithCtxQuitNoTimeout() + defer cancelPush() + for idx := range pushReq.Batch { item := pushReq.Batch[idx] + f.pushProofToFederation( - item.ID, item.Key, item.Leaf, + ctxPush, item.ID, item.Key, + item.Leaf, fedServers, false, ) } }() @@ -395,12 +639,18 @@ func (f *FederationEnvoy) syncer() { func (f *FederationEnvoy) UpsertProofLeaf(_ context.Context, id Identifier, key LeafKey, leaf *Leaf) (*Proof, error) { + // If we're attempting to push an issuance proof, then we'll ensure + // that we track the sync attempt to ensure that we retry in the event + // of a failure. + logProofSync := id.ProofType == ProofTypeIssuance + pushReq := &FederationPushReq{ - ID: id, - Key: key, - Leaf: leaf, - resp: make(chan *Proof, 1), - err: make(chan error, 1), + ID: id, + Key: key, + Leaf: leaf, + proofSyncLogged: logProofSync, + resp: make(chan *Proof, 1), + err: make(chan error, 1), } if !fn.SendOrQuit(f.pushRequests, pushReq, f.Quit) {