Skip to content

Commit

Permalink
Split structs into OperationUpdate and OperationUpdateAsync
Browse files Browse the repository at this point in the history
Signed-off-by: Enrique Lacal <[email protected]>
  • Loading branch information
EnriqueL8 committed Feb 25, 2025
1 parent b5c4555 commit 40fe22e
Show file tree
Hide file tree
Showing 20 changed files with 221 additions and 189 deletions.
18 changes: 10 additions & 8 deletions internal/blockchain/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type BlockchainCallbacks interface {
// BulkOperationUpdates is a synchronous way to update multiple operations and will return when the updates have been committed to the database or there has been an error
// An insertion ordering guarantee is only provided when this code is called on a single goroutine inside of the connector.
// It is the responsibility of the connector code to allocate that routine, and ensure that there is only one.
// Note: onComplete at each update level is not called, as this is a bulk operation and should be reponsibility of the caller to manage.
// Note: onComplete at each update level is not called, as this is a bulk operation and should be reponsibility of the caller to manage if needed.
BulkOperationUpdates(ctx context.Context, namespace string, updates []*core.OperationUpdate) error

OperationUpdate(ctx context.Context, plugin core.Named, nsOpID string, status core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject)
Expand Down Expand Up @@ -170,13 +170,15 @@ func (cb *callbacks) SetOperationalHandler(namespace string, handler core.Operat
func (cb *callbacks) OperationUpdate(ctx context.Context, plugin core.Named, nsOpID string, status core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) {
namespace, _, _ := core.ParseNamespacedOpID(ctx, nsOpID)
if handler, ok := cb.opHandlers[namespace]; ok {
handler.OperationUpdate(&core.OperationUpdate{
Plugin: plugin.Name(),
NamespacedOpID: nsOpID,
Status: status,
BlockchainTXID: blockchainTXID,
ErrorMessage: errorMessage,
Output: opOutput,
handler.OperationUpdate(&core.OperationUpdateAsync{
OperationUpdate: core.OperationUpdate{
Plugin: plugin.Name(),
NamespacedOpID: nsOpID,
Status: status,
BlockchainTXID: blockchainTXID,
ErrorMessage: errorMessage,
Output: opOutput,
},
})
return
}
Expand Down
2 changes: 1 addition & 1 deletion internal/blockchain/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestCallbackOperationUpdate(t *testing.T) {
cb.SetOperationalHandler("ns1", mcb)

mbi.On("Name").Return("utblockchain")
mcb.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
mcb.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
return update.NamespacedOpID == nsOpID &&
update.Status == core.OpStatusSucceeded &&
update.BlockchainTXID == "tx1" &&
Expand Down
8 changes: 4 additions & 4 deletions internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -1825,7 +1825,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
"transactionIndex": "0"
}`)

em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
return update.NamespacedOpID == "ns1:"+operationID.String() &&
update.Status == core.OpStatusSucceeded &&
update.BlockchainTXID == "0x71a38acb7a5d4a970854f6d638ceb1fa10a4b59cbf4ed7674273a1a8dc8b36b8" &&
Expand Down Expand Up @@ -1911,7 +1911,7 @@ func TestHandleReceiptTXUpdateEVMConnect(t *testing.T) {
"updated": "2022-08-03T18:55:43.781941Z"
}`)

em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
return update.NamespacedOpID == "ns1:"+operationID.String() &&
update.Status == core.OpStatusPending &&
update.BlockchainTXID == "0x929c898a46762d91e9f4b0b8e2800863dcf4a40f694109dc4cd19dbd334fa4cc" &&
Expand Down Expand Up @@ -1955,7 +1955,7 @@ func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) {

em := &coremocks.OperationCallbacks{}
e.SetOperationHandler("ns1", em)
txsu := em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
txsu := em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
return update.NamespacedOpID == "ns1:"+operationID.String() &&
update.Status == core.OpStatusFailed &&
update.ErrorMessage == "Packing arguments for method 'broadcastBatch': abi: cannot use [3]uint8 as type [32]uint8 as argument" &&
Expand Down
6 changes: 3 additions & 3 deletions internal/blockchain/fabric/fabric_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -1778,7 +1778,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
"receivedAt": 1630033474675
}`)

em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
return update.NamespacedOpID == "ns1:"+operationID.String() &&
update.Status == core.OpStatusSucceeded &&
update.BlockchainTXID == "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2" &&
Expand Down Expand Up @@ -1839,7 +1839,7 @@ func TestHandleReceiptFailedTx(t *testing.T) {
"transactionHash": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2"
}`)

em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
return update.NamespacedOpID == "ns1:"+operationID.String() &&
update.Status == core.OpStatusFailed &&
update.BlockchainTXID == "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2" &&
Expand Down
6 changes: 3 additions & 3 deletions internal/blockchain/tezos/tezos_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -685,7 +685,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
}
}`)

tm.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
tm.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
return update.NamespacedOpID == "ns1:"+operationID.String() &&
update.Status == core.OpStatusSucceeded &&
update.BlockchainTXID == "ooGcrcazgcGBrY1iym329ovV13MnWrTmV1fttCwWKH5DiYUQsiq" &&
Expand Down Expand Up @@ -769,7 +769,7 @@ func TestHandleReceiptTXUpdateTezosConnect(t *testing.T) {
"updated": "2023-09-10T14:49:36.030604Z"
}`)

