-
Notifications
You must be signed in to change notification settings - Fork 27
/
put.go
164 lines (143 loc) · 4.14 KB
/
put.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package nds
import (
"reflect"
"sync"
"golang.org/x/net/context"
"google.golang.org/appengine"
"google.golang.org/appengine/datastore"
"google.golang.org/appengine/log"
"google.golang.org/appengine/memcache"
)
// putMultiLimit is the App Engine datastore limit for the maximum number
// of entities that can be put by datastore.PutMulti at once.
const putMultiLimit = 500
// PutMulti is a batch version of Put. It works just like datastore.PutMulti
// except it interacts appropriately with NDS's caching strategy. It also
// removes the API limit of 500 entities per request by calling the datastore as
// many times as required to put all the keys. It does this efficiently and
// concurrently.
func PutMulti(c context.Context,
keys []*datastore.Key, vals interface{}) ([]*datastore.Key, error) {
if len(keys) == 0 {
return nil, nil
}
v := reflect.ValueOf(vals)
if err := checkKeysValues(keys, v); err != nil {
return nil, err
}
callCount := (len(keys)-1)/putMultiLimit + 1
putKeys := make([][]*datastore.Key, callCount)
errs := make([]error, callCount)
var wg sync.WaitGroup
wg.Add(callCount)
for i := 0; i < callCount; i++ {
lo := i * putMultiLimit
hi := (i + 1) * putMultiLimit
if hi > len(keys) {
hi = len(keys)
}
go func(i int, keys []*datastore.Key, vals reflect.Value) {
putKeys[i], errs[i] = putMulti(c, keys, vals.Interface())
wg.Done()
}(i, keys[lo:hi], v.Slice(lo, hi))
}
wg.Wait()
if isErrorsNil(errs) {
groupedKeys := make([]*datastore.Key, len(keys))
for i, k := range putKeys {
lo := i * putMultiLimit
hi := (i + 1) * putMultiLimit
if hi > len(keys) {
hi = len(keys)
}
copy(groupedKeys[lo:hi], k)
}
return groupedKeys, nil
}
groupedKeys := make([]*datastore.Key, len(keys))
groupedErrs := make(appengine.MultiError, len(keys))
for i, err := range errs {
lo := i * putMultiLimit
hi := (i + 1) * putMultiLimit
if hi > len(keys) {
hi = len(keys)
}
if me, ok := err.(appengine.MultiError); ok {
for j, e := range me {
if e == nil {
groupedKeys[lo+j] = putKeys[i][j]
} else {
groupedErrs[lo+j] = e
}
}
} else if err != nil {
for j := lo; j < hi; j++ {
groupedErrs[j] = err
}
}
}
return groupedKeys, groupedErrs
}
// Put saves the entity val into the datastore with key. val must be a struct
// pointer; if a struct pointer then any unexported fields of that struct will
// be skipped. If key is an incomplete key, the returned key will be a unique
// key generated by the datastore.
func Put(c context.Context,
key *datastore.Key, val interface{}) (*datastore.Key, error) {
keys := []*datastore.Key{key}
vals := []interface{}{val}
if err := checkKeysValues(keys, reflect.ValueOf(vals)); err != nil {
return nil, err
}
keys, err := putMulti(c, keys, vals)
switch e := err.(type) {
case nil:
return keys[0], nil
case appengine.MultiError:
return nil, e[0]
default:
return nil, err
}
}
// putMulti puts the entities into the datastore and then its local cache.
func putMulti(c context.Context,
keys []*datastore.Key, vals interface{}) ([]*datastore.Key, error) {
lockMemcacheKeys := make([]string, 0, len(keys))
lockMemcacheItems := make([]*memcache.Item, 0, len(keys))
for _, key := range keys {
if !key.Incomplete() {
item := &memcache.Item{
Key: createMemcacheKey(key),
Flags: lockItem,
Value: itemLock(),
Expiration: memcacheLockTime,
}
lockMemcacheItems = append(lockMemcacheItems, item)
lockMemcacheKeys = append(lockMemcacheKeys, item.Key)
}
}
memcacheCtx, err := memcacheContext(c)
if err != nil {
return nil, err
}
defer func() {
if _, ok := transactionFromContext(c); !ok {
// Remove the locks.
if err := memcacheDeleteMulti(memcacheCtx,
lockMemcacheKeys); err != nil {
log.Warningf(c, "putMulti memcache.DeleteMulti %s", err)
}
}
}()
if tx, ok := transactionFromContext(c); ok {
tx.Lock()
tx.lockMemcacheItems = append(tx.lockMemcacheItems,
lockMemcacheItems...)
tx.Unlock()
} else if err := memcacheSetMulti(memcacheCtx,
lockMemcacheItems); err != nil {
return nil, err
}
// Save to the datastore.
return datastorePutMulti(c, keys, vals)
}