Skip to content

Commit

Permalink
Move more state update functions to jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
leighmacdonald committed Sep 6, 2024
1 parent 4d90146 commit b70d647
Show file tree
Hide file tree
Showing 13 changed files with 257 additions and 180 deletions.
48 changes: 28 additions & 20 deletions internal/blocklist/blocklist_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (

"github.com/leighmacdonald/gbans/internal/domain"
"github.com/leighmacdonald/gbans/internal/httphelper"
"github.com/leighmacdonald/gbans/internal/queue"
"github.com/leighmacdonald/gbans/pkg/log"
"github.com/leighmacdonald/steamid/v4/steamid"
"github.com/riverqueue/river"
)

type blocklistUsecase struct {
Expand All @@ -37,26 +39,7 @@ func NewBlocklistUsecase(br domain.BlocklistRepository, banUsecase domain.BanSte
}
}

func (b blocklistUsecase) Start(ctx context.Context) {
ticker := time.NewTicker(time.Hour * 12)

update := func() {
b.syncBlocklists(ctx)
}

update()

for {
select {
case <-ticker.C:
update()
case <-ctx.Done():
return
}
}
}

func (b blocklistUsecase) syncBlocklists(ctx context.Context) {
func (b blocklistUsecase) Sync(ctx context.Context) {
waitGroup := &sync.WaitGroup{}

waitGroup.Add(1)
Expand Down Expand Up @@ -330,3 +313,28 @@ func (b blocklistUsecase) DeleteCIDRBlockWhitelist(ctx context.Context, whitelis

return nil
}

type ListUpdaterArgs struct{}

func (args ListUpdaterArgs) Kind() string {
return "blocklist_update"
}

func (args ListUpdaterArgs) InsertOpts() river.InsertOpts {
return river.InsertOpts{Queue: string(queue.Default), UniqueOpts: river.UniqueOpts{ByPeriod: time.Hour * 24}}
}

func NewListUpdaterWorker(lists domain.BlocklistUsecase) *ListUpdaterWorker {
return &ListUpdaterWorker{lists: lists}
}

type ListUpdaterWorker struct {
river.WorkerDefaults[ListUpdaterArgs]
lists domain.BlocklistUsecase
}

func (worker *ListUpdaterWorker) Work(ctx context.Context, _ *river.Job[ListUpdaterArgs]) error {
worker.lists.Sync(ctx)

return nil
}
59 changes: 51 additions & 8 deletions internal/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,24 @@ func firstTimeSetup(ctx context.Context, persons domain.PersonUsecase, news doma
}

func createQueueWorkers(people domain.PersonUsecase, notifications domain.NotificationUsecase,
discord domain.DiscordUsecase, authRepo domain.AuthRepository, memberships *steamgroup.Memberships,
discordUC domain.DiscordUsecase, authRepo domain.AuthRepository, memberships *steamgroup.Memberships,
patreonUC domain.PatreonUsecase, bansSteam domain.BanSteamUsecase, bansNet domain.BanNetUsecase, bansASN domain.BanASNUsecase,
configUC domain.ConfigUsecase, fetcher *demo.Fetcher,
configUC domain.ConfigUsecase, fetcher *demo.Fetcher, demos domain.DemoUsecase, reports domain.ReportUsecase,
blocklists domain.BlocklistUsecase, discordOAuth domain.DiscordOAuthUsecase,
) *river.Workers {
workers := river.NewWorkers()

river.AddWorker[notification.SenderArgs](workers, notification.NewSenderWorker(people, notifications, discord))
river.AddWorker[notification.SenderArgs](workers, notification.NewSenderWorker(people, notifications, discordUC))
river.AddWorker[auth.CleanupArgs](workers, auth.NewCleanupWorker(authRepo))
river.AddWorker[steamgroup.MembershipArgs](workers, steamgroup.NewMembershipWorker(memberships))
river.AddWorker[patreon.AuthUpdateArgs](workers, patreon.NewSyncWorker(patreonUC))
river.AddWorker[ban.ExpirationArgs](workers, ban.NewExpirationWorker(bansSteam, bansNet, bansASN, people, notifications, configUC))
river.AddWorker[demo.FetcherArgs](workers, demo.NewFetcherWorker(fetcher, configUC))
river.AddWorker[demo.CleanupArgs](workers, demo.NewCleanupWorker(demos))
river.AddWorker[report.MetaInfoArgs](workers, report.NewMetaInfoWorker(reports))
river.AddWorker[blocklist.ListUpdaterArgs](workers, blocklist.NewListUpdaterWorker(blocklists))
river.AddWorker[person.ExpiredArgs](workers, person.NewExpiredWorker(people))
river.AddWorker[discord.TokenRefreshArgs](workers, discord.NewTokenRefreshWorker(discordOAuth))

return workers
}
Expand Down Expand Up @@ -145,11 +151,46 @@ func createPeriodicJobs() []*river.PeriodicJob {
&river.PeriodicJobOpts{RunOnStart: true}),

river.NewPeriodicJob(
river.PeriodicInterval(time.Minute*5),
river.PeriodicInterval(time.Minute*10),
func() (river.JobArgs, *river.InsertOpts) {
return demo.FetcherArgs{}, nil
},
&river.PeriodicJobOpts{RunOnStart: true}),

river.NewPeriodicJob(
river.PeriodicInterval(time.Hour*24),
func() (river.JobArgs, *river.InsertOpts) {
return demo.CleanupArgs{}, nil
},
&river.PeriodicJobOpts{RunOnStart: true}),

river.NewPeriodicJob(
river.PeriodicInterval(24*time.Hour),
func() (river.JobArgs, *river.InsertOpts) {
return report.MetaInfoArgs{}, nil
},
&river.PeriodicJobOpts{RunOnStart: true}),

river.NewPeriodicJob(
river.PeriodicInterval(24*time.Hour),
func() (river.JobArgs, *river.InsertOpts) {
return blocklist.ListUpdaterArgs{}, nil
},
&river.PeriodicJobOpts{RunOnStart: true}),

river.NewPeriodicJob(
river.PeriodicInterval(time.Minute*5),
func() (river.JobArgs, *river.InsertOpts) {
return person.ExpiredArgs{}, nil
},
&river.PeriodicJobOpts{RunOnStart: true}),

river.NewPeriodicJob(
river.PeriodicInterval(time.Hour*12),
func() (river.JobArgs, *river.InsertOpts) {
return discord.TokenRefreshArgs{}, nil
},
&river.PeriodicJobOpts{RunOnStart: true}),
}

return jobs
Expand Down Expand Up @@ -267,7 +308,7 @@ func serveCmd() *cobra.Command { //nolint:maintidx
assetUsecase := asset.NewAssetUsecase(assetRepository)
serversUsecase := servers.NewServersUsecase(servers.NewServersRepository(dbConn))
demoUsecase := demo.NewDemoUsecase(domain.BucketDemo, demo.NewDemoRepository(dbConn), assetUsecase, configUsecase, serversUsecase)
go demoUsecase.Start(ctx)

reportUsecase := report.NewReportUsecase(report.NewReportRepository(dbConn), notificationUsecase, configUsecase, personUsecase, demoUsecase)

stateUsecase := state.NewStateUsecase(eventBroadcaster,
Expand All @@ -279,7 +320,6 @@ func serveCmd() *cobra.Command { //nolint:maintidx
banGroupUsecase := steamgroup.NewBanGroupUsecase(banGroupRepo, personUsecase, notificationUsecase, configUsecase)

blocklistUsecase := blocklist.NewBlocklistUsecase(blocklist.NewBlocklistRepository(dbConn), banUsecase, banGroupUsecase)
go blocklistUsecase.Start(ctx)

go func() {
if err := stateUsecase.Start(ctx); err != nil {
Expand All @@ -291,7 +331,6 @@ func serveCmd() *cobra.Command { //nolint:maintidx
banNetUsecase := ban.NewBanNetUsecase(ban.NewBanNetRepository(dbConn), personUsecase, configUsecase, notificationUsecase, stateUsecase)

discordOAuthUsecase := discord.NewDiscordOAuthUsecase(discord.NewDiscordOAuthRepository(dbConn), configUsecase)
go discordOAuthUsecase.Start(ctx)

appeals := appeal.NewAppealUsecase(appeal.NewAppealRepository(dbConn), banUsecase, personUsecase, notificationUsecase, configUsecase)

Expand Down Expand Up @@ -405,7 +444,11 @@ func serveCmd() *cobra.Command { //nolint:maintidx
banNetUsecase,
banASNUsecase,
configUsecase,
demoFetcher)
demoFetcher,
demoUsecase,
reportUsecase,
blocklistUsecase,
discordOAuthUsecase)

periodicJons := createPeriodicJobs()
queueClient, errClient := queue.Client(dbConn.Pool(), workers, periodicJons)
Expand Down
2 changes: 1 addition & 1 deletion internal/demo/demo_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func NewDemoHandler(engine *gin.Engine, du domain.DemoUsecase) {

func (h demoHandler) onAPIGetCleanup() gin.HandlerFunc {
return func(ctx *gin.Context) {
h.demos.TriggerCleanup()
h.demos.Cleanup(ctx)

ctx.JSON(http.StatusOK, gin.H{})
}
Expand Down
57 changes: 29 additions & 28 deletions internal/demo/demo_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import (
"github.com/dustin/go-humanize"
"github.com/gin-gonic/gin"
"github.com/leighmacdonald/gbans/internal/domain"
"github.com/leighmacdonald/gbans/internal/queue"
"github.com/leighmacdonald/gbans/pkg/demoparser"
"github.com/leighmacdonald/gbans/pkg/fs"
"github.com/leighmacdonald/gbans/pkg/log"
"github.com/ricochet2200/go-disk-usage/du"
"github.com/riverqueue/river"
)

type demoUsecase struct {
Expand Down Expand Up @@ -178,35 +180,9 @@ func (d demoUsecase) Cleanup(ctx context.Context) {
}

slog.Debug("Old demos flushed", slog.Int("count", count), slog.String("size", humanize.Bytes(uint64(size))))
}

func (d demoUsecase) TriggerCleanup() {
d.cleanupChan <- true
}

func (d demoUsecase) Start(ctx context.Context) {
ticker := time.NewTicker(time.Hour)
tickerOrphans := time.NewTicker(time.Hour * 24)

d.Cleanup(ctx)

if err := d.RemoveOrphans(ctx); err != nil {
slog.Error("Failed to execute orphans", log.ErrAttr(err))
}

for {
select {
case <-ticker.C:
d.cleanupChan <- true
case <-d.cleanupChan:
d.Cleanup(ctx)
case <-tickerOrphans.C:
if err := d.RemoveOrphans(ctx); err != nil {
slog.Error("Failed to execute orphans", log.ErrAttr(err))
}
case <-ctx.Done():
return
}
if errOrphans := d.RemoveOrphans(ctx); errOrphans != nil {
slog.Error("Failed to execute orphans", log.ErrAttr(errOrphans))
}
}

Expand Down Expand Up @@ -329,3 +305,28 @@ func (d demoUsecase) RemoveOrphans(ctx context.Context) error {

return nil
}

type CleanupArgs struct{}

func (args CleanupArgs) Kind() string {
return "demo_cleanup"
}

func (args CleanupArgs) InsertOpts() river.InsertOpts {
return river.InsertOpts{Queue: string(queue.Default), UniqueOpts: river.UniqueOpts{ByPeriod: time.Hour * 24}}
}

func NewCleanupWorker(demos domain.DemoUsecase) *CleanupWorker {
return &CleanupWorker{demos: demos}
}

type CleanupWorker struct {
river.WorkerDefaults[CleanupArgs]
demos domain.DemoUsecase
}

func (worker *CleanupWorker) Work(ctx context.Context, _ *river.Job[CleanupArgs]) error {
worker.demos.Cleanup(ctx)

return nil
}
2 changes: 1 addition & 1 deletion internal/demo/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (args FetcherArgs) Kind() string {
}

func (args FetcherArgs) InsertOpts() river.InsertOpts {
return river.InsertOpts{Queue: string(queue.Default), UniqueOpts: river.UniqueOpts{ByPeriod: time.Minute * 5}}
return river.InsertOpts{Queue: string(queue.Default), UniqueOpts: river.UniqueOpts{ByPeriod: time.Minute * 10}}
}

func NewFetcherWorker(fetcher *Fetcher, config domain.ConfigUsecase) *FetcherWorker {
Expand Down
50 changes: 32 additions & 18 deletions internal/discord/discord_usecase_oauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (

"github.com/leighmacdonald/gbans/internal/domain"
"github.com/leighmacdonald/gbans/internal/httphelper"
"github.com/leighmacdonald/gbans/internal/queue"
"github.com/leighmacdonald/gbans/pkg/log"
"github.com/leighmacdonald/gbans/pkg/oauth"
"github.com/leighmacdonald/steamid/v4/steamid"
"github.com/riverqueue/river"
)

type discordOAuthUsecase struct {
Expand All @@ -31,35 +33,20 @@ func NewDiscordOAuthUsecase(repository domain.DiscordOAuthRepository, config dom
}
}

func (d discordOAuthUsecase) Start(ctx context.Context) {
ticker := time.NewTicker(time.Hour)

d.RefreshOldTokens(ctx)

for {
select {
case <-ticker.C:
d.RefreshOldTokens(ctx)
case <-ctx.Done():
return
}
}
}

func (d discordOAuthUsecase) GetUserDetail(ctx context.Context, steamID steamid.SteamID) (domain.DiscordUserDetail, error) {
return d.repository.GetUserDetail(ctx, steamID)
}

func (d discordOAuthUsecase) RefreshOldTokens(ctx context.Context) {
func (d discordOAuthUsecase) RefreshTokens(ctx context.Context) error {
entries, errOld := d.repository.OldAuths(ctx)
if errOld != nil {
if errors.Is(errOld, domain.ErrNoResult) {
return
return nil
}

slog.Error("Failed to fetch old discord auth tokens", log.ErrAttr(errOld))

return
return errOld
}

for _, old := range entries {
Expand All @@ -72,10 +59,14 @@ func (d discordOAuthUsecase) RefreshOldTokens(ctx context.Context) {

if err := d.repository.SaveTokens(ctx, newCreds); err != nil {
slog.Error("Failed to save refresh tokens", log.ErrAttr(err))

return err
}

slog.Debug("Updated discord tokens", slog.String("steam_id", newCreds.SteamID.String()))
}

return nil
}

func (d discordOAuthUsecase) fetchRefresh(ctx context.Context, credentials domain.DiscordCredential) (domain.DiscordCredential, error) {
Expand Down Expand Up @@ -301,3 +292,26 @@ func (d discordOAuthUsecase) fetchToken(ctx context.Context, client *http.Client

return atr, nil
}

type TokenRefreshArgs struct{}

func (args TokenRefreshArgs) Kind() string {
return "discord_token_refresh"
}

func (args TokenRefreshArgs) InsertOpts() river.InsertOpts {
return river.InsertOpts{Queue: string(queue.Default), UniqueOpts: river.UniqueOpts{ByPeriod: time.Hour * 12}}
}

func NewTokenRefreshWorker(discordOAuth domain.DiscordOAuthUsecase) *TokenRefreshWorker {
return &TokenRefreshWorker{discordOAuth: discordOAuth}
}

type TokenRefreshWorker struct {
river.WorkerDefaults[TokenRefreshArgs]
discordOAuth domain.DiscordOAuthUsecase
}

func (worker *TokenRefreshWorker) Work(ctx context.Context, _ *river.Job[TokenRefreshArgs]) error {
return worker.discordOAuth.RefreshTokens(ctx)
}
2 changes: 1 addition & 1 deletion internal/domain/blocklist.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type BlocklistUsecase interface {
CreateSteamBlockWhitelists(ctx context.Context, steamID steamid.SteamID) (WhitelistSteam, error)
GetSteamBlockWhitelists(ctx context.Context) ([]WhitelistSteam, error)
DeleteSteamBlockWhitelists(ctx context.Context, steamID steamid.SteamID) error
Start(ctx context.Context)
Sync(ctx context.Context)
}

type BlocklistRepository interface {
Expand Down
2 changes: 0 additions & 2 deletions internal/domain/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@ import (
)

type DemoUsecase interface {
Start(ctx context.Context)
ExpiredDemos(ctx context.Context, limit uint64) ([]DemoInfo, error)
GetDemoByID(ctx context.Context, demoID int64, demoFile *DemoFile) error
MarkArchived(ctx context.Context, demo *DemoFile) error
GetDemoByName(ctx context.Context, demoName string, demoFile *DemoFile) error
GetDemos(ctx context.Context) ([]DemoFile, error)
CreateFromAsset(ctx context.Context, asset Asset, serverID int) (*DemoFile, error)
TriggerCleanup()
Cleanup(ctx context.Context)
}

Expand Down
Loading

0 comments on commit b70d647

Please sign in to comment.