Skip to content

Commit

Permalink
Merge pull request #48 from Icinga/clean-up-events
Browse files Browse the repository at this point in the history
Delete events older than 24 hours
  • Loading branch information
lippserd authored Dec 19, 2023
2 parents e644832 + 5f8d9a7 commit 9f2972e
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 0 deletions.
24 changes: 24 additions & 0 deletions cmd/icinga-kubernetes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
_ "github.com/go-sql-driver/mysql"
"github.com/icinga/icinga-kubernetes/pkg/com"
"github.com/icinga/icinga-kubernetes/pkg/database"
"github.com/icinga/icinga-kubernetes/pkg/periodic"
schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1"
"github.com/icinga/icinga-kubernetes/pkg/sync"
syncv1 "github.com/icinga/icinga-kubernetes/pkg/sync/v1"
Expand Down Expand Up @@ -226,6 +227,29 @@ func main() {

return s.Run(ctx)
})

errs := make(chan error, 1)
defer periodic.Start(ctx, time.Hour, func(tick periodic.Tick) {
olderThan := tick.Time.AddDate(0, 0, -1)

_, err := db.CleanupOlderThan(
ctx, database.CleanupStmt{
Table: "event",
PK: "id",
Column: "created",
}, 5000, olderThan,
)
if err != nil {
select {
case errs <- err:
case <-ctx.Done():
}

return
}
}, periodic.Immediate()).Stop()
com.ErrgroupReceive(g, errs)

if err := g.Wait(); err != nil {
klog.Fatal(err)
}
Expand Down
77 changes: 77 additions & 0 deletions pkg/database/cleanup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package database

import (
"context"
"fmt"
"github.com/icinga/icinga-kubernetes/pkg/com"
"github.com/icinga/icinga-kubernetes/pkg/internal"
"github.com/icinga/icinga-kubernetes/pkg/types"
"time"
)

// CleanupStmt defines information needed to compose cleanup statements.
type CleanupStmt struct {
Table string
PK string
Column string
}

// Build assembles the cleanup statement for the specified database driver with the given limit.
func (stmt *CleanupStmt) Build(driverName string, limit uint64) string {
switch driverName {
case MySQL, "mysql":
return fmt.Sprintf(`DELETE FROM %[1]s WHERE %[2]s < :time
ORDER BY %[2]s LIMIT %[3]d`, stmt.Table, stmt.Column, limit)
case PostgreSQL, "postgres":
return fmt.Sprintf(`WITH rows AS (
SELECT %[1]s FROM %[2]s WHERE %[3]s < :time ORDER BY %[3]s LIMIT %[4]d
)
DELETE FROM %[2]s WHERE %[1]s IN (SELECT %[1]s FROM rows)`, stmt.PK, stmt.Table, stmt.Column, limit)
default:
panic(fmt.Sprintf("invalid database type %s", driverName))
}
}

// CleanupOlderThan deletes all rows with the specified statement that are older than the given time.
// Deletes a maximum of as many rows per round as defined in count. Actually deleted rows will be passed to onSuccess.
// Returns the total number of rows deleted.
func (db *Database) CleanupOlderThan(
ctx context.Context, stmt CleanupStmt,
count uint64, olderThan time.Time, onSuccess ...OnSuccess[struct{}],
) (uint64, error) {
var counter com.Counter
defer db.periodicLog(ctx, stmt.Build(db.DriverName(), 0), &counter).Stop()

for {
q := db.Rebind(stmt.Build(db.DriverName(), count))
rs, err := db.NamedExecContext(ctx, q, cleanupWhere{
Time: types.UnixMilli(olderThan),
})
if err != nil {
return 0, internal.CantPerformQuery(err, q)
}

n, err := rs.RowsAffected()
if err != nil {
return 0, err
}

counter.Add(uint64(n))

for _, onSuccess := range onSuccess {
if err := onSuccess(ctx, make([]struct{}, n)); err != nil {
return 0, err
}
}

if n < int64(count) {
break
}
}

return counter.Total(), nil
}

type cleanupWhere struct {
Time types.UnixMilli
}
3 changes: 3 additions & 0 deletions pkg/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,3 +662,6 @@ func IsStruct(subject interface{}) bool {
return false
}
}

// OnSuccess is a callback for successful (bulk) DML operations.
type OnSuccess[T any] func(ctx context.Context, affectedRows []T) (err error)
8 changes: 8 additions & 0 deletions pkg/internal/internal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package internal

import "github.com/pkg/errors"

// CantPerformQuery wraps the given error with the specified query that cannot be executed.
func CantPerformQuery(err error, q string) error {
return errors.Wrapf(err, "can't perform %q", q)
}

0 comments on commit 9f2972e

Please sign in to comment.