Skip to content

Commit

Permalink
synchronously index comments for the first time run
Browse files Browse the repository at this point in the history
  • Loading branch information
vdimir committed Apr 2, 2023
1 parent 42d6bcd commit f9e7345
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 34 deletions.
16 changes: 4 additions & 12 deletions backend/app/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/go-pkgz/lcw/eventbus"
log "github.com/go-pkgz/lgr"
ntf "github.com/go-pkgz/notify"
"github.com/go-pkgz/syncs"
"github.com/golang-jwt/jwt"
"github.com/kyokomi/emoji/v2"
bolt "go.etcd.io/bbolt"
Expand Down Expand Up @@ -672,20 +671,13 @@ func (a *serverApp) run(ctx context.Context) error {
log.Printf("[WARN] failed to resubmit comments with staging images, %s", e)
}

// synchronously index comments in several workers for the first time run
numWorkersForIndexing := 8
grp := syncs.NewErrSizedGroup(numWorkersForIndexing)
for _, siteID := range a.Sites {
a.dataService.RunSiteIndexers(ctx, siteID, grp) // index comments for the first time run
err := a.dataService.IndexSites(ctx, a.Sites, numWorkersForIndexing)
if err != nil {
log.Printf("[WARN] failed to build search index, %s", err)
}

// don't lock here, wait in background to log error if any
go func() {
err := grp.Wait()
if err != nil {
log.Printf("[WARN] background task for indexing existing comments failed: %s", err)
}
}()

go a.imageService.Cleanup(ctx) // pictures cleanup for staging images

a.restSrv.Run(a.Address, a.Port)
Expand Down
2 changes: 1 addition & 1 deletion backend/app/store/search/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func TestSearch_IndexStartup(t *testing.T) {

grp := syncs.NewErrSizedGroup(4)
for _, siteID := range sites {
err := IndexSite(context.Background(), siteID, searcher, storeEngine, grp)
err := RunBackgroundSiteIndexer(context.Background(), siteID, searcher, storeEngine, grp)
require.NoError(t, err)
}
err := grp.Wait()
Expand Down
19 changes: 6 additions & 13 deletions backend/app/store/search/site_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ import (
"context"
"fmt"
"log"
"sync/atomic"
"time"

"github.com/go-pkgz/syncs"
"github.com/umputun/remark42/backend/app/store"
"github.com/umputun/remark42/backend/app/store/engine"
)

// IndexSite rebuilds search index for the site
// RunBackgroundSiteIndexer spawns background processes to build search index for the site from scratch
// Run indexing of each topic in parallel in a sized group
func IndexSite(ctx context.Context, siteID string, s *Service, e engine.Interface, grp *syncs.ErrSizedGroup) error {
// Caller should wait for the group to finish
func RunBackgroundSiteIndexer(ctx context.Context, siteID string, s *Service, e engine.Interface, grp *syncs.ErrSizedGroup) error {
siteIdx, isIndexed := s.sitesEngines[siteID]
if !isIndexed {
log.Printf("[INFO] skipping indexing site %q", siteID)
Expand All @@ -32,17 +32,14 @@ func IndexSite(ctx context.Context, siteID string, s *Service, e engine.Interfac
return nil
}

log.Printf("[INFO] indexing site %s", siteID)
startTime := time.Now()

req := engine.InfoRequest{Locator: store.Locator{SiteID: siteID}}
topics, err := e.Info(req)

if err != nil {
return fmt.Errorf("failed to get topics for site %q: %w", siteID, err)
}

var indexedCnt uint64
// worker to index single topic
worker := func(ctx context.Context, url string) error {
locator := store.Locator{SiteID: siteID, URL: url}
select {
Expand All @@ -58,22 +55,18 @@ func IndexSite(ctx context.Context, siteID string, s *Service, e engine.Interfac
}

indexErr := s.indexBatch(comments)
log.Printf("[INFO] %d documents indexed from site %v", len(comments), locator)

if indexErr != nil {
return fmt.Errorf("failed to index comments for search: %w", indexErr)
}

atomic.AddUint64(&indexedCnt, uint64(len(comments)))

log.Printf("[INFO] %d documents indexed from topic %v", len(comments), locator)
return nil
}

for i := len(topics) - 1; i >= 0; i-- {
url := topics[i].URL
grp.Go(func() error { return worker(ctx, url) })
}

log.Printf("[INFO] total %d documents indexed for site %q in %v", indexedCnt, siteID, time.Since(startTime))
// don't wait for the group to finish, caller should do it
return nil
}
36 changes: 28 additions & 8 deletions backend/app/store/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,17 +950,37 @@ func (s *DataStore) Search(siteID, query, sortBy string, limit, skip int) ([]sto
return comments, searchRes.Total, nil
}

// RunSiteIndexers indexes all comments for siteID in the sized group.
// It's required to call this method at the startup to support "cold start"
// when store contains some data, but search functionality is enabled for the first time.
func (s *DataStore) RunSiteIndexers(ctx context.Context, siteID string, grp *syncs.ErrSizedGroup) {
// IndexSites indexes all comments for all sites.
// It's required to call this method at the startup to support cold start
// (store contains some data, but search functionality is enabled for the first time)
// Note: this method is synchronous and can take time to complete to build the initial index.
func (s *DataStore) IndexSites(ctx context.Context, sites []string, numWorkers int) error {
if s.SearchService == nil {
return
return nil
}
err := search.IndexSite(ctx, siteID, s.SearchService, s.Engine, grp)
if err != nil {
log.Printf("[WARN] error occurred during indexing comments for site %q: %e", siteID, err)

log.Printf("[INFO] start building search index for %d sites", len(sites))
startTime := time.Now()

// run indexing for all sites in the same pool
grp := syncs.NewErrSizedGroup(numWorkers)
cancelCtx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
for _, siteID := range sites {
err := search.RunBackgroundSiteIndexer(cancelCtx, siteID, s.SearchService, s.Engine, grp)
if err != nil {
// cancel all existing background jobs
cancelFunc()
_ = grp.Wait()
return err
}
}

err := grp.Wait()
if err == nil {
log.Printf("[INFO] finish building search index in %v", time.Since(startTime))
}
return err
}

// Close store service
Expand Down

0 comments on commit f9e7345

Please sign in to comment.