Skip to content

Commit

Permalink
[MM-62338] Persist logs to scoped data path (#35)
Browse files Browse the repository at this point in the history
* Persist logs to scoped data path

* Fix test setup

* Fix tests
  • Loading branch information
streamer45 authored Jan 7, 2025
1 parent f19a56a commit d2a6016
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 26 deletions.
2 changes: 1 addition & 1 deletion cmd/transcriber/call/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestReportJobFailure(t *testing.T) {
AuthToken: "qj75unbsef83ik9p7ueypb6iyw",
}
cfg.SetDefaults()
tr, err := NewTranscriber(cfg)
tr, err := NewTranscriber(cfg, GetDataDir(""))
require.NoError(t, err)
require.NotNil(t, tr)

Expand Down
4 changes: 2 additions & 2 deletions cmd/transcriber/call/tracks.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (t *Transcriber) processLiveTrack(track trackRemote, sessionID string) {
return
}
ctx.user = user
ctx.filename = filepath.Join(getDataDir(), fmt.Sprintf("%s_%s.ogg", user.Id, track.ID()))
ctx.filename = filepath.Join(t.dataPath, fmt.Sprintf("%s_%s.ogg", user.Id, track.ID()))

var prevArrivalTime time.Time
var prevRTPTimestamp uint32
Expand Down Expand Up @@ -535,7 +535,7 @@ func (t *Transcriber) newTrackTranscriber() (transcribe.Transcriber, error) {
return azure.NewSpeechRecognizer(azure.SpeechRecognizerConfig{
SpeechKey: speechKey,
SpeechRegion: speechRegion,
DataDir: getDataDir(),
DataDir: t.dataPath,
})
default:
return nil, fmt.Errorf("transcribe API %q not implemented", t.cfg.TranscribeAPI)
Expand Down
9 changes: 8 additions & 1 deletion cmd/transcriber/call/transcriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type APIClient interface {
type Transcriber struct {
cfg config.CallTranscriberConfig

dataPath string

client *client.Client
apiClient APIClient
apiURL string
Expand All @@ -48,18 +50,23 @@ type Transcriber struct {
captionsPoolDoneCh chan struct{}
}

func NewTranscriber(cfg config.CallTranscriberConfig) (t *Transcriber, retErr error) {
func NewTranscriber(cfg config.CallTranscriberConfig, dataPath string) (t *Transcriber, retErr error) {
if err := cfg.IsValidURL(); err != nil {
return nil, fmt.Errorf("failed to validate URL: %w", err)
}

if dataPath == "" {
return nil, fmt.Errorf("dataPath should not be empty")
}

apiClient := model.NewAPIv4Client(cfg.SiteURL)
apiClient.SetToken(cfg.AuthToken)

t = &Transcriber{
cfg: cfg,
apiClient: apiClient,
apiURL: apiClient.URL,
dataPath: dataPath,
}

defer func() {
Expand Down
48 changes: 42 additions & 6 deletions cmd/transcriber/call/transcriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ func setupTranscriberForTest(t *testing.T) *Transcriber {
ModelSize: config.ModelSizeTiny,
}
cfg.SetDefaults()
tr, err := NewTranscriber(cfg)
require.NoError(t, err)
require.NotNil(t, tr)

dir, err := os.MkdirTemp("", "data")
if err != nil {
Expand All @@ -58,9 +55,48 @@ func setupTranscriberForTest(t *testing.T) *Transcriber {
os.RemoveAll(dir)
})

tr, err := NewTranscriber(cfg, GetDataDir(""))
require.NoError(t, err)
require.NotNil(t, tr)

return tr
}

func TestNewTranscriber(t *testing.T) {
t.Run("invalid siteURL", func(t *testing.T) {
cfg := config.CallTranscriberConfig{
SiteURL: "invalid-url",
CallID: "8w8jorhr7j83uqr6y1st894hqe",
PostID: "udzdsg7dwidbzcidx5khrf8nee",
TranscriptionID: "67t5u6cmtfbb7jug739d43xa9e",
AuthToken: "qj75unbsef83ik9p7ueypb6iyw",
NumThreads: 1,
ModelSize: config.ModelSizeTiny,
}
cfg.SetDefaults()

tr, err := NewTranscriber(cfg, GetDataDir(""))
require.EqualError(t, err, "failed to validate URL: SiteURL parsing failed: invalid scheme \"\"")
require.Nil(t, tr)
})
t.Run("empty data path", func(t *testing.T) {
cfg := config.CallTranscriberConfig{
SiteURL: "http://localhost:8065",
CallID: "8w8jorhr7j83uqr6y1st894hqe",
PostID: "udzdsg7dwidbzcidx5khrf8nee",
TranscriptionID: "67t5u6cmtfbb7jug739d43xa9e",
AuthToken: "qj75unbsef83ik9p7ueypb6iyw",
NumThreads: 1,
ModelSize: config.ModelSizeTiny,
}
cfg.SetDefaults()

tr, err := NewTranscriber(cfg, "")
require.EqualError(t, err, "dataPath should not be empty")
require.Nil(t, tr)
})
}

func TestTranscribeTrack(t *testing.T) {
tr := setupTranscriberForTest(t)

Expand Down Expand Up @@ -192,7 +228,7 @@ func TestProcessLiveTrack(t *testing.T) {
close(tr.trackCtxs)
require.Len(t, tr.trackCtxs, 1)

trackFile, err := os.Open(filepath.Join(getDataDir(), fmt.Sprintf("userID_%s.ogg", track.id)))
trackFile, err := os.Open(filepath.Join(tr.dataPath, fmt.Sprintf("userID_%s.ogg", track.id)))
defer trackFile.Close()
require.NoError(t, err)

Expand Down Expand Up @@ -293,7 +329,7 @@ func TestProcessLiveTrack(t *testing.T) {
close(tr.trackCtxs)
require.Len(t, tr.trackCtxs, 1)

trackFile, err := os.Open(filepath.Join(getDataDir(), fmt.Sprintf("userID_%s.ogg", track.id)))
trackFile, err := os.Open(filepath.Join(tr.dataPath, fmt.Sprintf("userID_%s.ogg", track.id)))
defer trackFile.Close()
require.NoError(t, err)

Expand Down Expand Up @@ -393,7 +429,7 @@ func TestProcessLiveTrack(t *testing.T) {
close(tr.trackCtxs)
require.Len(t, tr.trackCtxs, 1)

trackFile, err := os.Open(filepath.Join(getDataDir(), fmt.Sprintf("userID_%s.ogg", track.id)))
trackFile, err := os.Open(filepath.Join(tr.dataPath, fmt.Sprintf("userID_%s.ogg", track.id)))
defer trackFile.Close()
require.NoError(t, err)

Expand Down
10 changes: 5 additions & 5 deletions cmd/transcriber/call/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ func (t *Transcriber) getUserForSession(sessionID string) (*model.User, error) {
return nil, fmt.Errorf("failed to get user for call: max attempts reached")
}

func getDataDir() string {
func GetDataDir(jobID string) string {
if dir := os.Getenv("DATA_DIR"); dir != "" {
return dir
return filepath.Join(dir, jobID)
}
return dataDir
return filepath.Join(dataDir, jobID)
}

func getModelsDir() string {
Expand Down Expand Up @@ -102,12 +102,12 @@ func (t *Transcriber) publishTranscription(tr transcribe.Transcription) (err err
var vttFile *os.File
var textFile *os.File
openFiles := func() error {
vttFile, err = os.OpenFile(filepath.Join(getDataDir(), fname+".vtt"), os.O_RDWR|os.O_CREATE, 0600)
vttFile, err = os.OpenFile(filepath.Join(t.dataPath, fname+".vtt"), os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return fmt.Errorf("failed to open output file: %w", err)
}

textFile, err = os.OpenFile(filepath.Join(getDataDir(), fname+".txt"), os.O_RDWR|os.O_CREATE, 0600)
textFile, err = os.OpenFile(filepath.Join(t.dataPath, fname+".txt"), os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return fmt.Errorf("failed to open output file: %w", err)
}
Expand Down
24 changes: 16 additions & 8 deletions cmd/transcriber/call/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,18 @@ func TestPublishTranscriptions(t *testing.T) {
ModelSize: config.ModelSizeTiny,
}
cfg.SetDefaults()
tr, err := NewTranscriber(cfg)

tmpDir, err := os.MkdirTemp("", "")
require.NoError(t, err)
dataDir := os.Getenv("DATA_DIR")
os.Setenv("DATA_DIR", tmpDir)
defer os.Setenv("DATA_DIR", dataDir)

tr, err := NewTranscriber(cfg, GetDataDir(""))
require.NoError(t, err)
require.NotNil(t, tr)

t.Run("failure to get filename", func(t *testing.T) {
t.Run("invalid response", func(t *testing.T) {
err := tr.publishTranscription(transcribe.Transcription{})
require.EqualError(t, err, "failed to get filename for call: failed to get filename: AppErrorFromJSON: model.utils.decode_json.app_error, body: 404 page not found\n, json: cannot unmarshal number into Go value of type model.AppError")
})
Expand All @@ -97,11 +104,16 @@ func TestPublishTranscriptions(t *testing.T) {
},
}

tr.dataPath = "/invalid"
defer func() {
tr.dataPath = GetDataDir("")
}()

err := tr.publishTranscription(transcribe.Transcription{})
require.EqualError(t, err, fmt.Sprintf("failed to open output file: open %s: no such file or directory", filepath.Join(getDataDir(), "Call_Test.vtt")))
require.EqualError(t, err, fmt.Sprintf("failed to open output file: open %s: no such file or directory", filepath.Join(tr.dataPath, "Call_Test.vtt")))
})

vttFile, err := os.CreateTemp("", "Call_Test.vtt")
vttFile, err := os.CreateTemp(tmpDir, "Call_Test.vtt")
require.NoError(t, err)
defer os.Remove(vttFile.Name())

Expand All @@ -124,10 +136,6 @@ All right, we should be recording. Welcome everyone, developers meeting for Dece
`))
require.NoError(t, err)

dataDir := os.Getenv("DATA_DIR")
os.Setenv("DATA_DIR", filepath.Dir(vttFile.Name()))
defer os.Setenv("DATA_DIR", dataDir)

maxAPIRetryAttempts = 2

t.Run("upload session creation failure", func(t *testing.T) {
Expand Down
27 changes: 24 additions & 3 deletions cmd/transcriber/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"io"
"log/slog"
"os"
"os/signal"
Expand Down Expand Up @@ -39,11 +40,31 @@ func slogReplaceAttr(_ []string, a slog.Attr) slog.Attr {
}

func main() {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
trID := os.Getenv("TRANSCRIPTION_ID")

// Create scoped (by jobID) data path
dataPath := call.GetDataDir(trID)
err := os.MkdirAll(dataPath, 0700)
if err != nil {
slog.Error("failed to create data path", slog.String("err", err.Error()))
os.Exit(1)
}

logFile, err := os.Create(filepath.Join(dataPath, "transcriber.log"))
if err != nil {
slog.Error("failed to create log file", slog.String("err", err.Error()))
os.Exit(1)
}
defer logFile.Close()

// This lets us write logs simultaneously to console and file.
logWriter := io.MultiWriter(os.Stdout, logFile)

logger := slog.New(slog.NewTextHandler(logWriter, &slog.HandlerOptions{
AddSource: true,
Level: slog.LevelDebug,
ReplaceAttr: slogReplaceAttr,
})).With("trID", os.Getenv("TRANSCRIPTION_ID"))
})).With("trID", trID)
slog.SetDefault(logger)

pid := os.Getpid()
Expand All @@ -59,7 +80,7 @@ func main() {
}
cfg.SetDefaults()

transcriber, err := call.NewTranscriber(cfg)
transcriber, err := call.NewTranscriber(cfg, dataPath)
if err != nil {
slog.Error("failed to create call transcriber", slog.String("err", err.Error()))
os.Exit(1)
Expand Down

0 comments on commit d2a6016

Please sign in to comment.