diff --git a/cmd/tail.go b/cmd/tail.go index db7199633..021f4b30b 100644 --- a/cmd/tail.go +++ b/cmd/tail.go @@ -1,11 +1,11 @@ package cmd import ( + "bytes" "io" "io/ioutil" "os" "os/exec" - "strings" "time" "github.com/Benchkram/errz" @@ -35,15 +35,48 @@ type line struct { type lineWriter struct { name string + buf bytes.Buffer out chan line } -func (w lineWriter) Write(b []byte) (int, error) { - s := strings.TrimSuffix(string(b), "\n") - w.out <- line{Line: tail.Line{Text: s, Time: time.Now()}, source: w.name} +func (w *lineWriter) Write(b []byte) (int, error) { + // Buffer lines, then submit all completed lines to the output channel. For incomplete lines + // we just return the number of bytes written. Call Finish() when done writing to ensure any + // final line without line endings are also written to the output channel. + w.buf.Write(b) + i := bytes.LastIndexAny(w.buf.Bytes(), "\r\n") + if i == -1 { + // Incomplete line, so just return. + return len(b), nil + } + + // Completed line. Remove line endings from the buffer and text (in case of \r\n) then submit it. + text := w.buf.Next(i) + + // Consume \r or \n. Note that the Buffer takes care of re-using space when we catch up. + crOrLf, err := w.buf.ReadByte() + if err != nil { + // Impossible because the next character was already found to be a \r or \n. + panic(err) + } + + // If the last character was \n, we could have had \r\n. We want just the line without line + // endings so check if the previous character was \r and if so remove it. + if last := len(text) - 1; last >= 0 && crOrLf == '\n' && text[last] == '\r' { + text = text[:last] + } + + w.out <- line{Line: tail.Line{Text: string(text), Time: time.Now()}, source: w.name} return len(b), nil } +func (w *lineWriter) Finish() { + if w.buf.Len() > 0 { + // Write remainder because it didn't end in a newline. + w.out <- line{Line: tail.Line{Text: w.buf.String(), Time: time.Now()}, source: w.name} + } +} + // Streams output via API to aggregator channel. // Returns nil if streaming's not supported on this path. func tailStream(conn client.Client, agg chan line, path string) io.Closer { @@ -63,9 +96,12 @@ func tailStream(conn client.Client, agg chan line, path string) io.Closer { // Start copying the stream to the aggregate channel go func() { - _, err := io.Copy(lineWriter{name: path, out: agg}, stream) + lw := lineWriter{name: path, out: agg} + _, err := io.Copy(&lw, stream) if err != nil { agg <- line{Line: tail.Line{Time: time.Now(), Err: err}, source: path} + } else { + lw.Finish() } }() return stream diff --git a/cmd/tail_test.go b/cmd/tail_test.go new file mode 100644 index 000000000..c2fd9f0ca --- /dev/null +++ b/cmd/tail_test.go @@ -0,0 +1,75 @@ +package cmd + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestLineWriter(t *testing.T) { + // Setup a writer with test message and output channel. Also mark time before any messages. + const msg = "a complete line" + out := make(chan line, 1) + lw := lineWriter{name: "mine", out: out} + + // Write a message, mark after it's written but before we finish to verify Write produced it. + start := time.Now() + validWrite(t, &lw, msg+"\n") + mark := time.Now() + lw.Finish() + assertLine(t, out, "mine", msg, start, mark) + + start = time.Now() + validWrite(t, &lw, msg+"\r") + mark = time.Now() + lw.Finish() + assertLine(t, out, "mine", msg, start, mark) + + // Classic Windows endings, e.g. CRLF + start = time.Now() + validWrite(t, &lw, msg+"\r\n") + mark = time.Now() + lw.Finish() + assertLine(t, out, "mine", msg, start, mark) + + // Test message split over multiple writes + start = time.Now() + split := len(msg) / 2 + validWrite(t, &lw, msg[:split]) + validWrite(t, &lw, msg[split:]) + validWrite(t, &lw, "\r\n") + mark = time.Now() + lw.Finish() + assertLine(t, out, "mine", msg, start, mark) + + // Test multiple lines, with no newline on last one + start = time.Now() + validWrite(t, &lw, msg) + validWrite(t, &lw, "\r") + assertLine(t, out, "mine", msg, start, time.Now()) + start = time.Now() + validWrite(t, &lw, msg) + validWrite(t, &lw, "\n") + assertLine(t, out, "mine", msg, start, time.Now()) + start = time.Now() + validWrite(t, &lw, msg) + lw.Finish() + assertLine(t, out, "mine", msg, start, time.Now()) +} + +func validWrite(t *testing.T, lw *lineWriter, msg string) { + n, err := lw.Write([]byte(msg)) + assert.NoError(t, err) + assert.Equal(t, len(msg), n) +} + +func assertLine(t *testing.T, out <-chan line, source, msg string, before, after time.Time) { + ln, ok := <-out + assert.True(t, ok) + assert.NoError(t, ln.Err) + assert.Equal(t, "mine", ln.source) + assert.Equal(t, msg, ln.Text) + assert.True(t, before.Before(ln.Time)) + assert.True(t, after.After(ln.Time)) +}