-
Notifications
You must be signed in to change notification settings - Fork 36
/
Copy pathdbr.go
239 lines (202 loc) · 6.1 KB
/
dbr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
package dbr
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/mailru/dbr/dialect"
)
// Open instantiates a Connection for a given database/sql connection
// and event receiver
func Open(driver, dsn string, log EventReceiver) (*Connection, error) {
if log == nil {
log = nullReceiver
}
conn, err := sql.Open(driver, dsn)
if err != nil {
return nil, err
}
var d Dialect
switch driver {
case "mysql":
d = dialect.MySQL
case "postgres":
d = dialect.PostgreSQL
case "sqlite3":
d = dialect.SQLite3
case "clickhouse", "chhttp":
d = dialect.ClickHouse
default:
return nil, ErrNotSupported
}
return &Connection{DBConn: conn, EventReceiver: log, Dialect: d}, nil
}
const (
placeholder = "?"
)
// Connection is a connection to the database with an EventReceiver
// to send events, errors, and timings to
type Connection struct {
DBConn
Dialect Dialect
EventReceiver
}
// Session represents a business unit of execution for some connection
type Session struct {
*Connection
EventReceiver
ctx context.Context
}
// NewSession instantiates a Session for the Connection
func (conn *Connection) NewSession(log EventReceiver) *Session {
return conn.NewSessionContext(context.Background(), log)
}
// NewSessionContext instantiates a Session with context for the Connection
func (conn *Connection) NewSessionContext(ctx context.Context, log EventReceiver) *Session {
if log == nil {
log = conn.EventReceiver // Use parent instrumentation
}
return &Session{Connection: conn, EventReceiver: log, ctx: ctx}
}
// NewSession forks current session
func (sess *Session) NewSession(log EventReceiver) *Session {
if log == nil {
log = sess.EventReceiver
}
return &Session{Connection: sess.Connection, EventReceiver: log, ctx: sess.ctx}
}
// beginTx starts a transaction with context.
func (conn *Connection) beginTx() (*sql.Tx, error) {
return conn.Begin()
}
// SessionRunner can do anything that a Session can except start a transaction.
type SessionRunner interface {
Select(column ...string) SelectBuilder
SelectBySql(query string, value ...interface{}) SelectBuilder
InsertInto(table string) InsertBuilder
InsertBySql(query string, value ...interface{}) InsertBuilder
Update(table string) UpdateBuilder
UpdateBySql(query string, value ...interface{}) UpdateBuilder
DeleteFrom(table string) DeleteBuilder
DeleteBySql(query string, value ...interface{}) DeleteBuilder
}
// DBConn interface for sql.DB
type DBConn interface {
runner
BeginTx(context.Context, *sql.TxOptions) (*sql.Tx, error)
Begin() (*sql.Tx, error)
PingContext(ctx context.Context) error
Ping() error
Stats() sql.DBStats
Close() error
}
type runner interface {
Exec(query string, args ...interface{}) (sql.Result, error)
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
Query(query string, args ...interface{}) (*sql.Rows, error)
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRow(query string, args ...interface{}) *sql.Row
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}
// Executer can execute requests to database
type Executer interface {
Exec() (sql.Result, error)
ExecContext(ctx context.Context) (sql.Result, error)
}
type loader interface {
Load(value interface{}) (int, error)
LoadStruct(value interface{}) error
LoadStructs(value interface{}) (int, error)
LoadValue(value interface{}) error
LoadValues(value interface{}) (int, error)
LoadContext(ctx context.Context, value interface{}) (int, error)
LoadStructContext(ctx context.Context, value interface{}) error
LoadStructsContext(ctx context.Context, value interface{}) (int, error)
LoadValueContext(ctx context.Context, value interface{}) error
LoadValuesContext(ctx context.Context, value interface{}) (int, error)
}
func exec(ctx context.Context, runner runner, log EventReceiver, builder Builder, d Dialect) (sql.Result, error) {
i := interpolator{
Buffer: NewBuffer(),
Dialect: d,
IgnoreBinary: true,
}
err := i.interpolate(placeholder, []interface{}{builder})
query, value := i.String(), i.Value()
if err != nil {
return nil, log.EventErrKv("dbr.exec.interpolate", err, kvs{
"sql": query,
"args": fmt.Sprint(value),
})
}
startTime := time.Now()
defer func() {
log.TimingKv("dbr.exec", time.Since(startTime).Nanoseconds(), kvs{
"sql": query,
})
}()
traceImpl, hasTracingImpl := log.(TracingEventReceiver)
if hasTracingImpl {
ctx = traceImpl.SpanStart(ctx, "dbr.exec", query)
defer traceImpl.SpanFinish(ctx)
}
result, err := runner.Exec(query, value...)
if err != nil {
if hasTracingImpl {
traceImpl.SpanError(ctx, err)
}
return result, log.EventErrKv("dbr.exec.exec", err, kvs{
"sql": query,
})
}
return result, nil
}
func queryRows(ctx context.Context, runner runner, log EventReceiver, builder Builder, d Dialect) (*sql.Rows, string, error) {
i := interpolator{
Buffer: NewBuffer(),
Dialect: d,
IgnoreBinary: true,
}
err := i.interpolate(placeholder, []interface{}{builder})
query, value := i.String(), i.Value()
if err != nil {
return nil, "", log.EventErrKv("dbr.select.interpolate", err, kvs{
"sql": query,
"args": fmt.Sprint(value),
})
}
startTime := time.Now()
defer func() {
log.TimingKv("dbr.select", time.Since(startTime).Nanoseconds(), kvs{
"sql": query,
})
}()
traceImpl, hasTracingImpl := log.(TracingEventReceiver)
if hasTracingImpl {
ctx = traceImpl.SpanStart(ctx, "dbr.select", query)
defer traceImpl.SpanFinish(ctx)
}
rows, err := runner.QueryContext(ctx, query, value...)
if err != nil {
if hasTracingImpl {
traceImpl.SpanError(ctx, err)
}
return nil, query, log.EventErrKv("dbr.select.load.query", err, kvs{
"sql": query,
})
}
return rows, query, nil
}
func query(ctx context.Context, runner runner, log EventReceiver, builder Builder, d Dialect, dest interface{}) (int, error) {
rows, query, err := queryRows(ctx, runner, log, builder, d)
if err != nil {
return 0, err
}
count, err := Load(rows, dest)
if err != nil {
return 0, log.EventErrKv("dbr.select.load.scan", err, kvs{
"sql": query,
})
}
return count, nil
}