Skip to content

Commit

Permalink
make automod event processing timeouts configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
bnewbold committed Jan 23, 2025
1 parent 988c118 commit 0100a8a
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 61 deletions.
38 changes: 25 additions & 13 deletions automod/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@ import (
"github.com/bluesky-social/indigo/xrpc"
)

const (
recordEventTimeout = 30 * time.Second
identityEventTimeout = 10 * time.Second
notificationEventTimeout = 5 * time.Second
)

// runtime for executing rules, managing state, and recording moderation actions.
//
// NOTE: careful when initializing: several fields must not be nil or zero, even though they are pointer type.
Expand Down Expand Up @@ -60,6 +54,13 @@ type EngineConfig struct {
QuotaModTakedownDay int
// number of misc actions automod can do per day, for all subjects combined (circuit breaker)
QuotaModActionDay int

// timeout for record event processing (total, including all setup, rules, and teardown)
RecordEventTimeout time.Duration
// timeout for identity event and account event processing (total, including all setup, rules, and teardown)
IdentityEventTimeout time.Duration
// timeout for event processing (total, including all setup, rules, and teardown)
OzoneEventTimeout time.Duration
}

// Entrypoint for external code pushing #identity events in to the engine.
Expand All @@ -85,8 +86,11 @@ func (eng *Engine) ProcessIdentityEvent(ctx context.Context, evt comatproto.Sync
eventErrorCount.WithLabelValues("identity").Inc()
}
}()
ctx, cancel := context.WithTimeout(ctx, identityEventTimeout)
defer cancel()
var cancel context.CancelFunc
if eng.Config.IdentityEventTimeout != 0 {
ctx, cancel = context.WithTimeout(ctx, eng.Config.IdentityEventTimeout)
defer cancel()
}

// first purge any caches; we need to re-resolve from scratch on identity updates
if err := eng.PurgeAccountCaches(ctx, did); err != nil {
Expand Down Expand Up @@ -156,8 +160,11 @@ func (eng *Engine) ProcessAccountEvent(ctx context.Context, evt comatproto.SyncS
eventErrorCount.WithLabelValues("account").Inc()
}
}()
ctx, cancel := context.WithTimeout(ctx, identityEventTimeout)
defer cancel()
var cancel context.CancelFunc
if eng.Config.IdentityEventTimeout != 0 {
ctx, cancel = context.WithTimeout(ctx, eng.Config.IdentityEventTimeout)
defer cancel()
}

// first purge any caches; we need to re-resolve from scratch on account updates
if err := eng.PurgeAccountCaches(ctx, did); err != nil {
Expand Down Expand Up @@ -221,8 +228,11 @@ func (eng *Engine) ProcessRecordOp(ctx context.Context, op RecordOp) error {
eng.Logger.Error("automod event execution exception", "err", r, "did", op.DID, "collection", op.Collection, "rkey", op.RecordKey)
}
}()
ctx, cancel := context.WithTimeout(ctx, recordEventTimeout)
defer cancel()
var cancel context.CancelFunc
if eng.Config.RecordEventTimeout != 0 {
ctx, cancel = context.WithTimeout(ctx, eng.Config.RecordEventTimeout)
defer cancel()
}

if err := op.Validate(); err != nil {
eventErrorCount.WithLabelValues("record").Inc()
Expand Down Expand Up @@ -287,6 +297,7 @@ func (eng *Engine) ProcessRecordOp(ctx context.Context, op RecordOp) error {
}

// returns a boolean indicating "block the event"
// NOTE: this code is unused and should be removed
func (eng *Engine) ProcessNotificationEvent(ctx context.Context, senderDID, recipientDID syntax.DID, reason string, subject syntax.ATURI) (bool, error) {
eventProcessCount.WithLabelValues("notif").Inc()
start := time.Now()
Expand All @@ -301,7 +312,8 @@ func (eng *Engine) ProcessNotificationEvent(ctx context.Context, senderDID, reci
eng.Logger.Error("automod event execution exception", "err", r, "sender", senderDID, "recipient", recipientDID)
}
}()
ctx, cancel := context.WithTimeout(ctx, notificationEventTimeout)
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Second*5)
defer cancel()

senderIdent, err := eng.Directory.LookupDID(ctx, senderDID)
Expand Down
7 changes: 5 additions & 2 deletions automod/engine/engine_ozone.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,11 @@ func (eng *Engine) ProcessOzoneEvent(ctx context.Context, eventView *toolsozone.
eng.Logger.Error("automod ozone event execution exception", "err", r, "eventID", eventView.Id, "createdAt", eventView.CreatedAt)
}
}()
ctx, cancel := context.WithTimeout(ctx, recordEventTimeout)
defer cancel()
var cancel context.CancelFunc
if eng.Config.OzoneEventTimeout != 0 {
ctx, cancel = context.WithTimeout(ctx, eng.Config.OzoneEventTimeout)
defer cancel()
}

ec, err := NewOzoneEventContext(ctx, eng, eventView)
if err != nil {
Expand Down
63 changes: 42 additions & 21 deletions cmd/hepa/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,24 @@ func run(args []string) error {
EnvVars: []string{"HEPA_QUOTA_MOD_ACTION_DAY"},
Value: 2000,
},
&cli.DurationFlag{
Name: "record-event-timeout",
Usage: "total processing time for record events (including setup, rules, and persisting)",
EnvVars: []string{"HEPA_RECORD_EVENT_TIMEOUT"},
Value: 30 * time.Second,
},
&cli.DurationFlag{
Name: "identity-event-timeout",
Usage: "total processing time for identity and account events (including setup, rules, and persisting)",
EnvVars: []string{"HEPA_IDENTITY_EVENT_TIMEOUT"},
Value: 10 * time.Second,
},
&cli.DurationFlag{
Name: "ozone-event-timeout",
Usage: "total processing time for ozone events (including setup, rules, and persisting)",
EnvVars: []string{"HEPA_OZONE_EVENT_TIMEOUT"},
Value: 30 * time.Second,
},
}

