From 0100a8a63e80deee05e7ba11e9eaa574694b71bd Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Wed, 22 Jan 2025 17:36:12 -0800 Subject: [PATCH] make automod event processing timeouts configurable --- automod/engine/engine.go | 38 +++++++++++++------- automod/engine/engine_ozone.go | 7 ++-- cmd/hepa/main.go | 63 ++++++++++++++++++++++------------ cmd/hepa/server.go | 56 ++++++++++++++++-------------- 4 files changed, 103 insertions(+), 61 deletions(-) diff --git a/automod/engine/engine.go b/automod/engine/engine.go index 933987072..1125f89a4 100644 --- a/automod/engine/engine.go +++ b/automod/engine/engine.go @@ -17,12 +17,6 @@ import ( "github.com/bluesky-social/indigo/xrpc" ) -const ( - recordEventTimeout = 30 * time.Second - identityEventTimeout = 10 * time.Second - notificationEventTimeout = 5 * time.Second -) - // runtime for executing rules, managing state, and recording moderation actions. // // NOTE: careful when initializing: several fields must not be nil or zero, even though they are pointer type. @@ -60,6 +54,13 @@ type EngineConfig struct { QuotaModTakedownDay int // number of misc actions automod can do per day, for all subjects combined (circuit breaker) QuotaModActionDay int + + // timeout for record event processing (total, including all setup, rules, and teardown) + RecordEventTimeout time.Duration + // timeout for identity event and account event processing (total, including all setup, rules, and teardown) + IdentityEventTimeout time.Duration + // timeout for event processing (total, including all setup, rules, and teardown) + OzoneEventTimeout time.Duration } // Entrypoint for external code pushing #identity events in to the engine. @@ -85,8 +86,11 @@ func (eng *Engine) ProcessIdentityEvent(ctx context.Context, evt comatproto.Sync eventErrorCount.WithLabelValues("identity").Inc() } }() - ctx, cancel := context.WithTimeout(ctx, identityEventTimeout) - defer cancel() + var cancel context.CancelFunc + if eng.Config.IdentityEventTimeout != 0 { + ctx, cancel = context.WithTimeout(ctx, eng.Config.IdentityEventTimeout) + defer cancel() + } // first purge any caches; we need to re-resolve from scratch on identity updates if err := eng.PurgeAccountCaches(ctx, did); err != nil { @@ -156,8 +160,11 @@ func (eng *Engine) ProcessAccountEvent(ctx context.Context, evt comatproto.SyncS eventErrorCount.WithLabelValues("account").Inc() } }() - ctx, cancel := context.WithTimeout(ctx, identityEventTimeout) - defer cancel() + var cancel context.CancelFunc + if eng.Config.IdentityEventTimeout != 0 { + ctx, cancel = context.WithTimeout(ctx, eng.Config.IdentityEventTimeout) + defer cancel() + } // first purge any caches; we need to re-resolve from scratch on account updates if err := eng.PurgeAccountCaches(ctx, did); err != nil { @@ -221,8 +228,11 @@ func (eng *Engine) ProcessRecordOp(ctx context.Context, op RecordOp) error { eng.Logger.Error("automod event execution exception", "err", r, "did", op.DID, "collection", op.Collection, "rkey", op.RecordKey) } }() - ctx, cancel := context.WithTimeout(ctx, recordEventTimeout) - defer cancel() + var cancel context.CancelFunc + if eng.Config.RecordEventTimeout != 0 { + ctx, cancel = context.WithTimeout(ctx, eng.Config.RecordEventTimeout) + defer cancel() + } if err := op.Validate(); err != nil { eventErrorCount.WithLabelValues("record").Inc() @@ -287,6 +297,7 @@ func (eng *Engine) ProcessRecordOp(ctx context.Context, op RecordOp) error { } // returns a boolean indicating "block the event" +// NOTE: this code is unused and should be removed func (eng *Engine) ProcessNotificationEvent(ctx context.Context, senderDID, recipientDID syntax.DID, reason string, subject syntax.ATURI) (bool, error) { eventProcessCount.WithLabelValues("notif").Inc() start := time.Now() @@ -301,7 +312,8 @@ func (eng *Engine) ProcessNotificationEvent(ctx context.Context, senderDID, reci eng.Logger.Error("automod event execution exception", "err", r, "sender", senderDID, "recipient", recipientDID) } }() - ctx, cancel := context.WithTimeout(ctx, notificationEventTimeout) + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Second*5) defer cancel() senderIdent, err := eng.Directory.LookupDID(ctx, senderDID) diff --git a/automod/engine/engine_ozone.go b/automod/engine/engine_ozone.go index bbe354225..7349dd3a0 100644 --- a/automod/engine/engine_ozone.go +++ b/automod/engine/engine_ozone.go @@ -163,8 +163,11 @@ func (eng *Engine) ProcessOzoneEvent(ctx context.Context, eventView *toolsozone. eng.Logger.Error("automod ozone event execution exception", "err", r, "eventID", eventView.Id, "createdAt", eventView.CreatedAt) } }() - ctx, cancel := context.WithTimeout(ctx, recordEventTimeout) - defer cancel() + var cancel context.CancelFunc + if eng.Config.OzoneEventTimeout != 0 { + ctx, cancel = context.WithTimeout(ctx, eng.Config.OzoneEventTimeout) + defer cancel() + } ec, err := NewOzoneEventContext(ctx, eng, eventView) if err != nil { diff --git a/cmd/hepa/main.go b/cmd/hepa/main.go index 77cee7610..dcdfc6034 100644 --- a/cmd/hepa/main.go +++ b/cmd/hepa/main.go @@ -173,6 +173,24 @@ func run(args []string) error { EnvVars: []string{"HEPA_QUOTA_MOD_ACTION_DAY"}, Value: 2000, }, + &cli.DurationFlag{ + Name: "record-event-timeout", + Usage: "total processing time for record events (including setup, rules, and persisting)", + EnvVars: []string{"HEPA_RECORD_EVENT_TIMEOUT"}, + Value: 30 * time.Second, + }, + &cli.DurationFlag{ + Name: "identity-event-timeout", + Usage: "total processing time for identity and account events (including setup, rules, and persisting)", + EnvVars: []string{"HEPA_IDENTITY_EVENT_TIMEOUT"}, + Value: 10 * time.Second, + }, + &cli.DurationFlag{ + Name: "ozone-event-timeout", + Usage: "total processing time for ozone events (including setup, rules, and persisting)", + EnvVars: []string{"HEPA_OZONE_EVENT_TIMEOUT"}, + Value: 30 * time.Second, + }, } app.Commands = []*cli.Command{ @@ -260,27 +278,30 @@ var runCmd = &cli.Command{ srv, err := NewServer( dir, Config{ - Logger: logger, - BskyHost: cctx.String("atp-bsky-host"), - OzoneHost: cctx.String("atp-ozone-host"), - OzoneDID: cctx.String("ozone-did"), - OzoneAdminToken: cctx.String("ozone-admin-token"), - PDSHost: cctx.String("atp-pds-host"), - PDSAdminToken: cctx.String("pds-admin-token"), - SetsFileJSON: cctx.String("sets-json-path"), - RedisURL: cctx.String("redis-url"), - SlackWebhookURL: cctx.String("slack-webhook-url"), - HiveAPIToken: cctx.String("hiveai-api-token"), - AbyssHost: cctx.String("abyss-host"), - AbyssPassword: cctx.String("abyss-password"), - RatelimitBypass: cctx.String("ratelimit-bypass"), - RulesetName: cctx.String("ruleset"), - PreScreenHost: cctx.String("prescreen-host"), - PreScreenToken: cctx.String("prescreen-token"), - ReportDupePeriod: cctx.Duration("report-dupe-period"), - QuotaModReportDay: cctx.Int("quota-mod-report-day"), - QuotaModTakedownDay: cctx.Int("quota-mod-takedown-day"), - QuotaModActionDay: cctx.Int("quota-mod-action-day"), + Logger: logger, + BskyHost: cctx.String("atp-bsky-host"), + OzoneHost: cctx.String("atp-ozone-host"), + OzoneDID: cctx.String("ozone-did"), + OzoneAdminToken: cctx.String("ozone-admin-token"), + PDSHost: cctx.String("atp-pds-host"), + PDSAdminToken: cctx.String("pds-admin-token"), + SetsFileJSON: cctx.String("sets-json-path"), + RedisURL: cctx.String("redis-url"), + SlackWebhookURL: cctx.String("slack-webhook-url"), + HiveAPIToken: cctx.String("hiveai-api-token"), + AbyssHost: cctx.String("abyss-host"), + AbyssPassword: cctx.String("abyss-password"), + RatelimitBypass: cctx.String("ratelimit-bypass"), + RulesetName: cctx.String("ruleset"), + PreScreenHost: cctx.String("prescreen-host"), + PreScreenToken: cctx.String("prescreen-token"), + ReportDupePeriod: cctx.Duration("report-dupe-period"), + QuotaModReportDay: cctx.Int("quota-mod-report-day"), + QuotaModTakedownDay: cctx.Int("quota-mod-takedown-day"), + QuotaModActionDay: cctx.Int("quota-mod-action-day"), + RecordEventTimeout: cctx.Duration("record-event-timeout"), + IdentityEventTimeout: cctx.Duration("identity-event-timeout"), + OzoneEventTimeout: cctx.Duration("ozone-event-timeout"), }, ) if err != nil { diff --git a/cmd/hepa/server.go b/cmd/hepa/server.go index 7e8722d9d..7f82b4b1a 100644 --- a/cmd/hepa/server.go +++ b/cmd/hepa/server.go @@ -33,27 +33,30 @@ type Server struct { } type Config struct { - Logger *slog.Logger - BskyHost string - OzoneHost string - OzoneDID string - OzoneAdminToken string - PDSHost string - PDSAdminToken string - SetsFileJSON string - RedisURL string - SlackWebhookURL string - HiveAPIToken string - AbyssHost string - AbyssPassword string - RulesetName string - RatelimitBypass string - PreScreenHost string - PreScreenToken string - ReportDupePeriod time.Duration - QuotaModReportDay int - QuotaModTakedownDay int - QuotaModActionDay int + Logger *slog.Logger + BskyHost string + OzoneHost string + OzoneDID string + OzoneAdminToken string + PDSHost string + PDSAdminToken string + SetsFileJSON string + RedisURL string + SlackWebhookURL string + HiveAPIToken string + AbyssHost string + AbyssPassword string + RulesetName string + RatelimitBypass string + PreScreenHost string + PreScreenToken string + ReportDupePeriod time.Duration + QuotaModReportDay int + QuotaModTakedownDay int + QuotaModActionDay int + RecordEventTimeout time.Duration + IdentityEventTimeout time.Duration + OzoneEventTimeout time.Duration } func NewServer(dir identity.Directory, config Config) (*Server, error) { @@ -215,10 +218,13 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) { AdminClient: adminClient, BlobClient: blobClient, Config: engine.EngineConfig{ - ReportDupePeriod: config.ReportDupePeriod, - QuotaModReportDay: config.QuotaModReportDay, - QuotaModTakedownDay: config.QuotaModTakedownDay, - QuotaModActionDay: config.QuotaModActionDay, + ReportDupePeriod: config.ReportDupePeriod, + QuotaModReportDay: config.QuotaModReportDay, + QuotaModTakedownDay: config.QuotaModTakedownDay, + QuotaModActionDay: config.QuotaModActionDay, + RecordEventTimeout: config.RecordEventTimeout, + IdentityEventTimeout: config.IdentityEventTimeout, + OzoneEventTimeout: config.OzoneEventTimeout, }, }