Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go Client v8.0.1 #463

Merged
merged 3 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Change History

## February 5 2025: v8.0.1

- **Fixes**
- [CLIENT-3305] New key struct causes incorrect generation increment in MRT.
- [CLIENT-3326] Fix UDF can read a record with an expired transaction.

## January 22 2025: v8.0.0

- **New Features**
Expand Down
122 changes: 115 additions & 7 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package aerospike

import (
"sync"
"sync/atomic"
"time"

sm "github.com/aerospike/aerospike-client-go/v8/internal/atomic/map"
"github.com/aerospike/aerospike-client-go/v8/types"
)

Expand All @@ -41,8 +41,8 @@ func init() {
// Transaction. Each command in the Transaction must use the same namespace.
type Txn struct {
id int64
reads sm.Map[*Key, *uint64]
writes sm.Map[*Key, struct{}]
reads keyMap[*uint64]
writes keyMap[struct{}]
state TxnState
namespace *string
timeout int
Expand All @@ -58,8 +58,8 @@ type Txn struct {
func NewTxn() *Txn {
return &Txn{
id: createTxnId(),
reads: *sm.New[*Key, *uint64](16),
writes: *sm.New[*Key, struct{}](16),
reads: *newKeyMap[*uint64](16),
writes: *newKeyMap[struct{}](16),
timeout: 0,
state: TxnStateOpen,
}
Expand All @@ -80,8 +80,8 @@ func NewTxnWithCapacity(readsCapacity, writesCapacity int) *Txn {

return &Txn{
id: createTxnId(),
reads: *sm.New[*Key, *uint64](readsCapacity),
writes: *sm.New[*Key, struct{}](writesCapacity),
reads: *newKeyMap[*uint64](readsCapacity),
writes: *newKeyMap[struct{}](writesCapacity),
timeout: 0,
state: TxnStateOpen,
}
Expand Down Expand Up @@ -306,3 +306,111 @@ func (txn *Txn) Clear() {
txn.reads.Clear()
txn.writes.Clear()
}

////////////////////////////////////////////////////////////////////////////
//
// Specialized internal data type to simplify key bookkeeping
//
////////////////////////////////////////////////////////////////////////////

type keyTupple[V any] struct {
k *Key
v V
}

// keyMap implements a keyMap with atomic semantics.
type keyMap[V any] struct {
m map[[20]byte]*keyTupple[V]
mutex sync.RWMutex
}

// New generates a new Map instance.
func newKeyMap[V any](length int) *keyMap[V] {
return &keyMap[V]{
m: make(map[[20]byte]*keyTupple[V], length),
}
}

// Exists atomically checks if a key exists in the map
func (m *keyMap[V]) Exists(k *Key) bool {
if k != nil {
m.mutex.RLock()
_, ok := m.m[k.digest]
m.mutex.RUnlock()
return ok
}
return false
}

// Get atomically retrieves an element from the Map.
func (m *keyMap[V]) Get(k *Key) V {
if k != nil {
m.mutex.RLock()
res, found := m.m[k.digest]
m.mutex.RUnlock()
if found {
return res.v
}
}

var zero V
return zero
}

// Set atomically sets an element in the Map.
// If idx is out of range, it will return an error.
func (m *keyMap[V]) Set(k *Key, v V) {
if k != nil {
m.mutex.Lock()
m.m[k.digest] = &keyTupple[V]{k: k, v: v}
m.mutex.Unlock()
}
}

// Clone copies the map and returns the copy.
func (m *keyMap[V]) Clone() map[*Key]V {
m.mutex.RLock()
res := make(map[*Key]V, len(m.m))
for _, v := range m.m {
res[v.k] = v.v
}
m.mutex.RUnlock()

return res
}

// Returns the keys from the map.
func (m *keyMap[V]) Keys() []*Key {
m.mutex.RLock()
res := make([]*Key, 0, len(m.m))
for _, v := range m.m {
res = append(res, v.k)
}
m.mutex.RUnlock()

return res
}

// Clear will remove all entries.
func (m *keyMap[V]) Clear() {
m.mutex.Lock()
m.m = make(map[[20]byte]*keyTupple[V], len(m.m))
m.mutex.Unlock()
}

// Delete will remove the key and return its value.
func (m *keyMap[V]) Delete(k *Key) V {
if k != nil {
m.mutex.Lock()
res, ok := m.m[k.digest]
delete(m.m, k.digest)
m.mutex.Unlock()

if ok {
return res.v
}
}

var zero V
return zero
}
54 changes: 40 additions & 14 deletions txn_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,60 +28,86 @@ const binNameDigests = "keyds"

func (tm *TxnMonitor) addKey(cluster *Cluster, policy *WritePolicy, cmdKey *Key) Error {
txn := policy.Txn
if err := txn.VerifyCommand(); err != nil {
return err
}

if txn.WriteExistsForKey(cmdKey) {
// Transaction monitor already contains this key.
return nil
}

ops := tm.getTranOps(txn, cmdKey)
ops, err := tm.getTranOps(txn, cmdKey)
if err != nil {
return err
}
return tm.addWriteKeys(cluster, policy.GetBasePolicy(), ops)
}

func (tm *TxnMonitor) addKeys(cluster *Cluster, policy *BatchPolicy, keys []*Key) Error {
ops := tm.getTranOpsFromKeys(policy.Txn, keys)
ops, err := tm.getTranOpsFromKeys(policy.Txn, keys)
if err != nil {
return err
}
return tm.addWriteKeys(cluster, policy.GetBasePolicy(), ops)
}

func (tm *TxnMonitor) addKeysFromRecords(cluster *Cluster, policy *BatchPolicy, records []BatchRecordIfc) Error {
ops := tm.getTranOpsFromBatchRecords(policy.Txn, records)
ops, err := tm.getTranOpsFromBatchRecords(policy.Txn, records)
if err != nil {
return err
}

if len(ops) > 0 {
return tm.addWriteKeys(cluster, policy.GetBasePolicy(), ops)
}
return nil
}

func (tm *TxnMonitor) getTranOps(txn *Txn, cmdKey *Key) []*Operation {
txn.SetNamespace(cmdKey.namespace)
func (tm *TxnMonitor) getTranOps(txn *Txn, cmdKey *Key) ([]*Operation, Error) {
if err := txn.SetNamespace(cmdKey.namespace); err != nil {
return nil, err
}

if txn.MonitorExists() {
return []*Operation{
ListAppendWithPolicyOp(txnOrderedListPolicy, binNameDigests, cmdKey.Digest()),
}
}, nil
} else {
return []*Operation{
PutOp(NewBin(binNameId, txn.Id())),
ListAppendWithPolicyOp(txnOrderedListPolicy, binNameDigests, cmdKey.Digest()),
}
}, nil
}
}

func (tm *TxnMonitor) getTranOpsFromKeys(txn *Txn, keys []*Key) []*Operation {
func (tm *TxnMonitor) getTranOpsFromKeys(txn *Txn, keys []*Key) ([]*Operation, Error) {
if err := txn.VerifyCommand(); err != nil {
return nil, err
}

list := make([]interface{}, 0, len(keys))

for _, key := range keys {
txn.SetNamespace(key.namespace)
if err := txn.SetNamespace(key.namespace); err != nil {
return nil, err
}
list = append(list, NewBytesValue(key.Digest()))
}
return tm.getTranOpsFromValueList(txn, list)
return tm.getTranOpsFromValueList(txn, list), nil
}

func (tm *TxnMonitor) getTranOpsFromBatchRecords(txn *Txn, records []BatchRecordIfc) []*Operation {
func (tm *TxnMonitor) getTranOpsFromBatchRecords(txn *Txn, records []BatchRecordIfc) ([]*Operation, Error) {
if err := txn.VerifyCommand(); err != nil {
return nil, err
}

list := make([]interface{}, 0, len(records))

for _, br := range records {
txn.SetNamespace(br.key().namespace)
if err := txn.SetNamespace(br.key().namespace); err != nil {
return nil, err
}

if br.BatchRec().hasWrite {
list = append(list, br.key().Digest())
Expand All @@ -90,9 +116,9 @@ func (tm *TxnMonitor) getTranOpsFromBatchRecords(txn *Txn, records []BatchRecord

if len(list) == 0 {
// Readonly batch does not need to add key digests.
return nil
return nil, nil
}
return tm.getTranOpsFromValueList(txn, list)
return tm.getTranOpsFromValueList(txn, list), nil
}

func (tm *TxnMonitor) getTranOpsFromValueList(txn *Txn, list []interface{}) []*Operation {
Expand Down
Loading