Skip to content

Commit

Permalink
Merge pull request #288 from strideynet/kev-bg-tasks
Browse files Browse the repository at this point in the history
Move (un)following to background tasks
  • Loading branch information
strideynet authored Nov 22, 2024
2 parents 0c6bac0 + e76393a commit 84791c7
Show file tree
Hide file tree
Showing 15 changed files with 551 additions and 68 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ ENV=dev
BFF_INGESTER_ENABLED=1
BFF_API_ENABLED=1
BFF_SCORE_MATERIALIZER_ENABLED=1
BFF_BACKGROUND_WORKER_ENABLED=1
# Set BFF_HOSTNAME to the host you will serve BFF on.
BFF_HOSTNAME=

Expand Down
8 changes: 0 additions & 8 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"connectrpc.com/connect"
"connectrpc.com/otelconnect"
"github.com/rs/cors"
"github.com/strideynet/bsky-furry-feed/bluesky"
"github.com/strideynet/bsky-furry-feed/feed"
"github.com/strideynet/bsky-furry-feed/proto/bff/v1/bffv1pbconnect"
"github.com/strideynet/bsky-furry-feed/store"
Expand Down Expand Up @@ -49,7 +48,6 @@ func New(
feedService feedService,
pgxStore *store.PGXStore,
pdsHost string,
bskyCredentials *bluesky.Credentials,
authEngine *AuthEngine,
) (*http.Server, error) {
mux := &http.ServeMux{}
Expand Down Expand Up @@ -77,16 +75,10 @@ func New(
mux.Handle(getFeedSkeletonHandler(log, feedService))
mux.Handle(describeFeedGeneratorHandler(log, hostname, feedService))

client, err := bluesky.ClientFromCredentials(ctx, pdsHost, bskyCredentials)
if err != nil {
return nil, fmt.Errorf("creating bluesky client: %w", err)
}

// Mount Buf Connect services
modSvcHandler := &ModerationServiceHandler{
store: pgxStore,
log: log,
pdsClient: client,
authEngine: authEngine,
}
interceptors := connect.WithInterceptors(
Expand Down
5 changes: 0 additions & 5 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"connectrpc.com/connect"
indigoTest "github.com/bluesky-social/indigo/testing"
"github.com/stretchr/testify/require"
"github.com/strideynet/bsky-furry-feed/bluesky"
"github.com/strideynet/bsky-furry-feed/feed"
"github.com/strideynet/bsky-furry-feed/testenv"
)
Expand Down Expand Up @@ -67,10 +66,6 @@ func startAPIHarness(ctx context.Context, t *testing.T) *apiHarness {
},
harness.Store,
harness.PDS.HTTPHost(),
&bluesky.Credentials{
Identifier: "bff.tpds",
Password: "password",
},
&AuthEngine{
TokenValidator: BSkyTokenValidator(harness.PDS.HTTPHost()),
ActorGetter: harness.Store,
Expand Down
59 changes: 6 additions & 53 deletions api/moderation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ package api

import (
"context"
"errors"
"fmt"
"time"

"github.com/bluesky-social/indigo/api/bsky"
"github.com/bluesky-social/indigo/xrpc"
"github.com/strideynet/bsky-furry-feed/bluesky"
"golang.org/x/exp/slices"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -21,8 +18,6 @@ import (
type ModerationServiceHandler struct {
store *store.PGXStore
log *zap.Logger
pdsClient *bluesky.PDSClient
bgsClient bluesky.BGSClient
authEngine *AuthEngine
}

Expand Down Expand Up @@ -64,8 +59,8 @@ func (m *ModerationServiceHandler) BanActor(ctx context.Context, req *connect.Re
return nil, fmt.Errorf("creating audit event: %w", err)
}

if err := m.pdsClient.Unfollow(ctx, req.Msg.ActorDid); err != nil {
return nil, fmt.Errorf("unfollowing actor: %w", err)
if err := tx.EnqueueUnfollow(ctx, req.Msg.ActorDid); err != nil {
return nil, fmt.Errorf("enqueuing unfollow: %w", err)
}

if err := tx.Commit(ctx); err != nil {
Expand Down Expand Up @@ -122,8 +117,8 @@ func (m *ModerationServiceHandler) UnapproveActor(ctx context.Context, req *conn
return nil, fmt.Errorf("creating audit event: %w", err)
}

if err := m.pdsClient.Unfollow(ctx, req.Msg.ActorDid); err != nil {
return nil, fmt.Errorf("unfollowing actor: %w", err)
if err := tx.EnqueueUnfollow(ctx, req.Msg.ActorDid); err != nil {
return nil, fmt.Errorf("enqueuing unfollow: %w", err)
}

if err := tx.Commit(ctx); err != nil {
Expand Down Expand Up @@ -474,50 +469,8 @@ func (m *ModerationServiceHandler) ForceApproveActor(ctx context.Context, req *c
}

func (m *ModerationServiceHandler) updateProfileAndFollow(ctx context.Context, actorDID string, tx *store.PGXTX) error {
record, repoRev, err := m.bgsClient.SyncGetRecord(ctx, "app.bsky.actor.profile", actorDID, "self")
if err != nil {
if err2 := (&xrpc.Error{}); !errors.As(err, &err2) || err2.StatusCode != 404 {
return fmt.Errorf("getting profile: %w", err)
}
record = nil
}

var profile *bsky.ActorProfile
if record != nil {
switch record := record.(type) {
case *bsky.ActorProfile:
profile = record
default:
return fmt.Errorf("expected *bsky.ActorProfile, got %T", record)
}
}

displayName := ""
description := ""

if profile != nil {
if profile.DisplayName != nil {
displayName = *profile.DisplayName
}

if profile.Description != nil {
description = *profile.Description
}
}

if err := tx.CreateLatestActorProfile(ctx, store.CreateLatestActorProfileOpts{
ActorDID: actorDID,
CommitCID: repoRev,
CreatedAt: time.Now(), // NOTE: The Firehose reader uses the server time but we use the local time here. This may cause staleness if the firehose gives us an older timestamp but a newer update.
IndexedAt: time.Now(),
DisplayName: displayName,
Description: description,
}); err != nil {
return fmt.Errorf("updating actor profile: %w", err)
}

if err := m.pdsClient.Follow(ctx, actorDID); err != nil {
return fmt.Errorf("following approved actor: %w", err)
if err := tx.EnqueueFollow(ctx, actorDID); err != nil {
return fmt.Errorf("enqueing follow: %w", err)
}

return nil
Expand Down
16 changes: 15 additions & 1 deletion cmd/bffsrv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/grafana/pyroscope-go"
"github.com/strideynet/bsky-furry-feed/scoring"
"github.com/strideynet/bsky-furry-feed/worker"

"github.com/joho/godotenv"
"github.com/strideynet/bsky-furry-feed/api"
Expand Down Expand Up @@ -117,6 +118,7 @@ func runE(log *zap.Logger) error {
ingesterEnabled := os.Getenv("BFF_INGESTER_ENABLED") == "1"
apiEnabled := os.Getenv("BFF_API_ENABLED") == "1"
scoreMaterializerEnabled := os.Getenv("BFF_SCORE_MATERIALIZER_ENABLED") == "1"
backgroundWorkerEnabled := os.Getenv("BFF_BACKGROUND_WORKER_ENABLED") == "1"

log.Info("starting", zap.String("mode", string(mode)))

Expand Down Expand Up @@ -241,7 +243,6 @@ func runE(log *zap.Logger) error {
feedService,
pgxStore,
bluesky.DefaultPDSHost,
bskyCredentials,
&api.AuthEngine{
ActorGetter: pgxStore,
TokenValidator: api.BSkyTokenValidator(bluesky.DefaultPDSHost),
Expand Down Expand Up @@ -280,6 +281,19 @@ func runE(log *zap.Logger) error {
})
}

if backgroundWorkerEnabled {
log.Info("starting background worker")

eg.Go(func() error {
worker, err := worker.New(ctx, log.Named("background_worker"), bluesky.DefaultPDSHost, bskyCredentials, pgxStore)
if err != nil {
return fmt.Errorf("initializing worker: %w", err)
}

return worker.Run(ctx)
})
}

// Setup private diagnostics/metrics server
debugSrv := debugServer()
eg.Go(func() error {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/stretchr/testify v1.9.0
github.com/stretchr/testify v1.9.1-0.20241112225845-89cbdd9e7b39
github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,8 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.9.1-0.20241112225845-89cbdd9e7b39 h1:unyM6Xj3Keon8PvufL+AOmDMuaO7Uu/r3JW3ZGSZfqQ=
github.com/stretchr/testify v1.9.1-0.20241112225845-89cbdd9e7b39/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807/go.mod h1:7jxmlfBCDBXRzr0eAQJ48XC1hBu1np4CS5+cHEYfwpc=
github.com/testcontainers/testcontainers-go v0.21.0 h1:syePAxdeTzfkap+RrJaQZpJQ/s/fsUgn11xIvHrOE9U=
github.com/testcontainers/testcontainers-go v0.21.0/go.mod h1:c1ez3WVRHq7T/Aj+X3TIipFBwkBaNT5iNCY8+1b83Ng=
Expand Down
96 changes: 96 additions & 0 deletions store/gen/follow_tasks.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions store/gen/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions store/migrations/000025_create_follow_tasks.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE follow_tasks;
12 changes: 12 additions & 0 deletions store/migrations/000025_create_follow_tasks.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE follow_tasks (
id BIGSERIAL PRIMARY KEY,
actor_did TEXT NOT NULL REFERENCES candidate_actors (did),
next_try_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
tries INT DEFAULT 0 NOT NULL,
should_unfollow BOOLEAN NOT NULL,
finished_at TIMESTAMPTZ,
last_error TEXT
);

CREATE INDEX follow_tasks_dates_idx ON public.follow_tasks (next_try_at, finished_at);
34 changes: 34 additions & 0 deletions store/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,3 +827,37 @@ func (s *PGXStore) HoldBackPendingActor(ctx context.Context, did string, duratio
HeldUntil: pgtype.Timestamptz{Time: duration, Valid: true},
})
}

func (s *PGXStore) EnqueueFollow(ctx context.Context, did string) error {
return s.queries.EnqueueFollowTask(ctx, gen.EnqueueFollowTaskParams{
ActorDID: did,
NextTryAt: pgtype.Timestamptz{Time: time.Now(), Valid: true},
CreatedAt: pgtype.Timestamptz{Time: time.Now(), Valid: true},
ShouldUnfollow: false,
})
}

func (s *PGXStore) EnqueueUnfollow(ctx context.Context, did string) error {
return s.queries.EnqueueFollowTask(ctx, gen.EnqueueFollowTaskParams{
ActorDID: did,
NextTryAt: pgtype.Timestamptz{Time: time.Now(), Valid: true},
CreatedAt: pgtype.Timestamptz{Time: time.Now(), Valid: true},
ShouldUnfollow: true,
})
}

func (s *PGXStore) GetNextFollowTask(ctx context.Context) (gen.FollowTask, error) {
return s.queries.GetNextFollowTask(ctx)
}

func (s *PGXStore) MarkFollowTaskAsErrored(ctx context.Context, id int64, err error) error {
return s.queries.MarkFollowTaskAsErrored(ctx, gen.MarkFollowTaskAsErroredParams{
ID: id,
LastError: pgtype.Text{String: err.Error(), Valid: true},
NextTryAt: pgtype.Timestamptz{Time: time.Now().Add(time.Minute * 15), Valid: true},
})
}

func (s *PGXStore) MarkFollowTaskAsDone(ctx context.Context, id int64) error {
return s.queries.MarkFollowTaskAsDone(ctx, id)
}
Loading

0 comments on commit 84791c7

Please sign in to comment.