Skip to content

Commit

Permalink
Merge pull request #1398 from kaleido-io/late-join
Browse files Browse the repository at this point in the history
Do not expect to retrieve local org during definition handling
  • Loading branch information
awrichar authored Sep 5, 2023
2 parents c454f8d + 9e33ff2 commit 5418e40
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 108 deletions.
8 changes: 4 additions & 4 deletions internal/definitions/handler_contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,11 @@ func (dh *definitionHandler) handleFFIBroadcast(ctx context.Context, state *core
return HandlerResult{Action: core.ActionReject}, i18n.NewError(ctx, coremsgs.MsgDefRejectedBadPayload, "contract interface", msg.Header.ID)
}

org, err := dh.identity.GetRootOrg(ctx)
org, err := dh.identity.GetRootOrgDID(ctx)
if err != nil {
return HandlerResult{Action: core.ActionRetry}, err
}
isAuthor := org.DID == msg.Header.Author
isAuthor := org == msg.Header.Author

ffi.Message = msg.Header.ID
ffi.Name = ffi.NetworkName
Expand Down Expand Up @@ -198,11 +198,11 @@ func (dh *definitionHandler) handleContractAPIBroadcast(ctx context.Context, sta
return HandlerResult{Action: core.ActionReject}, i18n.NewError(ctx, coremsgs.MsgDefRejectedBadPayload, "contract API", msg.Header.ID)
}

org, err := dh.identity.GetRootOrg(ctx)
org, err := dh.identity.GetRootOrgDID(ctx)
if err != nil {
return HandlerResult{Action: core.ActionRetry}, err
}
isAuthor := org.DID == msg.Header.Author
isAuthor := org == msg.Header.Author

