diff --git a/api/workflow/activity/calc_recom_score.go b/api/workflow/activity/calc_recom_score.go new file mode 100644 index 00000000..a2704f60 --- /dev/null +++ b/api/workflow/activity/calc_recom_score.go @@ -0,0 +1,19 @@ +package activity + +import ( + "context" + "log/slog" + + "opencsg.com/csghub-server/common/config" + "opencsg.com/csghub-server/component" +) + +func CalcRecomScore(ctx context.Context, config *config.Config) error { + c, err := component.NewRecomComponent(config) + if err != nil { + slog.Error("failed to create recom component", "err", err) + return err + } + c.CalculateRecomScore(context.Background()) + return nil +} diff --git a/api/workflow/activity/sync_as_client.go b/api/workflow/activity/sync_as_client.go new file mode 100644 index 00000000..99e4ca3e --- /dev/null +++ b/api/workflow/activity/sync_as_client.go @@ -0,0 +1,28 @@ +package activity + +import ( + "context" + "log/slog" + + "opencsg.com/csghub-server/builder/multisync" + "opencsg.com/csghub-server/builder/store/database" + "opencsg.com/csghub-server/common/config" + "opencsg.com/csghub-server/component" +) + +func SyncAsClient(ctx context.Context, config *config.Config) error { + c, err := component.NewMultiSyncComponent(config) + if err != nil { + slog.Error("failed to create multi sync component", "err", err) + return err + } + syncClientSettingStore := database.NewSyncClientSettingStore() + setting, err := syncClientSettingStore.First(ctx) + if err != nil { + slog.Error("failed to find sync client setting", "error", err) + return err + } + apiDomain := config.MultiSync.SaasAPIDomain + sc := multisync.FromOpenCSG(apiDomain, setting.Token) + return c.SyncAsClient(ctx, sc) +} diff --git a/api/workflow/cron_calc_recom_score.go b/api/workflow/cron_calc_recom_score.go new file mode 100644 index 00000000..d6c44fab --- /dev/null +++ b/api/workflow/cron_calc_recom_score.go @@ -0,0 +1,32 @@ +package workflow + +import ( + "time" + + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/workflow" + "opencsg.com/csghub-server/api/workflow/activity" + "opencsg.com/csghub-server/common/config" +) + +func CalcRecomScoreWorkflow(ctx workflow.Context, config *config.Config) error { + logger := workflow.GetLogger(ctx) + logger.Info("calc recom score workflow started") + + retryPolicy := &temporal.RetryPolicy{ + MaximumAttempts: 3, + } + + options := workflow.ActivityOptions{ + StartToCloseTimeout: time.Hour * 1, + RetryPolicy: retryPolicy, + } + + ctx = workflow.WithActivityOptions(ctx, options) + err := workflow.ExecuteActivity(ctx, activity.CalcRecomScore, config).Get(ctx, nil) + if err != nil { + logger.Error("failed to calc recom score", "error", err) + return err + } + return nil +} diff --git a/api/workflow/cron_sync_as_client.go b/api/workflow/cron_sync_as_client.go new file mode 100644 index 00000000..54f5ab6b --- /dev/null +++ b/api/workflow/cron_sync_as_client.go @@ -0,0 +1,32 @@ +package workflow + +import ( + "time" + + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/workflow" + "opencsg.com/csghub-server/api/workflow/activity" + "opencsg.com/csghub-server/common/config" +) + +func SyncAsClientWorkflow(ctx workflow.Context, config *config.Config) error { + logger := workflow.GetLogger(ctx) + logger.Info("sync as client workflow started") + + retryPolicy := &temporal.RetryPolicy{ + MaximumAttempts: 3, + } + + options := workflow.ActivityOptions{ + StartToCloseTimeout: time.Hour * 1, + RetryPolicy: retryPolicy, + } + + ctx = workflow.WithActivityOptions(ctx, options) + err := workflow.ExecuteActivity(ctx, activity.SyncAsClient, config).Get(ctx, nil) + if err != nil { + logger.Error("failed to sync as client", "error", err) + return err + } + return nil +} diff --git a/api/workflow/cron_worker.go b/api/workflow/cron_worker.go new file mode 100644 index 00000000..b0409c12 --- /dev/null +++ b/api/workflow/cron_worker.go @@ -0,0 +1,88 @@ +package workflow + +import ( + "context" + "fmt" + + enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + "opencsg.com/csghub-server/api/workflow/activity" + "opencsg.com/csghub-server/common/config" +) + +const ( + AlreadyScheduledMessage = "schedule with this ID is already registered" + CronJobQueueName = "workflow_cron_queue" +) + +func RegisterCronJobs(config *config.Config) error { + var err error + if wfClient == nil { + wfClient, err = client.Dial(client.Options{ + HostPort: config.WorkFLow.Endpoint, + }) + if err != nil { + return fmt.Errorf("unable to create workflow client, error:%w", err) + } + } + + if !config.Saas { + _, err = wfClient.ScheduleClient().Create(context.Background(), client.ScheduleOptions{ + ID: "sync-as-client-schedule", + Spec: client.ScheduleSpec{ + CronExpressions: []string{config.CronJob.SyncAsClientCronExpression}, + }, + Overlap: enumspb.SCHEDULE_OVERLAP_POLICY_SKIP, + Action: &client.ScheduleWorkflowAction{ + ID: "sync-as-client-workflow", + TaskQueue: CronJobQueueName, + Workflow: SyncAsClientWorkflow, + Args: []interface{}{config}, + }, + }) + if err != nil && err.Error() != AlreadyScheduledMessage { + return fmt.Errorf("unable to create schedule, error:%w", err) + } + } + + _, err = wfClient.ScheduleClient().Create(context.Background(), client.ScheduleOptions{ + ID: "calc-recom-score-schedule", + Spec: client.ScheduleSpec{ + CronExpressions: []string{config.CronJob.CalcRecomScoreCronExpression}, + }, + Overlap: enumspb.SCHEDULE_OVERLAP_POLICY_SKIP, + Action: &client.ScheduleWorkflowAction{ + ID: "calc-recom-score-workflow", + TaskQueue: CronJobQueueName, + Workflow: CalcRecomScoreWorkflow, + Args: []interface{}{config}, + }, + }) + if err != nil && err.Error() != AlreadyScheduledMessage { + return fmt.Errorf("unable to create schedule, error:%w", err) + } + + return nil +} + +func StartCronWorker(config *config.Config) error { + var err error + if wfClient == nil { + wfClient, err = client.Dial(client.Options{ + HostPort: config.WorkFLow.Endpoint, + }) + if err != nil { + return fmt.Errorf("unable to create workflow client, error:%w", err) + } + } + wfWorker = worker.New(wfClient, CronJobQueueName, worker.Options{}) + if !config.Saas { + wfWorker.RegisterWorkflow(SyncAsClientWorkflow) + wfWorker.RegisterActivity(activity.SyncAsClient) + } + wfWorker.RegisterWorkflow(CalcRecomScoreWorkflow) + wfWorker.RegisterActivity(activity.CalcRecomScore) + + return wfWorker.Start() +} diff --git a/api/workflow/worker.go b/api/workflow/worker.go index 973f523d..bad5bd62 100644 --- a/api/workflow/worker.go +++ b/api/workflow/worker.go @@ -11,8 +11,10 @@ import ( const HandlePushQueueName = "workflow_handle_push_queue" -var wfWorker worker.Worker -var wfClient client.Client +var ( + wfWorker worker.Worker + wfClient client.Client +) func StartWorker(config *config.Config) error { var err error diff --git a/cmd/csghub-server/cmd/start/server.go b/cmd/csghub-server/cmd/start/server.go index e66787af..ff24eb0d 100644 --- a/cmd/csghub-server/cmd/start/server.go +++ b/cmd/csghub-server/cmd/start/server.go @@ -91,6 +91,17 @@ var serverCmd = &cobra.Command{ if err != nil { return fmt.Errorf("failed to start worker: %w", err) } + + err = workflow.RegisterCronJobs(cfg) + if err != nil { + return fmt.Errorf("failed to register cron jobs: %w", err) + } + + err = workflow.StartCronWorker(cfg) + if err != nil { + return fmt.Errorf("failed to start cron worker: %w", err) + } + server := httpbase.NewGracefulServer( httpbase.GraceServerOpt{ Port: cfg.APIServer.Port, diff --git a/common/config/config.go b/common/config/config.go index 7d85a642..c69ccc46 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -224,6 +224,11 @@ type Config struct { // S3PublicBucket is used to store public files, should set bucket same with portal S3PublicBucket string `env:"STARHUB_SERVER_ARGO_S3_PUBLIC_BUCKET"` } + + CronJob struct { + SyncAsClientCronExpression string `env:"STARHUB_SERVER_CRON_JOB_SYNC_AS_CLIENT_CRON_EXPRESSION, default=0 * * * *"` + CalcRecomScoreCronExpression string `env:"STARHUB_SERVER_CRON_JOB_CLAC_RECOM_SCORE_CRON_EXPRESSION, default=0 1 * * *"` + } } func SetConfigFile(file string) { diff --git a/common/config/config.toml.example b/common/config/config.toml.example index b2948124..e1073ea0 100644 --- a/common/config/config.toml.example +++ b/common/config/config.toml.example @@ -161,3 +161,7 @@ encoded_sensitive_words = "5Lmg6L+R5bmzLHhpamlucGluZw==" [workflow] endpoint = "localhost:7233" + +[cron_job] +sync_as_client_cron_expression = "0 * * * *" +calc_recom_score_cron_expression = "0 1 * * *" diff --git a/component/multi_sync.go b/component/multi_sync.go index 38e2494d..edb44597 100644 --- a/component/multi_sync.go +++ b/component/multi_sync.go @@ -242,7 +242,7 @@ func (c *multiSyncComponentImpl) createLocalDataset(ctx context.Context, m *type } err = c.repo.DeleteAllTags(ctx, newDBRepo.ID) - if err != nil { + if err != nil && err != sql.ErrNoRows { slog.Error("failed to delete database tag", slog.Any("error", err)) } @@ -253,14 +253,14 @@ func (c *multiSyncComponentImpl) createLocalDataset(ctx context.Context, m *type } err = c.repo.DeleteAllFiles(ctx, newDBRepo.ID) - if err != nil { + if err != nil && err != sql.ErrNoRows { slog.Error("failed to delete database files", slog.Any("error", err)) } ctxGetFileList, cancel := context.WithTimeout(ctx, 5*time.Second) files, err := sc.FileList(ctxGetFileList, s) cancel() - if err != nil { + if err != nil && err != sql.ErrNoRows { slog.Error("failed to get all files of repo", slog.Any("sync_version", s), slog.Any("error", err)) } if len(files) > 0 { @@ -367,7 +367,7 @@ func (c *multiSyncComponentImpl) createLocalModel(ctx context.Context, m *types. }) } err = c.repo.DeleteAllTags(ctx, newDBRepo.ID) - if err != nil { + if err != nil && err != sql.ErrNoRows { slog.Error("failed to delete database tag", slog.Any("error", err)) } err = c.repo.BatchCreateRepoTags(ctx, repoTags) @@ -377,14 +377,14 @@ func (c *multiSyncComponentImpl) createLocalModel(ctx context.Context, m *types. } err = c.repo.DeleteAllFiles(ctx, newDBRepo.ID) - if err != nil { + if err != nil && err != sql.ErrNoRows { slog.Error("failed to delete all files for repo", slog.Any("error", err)) } ctxGetFileList, cancel := context.WithTimeout(ctx, 5*time.Second) files, err := sc.FileList(ctxGetFileList, s) cancel() - if err != nil { + if err != nil && err != sql.ErrNoRows { slog.Error("failed to get all files of repo", slog.Any("sync_version", s), slog.Any("error", err)) } if len(files) > 0 { diff --git a/scripts/init.sh b/scripts/init.sh index 59a8f0e6..b7cc5712 100755 --- a/scripts/init.sh +++ b/scripts/init.sh @@ -72,69 +72,6 @@ if [ "$STARHUB_SERVER_GITSERVER_TYPE" = "gitea" ]; then fi fi - -# Create cron job -cron="" -read_and_set_cron() { - env_variable=$1 - default_value=$2 - - cron=${!env_variable} - - if [[ -z $cron ]]; then - cron=$default_value - fi -} - -current_cron_jobs=$(crontab -l 2>/dev/null) - -if echo "$current_cron_jobs" | grep -qF "starhub logscan gitea"; then - echo "Gitea log scan job already exists" -else - echo "Creating cron job for gitea logscan..." - read_and_set_cron "STARHUB_SERVER_CRON_LOGSCAN" "0 23 * * *" - (crontab -l ;echo "$cron STARHUB_DATABASE_DSN=$STARHUB_DATABASE_DSN /starhub-bin/starhub logscan gitea --path /starhub-bin/logs/gitea.log >> /starhub-bin/cron.log 2>&1") | crontab - -fi - -if echo "$current_cron_jobs" | grep -qF "calc-recom-score"; then - echo "Calculate score job already exists" -else - echo "Creating cron job for repository recommendation score calculation..." - read_and_set_cron "STARHUB_SERVER_CRON_CALC_RECOM_SCORE" "0 1 * * *" - (crontab -l ;echo "$cron STARHUB_DATABASE_DSN=$STARHUB_DATABASE_DSN STARHUB_SERVER_GITSERVER_HOST=$STARHUB_SERVER_GITSERVER_HOST STARHUB_SERVER_GITSERVER_USERNAME=$STARHUB_SERVER_GITSERVER_USERNAME STARHUB_SERVER_GITSERVER_PASSWORD=$STARHUB_SERVER_GITSERVER_PASSWORD /starhub-bin/starhub cron calc-recom-score >> /starhub-bin/cron-calc-recom-score.log 2>&1") | crontab - -fi - -if echo "$current_cron_jobs" | grep -qF "create-push-mirror"; then - echo "Create push mirror job already exists" -else - echo "Creating cron job for push mirror creation..." - read_and_set_cron "STARHUB_SERVER_CRON_PUSH_MIRROR" "*/10 * * * *" - (crontab -l ;echo "$cron STARHUB_DATABASE_DSN=$STARHUB_DATABASE_DSN STARHUB_SERVER_GITSERVER_HOST=$STARHUB_SERVER_GITSERVER_HOST STARHUB_SERVER_GITSERVER_USERNAME=$STARHUB_SERVER_GITSERVER_USERNAME STARHUB_SERVER_GITSERVER_PASSWORD=$STARHUB_SERVER_GITSERVER_PASSWORD STARHUB_SERVER_MIRRORSERVER_HOST=$STARHUB_SERVER_MIRRORSERVER_HOST STARHUB_SERVER_MIRRORSERVER_USERNAME=$STARHUB_SERVER_MIRRORSERVER_USERNAME STARHUB_SERVER_MIRRORSERVER_PASSWORD=$STARHUB_SERVER_MIRRORSERVER_PASSWORD /starhub-bin/starhub cron create-push-mirror >> /starhub-bin/create-push-mirror.log 2>&1") | crontab - -fi - -if echo "$current_cron_jobs" | grep -qF "check-mirror-progress"; then - echo "Check mirror progress job already exists" -else - echo "Creating cron job for update mirror status and progress..." - read_and_set_cron "STARHUB_SERVER_CRON_PUSH_MIRROR" "*/5 * * * *" - (crontab -l ;echo "$cronstarhub-bin/starhub mirror check-mirror-progress >> /starhub-bin/check-mirror-progress.log 2>&1") | crontab - -fi - -if [ "$STARHUB_SERVER_SAAS" == "false" ]; then - if echo "$current_cron_jobs" | grep -qF "sync-as-client"; then - echo "Sync as client job already exists" - else - echo "Creating cron job for sync saas sync verions..." - read_and_set_cron "STARHUB_SERVER_CRON_SYNC_AS_CLIENT" "*/10 * * * *" - (crontab -l ;echo "$cron STARHUB_SERVER_REDIS_ENDPOINT=$STARHUB_SERVER_REDIS_ENDPOINT STARHUB_SERVER_REDIS_USER=$STARHUB_SERVER_REDIS_USER STARHUB_SERVER_REDIS_PASSWORD=$STARHUB_SERVER_REDIS_PASSWORD STARHUB_DATABASE_DSN=$STARHUB_DATABASE_DSN STARHUB_SERVER_GITSERVER_TYPE=$STARHUB_SERVER_GITSERVER_TYPE STARHUB_SERVER_GITALY_TOKEN=$STARHUB_SERVER_GITALY_TOKEN STARHUB_SERVER_GITALY_SERVER_SOCKET=$STARHUB_SERVER_GITALY_SERVER_SOCKET STARHUB_SERVER_GITSERVER_HOST=$STARHUB_SERVER_GITSERVER_HOST STARHUB_SERVER_GITSERVER_USERNAME=$STARHUB_SERVER_GITSERVER_USERNAME STARHUB_SERVER_GITSERVER_PASSWORD=$STARHUB_SERVER_GITSERVER_PASSWORD /starhub-bin/starhub sync sync-as-client >> /starhub-bin/cron-sync-as-client.log 2>&1") | crontab - - fi -else - echo "Saas does not need sync-as-client cron job" -fi -# Reload cron server -service cron restart -echo "Done." - echo "Database setup..." echo "Migration init"