Skip to content

Commit

Permalink
publish ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
fredrikvedvik committed Sep 12, 2023
1 parent 20b5880 commit 1b9c1f6
Show file tree
Hide file tree
Showing 19 changed files with 355 additions and 15 deletions.
14 changes: 9 additions & 5 deletions activities/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ import (
)

func registerProgressCallback(ctx context.Context) (chan struct{}, func(ffmpeg.Progress)) {
var current ffmpeg.Progress
return newHeartBeater[ffmpeg.Progress](ctx)
}

func newHeartBeater[T any](ctx context.Context) (chan struct{}, func(T)) {
var info T

progressCallback := func(percent ffmpeg.Progress) {
current = percent
cb := func(i T) {
info = i
}

stopChan := make(chan struct{})
Expand All @@ -23,12 +27,12 @@ func registerProgressCallback(ctx context.Context) (chan struct{}, func(ffmpeg.P
for {
select {
case <-timer.C:
activity.RecordHeartbeat(ctx, current)
activity.RecordHeartbeat(ctx, info)
case <-stopChan:
return
}
}
}()

return stopChan, progressCallback
return stopChan, cb
}
27 changes: 27 additions & 0 deletions activities/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package activities

import (
"cloud.google.com/go/pubsub"
"context"
"encoding/json"
)

func PubsubPublish(ctx context.Context, data any) error {
client, err := pubsub.NewClient(ctx, "btv-platform-prod-2")
if err != nil {
return err
}

topic := client.Topic("background_worker")
defer topic.Stop()

msg, err := json.Marshal(data)
if err != nil {
return err
}

_, err = topic.Publish(ctx, &pubsub.Message{
Data: msg,
}).Get(ctx)
return err
}
37 changes: 37 additions & 0 deletions activities/rclone.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package activities

import (
"context"
"github.com/bcc-code/bccm-flows/services/rclone"
"go.temporal.io/sdk/activity"
"time"
)

type RcloneUploadDirInput struct {
Source string
Destination string
}

func RcloneUploadDir(ctx context.Context, input RcloneUploadDirInput) (bool, error) {
activity.RecordHeartbeat(ctx, "Rclone Upload Dir")

res, err := rclone.CopyDir(input.Source, input.Destination)
if err != nil {
return false, err
}

for {
job, err := rclone.CheckJobStatus(res.JobID)
if err != nil {
return false, err
}
activity.RecordHeartbeat(ctx, job)
if job == nil {
return false, nil
}
if job.Finished {
return job.Success, nil
}
time.Sleep(time.Second * 10)
}
}
4 changes: 4 additions & 0 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ func main() {
switch utils.GetQueue() {
case common.QueueDebug:
w.RegisterActivity(activities.Transcribe)
w.RegisterActivity(activities.RcloneUploadDir)
w.RegisterActivity(activities.PubsubPublish)

for _, a := range utilActivities {
w.RegisterActivity(a)
Expand All @@ -106,6 +108,8 @@ func main() {
}
case common.QueueWorker:
w.RegisterActivity(activities.Transcribe)
w.RegisterActivity(activities.RcloneUploadDir)
w.RegisterActivity(activities.PubsubPublish)

for _, a := range utilActivities {
w.RegisterActivity(a)
Expand Down
23 changes: 21 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,23 @@ module github.com/bcc-code/bccm-flows
go 1.20

require (
cloud.google.com/go/pubsub v1.33.0
github.com/cloudevents/sdk-go/v2 v2.14.0
github.com/davecgh/go-spew v1.1.1
github.com/deckarep/golang-set/v2 v2.3.1
github.com/gin-gonic/gin v1.9.1
github.com/go-resty/resty/v2 v2.7.0
github.com/google/uuid v1.3.0
github.com/pkg/errors v0.9.1
github.com/samber/lo v1.38.1
github.com/stretchr/testify v1.8.3
go.temporal.io/sdk v1.24.0
)

require (
cloud.google.com/go v0.110.2 // indirect
cloud.google.com/go/compute v1.19.3 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.0 // indirect
github.com/bytedance/sonic v1.9.1 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
Expand All @@ -27,8 +32,13 @@ require (
github.com/gogo/googleapis v1.4.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gogo/status v1.1.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.11.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
Expand All @@ -43,16 +53,25 @@ require (
github.com/stretchr/objx v0.5.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
go.opencensus.io v0.24.0 // indirect
go.temporal.io/api v1.21.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/oauth2 v0.8.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/genproto v0.0.0-20230525154841-bd750badd5c6 // indirect
google.golang.org/api v0.126.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/grpc v1.55.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
Loading

0 comments on commit 1b9c1f6

Please sign in to comment.