Skip to content

Commit

Permalink
Add buffered update queue for retrieving external player data
Browse files Browse the repository at this point in the history
  • Loading branch information
leighmacdonald committed Feb 25, 2024
1 parent 631fa0e commit 5f211d4
Show file tree
Hide file tree
Showing 17 changed files with 212 additions and 236 deletions.
11 changes: 6 additions & 5 deletions actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ package main
import (
"context"
"errors"
"log/slog"
"os"

"github.com/leighmacdonald/bd/rules"
"github.com/leighmacdonald/bd/store"
"github.com/leighmacdonald/steamid/v3/steamid"
"log/slog"
"os"
)

// unMark will unmark & remove a player from your local list. This *will not* unmark players from any
// other list sources. If you want to not kick someone on a 3rd party list, you can instead whitelist the player.
func unMark(ctx context.Context, re *rules.Engine, db store.Querier, state *gameState, sid64 steamid.SID64) (int, error) {
player, errPlayer := getPlayerOrCreate(ctx, db, sid64)
player, errPlayer := loadPlayerOrCreate(ctx, db, sid64)
if errPlayer != nil {
return 0, errPlayer
}
Expand Down Expand Up @@ -44,7 +45,7 @@ func mark(ctx context.Context, sm *settingsManager, db store.Querier, state *gam
if !errors.Is(errPlayer, errPlayerNotFound) {
return errPlayer
}
created, errCreate := getPlayerOrCreate(ctx, db, sid64)
created, errCreate := loadPlayerOrCreate(ctx, db, sid64)
if errCreate != nil {
return errCreate
}
Expand Down Expand Up @@ -76,7 +77,7 @@ func mark(ctx context.Context, sm *settingsManager, db store.Querier, state *gam

// whitelist prevents a player marked in 3rd party lists from being flagged for kicking.
func whitelist(ctx context.Context, db store.Querier, state *gameState, sid64 steamid.SID64, enabled bool) error {
player, errPlayer := getPlayerOrCreate(ctx, db, sid64)
player, errPlayer := loadPlayerOrCreate(ctx, db, sid64)
if errPlayer != nil {
return errPlayer
}
Expand Down
2 changes: 1 addition & 1 deletion announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package main
import (
"context"
"fmt"
"github.com/leighmacdonald/steamid/v3/steamid"
"log/slog"
"slices"
"sort"
"strings"
"time"

"github.com/leighmacdonald/bd/rules"
"github.com/leighmacdonald/steamid/v3/steamid"
)

type kickRequest struct {
Expand Down
7 changes: 4 additions & 3 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/leighmacdonald/bd-api/models"
"github.com/leighmacdonald/steamid/v3/steamid"
"github.com/leighmacdonald/steamweb/v2"
"net/http"
"net/url"
"strings"
"sync"

"github.com/leighmacdonald/bd-api/models"
"github.com/leighmacdonald/steamid/v3/steamid"
"github.com/leighmacdonald/steamweb/v2"
)

type FriendMap map[steamid.SID64][]steamweb.Friend
Expand Down
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ func getLaunchArgs(rconPass string, rconPort uint16, steamRoot string, steamID s
}

bdArgs := []string{
"-game", "tf",
//"-noreactlogin", // needed for vac to load as of late 2022?
"-steam",
"-secure",
"-usercon",
"+ip", "0.0.0.0",
"+sv_rcon_whitelist_address", "127.0.0.1",
Expand Down
11 changes: 0 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.22

require (
fyne.io/systray v1.10.0
github.com/Masterminds/squirrel v1.5.4
github.com/andygrunwald/vdf v1.1.0
github.com/dotse/slug v0.1.0
github.com/golang-migrate/migrate/v4 v4.17.0
Expand All @@ -18,7 +17,6 @@ require (
github.com/nxadm/tail v1.4.11
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c
github.com/stretchr/testify v1.8.4
golang.org/x/sync v0.6.0
golang.org/x/sys v0.17.0
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce
gopkg.in/yaml.v3 v3.0.1
Expand All @@ -35,9 +33,6 @@ require (
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
github.com/logrusorgru/aurora v2.0.3+incompatible // indirect
github.com/logrusorgru/aurora/v4 v4.0.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
Expand All @@ -61,19 +56,13 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect
golang.org/x/mod v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.18.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
lukechampine.com/uint128 v1.3.0 // indirect
modernc.org/cc/v3 v3.41.0 // indirect
modernc.org/ccgo/v3 v3.17.0 // indirect
modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 // indirect
modernc.org/libc v1.41.0 // indirect
modernc.org/mathutil v1.6.0 // indirect
modernc.org/memory v1.7.2 // indirect
modernc.org/opt v0.1.3 // indirect
modernc.org/strutil v1.2.0 // indirect
modernc.org/token v1.1.0 // indirect
)
77 changes: 4 additions & 73 deletions go.sum

Large diffs are not rendered by default.

131 changes: 52 additions & 79 deletions loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,32 @@ package main

import (
"context"
"log/slog"
"sync"
"time"

"github.com/leighmacdonald/bd-api/models"
"github.com/leighmacdonald/bd/rules"
"github.com/leighmacdonald/bd/store"
"github.com/leighmacdonald/steamid/v3/steamid"
"github.com/leighmacdonald/steamweb/v2"
"log/slog"
"sync"
"time"
)

type updatedRemoteData struct {
type bulkUpdatedRemoteData struct {
summaries []steamweb.PlayerSummary
bans []steamweb.PlayerBanState
sourcebans SourcebansMap
friends FriendMap
}

type playerDataUpdate struct {
steamID steamid.SID64
summary steamweb.PlayerSummary
bans steamweb.PlayerBanState
sourcebans []models.SbBanRecord
friends []steamweb.Friend
}

type playerDataLoader struct {
profileUpdateQueue chan steamid.SID64
datasource DataSource
Expand All @@ -27,21 +37,19 @@ type playerDataLoader struct {
re *rules.Engine
}

func newPlayerDataLoader(db store.Querier, ds DataSource, state *gameState, settings *settingsManager, re *rules.Engine) *playerDataLoader {
func newPlayerDataLoader(db store.Querier, ds DataSource, state *gameState, settings *settingsManager, re *rules.Engine,
profileUpdateQueue chan steamid.SID64,
) *playerDataLoader {
return &playerDataLoader{
db: db,
datasource: ds,
state: state,
settings: settings,
re: re,
profileUpdateQueue: make(chan steamid.SID64),
profileUpdateQueue: profileUpdateQueue,
}
}

func (p playerDataLoader) queue(sid64 steamid.SID64) {
p.profileUpdateQueue <- sid64
}

// playerDataLoader will update the 3rd party data from remote APIs.
// It will wait a short amount of time between updates to attempt to batch send the requests
// as much as possible.
Expand All @@ -62,86 +70,51 @@ func (p playerDataLoader) start(ctx context.Context) {
continue
}

updateData := p.fetchProfileUpdates(ctx, queue)
p.applyRemoteData(updateData)
bulkData := p.fetchProfileUpdates(ctx, queue)

for _, player := range p.state.players.all() {
localPlayer := player
if errSave := p.db.PlayerUpdate(ctx, localPlayer.toUpdateParams()); errSave != nil {
if errSave.Error() != "sql: database is closed" {
slog.Error("Failed to save updated player state",
slog.String("sid", localPlayer.SID64().String()), errAttr(errSave))
// Flatten the results
var updates []playerDataUpdate
for _, steamID := range queue {
u := playerDataUpdate{
steamID: steamID,
friends: make([]steamweb.Friend, 0),
sourcebans: make([]models.SbBanRecord, 0),
bans: steamweb.PlayerBanState{},
}
for _, summary := range bulkData.summaries {
if summary.SteamID == steamID {
u.summary = summary
break
}
}

p.state.players.update(localPlayer)
}

ourSteamID := p.settings.Settings().SteamID

for steamID, friends := range updateData.friends {
for _, friend := range friends {
if friend.SteamID == ourSteamID {
if actualPlayer, errPlayer := p.state.players.bySteamID(steamID, true); errPlayer == nil {
actualPlayer.OurFriend = true

p.state.players.update(actualPlayer)

break
}
for _, ban := range bulkData.bans {
if ban.SteamID == steamID {
u.bans = ban
break
}
}
}

slog.Info("Updated",
slog.Int("sums", len(updateData.summaries)), slog.Int("bans", len(updateData.bans)),
slog.Int("sourcebans", len(updateData.sourcebans)), slog.Int("fiends", len(updateData.friends)))

queue = nil
}
}
}

// applyRemoteData updates the current player states with new incoming data.
func (p playerDataLoader) applyRemoteData(data updatedRemoteData) {
players := p.state.players.all()

for _, curPlayer := range players {
player := curPlayer
for _, sum := range data.summaries {
if sum.SteamID == player.SteamID {
player.AvatarHash = sum.AvatarHash
player.AccountCreatedOn = time.Unix(int64(sum.TimeCreated), 0)
player.Visibility = int64(sum.CommunityVisibilityState)

break
}
}

for _, ban := range data.bans {
if ban.SteamID == player.SteamID {
player.CommunityBanned = ban.CommunityBanned
player.CommunityBanned = ban.VACBanned
player.GameBans = int64(ban.NumberOfGameBans)
player.VacBans = int64(ban.NumberOfVACBans)
player.EconomyBan = ban.EconomyBan
if friends, ok := bulkData.friends[steamID]; ok {
u.friends = friends
}

if ban.VACBanned && ban.DaysSinceLastBan > 0 {
player.LastVacBanOn = time.Now().AddDate(0, 0, -ban.DaysSinceLastBan).Unix()
if sourcebans, ok := bulkData.sourcebans[steamID]; ok {
u.sourcebans = sourcebans
}

break
updates = append(updates, u)
}
}

if sb, ok := data.sourcebans[player.SteamID]; ok {
player.Sourcebans = sb
}
for _, update := range updates {
p.state.playerDataChan <- update
}

player.UpdatedOn = time.Now()
player.ProfileUpdatedOn = player.UpdatedOn
slog.Info("Updated",
slog.Int("sums", len(bulkData.summaries)), slog.Int("bans", len(bulkData.bans)),
slog.Int("sourcebans", len(bulkData.sourcebans)), slog.Int("fiends", len(bulkData.friends)))

p.state.players.update(player)
queue = nil
}
}
}

Expand All @@ -155,12 +128,12 @@ func (p playerDataLoader) applyRemoteData(data updatedRemoteData) {
//
// If the user does not configure their own steam api key using LocalDataSource, then the
// default bd-api backed APIDataSource will instead be used as a proxy for fetching the results.
func (p playerDataLoader) fetchProfileUpdates(ctx context.Context, queued steamid.Collection) updatedRemoteData {
func (p playerDataLoader) fetchProfileUpdates(ctx context.Context, queued steamid.Collection) bulkUpdatedRemoteData {
localCtx, cancel := context.WithTimeout(ctx, time.Second*15)
defer cancel()

var (
updated updatedRemoteData
updated bulkUpdatedRemoteData
waitGroup = &sync.WaitGroup{}
)

Expand Down
12 changes: 8 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/leighmacdonald/bd/platform"
"github.com/leighmacdonald/bd/rules"
"github.com/leighmacdonald/bd/store"
"github.com/leighmacdonald/steamid/v3/steamid"
_ "modernc.org/sqlite"
)

Expand Down Expand Up @@ -124,7 +125,10 @@ func run() int {
// }

rcon := newRconConnection(settings.Rcon.String(), settings.Rcon.Password)
state := newGameState(db, settingsMgr, newPlayerStates(), rcon)

profileUpdateQueue := make(chan steamid.SID64)

state := newGameState(db, settingsMgr, newPlayerStates(), rcon, db, profileUpdateQueue)
eh := newEventHandler(state)

ingest, errLogReader := newLogIngest(filepath.Join(settings.TF2Dir, "console.log"), newLogParser(), true)
Expand All @@ -144,9 +148,9 @@ func run() int {
}

re := createRulesEngine(settingsMgr)
updater := newPlayerDataLoader(db, dataSource, state, settingsMgr, re)
updater := newPlayerDataLoader(db, dataSource, state, settingsMgr, re, profileUpdateQueue)
discordPresence := newDiscordState(state, settingsMgr)
processHandler := newProcessState(plat, rcon)
processHandler := newProcessState(plat, rcon, settingsMgr)

Check failure on line 153 in main.go

View workflow job for this annotation

GitHub Actions / staticcheck

cannot use plat (variable of type platform.LinuxPlatform) as platform.Platform value in argument to newProcessState: platform.LinuxPlatform does not implement platform.Platform (wrong type for method LaunchTF2)

Check failure on line 153 in main.go

View workflow job for this annotation

GitHub Actions / staticcheck

cannot use plat (variable of type platform.LinuxPlatform) as platform.Platform value in argument to newProcessState: platform.LinuxPlatform does not implement platform.Platform (wrong type for method LaunchTF2)

Check failure on line 153 in main.go

View workflow job for this annotation

GitHub Actions / lint-golangci

cannot use plat (variable of type platform.LinuxPlatform) as platform.Platform value in argument to newProcessState: platform.LinuxPlatform does not implement platform.Platform (wrong type for method LaunchTF2)
statusHandler := newStatusUpdater(rcon, processHandler, state, time.Second*2)
bigBrotherHandler := newOverwatch(settingsMgr, rcon, state)

Expand All @@ -161,7 +165,7 @@ func run() int {
httpServer := newHTTPServer(ctx, settings.HTTPListenAddr, mux)

// Start all the background workers
for _, svc := range []backgroundService{eh, discordPresence, cr, ingest, updater, statusHandler, bigBrotherHandler} {
for _, svc := range []backgroundService{eh, discordPresence, cr, ingest, updater, statusHandler, bigBrotherHandler, processHandler} {
go svc.start(ctx)
}

Expand Down
Loading

0 comments on commit 5f211d4

Please sign in to comment.