Skip to content

Commit

Permalink
remove worker cache in all-in-one mode (#494)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghaoz authored Jul 9, 2022
1 parent 6696261 commit 4e7355e
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 33 deletions.
25 changes: 11 additions & 14 deletions cmd/gorse-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,17 @@ var oneCommand = &cobra.Command{
log.Logger().Fatal("invalid config", zap.Error(err))
}

// print prologue
fmt.Printf("Welcome to Gorse %s Playground\n", version.Version)

if err = initializeDatabase("data.db"); err != nil {
log.Logger().Fatal("failed to initialize database", zap.Error(err))
}

fmt.Println()
fmt.Printf(" Dashboard: http://%s:%d/overview\n", conf.Master.HttpHost, conf.Master.HttpPort)
fmt.Printf(" RESTful APIs: http://%s:%d/apidocs\n", conf.Master.HttpHost, conf.Master.HttpPort)
fmt.Printf(" Documentation: https://docs.gorse.io/\n")
fmt.Println()

if err = initializeDatabase("data.db"); err != nil {
log.Logger().Fatal("failed to initialize database", zap.Error(err))
}
} else {
configPath, _ := cmd.PersistentFlags().GetString("config")
log.Logger().Info("load config", zap.String("config", configPath))
Expand All @@ -103,14 +103,13 @@ var oneCommand = &cobra.Command{
}

// create master
masterCachePath, _ := cmd.PersistentFlags().GetString("master-cache-path")
l := master.NewMaster(conf, masterCachePath)
cachePath, _ := cmd.PersistentFlags().GetString("cache-path")
l := master.NewMaster(conf, cachePath)
// Start worker
go func() {
workerCachePath, _ := cmd.PersistentFlags().GetString("worker-cache-path")
workerJobs, _ := cmd.PersistentFlags().GetInt("worker-jobs")
workerJobs, _ := cmd.PersistentFlags().GetInt("recommend-jobs")
w := worker.NewWorker(conf.Master.Host, conf.Master.Port, conf.Master.Host,
0, workerJobs, workerCachePath)
0, workerJobs, "")
w.SetOneMode(l.Settings)
w.Serve()
}()
Expand All @@ -124,12 +123,10 @@ func init() {
oneCommand.PersistentFlags().BoolP("version", "v", false, "gorse version")
oneCommand.PersistentFlags().String("log-path", "", "path of log file")
oneCommand.PersistentFlags().Bool("playground", false, "playground mode (setup a recommender system for GitHub repositories)")
// master node commands
oneCommand.PersistentFlags().StringP("config", "c", "", "configuration file path")
oneCommand.PersistentFlags().String("master-cache-path", "master_cache.data", "path of cache file for the master node")
oneCommand.PersistentFlags().String("cache-path", "one_cache.data", "path of cache file")
// worker node commands
oneCommand.PersistentFlags().Int("worker-jobs", 1, "number of working jobs for the worker node")
oneCommand.PersistentFlags().String("worker-cache-path", "worker_cache.data", "path of cache file for the worker node")
oneCommand.PersistentFlags().Int("recommend-jobs", 1, "number of working jobs for recommendation tasks")
}

func main() {
Expand Down
1 change: 0 additions & 1 deletion storage/cache/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@ func Open(path string) (Database, error) {
// append parameters
if path, err = storage.AppendURLParams(path, []lo.Tuple2[string, string]{
{"_pragma", "busy_timeout(10000)"},
{"_pragma", "journal_mode(wal)"},
}); err != nil {
return nil, errors.Trace(err)
}
Expand Down
1 change: 0 additions & 1 deletion storage/data/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ func Open(path string) (Database, error) {
// append parameters
if path, err = storage.AppendURLParams(path, []lo.Tuple2[string, string]{
{"_pragma", "busy_timeout(10000)"},
{"_pragma", "journal_mode(wal)"},
}); err != nil {
return nil, errors.Trace(err)
}
Expand Down
36 changes: 19 additions & 17 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,26 +276,28 @@ func (w *Worker) ServeMetrics() {
func (w *Worker) Serve() {
rand.Seed(time.Now().UTC().UnixNano())
// open local store
state, err := LoadLocalCache(w.cacheFile)
if err != nil {
if errors.IsNotFound(err) {
log.Logger().Info("no cache file found, create a new one", zap.String("path", state.path))
} else {
log.Logger().Error("failed to load persist state", zap.Error(err),
zap.String("path", state.path))
}
}
if state.WorkerName == "" {
state.WorkerName = base.GetRandomName(0)
err = state.WriteLocalCache()
if !w.oneMode {
state, err := LoadLocalCache(w.cacheFile)
if err != nil {
log.Logger().Fatal("failed to write meta", zap.Error(err))
if errors.IsNotFound(err) {
log.Logger().Info("no cache file found, create a new one", zap.String("path", state.path))
} else {
log.Logger().Error("failed to load persist state", zap.Error(err),
zap.String("path", state.path))
}
}
if state.WorkerName == "" {
state.WorkerName = base.GetRandomName(0)
err = state.WriteLocalCache()
if err != nil {
log.Logger().Fatal("failed to write meta", zap.Error(err))
}
}
w.workerName = state.WorkerName
log.Logger().Info("start worker",
zap.Int("n_jobs", w.jobs),
zap.String("worker_name", w.workerName))
}
w.workerName = state.WorkerName
log.Logger().Info("start worker",
zap.Int("n_jobs", w.jobs),
zap.String("worker_name", w.workerName))

// connect to master
conn, err := grpc.Dial(fmt.Sprintf("%v:%v", w.masterHost, w.masterPort), grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand Down

0 comments on commit 4e7355e

Please sign in to comment.