Skip to content

Commit

Permalink
[Winlogbeat] Add configuration for registry file flush timeout (#29053)…
Browse files Browse the repository at this point in the history
… (#29232)

- Added a new option ("winlogbeat.registry_flush") for configuring the
timeout for writing registry entries to disk. The previously hard-coded
value of 5 seconds is now the default value.

(cherry picked from commit 1bd0da5)

Co-authored-by: Taylor Swanson <[email protected]>
  • Loading branch information
mergify[bot] and taylor-swanson authored Dec 9, 2021
1 parent 7e56c4a commit 1d524ad
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 70 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Winlogbeat*

- Add configuration option for registry file flush timeout {issue}29001[29001] {pull}29053[29053]

*Elastic Log Driver*

Expand Down
6 changes: 6 additions & 0 deletions winlogbeat/_meta/config/header.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
# directory in which it was started.
#winlogbeat.registry_file: .winlogbeat.yml

# The timeout value that controls when registry entries are written to disk
# (flushed). When an unwritten update exceeds this value, it triggers a write
# to disk. When flush is set to 0s, the registry is written to disk after each
# batch of events has been published successfully. The default value is 5s.
#winlogbeat.registry_flush: 5s

{{end -}}
# event_logs specifies a list of event logs to monitor as well as any
# accompanying options. The YAML data type of event_logs is a list of
Expand Down
2 changes: 1 addition & 1 deletion winlogbeat/beater/winlogbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (eb *Winlogbeat) setup(b *beat.Beat) error {
config := &eb.config

var err error
eb.checkpoint, err = checkpoint.NewCheckpoint(config.RegistryFile, 10, 5*time.Second)
eb.checkpoint, err = checkpoint.NewCheckpoint(config.RegistryFile, config.RegistryFlush)
if err != nil {
return err
}
Expand Down
18 changes: 3 additions & 15 deletions winlogbeat/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type Checkpoint struct {
file string // File where the state is persisted.
fileLock sync.RWMutex // Lock that protects concurrent reads/writes to file.
numUpdates int // Number of updates received since last persisting to disk.
maxUpdates int // Maximum number of updates to buffer before persisting to disk.
flushInterval time.Duration // Maximum time interval that can pass before persisting to disk.
sort []string // Slice used for sorting states map (store to save on mallocs).

Expand Down Expand Up @@ -72,26 +71,18 @@ type EventLogState struct {
// guarantee any in-memory state information is flushed to disk.
//
// file is the name of the file where event log state is persisted as YAML.
// maxUpdates is the maximum number of updates checkpoint will accept before
// triggering a flush to disk. interval is maximum amount of time that can
// pass since the last flush before triggering a flush to disk (minimum value
// is 1s).
func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkpoint, error) {
// interval is maximum amount of time that can pass since the last flush
// before triggering a flush to disk (minimum value is 1s).
func NewCheckpoint(file string, interval time.Duration) (*Checkpoint, error) {
c := &Checkpoint{
done: make(chan struct{}),
file: file,
maxUpdates: maxUpdates,
flushInterval: interval,
sort: make([]string, 0, 10),
states: make(map[string]EventLogState),
save: make(chan EventLogState, 1),
}

// Minimum batch size.
if c.maxUpdates < 1 {
c.maxUpdates = 1
}

// Minimum flush interval.
if c.flushInterval < time.Second {
c.flushInterval = time.Second
Expand Down Expand Up @@ -139,9 +130,6 @@ loop:
c.states[s.Name] = s
c.lock.Unlock()
c.numUpdates++
if c.numUpdates < c.maxUpdates {
continue
}
case <-flushTimer.C:
}

Expand Down
54 changes: 1 addition & 53 deletions winlogbeat/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,58 +56,6 @@ func eventually(t *testing.T, predicate func() (bool, error), timeout time.Durat
t.Fatal("predicate is not true after", timeout)
}

// Test that a write is triggered when the maximum number of updates is reached.
func TestWriteMaxUpdates(t *testing.T) {
dir, err := ioutil.TempDir("", "wlb-checkpoint-test")
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll(dir)
if err != nil {
t.Fatal(err)
}
}()

file := filepath.Join(dir, "some", "new", "dir", ".winlogbeat.yml")
if !assert.False(t, fileExists(file), "%s should not exist", file) {
return
}

cp, err := NewCheckpoint(file, 2, time.Hour)
if err != nil {
t.Fatal(err)
}
defer cp.Shutdown()

// Send update - it's not written to disk but it's in memory.
cp.Persist("App", 1, time.Now(), "")
found := false
eventually(t, func() (bool, error) {
_, found = cp.States()["App"]
return found, nil
}, time.Second*15)
assert.True(t, found)

ps, err := cp.read()
if err != nil {
t.Fatal("read failed", err)
}
assert.Len(t, ps.States, 0)

// Send update - it is written to disk.
cp.Persist("App", 2, time.Now(), "")
eventually(t, func() (bool, error) {
ps, err = cp.read()
return ps != nil && len(ps.States) > 0, err
}, time.Second*15)

if assert.Len(t, ps.States, 1, "state not written, could be a flush timing issue, retry") {
assert.Equal(t, "App", ps.States[0].Name)
assert.Equal(t, uint64(2), ps.States[0].RecordNumber)
}
}

// Test that a write is triggered when the maximum time period since the last
// write is reached.
func TestWriteTimedFlush(t *testing.T) {
Expand All @@ -127,7 +75,7 @@ func TestWriteTimedFlush(t *testing.T) {
return
}

cp, err := NewCheckpoint(file, 100, time.Second)
cp, err := NewCheckpoint(file, time.Second)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 3 additions & 1 deletion winlogbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ const (

var (
DefaultSettings = WinlogbeatConfig{
RegistryFile: DefaultRegistryFile,
RegistryFile: DefaultRegistryFile,
RegistryFlush: 5 * time.Second,
}
)

// WinlogbeatConfig contains all of Winlogbeat configuration data.
type WinlogbeatConfig struct {
EventLogs []*common.Config `config:"event_logs"`
RegistryFile string `config:"registry_file"`
RegistryFlush time.Duration `config:"registry_flush"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
}

Expand Down
17 changes: 17 additions & 0 deletions winlogbeat/docs/winlogbeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ backslashes (\) for Windows compatibility. You can use either forward or
backslashes. Forward slashes are easier to work with in YAML because there is no
need to escape them.

[float]
==== `registry_flush`

The timeout value that controls when registry entries are written to disk
(flushed). When an unwritten update exceeds this value, it triggers a write
to disk. When flush is set to 0s, the registry is written to disk after each
batch of events has been published successfully.

The default value is 5s.

Valid time units are `ns`, `us`, `ms`, `s`, `m`, `h`.

[source,yaml]
--------------------------------------------------------------------------------
winlogbeat.registry_flush: 5s
--------------------------------------------------------------------------------

[float]
==== `shutdown_timeout`

Expand Down
6 changes: 6 additions & 0 deletions winlogbeat/winlogbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
# directory in which it was started.
#winlogbeat.registry_file: .winlogbeat.yml

# The timeout value that controls when registry entries are written to disk
# (flushed). When an unwritten update exceeds this value, it triggers a write
# to disk. When flush is set to 0s, the registry is written to disk after each
# batch of events has been published successfully. The default value is 5s.
#winlogbeat.registry_flush: 5s

# event_logs specifies a list of event logs to monitor as well as any
# accompanying options. The YAML data type of event_logs is a list of
# dictionaries.
Expand Down
6 changes: 6 additions & 0 deletions x-pack/winlogbeat/winlogbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
# directory in which it was started.
#winlogbeat.registry_file: .winlogbeat.yml

# The timeout value that controls when registry entries are written to disk
# (flushed). When an unwritten update exceeds this value, it triggers a write
# to disk. When flush is set to 0s, the registry is written to disk after each
# batch of events has been published successfully. The default value is 5s.
#winlogbeat.registry_flush: 5s

# event_logs specifies a list of event logs to monitor as well as any
# accompanying options. The YAML data type of event_logs is a list of
# dictionaries.
Expand Down

0 comments on commit 1d524ad

Please sign in to comment.