tm.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
tm.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
return update.NamespacedOpID == "ns1:"+operationID.String() &&
update.Status == core.OpStatusPending &&
update.BlockchainTXID == "onhZJDmz5JihnW1RaZ96f17FgUBv3GoERkRECK3XVFt1kL5E6Yy" &&
Expand Down
96 changes: 54 additions & 42 deletions internal/dataexchange/ffdx/dxevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,71 +90,83 @@ func (h *FFDX) dispatchEvent(msg *wsEvent) {

switch msg.Type {
case messageFailed:
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: core.OpStatusFailed,
ErrorMessage: msg.Error,
Output: msg.Info,
OnComplete: e.Ack,
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
OperationUpdate: core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: core.OpStatusFailed,
ErrorMessage: msg.Error,
Output: msg.Info,
},
OnComplete: e.Ack,
})
return
case messageDelivered:
status := core.OpStatusSucceeded
if h.capabilities.Manifest {
status = core.OpStatusPending
}
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: status,
Output: msg.Info,
OnComplete: e.Ack,
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
OperationUpdate: core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: status,
Output: msg.Info,
},
OnComplete: e.Ack,
})
return
case messageAcknowledged:
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: core.OpStatusSucceeded,
VerifyManifest: h.capabilities.Manifest,
DXManifest: msg.Manifest,
Output: msg.Info,
OnComplete: e.Ack,
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
OperationUpdate: core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: core.OpStatusSucceeded,
VerifyManifest: h.capabilities.Manifest,
DXManifest: msg.Manifest,
Output: msg.Info,
},
OnComplete: e.Ack,
})
return
case blobFailed:
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: core.OpStatusFailed,
ErrorMessage: msg.Error,
Output: msg.Info,
OnComplete: e.Ack,
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
OperationUpdate: core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: core.OpStatusFailed,
ErrorMessage: msg.Error,
Output: msg.Info,
},
OnComplete: e.Ack,
})
return
case blobDelivered:
status := core.OpStatusSucceeded
if h.capabilities.Manifest {
status = core.OpStatusPending
}
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: status,
Output: msg.Info,
OnComplete: e.Ack,
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
OperationUpdate: core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: status,
Output: msg.Info,
},
OnComplete: e.Ack,
})
return
case blobAcknowledged:
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: core.OpStatusSucceeded,
Output: msg.Info,
VerifyManifest: h.capabilities.Manifest,
DXHash: msg.Hash,
OnComplete: e.Ack,
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
OperationUpdate: core.OperationUpdate{
Plugin: h.Name(),
NamespacedOpID: msg.RequestID,
Status: core.OpStatusSucceeded,
Output: msg.Info,
VerifyManifest: h.capabilities.Manifest,
DXHash: msg.Hash,
},
OnComplete: e.Ack,
})
return

Expand Down
2 changes: 1 addition & 1 deletion internal/dataexchange/ffdx/ffdx.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type callbacks struct {
opHandlers map[string]core.OperationCallbacks
}