api.Message = msg.Header.ID
api.Name = api.NetworkName
Expand Down
52 changes: 10 additions & 42 deletions internal/definitions/handler_contracts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,7 @@ func TestHandleFFIBroadcastOk(t *testing.T) {
dh.mdi.On("UpsertFFIError", mock.Anything, mock.Anything).Return(nil)
dh.mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
dh.mcm.On("ResolveFFI", mock.Anything, mock.Anything).Return(nil)
dh.mim.On("GetRootOrg", context.Background()).Return(&core.Identity{
IdentityBase: core.IdentityBase{
DID: "firefly:org1",
},
}, nil)
dh.mim.On("GetRootOrgDID", context.Background()).Return("firefly:org1", nil)

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, &core.Message{
Header: core.MessageHeader{
Expand Down Expand Up @@ -154,11 +150,7 @@ func TestHandleFFIBroadcastUpdate(t *testing.T) {
dh.mdi.On("InsertOrGetFFI", mock.Anything, mock.Anything).Return(existing, nil)
dh.mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
dh.mcm.On("ResolveFFI", mock.Anything, mock.Anything).Return(nil)
dh.mim.On("GetRootOrg", context.Background()).Return(&core.Identity{
IdentityBase: core.IdentityBase{
DID: "firefly:org1",
},
}, nil)
dh.mim.On("GetRootOrgDID", context.Background()).Return("firefly:org1", nil)

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, &core.Message{
Header: core.MessageHeader{
Expand Down Expand Up @@ -198,11 +190,7 @@ func TestHandleFFIBroadcastNameExists(t *testing.T) {
dh.mdi.On("UpsertFFIError", mock.Anything, mock.Anything).Return(nil)
dh.mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
dh.mcm.On("ResolveFFI", mock.Anything, mock.Anything).Return(nil)
dh.mim.On("GetRootOrg", context.Background()).Return(&core.Identity{
IdentityBase: core.IdentityBase{
DID: "firefly:org1",
},
}, nil)
dh.mim.On("GetRootOrgDID", context.Background()).Return("firefly:org1", nil)

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, &core.Message{
Header: core.MessageHeader{
Expand Down Expand Up @@ -381,7 +369,7 @@ func TestHandleFFIBroadcastOrgFail(t *testing.T) {
Value: fftypes.JSONAnyPtrBytes(b),
}

dh.mim.On("GetRootOrg", context.Background()).Return(nil, fmt.Errorf("pop"))
dh.mim.On("GetRootOrgDID", context.Background()).Return("", fmt.Errorf("pop"))

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, &core.Message{
Header: core.MessageHeader{
Expand All @@ -406,11 +394,7 @@ func TestHandleFFIBroadcastPersistFail(t *testing.T) {
}
dh.mdi.On("InsertOrGetFFI", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("pop"))
dh.mcm.On("ResolveFFI", mock.Anything, mock.Anything).Return(nil)
dh.mim.On("GetRootOrg", context.Background()).Return(&core.Identity{
IdentityBase: core.IdentityBase{
DID: "firefly:org1",
},
}, nil)
dh.mim.On("GetRootOrgDID", context.Background()).Return("firefly:org1", nil)

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, &core.Message{
Header: core.MessageHeader{
Expand All @@ -434,11 +418,7 @@ func TestHandleFFIBroadcastResolveFail(t *testing.T) {
}

dh.mcm.On("ResolveFFI", mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
dh.mim.On("GetRootOrg", context.Background()).Return(&core.Identity{
IdentityBase: core.IdentityBase{
DID: "firefly:org1",
},
}, nil)
dh.mim.On("GetRootOrgDID", context.Background()).Return("firefly:org1", nil)

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, &core.Message{
Header: core.MessageHeader{
Expand All @@ -464,11 +444,7 @@ func TestHandleContractAPIBroadcastOk(t *testing.T) {
dh.mdi.On("InsertOrGetContractAPI", mock.Anything, mock.Anything).Return(nil, nil)
dh.mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
dh.mcm.On("ResolveContractAPI", context.Background(), "", mock.Anything).Return(nil)
dh.mim.On("GetRootOrg", context.Background()).Return(&core.Identity{
IdentityBase: core.IdentityBase{
DID: "firefly:org1",
},
}, nil)
dh.mim.On("GetRootOrgDID", context.Background()).Return("firefly:org1", nil)

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, &core.Message{
Header: core.MessageHeader{
Expand Down Expand Up @@ -510,11 +486,7 @@ func TestHandleContractAPIBroadcastPersistFail(t *testing.T) {

dh.mdi.On("InsertOrGetContractAPI", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("pop"))
dh.mcm.On("ResolveContractAPI", context.Background(), "", mock.Anything).Return(nil)
dh.mim.On("GetRootOrg", context.Background()).Return(&core.Identity{
IdentityBase: core.IdentityBase{
DID: "firefly:org1",
},
}, nil)
dh.mim.On("GetRootOrgDID", context.Background()).Return("firefly:org1", nil)

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, &core.Message{
Header: core.MessageHeader{
Expand All @@ -538,11 +510,7 @@ func TestHandleContractAPIBroadcastResolveFail(t *testing.T) {
}

dh.mcm.On("ResolveContractAPI", context.Background(), "", mock.Anything).Return(fmt.Errorf("pop"))
dh.mim.On("GetRootOrg", context.Background()).Return(&core.Identity{
IdentityBase: core.IdentityBase{
DID: "firefly:org1",
},
}, nil)
dh.mim.On("GetRootOrgDID", context.Background()).Return("firefly:org1", nil)

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, &core.Message{
Header: core.MessageHeader{
Expand All @@ -565,7 +533,7 @@ func TestHandleContractAPIBroadcastOrgFail(t *testing.T) {
Value: fftypes.JSONAnyPtrBytes(b),
}

dh.mim.On("GetRootOrg", context.Background()).Return(nil, fmt.Errorf("pop"))
dh.mim.On("GetRootOrgDID", context.Background()).Return("", fmt.Errorf("pop"))

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, &core.Message{
Header: core.MessageHeader{
Expand Down
4 changes: 2 additions & 2 deletions internal/definitions/handler_tokenpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ func (dh *definitionHandler) handleTokenPoolBroadcast(ctx context.Context, state
return HandlerResult{Action: core.ActionReject}, i18n.NewError(ctx, coremsgs.MsgInvalidConnectorName, pool.Connector, "token")
}

org, err := dh.identity.GetRootOrg(ctx)
org, err := dh.identity.GetRootOrgDID(ctx)
if err != nil {
return HandlerResult{Action: core.ActionRetry}, err
}
isAuthor := org.DID == msg.Header.Author
isAuthor := org == msg.Header.Author

pool.Message = msg.Header.ID
pool.Name = pool.NetworkName
Expand Down
68 changes: 12 additions & 56 deletions internal/definitions/handler_tokenpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,7 @@ func TestHandleDefinitionBroadcastTokenPoolActivateOK(t *testing.T) {
return *p.ID == *pool.ID && p.Message == msg.Header.ID && p.Connector == "connector1"
})).Return(nil, nil)
dh.mam.On("ActivateTokenPool", context.Background(), mock.AnythingOfType("*core.TokenPool")).Return(nil)
dh.mim.On("GetRootOrg", context.Background()).Return(&core.Identity{
IdentityBase: core.IdentityBase{
DID: "firefly:org1",
},
}, nil)
dh.mim.On("GetRootOrgDID", context.Background()).Return("firefly:org1", nil)

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, msg, data, fftypes.NewUUID())
assert.Equal(t, HandlerResult{Action: core.ActionWait, CustomCorrelator: pool.ID}, action)
Expand All @@ -105,11 +101,7 @@ func TestHandleDefinitionBroadcastTokenPoolBadConnector(t *testing.T) {
msg, data, err := buildPoolDefinitionMessage(definition)
assert.NoError(t, err)

dh.mim.On("GetRootOrg", context.Background()).Return(&core.Identity{
IdentityBase: core.IdentityBase{
DID: "firefly:org1",
},
}, nil)
dh.mim.On("GetRootOrgDID", context.Background()).Return("firefly:org1", nil)

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, msg, data, fftypes.NewUUID())
assert.Equal(t, HandlerResult{Action: core.ActionReject, CustomCorrelator: pool.ID}, action)
Expand Down Expand Up @@ -137,11 +129,7 @@ func TestHandleDefinitionBroadcastTokenPoolNameExists(t *testing.T) {
dh.mdi.On("InsertOrGetTokenPool", context.Background(), mock.MatchedBy(func(p *core.TokenPool) bool {
return *p.ID == *pool.ID && p.Message == msg.Header.ID && p.Name == "name1-1"
})).Return(nil, nil)
dh.mim.On("GetRootOrg", context.Background()).Return(&core.Identity{
IdentityBase: core.IdentityBase{
DID: "firefly:org1",
},
}, nil)
dh.mim.On("GetRootOrgDID", context.Background()).Return("firefly:org1", nil)

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, msg, data, fftypes.NewUUID())
assert.Equal(t, HandlerResult{Action: core.ActionWait, CustomCorrelator: pool.ID}, action)
Expand Down Expand Up @@ -185,11 +173,7 @@ func TestHandleDefinitionBroadcastTokenPoolNetworkNameExists(t *testing.T) {
dh.mdi.On("InsertOrGetTokenPool", context.Background(), mock.MatchedBy(func(p *core.TokenPool) bool {
return *p.ID == *pool.ID && p.Message == msg.Header.ID
})).Return(existing, nil)
dh.mim.On("GetRootOrg", context.Background()).Return(&core.Identity{
IdentityBase: core.IdentityBase{
DID: "firefly:org1",
},
}, nil)
dh.mim.On("GetRootOrgDID", context.Background()).Return("firefly:org1", nil)

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, msg, data, fftypes.NewUUID())
assert.Equal(t, HandlerResult{Action: core.ActionReject, CustomCorrelator: pool.ID}, action)
Expand All @@ -215,11 +199,7 @@ func TestHandleDefinitionBroadcastTokenPoolExistingConfirmed(t *testing.T) {
dh.mdi.On("InsertOrGetTokenPool", context.Background(), mock.MatchedBy(func(p *core.TokenPool) bool {
return *p.ID == *pool.ID && p.Message == msg.Header.ID
})).Return(existing, nil)
dh.mim.On("GetRootOrg", context.Background()).Return(&core.Identity{
IdentityBase: core.IdentityBase{
DID: "firefly:org1",
},
}, nil)
dh.mim.On("GetRootOrgDID", context.Background()).Return("firefly:org1", nil)

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, msg, data, fftypes.NewUUID())
assert.Equal(t, HandlerResult{Action: core.ActionConfirm, CustomCorrelator: pool.ID}, action)
Expand All @@ -243,11 +223,7 @@ func TestHandleDefinitionBroadcastTokenPoolExistingWaiting(t *testing.T) {
dh.mdi.On("InsertOrGetTokenPool", context.Background(), mock.MatchedBy(func(p *core.TokenPool) bool {
return *p.ID == *pool.ID && p.Message == msg.Header.ID
})).Return(existing, nil)
dh.mim.On("GetRootOrg", context.Background()).Return(&core.Identity{
IdentityBase: core.IdentityBase{
DID: "firefly:org1",
},
}, nil)
dh.mim.On("GetRootOrgDID", context.Background()).Return("firefly:org1", nil)

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, msg, data, fftypes.NewUUID())
assert.Equal(t, HandlerResult{Action: core.ActionWait, CustomCorrelator: pool.ID}, action)
Expand Down Expand Up @@ -278,11 +254,7 @@ func TestHandleDefinitionBroadcastTokenPoolExistingPublish(t *testing.T) {
return *p.ID == *pool.ID && p.Message == msg.Header.ID
})).Return(existing, nil)
dh.mdi.On("UpsertTokenPool", context.Background(), &newPool, database.UpsertOptimizationExisting).Return(nil)
dh.mim.On("GetRootOrg", context.Background()).Return(&core.Identity{
IdentityBase: core.IdentityBase{
DID: "firefly:org1",
},
}, nil)
dh.mim.On("GetRootOrgDID", context.Background()).Return("firefly:org1", nil)

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, msg, data, fftypes.NewUUID())
assert.Equal(t, HandlerResult{Action: core.ActionConfirm, CustomCorrelator: pool.ID}, action)
Expand Down Expand Up @@ -313,11 +285,7 @@ func TestHandleDefinitionBroadcastTokenPoolExistingPublishUpsertFail(t *testing.
return *p.ID == *pool.ID && p.Message == msg.Header.ID
})).Return(existing, nil)
dh.mdi.On("UpsertTokenPool", context.Background(), &newPool, database.UpsertOptimizationExisting).Return(fmt.Errorf("pop"))
dh.mim.On("GetRootOrg", context.Background()).Return(&core.Identity{
IdentityBase: core.IdentityBase{
DID: "firefly:org1",
},
}, nil)
dh.mim.On("GetRootOrgDID", context.Background()).Return("firefly:org1", nil)

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, msg, data, fftypes.NewUUID())
assert.Equal(t, HandlerResult{Action: core.ActionRetry, CustomCorrelator: pool.ID}, action)
Expand All @@ -344,7 +312,7 @@ func TestHandleDefinitionBroadcastTokenPoolExistingPublishOrgFail(t *testing.T)
newPool.Message = msg.Header.ID
newPool.State = core.TokenPoolStatePending

dh.mim.On("GetRootOrg", context.Background()).Return(nil, fmt.Errorf("pop"))
dh.mim.On("GetRootOrgDID", context.Background()).Return("", fmt.Errorf("pop"))

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, msg, data, fftypes.NewUUID())
assert.Equal(t, HandlerResult{Action: core.ActionRetry}, action)
Expand Down Expand Up @@ -374,11 +342,7 @@ func TestHandleDefinitionBroadcastTokenPoolExistingPublishOrgMismatch(t *testing
dh.mdi.On("InsertOrGetTokenPool", context.Background(), mock.MatchedBy(func(p *core.TokenPool) bool {
return *p.ID == *pool.ID && p.Message == msg.Header.ID
})).Return(existing, nil)
dh.mim.On("GetRootOrg", context.Background()).Return(&core.Identity{
IdentityBase: core.IdentityBase{
DID: "firefly:org2",
},
}, nil)
dh.mim.On("GetRootOrgDID", context.Background()).Return("firefly:org2", nil)

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, msg, data, fftypes.NewUUID())
assert.Equal(t, HandlerResult{Action: core.ActionReject, CustomCorrelator: pool.ID}, action)
Expand All @@ -396,11 +360,7 @@ func TestHandleDefinitionBroadcastTokenPoolFailUpsert(t *testing.T) {
dh.mdi.On("InsertOrGetTokenPool", context.Background(), mock.MatchedBy(func(p *core.TokenPool) bool {
return *p.ID == *pool.ID && p.Message == msg.Header.ID
})).Return(nil, fmt.Errorf("pop"))
dh.mim.On("GetRootOrg", context.Background()).Return(&core.Identity{
IdentityBase: core.IdentityBase{
DID: "firefly:org1",
},
}, nil)
dh.mim.On("GetRootOrgDID", context.Background()).Return("firefly:org1", nil)

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, msg, data, fftypes.NewUUID())
assert.Equal(t, HandlerResult{Action: core.ActionRetry}, action)
Expand All @@ -421,11 +381,7 @@ func TestHandleDefinitionBroadcastTokenPoolActivateFail(t *testing.T) {
return *p.ID == *pool.ID && p.Message == msg.Header.ID
})).Return(nil, nil)
dh.mam.On("ActivateTokenPool", context.Background(), mock.AnythingOfType("*core.TokenPool")).Return(fmt.Errorf("pop"))
dh.mim.On("GetRootOrg", context.Background()).Return(&core.Identity{
IdentityBase: core.IdentityBase{
DID: "firefly:org1",
},
}, nil)
dh.mim.On("GetRootOrgDID", context.Background()).Return("firefly:org1", nil)

action, err := dh.HandleDefinitionBroadcast(context.Background(), &bs.BatchState, msg, data, fftypes.NewUUID())
assert.Equal(t, HandlerResult{Action: core.ActionWait, CustomCorrelator: pool.ID}, action)
Expand Down
14 changes: 10 additions & 4 deletions internal/identity/identitymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Manager interface {
CachedIdentityLookupMustExist(ctx context.Context, did string) (identity *core.Identity, retryable bool, err error)
CachedIdentityLookupNilOK(ctx context.Context, did string) (identity *core.Identity, retryable bool, err error)
GetLocalNode(ctx context.Context) (node *core.Identity, err error)
GetRootOrgDID(ctx context.Context) (string, error)
GetRootOrg(ctx context.Context) (org *core.Identity, err error)
VerifyIdentityChain(ctx context.Context, identity *core.Identity) (immediateParent *core.Identity, retryable bool, err error)
ValidateNodeOwner(ctx context.Context, node *core.Identity, identity *core.Identity) (valid bool, err error)
Expand Down Expand Up @@ -114,14 +115,19 @@ func (im *identityManager) GetLocalNode(ctx context.Context) (node *core.Identit
return node, err
}

func (im *identityManager) GetRootOrg(ctx context.Context) (org *core.Identity, err error) {
func (im *identityManager) GetRootOrgDID(ctx context.Context) (string, error) {
orgName := im.multiparty.RootOrg().Name
if orgName != "" {
orgDID := fmt.Sprintf("%s%s", core.FireFlyOrgDIDPrefix, orgName)
org, _, err = im.CachedIdentityLookupNilOK(ctx, orgDID)
return orgDID, nil
}
if err == nil && org == nil {
return nil, i18n.NewError(ctx, coremsgs.MsgLocalOrgNotSet)
return "", i18n.NewError(ctx, coremsgs.MsgLocalOrgNotSet)
}

func (im *identityManager) GetRootOrg(ctx context.Context) (org *core.Identity, err error) {
orgDID, err := im.GetRootOrgDID(ctx)
if err == nil {
org, _, err = im.CachedIdentityLookupMustExist(ctx, orgDID)
}
return org, err
}
Expand Down
16 changes: 16 additions & 0 deletions internal/identity/identitymanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1472,6 +1472,22 @@ func TestGetRootOrg(t *testing.T) {
mdi.AssertExpectations(t)
}

func TestGetRootOrgUnregistered(t *testing.T) {
ctx, im := newTestIdentityManager(t)
mmp := im.multiparty.(*multipartymocks.Manager)
mdi := im.database.(*databasemocks.Plugin)

mmp.On("RootOrg").Return(multiparty.RootOrg{Name: "org1"})
mdi.On("GetIdentityByDID", ctx, "ns1", "did:firefly:org/org1").Return(nil, nil)
mmp.On("GetNetworkVersion").Return(2)

_, err := im.GetRootOrg(ctx)
assert.Regexp(t, "FF10277", err)

mmp.AssertExpectations(t)
mdi.AssertExpectations(t)
}

func TestParseKeyNormalizationConfig(t *testing.T) {
assert.Equal(t, KeyNormalizationBlockchainPlugin, ParseKeyNormalizationConfig("blockchain_Plugin"))
assert.Equal(t, KeyNormalizationNone, ParseKeyNormalizationConfig("none"))
Expand Down
Loading

0 comments on commit 5418e40

Please sign in to comment.