Skip to content

Commit

Permalink
limit collection lookups and other small refactoring
Browse files Browse the repository at this point in the history
* adds error handling for collection lookup (#216)
  • Loading branch information
henrygd committed Oct 23, 2024
1 parent a975466 commit c7463f2
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 35 deletions.
68 changes: 49 additions & 19 deletions beszel/internal/hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type Hub struct {
am *alerts.AlertManager
um *users.UserManager
rm *records.RecordManager
systemStats *models.Collection
containerStats *models.Collection
}

func NewHub(app *pocketbase.PocketBase) *Hub {
Expand Down Expand Up @@ -125,7 +127,11 @@ func (h *Hub) Run() {
// delete old records once every hour
scheduler.MustAdd("delete old records", "8 * * * *", h.rm.DeleteOldRecords)
// create longer records every 10 minutes
scheduler.MustAdd("create longer records", "*/10 * * * *", h.rm.CreateLongerRecords)
scheduler.MustAdd("create longer records", "*/10 * * * *", func() {
if systemStats, containerStats, err := h.getCollections(); err == nil {
h.rm.CreateLongerRecords([]*models.Collection{systemStats, containerStats})
}
})
scheduler.Start()
return nil
})
Expand Down Expand Up @@ -286,37 +292,61 @@ func (h *Hub) updateSystem(record *models.Record) {
return
}
// update system record
dao := h.app.Dao()
record.Set("status", "up")
record.Set("info", systemData.Info)
if err := h.app.Dao().SaveRecord(record); err != nil {
if err := dao.SaveRecord(record); err != nil {
h.app.Logger().Error("Failed to update record: ", "err", err.Error())
}
// add new system_stats record
system_stats, _ := h.app.Dao().FindCollectionByNameOrId("system_stats")
systemStatsRecord := models.NewRecord(system_stats)
systemStatsRecord.Set("system", record.Id)
systemStatsRecord.Set("stats", systemData.Stats)
systemStatsRecord.Set("type", "1m")
if err := h.app.Dao().SaveRecord(systemStatsRecord); err != nil {
h.app.Logger().Error("Failed to save record: ", "err", err.Error())
}
// add new container_stats record
if len(systemData.Containers) > 0 {
container_stats, _ := h.app.Dao().FindCollectionByNameOrId("container_stats")
containerStatsRecord := models.NewRecord(container_stats)
containerStatsRecord.Set("system", record.Id)
containerStatsRecord.Set("stats", systemData.Containers)
containerStatsRecord.Set("type", "1m")
if err := h.app.Dao().SaveRecord(containerStatsRecord); err != nil {
// add system_stats and container_stats records
if systemStats, containerStats, err := h.getCollections(); err != nil {
h.app.Logger().Error("Failed to get collections: ", "err", err.Error())
} else {
// add new system_stats record
systemStatsRecord := models.NewRecord(systemStats)
systemStatsRecord.Set("system", record.Id)
systemStatsRecord.Set("stats", systemData.Stats)
systemStatsRecord.Set("type", "1m")
if err := dao.SaveRecord(systemStatsRecord); err != nil {
h.app.Logger().Error("Failed to save record: ", "err", err.Error())
}
// add new container_stats record
if len(systemData.Containers) > 0 {
containerStatsRecord := models.NewRecord(containerStats)
containerStatsRecord.Set("system", record.Id)
containerStatsRecord.Set("stats", systemData.Containers)
containerStatsRecord.Set("type", "1m")
if err := dao.SaveRecord(containerStatsRecord); err != nil {
h.app.Logger().Error("Failed to save record: ", "err", err.Error())
}
}
}

// system info alerts (todo: extra fs alerts)
if err := h.am.HandleSystemAlerts(record, systemData.Info, systemData.Stats.Temperatures, systemData.Stats.ExtraFs); err != nil {
h.app.Logger().Error("System alerts error", "err", err.Error())
}
}

// return system_stats and container_stats collections
func (h *Hub) getCollections() (*models.Collection, *models.Collection, error) {
if h.systemStats == nil {
systemStats, err := h.app.Dao().FindCollectionByNameOrId("system_stats")
if err != nil {
return nil, nil, err
}
h.systemStats = systemStats
}
if h.containerStats == nil {
containerStats, err := h.app.Dao().FindCollectionByNameOrId("container_stats")
if err != nil {
return nil, nil, err
}
h.containerStats = containerStats
}
return h.systemStats, h.containerStats, nil
}

// set system to specified status and save record
func (h *Hub) updateSystemStatus(record *models.Record, status string) {
if record.GetString("status") != status {
Expand Down
27 changes: 11 additions & 16 deletions beszel/internal/records/records.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type RecordDeletionData struct {
retention time.Duration
}

type RecordStats []*struct {
type RecordStats []struct {
Stats []byte `db:"stats"`
}

Expand All @@ -41,9 +41,9 @@ func NewRecordManager(app *pocketbase.PocketBase) *RecordManager {
}

// Create longer records by averaging shorter records
func (rm *RecordManager) CreateLongerRecords() {
func (rm *RecordManager) CreateLongerRecords(collections []*models.Collection) {
// start := time.Now()
recordData := []LongerRecordData{
longerRecordData := []LongerRecordData{
{
shorterType: "1m",
// change to 9 from 10 to allow edge case timing or short pauses
Expand Down Expand Up @@ -78,17 +78,11 @@ func (rm *RecordManager) CreateLongerRecords() {
return err
}

// need *models.Collection to create a new record with models.NewRecord
collections := map[string]*models.Collection{}
for _, collectionName := range []string{"system_stats", "container_stats"} {
collection, _ := txDao.FindCollectionByNameOrId(collectionName)
collections[collectionName] = collection
}

// loop through all active systems, time periods, and collections
for _, system := range activeSystems {
// log.Println("processing system", system.GetString("name"))
for _, recordData := range recordData {
for i := range longerRecordData {
recordData := longerRecordData[i]
// log.Println("processing longer record type", recordData.longerType)
// add one minute padding for longer records because they are created slightly later than the job start time
longerRecordPeriod := time.Now().UTC().Add(recordData.longerTimeDuration + time.Minute)
Expand Down Expand Up @@ -165,8 +159,8 @@ func (rm *RecordManager) AverageSystemStats(records RecordStats) system.Stats {
tempCount := float64(0)

var stats system.Stats
for _, record := range records {
json.Unmarshal(record.Stats, &stats)
for i := range records {
json.Unmarshal(records[i].Stats, &stats)
sum.Cpu += stats.Cpu
sum.Mem += stats.Mem
sum.MemUsed += stats.MemUsed
Expand Down Expand Up @@ -268,13 +262,14 @@ func (rm *RecordManager) AverageContainerStats(records RecordStats) []container.
count := float64(len(records))

var containerStats []container.Stats
for _, record := range records {
for i := range records {
// Reset the slice length to 0, but keep the capacity
containerStats = containerStats[:0]
if err := json.Unmarshal(record.Stats, &containerStats); err != nil {
if err := json.Unmarshal(records[i].Stats, &containerStats); err != nil {
return []container.Stats{}
}
for _, stat := range containerStats {
for i := range containerStats {
stat := containerStats[i]
if _, ok := sums[stat.Name]; !ok {
sums[stat.Name] = &container.Stats{Name: stat.Name}
}
Expand Down

0 comments on commit c7463f2

Please sign in to comment.