Skip to content

Commit

Permalink
kgo: add Buffered{Fetch,Produce}Bytes
Browse files Browse the repository at this point in the history
This slows down fetching a little bit. If it is egregious, we can fix
the perf by tracking the size buffered when processing the fetch itself,
and then adding a new field to batch-untrack the size. That's left as an
exercise for a person that cares.

This is now done since the prior commit introduces buffered produce
bytes, and we may as well add it while fetching for both-sizes
consistency, and we may as well expose it.
  • Loading branch information
twmb committed Sep 16, 2023
1 parent 6ed00e9 commit a155a9f
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 1 deletion.
8 changes: 8 additions & 0 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (o Offset) At(at int64) Offset {

type consumer struct {
bufferedRecords atomicI64
bufferedBytes atomicI64

cl *Client

Expand Down Expand Up @@ -285,6 +286,13 @@ func (cl *Client) BufferedFetchRecords() int64 {
return cl.consumer.bufferedRecords.Load()
}

// BufferedFetchBytes returns the number of bytes currently buffered from
// fetching within the client. This is the sum of all keys, values, and header
// keys/values. See the related [BufferedFetchRecords] for more information.
func (cl *Client) BufferedFetchBytes() int64 {
return cl.consumer.bufferedBytes.Load()
}

type usedCursors map[*cursor]struct{}

func (u *usedCursors) use(c *cursor) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ func (cl *Client) BufferedProduceRecords() int64 {
return cl.producer.bufferedRecords.Load()
}

// BufferedProduceBytes returns the number of bytes currently buffered for
// producing within the client. This is the sum of all keys, values, and header
// keys/values. See the related [BufferedProduceRecords] for more information.
func (cl *Client) BufferedProduceBytes() int64 {
return cl.producer.bufferedBytes.Load()
}

type unknownTopicProduces struct {
buffered []promisedRec
wait chan error // retryable errors
Expand Down
10 changes: 9 additions & 1 deletion pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,16 +323,24 @@ func (s *source) hook(f *Fetch, buffered, polled bool) {
})

var nrecs int
var nbytes int64
for i := range f.Topics {
t := &f.Topics[i]
for j := range t.Partitions {

Check warning on line 329 in pkg/kgo/source.go

View workflow job for this annotation

GitHub Actions / golangci-lint on amd64

empty-lines: extra empty line at the end of a block (revive)
nrecs += len(t.Partitions[j].Records)
p := &t.Partitions[j]
nrecs += len(p.Records)
for k := range p.Records {
nbytes += p.Records[k].userSize()
}

Check failure on line 335 in pkg/kgo/source.go

View workflow job for this annotation

GitHub Actions / golangci-lint on amd64

unnecessary trailing newline (whitespace)
}
}
if buffered {
s.cl.consumer.bufferedRecords.Add(int64(nrecs))
s.cl.consumer.bufferedBytes.Add(nbytes)
} else {
s.cl.consumer.bufferedRecords.Add(-int64(nrecs))
s.cl.consumer.bufferedBytes.Add(-nbytes)
}
}

Expand Down

0 comments on commit a155a9f

Please sign in to comment.