-
-
Notifications
You must be signed in to change notification settings - Fork 61
/
Copy pathcolumn_record.go
167 lines (137 loc) · 4.6 KB
/
column_record.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// Copyright (c) Roman Atachiants and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.
package column
import (
"encoding"
"reflect"
"sync"
"unsafe"
"github.com/kelindar/column/commit"
)
type recordType interface {
encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
}
// --------------------------- Record ----------------------------
// columnRecord represents a typed column that is persisted using binary marshaler
type columnRecord struct {
columnString
pool *sync.Pool
}
// ForRecord creates a new column that contains a type marshaled into/from binary. It requires
// a constructor for the type as well as optional merge function. If merge function is
// set to nil, "overwrite" strategy will be used.
func ForRecord[T recordType](new func() T, opts ...func(*option[T])) Column {
mergeFunc := configure(opts, option[T]{
Merge: func(value, delta T) T { return delta },
}).Merge
pool := &sync.Pool{
New: func() any { return new() },
}
// Merge function that decodes, merges and re-encodes records into their
// respective binary representation.
mergeRecord := func(v, d string) string {
value := pool.Get().(T)
delta := pool.Get().(T)
defer pool.Put(value)
defer pool.Put(delta)
// Unmarshal the existing value
err1 := value.UnmarshalBinary(s2b(v))
err2 := delta.UnmarshalBinary(s2b(d))
if err1 != nil || err2 != nil {
return v
}
// Apply the user-defined merging strategy and marshal it back
merged := mergeFunc(value, delta)
if encoded, err := merged.MarshalBinary(); err == nil {
return b2s(&encoded)
}
return v
}
return &columnRecord{
pool: pool,
columnString: columnString{
chunks: make(chunks[string], 0, 4),
option: option[string]{
Merge: mergeRecord,
},
},
}
}
// Value returns the value at the given index
// TODO: should probably get rid of this and use an `rdRecord` instead
func (c *columnRecord) Value(idx uint32) (out any, has bool) {
if v, ok := c.columnString.Value(idx); ok {
out = c.pool.New()
has = out.(encoding.BinaryUnmarshaler).UnmarshalBinary(s2b(v.(string))) == nil
}
return
}
// --------------------------- Writer ----------------------------
// rwRecord represents read-write accessor for primary keys.
type rwRecord struct {
rdRecord
writer *commit.Buffer
}
// Set sets the value at the current transaction index
func (s rwRecord) Set(value encoding.BinaryMarshaler) error {
return s.write(commit.Put, value.MarshalBinary)
}
// Merge atomically merges a delta to the value at the current transaction cursor
func (s rwRecord) Merge(delta encoding.BinaryMarshaler) error {
return s.write(commit.Merge, delta.MarshalBinary)
}
// write writes the operation
func (s rwRecord) write(op commit.OpType, encodeDelta func() ([]byte, error)) error {
v, err := encodeDelta()
if err == nil {
s.writer.PutBytes(op, *s.cursor, v)
}
return err
}
// As creates a read-write accessor for a specific record type.
func (txn *Txn) Record(columnName string) rwRecord {
return rwRecord{
rdRecord: readRecordOf(txn, columnName),
writer: txn.bufferFor(columnName),
}
}
// --------------------------- Reader ----------------------------
// rdRecord represents a read-only accessor for records
type rdRecord reader[*columnRecord]
// Get loads the value at the current transaction index
func (s rdRecord) Get() (any, bool) {
value := s.reader.pool.New().(encoding.BinaryUnmarshaler)
if s.Unmarshal(value.UnmarshalBinary) {
return value, true
}
return nil, false
}
// Unmarshal loads the value at the current transaction index using a
// specified function to decode the value.
func (s rdRecord) Unmarshal(decode func(data []byte) error) bool {
encoded, ok := s.reader.LoadString(*s.cursor)
if !ok {
return false
}
return decode(s2b(encoded)) == nil
}
// readRecordOf creates a read-only accessor for readers
func readRecordOf(txn *Txn, columnName string) rdRecord {
return rdRecord(readerFor[*columnRecord](txn, columnName))
}
// --------------------------- Convert ----------------------------
// b2s converts byte slice to a string without allocating.
func b2s(b *[]byte) string {
return *(*string)(unsafe.Pointer(b))
}
// s2b converts a string to a byte slice without allocating.
func s2b(v string) (b []byte) {
strHeader := (*reflect.StringHeader)(unsafe.Pointer(&v))
byteHeader := (*reflect.SliceHeader)(unsafe.Pointer(&b))
byteHeader.Data = strHeader.Data
l := len(v)
byteHeader.Len = l
byteHeader.Cap = l
return
}