Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(dataobj): Reintroduce sorting of the logs section #15906

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

rfratto
Copy link
Member

@rfratto rfratto commented Jan 22, 2025

This PR reintroduces sorting in the logs section by:

  1. Accumulating records in memory up to a buffer size, then flushing those records in sorted order into smaller intermediate tables.

  2. Accumulating smaller tables in memory up to a section size, then merging those smaller tables into a larger, sorted section.

As a result of this, a written data object may now have several logs sections.

I have done a fair amount of work testing this and making performance improvements, which are scattered throughout the PR. I split my changes up cleanly across 7 commits to make this easier to review.

Resources used by iter.Pull can be garbage collected when one of the
following are true:

1. The `stop` function returned by iter.Pull is called, or
2. The `next` function returned by iter.Pull is called past the final
   element.

dataset.Iter stored the stop function, but forgot to call it, and it
only consumed up to the final element, but not past it.

As a result, any Column passed to dataset.Iter was being permanently
retained in memory. If a MemColumn as provided, then all pages were also
unable to be garbage collected.
dataobj reuses slices as much as possible to avoid extra allocations.
This is done by truncating a slice to length zero (`slice[:0]`).
However, any element from before the slice is truncated is still
reachable and won't be garbage collected.

Properly clearing and reusing a slice requires first calling `clear` on
the slice, and _then_ truncating the length.

There were two places in the code where clearing the slice was done
incorrectly.

To avoid running into this again, a helper package `sliceclear` has been
introduced which clears the slice and then truncates its length.
zstd.SpeedBestCompression offers great compression for very high CPU
cost. While the cost of this is diminished if writes are streamed as
data is read from Kafka, the CPU cost becomes untenable if compression
is done in a single pass.

Rather than picking a universal compression level for Zstd, this commit
adds CompressionOptions to allow callers to opt-in to different
compression levels based on their needs.

As a result of this, the compression for the logs and streams sections
have dropped to zstd.SpeedDefault. This decreases the compression ratio
slightly but greatly reduces CPU usage.
Pools of *bytes.Buffer can have their memory footprint slowly increase
as the capacity of pooled buffers grows. For example, if two callers use
a pool in parallel, all buffers in the pool will eventually have the
capacity required by the larger caller.

This problem exacerbates if a larger capacity is only needed very
rarely; usage of the pool can keep elements alive. This means the
oversized buffers can be slow to reclaim, if at all.

The new bufpool package provides a utility to bucket *bytes.Buffer pools
by the capacity of the buffers, with buckets starting from 1 KiB and
doubling in size up to 64 GiB.

Using bufpool helps bound the growth of pools and limits the lifetime of
one-off massive allocations.
Reusing Zstd decoders reduces CPU and memory consumption by several
orders of magnitude:

goos: darwin
goarch: arm64
pkg: github.com/grafana/loki/v3/pkg/dataobj/internal/dataset
cpu: Apple M2 Ultra
                          │ /Users/robert/before.txt │       /Users/robert/after.txt       │
                          │          sec/op          │   sec/op     vs base                │
_pageBuilder_WriteRead-24             348.229µ ± 26%   1.899µ ± 4%  -99.45% (p=0.000 n=10)

                          │ /Users/robert/before.txt │       /Users/robert/after.txt       │
                          │           B/op           │    B/op     vs base                 │
_pageBuilder_WriteRead-24             9596952.5 ± 0%   144.0 ± 0%  -100.00% (p=0.000 n=10)

                          │ /Users/robert/before.txt │      /Users/robert/after.txt       │
                          │        allocs/op         │ allocs/op   vs base                │
_pageBuilder_WriteRead-24                48.000 ± 2%   3.000 ± 0%  -93.75% (p=0.000 n=10)

See the comments in fixedZstdReader for why pooling was not used here.
This commit reintroduces sorting in the logs section by:

* Accumulating records in memory, and then flushing them in sorted order
  into smaller tables.

* Accumulating smaller tables and using heap sort to merge them into a
  section.

As a result of this, a written data object may now have several logs
sections.
@rfratto rfratto requested a review from a team as a code owner January 22, 2025 21:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant