Skip to content

Commit

Permalink
Add option to delety Entrieshistory with a max number of stored tasks…
Browse files Browse the repository at this point in the history
…, add GetWorkersThreads to task loop limit in db
  • Loading branch information
r4ulcl committed Dec 17, 2024
1 parent 928ac0c commit 9d8f684
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 5 deletions.
3 changes: 2 additions & 1 deletion manager.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@
"dbPort": "3306",
"dbDatabase": "manager",
"diskPath": "",
"certFolder": "./certs/manager/"
"certFolder": "./certs/manager/",
"maxTaskHistory": 100000
}
63 changes: 63 additions & 0 deletions manager/database/DBtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,3 +516,66 @@ func GetCountByStatus(status string, db *sql.DB, verbose, debug bool) (int, erro

return count, nil
}

// DeleteMaxEntriesHistory Delete Database Entries if entries number > maxEntries
func DeleteMaxEntriesHistory(db *sql.DB, maxEntries int, tableName string, verbose, debug bool, wg *sync.WaitGroup) error {
defer wg.Done()
wg.Add(1)
// Step 1: Count total entries in the table
countQuery := "SELECT COUNT(*) FROM " + tableName + " WHERE status = 'done'"
var totalEntries int
if err := db.QueryRow(countQuery).Scan(&totalEntries); err != nil {
return fmt.Errorf("failed to count entries in table %s: %w", tableName, err)
}
if verbose || debug {
log.Printf("DeleteMaxEntriesHistory - Table %s has %d entries\n", tableName, totalEntries)
}

// Step 2: If total entries are within the limit, return
if totalEntries <= maxEntries {
if debug {
log.Printf("DeleteMaxEntriesHistory - No deletion needed; %d <= %d\n", totalEntries, maxEntries)
}
return nil
}

// Step 3: Calculate the number of entries to delete
entriesToDelete := totalEntries - maxEntries
if debug {
log.Printf("DeleteMaxEntriesHistory - Need to delete %d entries from table %s\n", entriesToDelete, tableName)
}

// Step 4: Delete the oldest entries
// Assuming the table has columns `id` and `created_at`. Adjust as needed.
deleteQuery := fmt.Sprintf(`
WITH cte AS (
SELECT ID
FROM %s
WHERE status = "done"
ORDER BY createdAt ASC
LIMIT ?
)
DELETE FROM %s
WHERE ID IN (SELECT ID FROM cte)
`, tableName, tableName)

result, err := db.Exec(deleteQuery, entriesToDelete)
if err != nil {
return fmt.Errorf("DeleteMaxEntriesHistory - failed to delete old entries from table %s: %w", tableName, err)
}

if debug {
log.Println("DeleteMaxEntriesHistory - db.Exec", result, err)
}

// Step 5: Log the number of rows affected
rowsDeleted, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("DeleteMaxEntriesHistory - failed to fetch rows affected: %w", err)
}
if verbose || debug {
log.Printf("DeleteMaxEntriesHistory - Deleted %d old entries from table %s\n", rowsDeleted, tableName)
}

return nil
}
5 changes: 5 additions & 0 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ func loadManagerConfigurations(configFile string, verbose, debug bool) (*utils.M
config.StatusCheckDown = 360
}

if config.MaxTaskHistory < 0 {
config.MaxTaskHistory = 0
}

return config, nil
}

Expand Down Expand Up @@ -308,6 +312,7 @@ func startSSHBackgroundTask(configSSH *utils.ManagerSSHConfig, config *utils.Man
func startBackgroundTask(db *sql.DB, config *utils.ManagerConfig, wg *sync.WaitGroup, writeLock *sync.Mutex, verbose, debug bool) {
go utils.VerifyWorkersLoop(db, config, verbose, debug, wg, writeLock)
go utils.ManageTasks(config, db, verbose, debug, wg, writeLock)
go utils.DeleteMaxTaskHistoryLoop(db, config, verbose, debug, wg)
}

func setupAndStartServers(swagger bool, config *utils.ManagerConfig, db *sql.DB, wg *sync.WaitGroup, writeLock *sync.Mutex, verbose, debug bool) {
Expand Down
6 changes: 4 additions & 2 deletions manager/utils/manageTasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ func ManageTasks(config *ManagerConfig, db *sql.DB, verbose, debug bool, wg *syn
// infinite loop eecuted with go routine
for {
// Get all tasks in order and if priority
tasks, err := database.GetTasksPending(100, db, verbose, debug)
workersThreads := getWorkersThreads(db, verbose, debug) + 10
tasks, err := database.GetTasksPending(workersThreads, db, verbose, debug)
if err != nil {
log.Println(err.Error())
}
Expand Down Expand Up @@ -57,8 +58,9 @@ func ManageTasks(config *ManagerConfig, db *sql.DB, verbose, debug bool, wg *syn
} else {
if len(tasks) == 0 {
time.Sleep(time.Second * 1)
} else if len(workers) == 0 {
time.Sleep(time.Millisecond * 500)
}
}
time.Sleep(time.Millisecond * 500)
}
}
1 change: 1 addition & 0 deletions manager/utils/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ManagerConfig struct {
CertFolder string `json:"certFolder"`
ClientHTTP *http.Client `json:"clientHTTP"`
WebSockets map[string]*websocket.Conn `json:"webSockets"`
MaxTaskHistory int `json:"maxTaskHistory"`
}

// ManagerSSHConfig manager SSH config struct
Expand Down
41 changes: 39 additions & 2 deletions manager/utils/workerRequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,49 @@ func VerifyWorkersLoop(db *sql.DB, config *ManagerConfig, verbose, debug bool, w
}
}

// DeleteMaxTaskHistoryLoop Loop and Delete Database Entries if num tasks > config.MaxTaskHistory
func DeleteMaxTaskHistoryLoop(db *sql.DB, config *ManagerConfig, verbose, debug bool, wg *sync.WaitGroup) {
maxEntries := config.MaxTaskHistory
tableName := "task"
if maxEntries > 0 {
for {
err := database.DeleteMaxEntriesHistory(db, maxEntries, tableName, verbose, debug, wg)
if err != nil && (verbose || debug) {
log.Println("Error DeleteMaxEntriesHistory:", err)
}
time.Sleep(1 * time.Hour)
}
}
}

// getWorkersThreads get DefaultThreads of all workers
func getWorkersThreads(db *sql.DB, verbose, debug bool) int {

workersThreads := 0
// Get all workers from the database
workers, err := database.GetWorkers(db, verbose, debug)
if err != nil {
log.Print("GetWorker", err)
}

// Verify each worker
for _, worker := range workers {
workersThreads += worker.DefaultThreads
}

if debug {
log.Println("getWorkersThreads workersThreads", workersThreads)
}

return workersThreads
}

// verifyWorkers checks and sets if the workers are UP.
func verifyWorkers(db *sql.DB, config *ManagerConfig, verbose, debug bool, wg *sync.WaitGroup, writeLock *sync.Mutex) {
// Get all UP workers from the database
// Get all workers from the database
workers, err := database.GetWorkers(db, verbose, debug)
if err != nil {
log.Print("GetWorkerUP", err)
log.Print("GetWorker", err)
}

// Verify each worker
Expand Down

0 comments on commit 9d8f684

Please sign in to comment.