func (cb *callbacks) OperationUpdate(ctx context.Context, update *core.OperationUpdate) {
func (cb *callbacks) OperationUpdate(ctx context.Context, update *core.OperationUpdateAsync) {
namespace, _, _ := core.ParseNamespacedOpID(ctx, update.NamespacedOpID)
if handler, ok := cb.opHandlers[namespace]; ok {
handler.OperationUpdate(update)
Expand Down
26 changes: 13 additions & 13 deletions internal/dataexchange/ffdx/ffdx_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestInitMissingURL(t *testing.T) {

func opAcker() func(args mock.Arguments) {
return func(args mock.Arguments) {
args[0].(*core.OperationUpdate).OnComplete()
args[0].(*core.OperationUpdateAsync).OnComplete()
}
}

Expand Down Expand Up @@ -492,7 +492,7 @@ func TestMessageEventsBackgroundStart(t *testing.T) {
assert.NoError(t, err)

namespacedID1 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
return ev.NamespacedOpID == namespacedID1 &&
ev.Status == core.OpStatusFailed &&
ev.ErrorMessage == "pop" &&
Expand All @@ -503,7 +503,7 @@ func TestMessageEventsBackgroundStart(t *testing.T) {
assert.Equal(t, `{"action":"ack","id":"1"}`, string(msg))

namespacedID2 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
return ev.NamespacedOpID == namespacedID2 &&
ev.Status == core.OpStatusSucceeded &&
ev.Plugin == "ffdx"
Expand All @@ -513,7 +513,7 @@ func TestMessageEventsBackgroundStart(t *testing.T) {
assert.Equal(t, `{"action":"ack","id":"2"}`, string(msg))

namespacedID3 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
return ev.NamespacedOpID == namespacedID3 &&
ev.Status == core.OpStatusSucceeded &&
ev.DXManifest == `{"manifest":true}` &&
Expand Down Expand Up @@ -552,7 +552,7 @@ func TestMessageEvents(t *testing.T) {
assert.NoError(t, err)

namespacedID1 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
return ev.NamespacedOpID == namespacedID1 &&
ev.Status == core.OpStatusFailed &&
ev.ErrorMessage == "pop" &&
Expand All @@ -563,7 +563,7 @@ func TestMessageEvents(t *testing.T) {
assert.Equal(t, `{"action":"ack","id":"1"}`, string(msg))

namespacedID2 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
return ev.NamespacedOpID == namespacedID2 &&
ev.Status == core.OpStatusSucceeded &&
ev.Plugin == "ffdx"
Expand All @@ -573,7 +573,7 @@ func TestMessageEvents(t *testing.T) {
assert.Equal(t, `{"action":"ack","id":"2"}`, string(msg))

namespacedID3 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
return ev.NamespacedOpID == namespacedID3 &&
ev.Status == core.OpStatusSucceeded &&
ev.DXManifest == `{"manifest":true}` &&
Expand Down Expand Up @@ -617,7 +617,7 @@ func TestBlobEvents(t *testing.T) {
assert.NoError(t, err)

namespacedID5 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
return ev.NamespacedOpID == namespacedID5 &&
ev.Status == core.OpStatusFailed &&
ev.ErrorMessage == "pop" &&
Expand All @@ -628,7 +628,7 @@ func TestBlobEvents(t *testing.T) {
assert.Equal(t, `{"action":"ack","id":"5"}`, string(msg))

namespacedID6 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
return ev.NamespacedOpID == namespacedID6 &&
ev.Status == core.OpStatusSucceeded &&
ev.Output.String() == `{"some":"details"}` &&
Expand All @@ -650,7 +650,7 @@ func TestBlobEvents(t *testing.T) {
assert.Equal(t, `{"action":"ack","id":"9"}`, string(msg))

namespacedID10 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
return ev.NamespacedOpID == namespacedID10 &&
ev.Status == core.OpStatusSucceeded &&
ev.Output.String() == `{"signatures":"and stuff"}` &&
Expand Down Expand Up @@ -683,7 +683,7 @@ func TestEventsWithManifest(t *testing.T) {
h.SetOperationHandler("ns1", ocb)

namespacedID1 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
return ev.NamespacedOpID == namespacedID1 &&
ev.Status == core.OpStatusPending &&
ev.Plugin == "ffdx"
Expand All @@ -693,7 +693,7 @@ func TestEventsWithManifest(t *testing.T) {
assert.Equal(t, `{"action":"ack","id":"1"}`, string(msg))

namespacedID2 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
return ev.NamespacedOpID == namespacedID2 &&
ev.Status == core.OpStatusPending &&
ev.Plugin == "ffdx"
Expand Down
Loading

0 comments on commit 40fe22e

Please sign in to comment.