app.Commands = []*cli.Command{
Expand Down Expand Up @@ -260,27 +278,30 @@ var runCmd = &cli.Command{
srv, err := NewServer(
dir,
Config{
Logger: logger,
BskyHost: cctx.String("atp-bsky-host"),
OzoneHost: cctx.String("atp-ozone-host"),
OzoneDID: cctx.String("ozone-did"),
OzoneAdminToken: cctx.String("ozone-admin-token"),
PDSHost: cctx.String("atp-pds-host"),
PDSAdminToken: cctx.String("pds-admin-token"),
SetsFileJSON: cctx.String("sets-json-path"),
RedisURL: cctx.String("redis-url"),
SlackWebhookURL: cctx.String("slack-webhook-url"),
HiveAPIToken: cctx.String("hiveai-api-token"),
AbyssHost: cctx.String("abyss-host"),
AbyssPassword: cctx.String("abyss-password"),
RatelimitBypass: cctx.String("ratelimit-bypass"),
RulesetName: cctx.String("ruleset"),
PreScreenHost: cctx.String("prescreen-host"),
PreScreenToken: cctx.String("prescreen-token"),
ReportDupePeriod: cctx.Duration("report-dupe-period"),
QuotaModReportDay: cctx.Int("quota-mod-report-day"),
QuotaModTakedownDay: cctx.Int("quota-mod-takedown-day"),
QuotaModActionDay: cctx.Int("quota-mod-action-day"),
Logger: logger,
BskyHost: cctx.String("atp-bsky-host"),
OzoneHost: cctx.String("atp-ozone-host"),
OzoneDID: cctx.String("ozone-did"),
OzoneAdminToken: cctx.String("ozone-admin-token"),
PDSHost: cctx.String("atp-pds-host"),
PDSAdminToken: cctx.String("pds-admin-token"),
SetsFileJSON: cctx.String("sets-json-path"),
RedisURL: cctx.String("redis-url"),
SlackWebhookURL: cctx.String("slack-webhook-url"),
HiveAPIToken: cctx.String("hiveai-api-token"),
AbyssHost: cctx.String("abyss-host"),
AbyssPassword: cctx.String("abyss-password"),
RatelimitBypass: cctx.String("ratelimit-bypass"),
RulesetName: cctx.String("ruleset"),
PreScreenHost: cctx.String("prescreen-host"),
PreScreenToken: cctx.String("prescreen-token"),
ReportDupePeriod: cctx.Duration("report-dupe-period"),
QuotaModReportDay: cctx.Int("quota-mod-report-day"),
QuotaModTakedownDay: cctx.Int("quota-mod-takedown-day"),
QuotaModActionDay: cctx.Int("quota-mod-action-day"),
RecordEventTimeout: cctx.Duration("record-event-timeout"),
IdentityEventTimeout: cctx.Duration("identity-event-timeout"),
OzoneEventTimeout: cctx.Duration("ozone-event-timeout"),
},
)
if err != nil {
Expand Down
56 changes: 31 additions & 25 deletions cmd/hepa/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,30 @@ type Server struct {
}

type Config struct {
Logger *slog.Logger
BskyHost string
OzoneHost string
OzoneDID string
OzoneAdminToken string
PDSHost string
PDSAdminToken string
SetsFileJSON string
RedisURL string
SlackWebhookURL string
HiveAPIToken string
AbyssHost string
AbyssPassword string
RulesetName string
RatelimitBypass string
PreScreenHost string
PreScreenToken string
ReportDupePeriod time.Duration
QuotaModReportDay int
QuotaModTakedownDay int
QuotaModActionDay int
Logger *slog.Logger
BskyHost string
OzoneHost string
OzoneDID string
OzoneAdminToken string
PDSHost string
PDSAdminToken string
SetsFileJSON string
RedisURL string
SlackWebhookURL string
HiveAPIToken string
AbyssHost string
AbyssPassword string
RulesetName string
RatelimitBypass string
PreScreenHost string
PreScreenToken string
ReportDupePeriod time.Duration
QuotaModReportDay int
QuotaModTakedownDay int
QuotaModActionDay int
RecordEventTimeout time.Duration
IdentityEventTimeout time.Duration
OzoneEventTimeout time.Duration
}

func NewServer(dir identity.Directory, config Config) (*Server, error) {
Expand Down Expand Up @@ -215,10 +218,13 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) {
AdminClient: adminClient,
BlobClient: blobClient,
Config: engine.EngineConfig{
ReportDupePeriod: config.ReportDupePeriod,
QuotaModReportDay: config.QuotaModReportDay,
QuotaModTakedownDay: config.QuotaModTakedownDay,
QuotaModActionDay: config.QuotaModActionDay,
ReportDupePeriod: config.ReportDupePeriod,
QuotaModReportDay: config.QuotaModReportDay,
QuotaModTakedownDay: config.QuotaModTakedownDay,
QuotaModActionDay: config.QuotaModActionDay,
RecordEventTimeout: config.RecordEventTimeout,
IdentityEventTimeout: config.IdentityEventTimeout,
OzoneEventTimeout: config.OzoneEventTimeout,
},
}

Expand Down

0 comments on commit 0100a8a

Please sign in to comment.