-
Notifications
You must be signed in to change notification settings - Fork 0
/
postgresql_storage.go
144 lines (120 loc) · 3.74 KB
/
postgresql_storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package postgresql_storage
import (
"context"
"fmt"
_ "github.com/lib/pq"
sql_based_storage "github.com/storage-lock/go-sql-based-storage"
"github.com/storage-lock/go-storage"
storage_lock "github.com/storage-lock/go-storage-lock"
"strings"
"time"
)
// PostgresqlStorage 基于Postgresql作为存储引擎
type PostgresqlStorage struct {
// postgresql是sql storage的一种具体实现
*sql_based_storage.SqlBasedStorage
// 创建时所需的各种参数
options *PostgresqlStorageOptions
}
var _ storage.Storage = &PostgresqlStorage{}
func NewPostgresqlStorage(ctx context.Context, options *PostgresqlStorageOptions) (*PostgresqlStorage, error) {
// 参数校验
if err := options.Check(); err != nil {
return nil, err
}
// 先创建SQL Storage
baseStorageOptions := sql_based_storage.NewSqlBasedStorageOptions().
SetTableFullName(options.GetTableFullName()).
SetConnectionManager(options.ConnectionManager).
SetSqlProvider(NewPostgresqlSqlProvider())
basedStorage, err := sql_based_storage.NewSqlBasedStorage(baseStorageOptions)
if err != nil {
return nil, err
}
// 然后再创建自己
postgresqlStorage := &PostgresqlStorage{
SqlBasedStorage: basedStorage,
options: options,
}
// 初始化
err = postgresqlStorage.Init(ctx)
if err != nil {
return nil, err
}
return postgresqlStorage, nil
}
const PostgresqlStorageName = "postgresql-storage"
func (x *PostgresqlStorage) GetName() string {
return PostgresqlStorageName
}
func (x *PostgresqlStorage) Init(ctx context.Context) (returnError error) {
db, err := x.options.ConnectionManager.Take(ctx)
if err != nil {
return err
}
defer func() {
err := x.options.ConnectionManager.Return(ctx, db)
if returnError == nil {
returnError = err
}
}()
// 这个逻辑跟通用的sql storage流程不太一样,所以这里就覆写这个Init方法
// 如果设置了数据库的话需要切换数据库
if x.options.Schema != "" {
// 切换到schema,如果需要的话,但是schema不会自动创建,需要使用者自己创建,会自动创建的只有存放锁信息的表
_, err = db.ExecContext(ctx, fmt.Sprintf("SET search_path TO %s ", x.options.Schema))
if err != nil {
return err
}
}
// 创建存储锁信息需要的表
createTableSql := `CREATE TABLE IF NOT EXISTS %s (
lock_id VARCHAR(255) NOT NULL PRIMARY KEY,
owner_id VARCHAR(255) NOT NULL,
version BIGINT NOT NULL,
lock_information_json_string VARCHAR(255) NOT NULL
)`
_, err = db.ExecContext(ctx, fmt.Sprintf(createTableSql, x.options.GetTableName()))
if err != nil {
return err
}
return nil
}
// duplicate key value violates unique constraint
func (x *PostgresqlStorage) CreateWithVersion(ctx context.Context, lockId string, version storage.Version, lockInformation *storage.LockInformation) (returnError error) {
returnError = x.SqlBasedStorage.CreateWithVersion(ctx, lockId, version, lockInformation)
if returnError != nil {
// 把重复转为版本miss
// panic: pq: duplicate key value violates unique constraint "storage_lock_pkey"
if strings.Contains(returnError.Error(), "duplicate key value violates unique constraint") {
return storage_lock.ErrVersionMiss
}
}
return returnError
}
func (x *PostgresqlStorage) GetTime(ctx context.Context) (time.Time, error) {
db, err := x.options.ConnectionManager.Take(ctx)
if err != nil {
return time.Time{}, err
}
defer func() {
_ = x.options.ConnectionManager.Return(ctx, db)
}()
var zero time.Time
rs, err := db.Query("SELECT CURRENT_TIMESTAMP")
if err != nil {
return zero, err
}
defer func() {
_ = rs.Close()
}()
if !rs.Next() {
return zero, ErrQueryPostgresqlServerTime
}
var databaseTime time.Time
err = rs.Scan(&databaseTime)
if err != nil {
return zero, err
}
return databaseTime, nil
}