Skip to content

Commit

Permalink
feat: MySQL slow log parser (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
liweiyi88 authored Feb 24, 2025
1 parent 0ab69f4 commit f481cc4
Show file tree
Hide file tree
Showing 27 changed files with 4,641 additions and 223 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- run: git fetch --force --tags
- uses: actions/setup-go@v3
with:
go-version: '>=1.22.2'
go-version: '>=1.24.0'
cache: true
- uses: goreleaser/goreleaser-action@v4
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
test:
strategy:
matrix:
go-version: [1.22.2]
go-version: [1.24.0]
os: [ubuntu-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.onedump
.onedump
*.DS_Store
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Onedump is a database dump and backup tool that can dump different databases to
* Database backup from different sources to different destinations.
* MySQL dump with zero dependencies (with built-in mysql native dumper).
* Supports dumpers with dependencies (`mysqldump` and `pg_dump`).
* MySQL slow log parser.
* Loads configuration from S3 bucket.
* Compression (use `job.gzip: true` to enable compression).
* Unique filename (use `job.unique: true` to enable unique filename).
Expand Down
12 changes: 12 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ import (

"github.com/liweiyi88/onedump/config"
"github.com/liweiyi88/onedump/handler"
"github.com/liweiyi88/onedump/slow"
"github.com/liweiyi88/onedump/storage/s3"
)

var file, s3Bucket, s3Region, s3AccessKeyId, s3SecretAccessKey, cron string
var sloglog, database, pattern string
var limit int
var mask bool

var rootCmd = &cobra.Command{
Use: "onedump",
Expand Down Expand Up @@ -113,4 +117,12 @@ func init() {
rootCmd.Flags().StringVarP(&s3Region, "aws-region", "r", "", "the aws region to read the config file (optional)")
rootCmd.Flags().StringVarP(&s3AccessKeyId, "aws-key", "k", "", "aws access key id to overwrite the default one. (optional)")
rootCmd.Flags().StringVarP(&s3SecretAccessKey, "aws-secret", "s", "", "aws secret access key to overwrite the default one. (optional)")

slowCmd.Flags().StringVarP(&sloglog, "file", "f", "", "path to the slow log file. a directory can also be specified. (required)")
slowCmd.Flags().StringVarP(&database, "database", "d", string(slow.MySQL), "specify the database engine (optional)")
slowCmd.Flags().StringVarP(&pattern, "pattern", "p", "", "only read files that follow the same pattern, for example *slow.log . (optional)")
slowCmd.Flags().IntVarP(&limit, "limit", "l", 0, "limit the number of results. no limit is set by default. (optional)")
slowCmd.Flags().BoolVarP(&mask, "mask", "m", true, "mask query values. enabled by default. (optional)")
slowCmd.MarkFlagRequired("file")
rootCmd.AddCommand(slowCmd)
}
4 changes: 2 additions & 2 deletions cmd/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestRootCmdWithCron(t *testing.T) {

newFd.Close()
o := bytes.NewBufferString("")
cmd.SetOutput(o)
cmd.SetOut(o)
cmd.SetArgs([]string{"-f", filename, "-c", "1sec"})
err = cmd.Execute()
assert.NotNil(err)
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestRootCmd(t *testing.T) {

newFd.Close()
o := bytes.NewBufferString("")
cmd.SetOutput(o)
cmd.SetOut(o)
err = cmd.Execute()
assert.NotNil(err)
}
33 changes: 33 additions & 0 deletions cmd/slow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package cmd

import (
"encoding/json"
"fmt"
"os"

"github.com/liweiyi88/onedump/slow"
"github.com/spf13/cobra"
)

func isValidDatabase(db string) bool {
return db == string(slow.MySQL) || db == string(slow.PostgreSQL)
}

var slowCmd = &cobra.Command{
Use: "slow",
Short: "Database slow log parser",
Long: "Database slow log parser, it formats the result in json",
RunE: func(cmd *cobra.Command, args []string) error {
if !isValidDatabase(database) {
return fmt.Errorf("unsupported database type: %s, support [mysql]", database)
}

databaseType := slow.DatabaseType(database)
result := slow.Parse(sloglog, databaseType, slow.ParseOptions{Limit: limit, Mask: mask, Pattern: pattern})

encoder := json.NewEncoder(os.Stdout)
encoder.SetEscapeHTML(false)

return encoder.Encode(result)
},
}
39 changes: 39 additions & 0 deletions cmd/slow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package cmd

import (
"bytes"
"io"
"os"
"runtime"
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func TestSlowCmd(t *testing.T) {
// Skip this test on Windows
if runtime.GOOS == "windows" {
t.Skip("Skipping test on Windows")
}

originalStdout := os.Stdout
r, w, _ := os.Pipe() // Create a pipe to capture stdout
os.Stdout = w // Redirect stdout
defer func() { os.Stdout = originalStdout }() // Ensure that os.Stdout is restored after the test

assert := assert.New(t)
cmd := rootCmd
cmd.SetArgs([]string{"slow", "-f", "../testutils/slowlogs/short"})
cmd.Execute()

w.Close()
var buf bytes.Buffer
io.Copy(&buf, r)

// Get the captured output
out := buf.String()

expected := `{"ok":true,"error":"","results":[{"time":"2023-10-15T12:36:05.987654Z","user":"admin[admin]","host_ip":"[192.168.1.101]","query_time":12.890323,"lock_time":0.001456,"rows_sent":100,"rows_examined":100000,"thread_id":0,"errno":0,"killed":0,"bytes_received":0,"bytes_sent":0,"read_first":0,"read_last":0,"read_key":0,"read_next":0,"read_prev":0,"read_rnd":0,"read_rnd_next":0,"sort_merge_passes":0,"sort_range_count":0,"sort_rows":0,"sort_scan_count":0,"created_tmp_disk_tables":0,"created_tmp_tables":0,"count_hit_tmp_table_size":0,"start":"","end":"","query":"SELECT customer_id, COUNT(*) as order_count FROM orders GROUP BY customer_id HAVING order_count > ?"},{"time":"2023-10-15T12:34:56.123456Z","user":"root[root]","host_ip":"localhost []","query_time":9.123456,"lock_time":0.001234,"rows_sent":10,"rows_examined":10000,"thread_id":0,"errno":0,"killed":0,"bytes_received":0,"bytes_sent":0,"read_first":0,"read_last":0,"read_key":0,"read_next":0,"read_prev":0,"read_rnd":0,"read_rnd_next":0,"sort_merge_passes":0,"sort_range_count":0,"sort_rows":0,"sort_scan_count":0,"created_tmp_disk_tables":0,"created_tmp_tables":0,"count_hit_tmp_table_size":0,"start":"","end":"","query":"SELECT * FROM orders WHERE customer_id = ? AND order_date > ? ORDER BY order_date DESC"},{"time":"2023-10-15T12:34:56.123456Z","user":"root[root]","host_ip":"localhost []","query_time":9.123456,"lock_time":0.001234,"rows_sent":10,"rows_examined":10000,"thread_id":0,"errno":0,"killed":0,"bytes_received":0,"bytes_sent":0,"read_first":0,"read_last":0,"read_key":0,"read_next":0,"read_prev":0,"read_rnd":0,"read_rnd_next":0,"sort_merge_passes":0,"sort_range_count":0,"sort_rows":0,"sort_scan_count":0,"created_tmp_disk_tables":0,"created_tmp_tables":0,"count_hit_tmp_table_size":0,"start":"","end":"","query":"SELECT * FROM orders WHERE customer_id = ? AND order_date > ? ORDER BY order_date DESC"},{"time":"2023-10-15T12:36:05.987654Z","user":"admin[admin]","host_ip":"[192.168.1.101]","query_time":7.890123,"lock_time":0.003456,"rows_sent":100,"rows_examined":100000,"thread_id":0,"errno":0,"killed":0,"bytes_received":0,"bytes_sent":0,"read_first":0,"read_last":0,"read_key":0,"read_next":0,"read_prev":0,"read_rnd":0,"read_rnd_next":0,"sort_merge_passes":0,"sort_range_count":0,"sort_rows":0,"sort_scan_count":0,"created_tmp_disk_tables":0,"created_tmp_tables":0,"count_hit_tmp_table_size":0,"start":"","end":"","query":"SELECT customer_id, COUNT(*) as order_count FROM orders GROUP BY customer_id HAVING order_count > ?"},{"time":"2023-10-15T12:36:05.987654Z","user":"admin[admin]","host_ip":"[192.168.1.101]","query_time":7.890123,"lock_time":0.003456,"rows_sent":100,"rows_examined":100000,"thread_id":0,"errno":0,"killed":0,"bytes_received":0,"bytes_sent":0,"read_first":0,"read_last":0,"read_key":0,"read_next":0,"read_prev":0,"read_rnd":0,"read_rnd_next":0,"sort_merge_passes":0,"sort_range_count":0,"sort_rows":0,"sort_scan_count":0,"created_tmp_disk_tables":0,"created_tmp_tables":0,"count_hit_tmp_table_size":0,"start":"","end":"","query":"SELECT customer_id, COUNT(*) as order_count FROM orders GROUP BY customer_id HAVING order_count > ?"},{"time":"2023-10-15T12:35:10.654321Z","user":"app_user[app_user]","host_ip":"[192.168.1.100]","query_time":3.456789,"lock_time":0.002345,"rows_sent":1,"rows_examined":5000,"thread_id":0,"errno":0,"killed":0,"bytes_received":0,"bytes_sent":0,"read_first":0,"read_last":0,"read_key":0,"read_next":0,"read_prev":0,"read_rnd":0,"read_rnd_next":0,"sort_merge_passes":0,"sort_range_count":0,"sort_rows":0,"sort_scan_count":0,"created_tmp_disk_tables":0,"created_tmp_tables":0,"count_hit_tmp_table_size":0,"start":"","end":"","query":"UPDATE products SET stock = stock - ? WHERE product_id = ?"},{"time":"2023-10-15T12:35:10.654321Z","user":"app_user[app_user]","host_ip":"[192.168.1.100]","query_time":3.456789,"lock_time":0.002345,"rows_sent":1,"rows_examined":5000,"thread_id":0,"errno":0,"killed":0,"bytes_received":0,"bytes_sent":0,"read_first":0,"read_last":0,"read_key":0,"read_next":0,"read_prev":0,"read_rnd":0,"read_rnd_next":0,"sort_merge_passes":0,"sort_range_count":0,"sort_rows":0,"sort_scan_count":0,"created_tmp_disk_tables":0,"created_tmp_tables":0,"count_hit_tmp_table_size":0,"start":"","end":"","query":"UPDATE products SET stock = stock - ? WHERE product_id = ?"}]}`
assert.Equal(expected, strings.TrimSpace(string(out)))
}
6 changes: 3 additions & 3 deletions dumper/mysqldump.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package dumper
import (
"fmt"
"io"
"log"
"log/slog"
"net"
"os"
"os/exec"
Expand Down Expand Up @@ -130,7 +130,7 @@ host = %s`
defer func() {
err := file.Close()
if err != nil {
log.Printf("failed to close temp file for storing mysql credentials: %v", err)
slog.Error("fail to close temp file for storing mysql credentials", slog.Any("error", err))
}
}()

Expand Down Expand Up @@ -163,7 +163,7 @@ func (mysql *MysqlDump) close() error {
func (mysql *MysqlDump) Dump(storage io.Writer) error {
defer func() {
if err := mysql.close(); err != nil {
log.Printf("could not mysqldump credential files db driver: %v", err)
slog.Error("could not mysqldump credential files db driver", slog.Any("error", err))
}
}()

Expand Down
8 changes: 4 additions & 4 deletions dumper/pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package dumper
import (
"fmt"
"io"
"log"
"log/slog"
"os"
"os/exec"
"strconv"
Expand Down Expand Up @@ -113,7 +113,7 @@ func (psql *PgDump) createCredentialFile() (string, error) {

defer func() {
if err := file.Close(); err != nil {
log.Printf("could not close file: %s, err: %v", file.Name(), err)
slog.Error("could not close file", slog.Any("error", err), slog.Any("file", file.Name()))
}
}()

Expand All @@ -124,7 +124,7 @@ func (psql *PgDump) createCredentialFile() (string, error) {
}

if err = os.Chmod(file.Name(), 0600); err != nil {
log.Printf("could not change file permissoin, file: %s, error: %v", file.Name(), err)
slog.Error("could not change file permissoin", slog.Any("error", err), slog.Any("file", file.Name()))
}

psql.credentialFiles = append(psql.credentialFiles, file.Name())
Expand All @@ -135,7 +135,7 @@ func (psql *PgDump) createCredentialFile() (string, error) {
func (psql *PgDump) Dump(storage io.Writer) error {
defer func() {
if err := psql.close(); err != nil {
log.Printf("could not pgdump credential files db driver: %v", err)
slog.Error("could not pgdump credential files db driver", slog.Any("error", err))
}
}()

Expand Down
4 changes: 2 additions & 2 deletions dumper/runner/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"bytes"
"fmt"
"io"
"log"
"log/slog"

"github.com/liweiyi88/onedump/dumper/dialer"
)
Expand Down Expand Up @@ -32,7 +32,7 @@ func (runner *SshRunner) Run(writer io.Writer) error {
// Do not need to call session.Close() here as it will only give EOF error.
err = client.Close()
if err != nil {
log.Printf("failed to close ssh client: %v", err)
slog.Error("fail to close ssh client", slog.Any("error", err))
}
}()

Expand Down
100 changes: 79 additions & 21 deletions fileutil/filenutil.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,30 @@
package fileutil

import (
"log"
"bytes"
"fmt"
"io"
"log/slog"
"math/rand"
"os"
"path/filepath"
"time"
)

// Ensure a file has unique name when necessary.
func ensureUniqueness(path string, unique bool) string {
if !unique {
return path
}

dir, filename := filepath.Split(path)

now := time.Now().UTC().Format("20060102150405")
filename = now + "-" + filename

return filepath.Join(dir, filename)
}

// Ensure a file has proper file extension.
func EnsureFileSuffix(filename string, shouldGzip bool) string {
if !shouldGzip {
Expand All @@ -22,37 +39,64 @@ func EnsureFileSuffix(filename string, shouldGzip bool) string {
return filename + ".gz"
}

// Ensure a file has unique name when necessary.
func ensureUniqueness(path string, unique bool) string {
if !unique {
return path
func EnsureFileName(path string, shouldGzip, unique bool) string {
p := EnsureFileSuffix(path, shouldGzip)
return ensureUniqueness(p, unique)
}

func IsGzipped(filename string) bool {
file, err := os.Open(filename)

if err != nil {
return false
}

dir, filename := filepath.Split(path)
defer func() {
err := file.Close()
if err != nil {
slog.Error("fail to close file", slog.Any("error", err), slog.String("filename", file.Name()))
}
}()

now := time.Now().UTC().Format("20060102150405")
filename = now + "-" + filename
buf := make([]byte, 2)

return filepath.Join(dir, filename)
}
_, err = io.ReadFull(file, buf)

func EnsureFileName(path string, shouldGzip, unique bool) string {
p := EnsureFileSuffix(path, shouldGzip)
return ensureUniqueness(p, unique)
if err != nil {
return false
}

return bytes.Equal(buf, []byte{0x1f, 0x8b})
}

func WorkDir() string {
dir, err := os.Getwd()
func ListFiles(dir, pattern string) ([]string, error) {
entries, err := os.ReadDir(dir)
if err != nil {
log.Printf("Cannot get the current directory: %v, using $HOME directory!", err)
dir, err = os.UserHomeDir()
if err != nil {
log.Printf("Cannot get the user home directory: %v, using /tmp directory!", err)
dir = os.TempDir()
return nil, err
}

var files []string

for _, v := range entries {
if v.IsDir() {
continue
}

if pattern != "" {
matched, err := filepath.Match(pattern, v.Name())
if err != nil {
return nil, fmt.Errorf("invalid pattern: %w", err)
}

if !matched {
continue
}
}

files = append(files, filepath.Join(dir, v.Name()))
}

return dir
return files, nil
}

func GenerateRandomName(n int) string {
Expand All @@ -64,3 +108,17 @@ func GenerateRandomName(n int) string {
}
return string(b)
}

func WorkDir() string {
dir, err := os.Getwd()
if err != nil {
slog.Error("can not get the current directory, use $HOME instead", slog.Any("error", err))
dir, err = os.UserHomeDir()
if err != nil {
slog.Error("can not get the user home directory, use /tmp instead", slog.Any("error", err))
dir = os.TempDir()
}
}

return dir
}
Loading

0 comments on commit f481cc4

Please sign in to comment.