Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - v2alpha1 malfeasance API #6353

Closed
wants to merge 18 commits into from
Closed
4 changes: 2 additions & 2 deletions activation/handler_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (h *HandlerV1) syntacticallyValidateDeps(
if err := identities.SetMalicious(h.cdb, atx.SmesherID, encodedProof, time.Now()); err != nil {
return 0, 0, nil, fmt.Errorf("adding malfeasance proof: %w", err)
}
h.cdb.CacheMalfeasanceProof(atx.SmesherID, proof)
h.cdb.CacheMalfeasanceProof(atx.SmesherID, encodedProof)
h.tortoise.OnMalfeasance(atx.SmesherID)
return 0, 0, proof, nil
}
Expand Down Expand Up @@ -495,7 +495,7 @@ func (h *HandlerV1) storeAtx(

atxs.AtxAdded(h.cdb, atx)
if proof != nil {
h.cdb.CacheMalfeasanceProof(atx.SmesherID, proof)
h.cdb.CacheMalfeasanceProof(atx.SmesherID, codec.MustEncode(proof))
h.tortoise.OnMalfeasance(atx.SmesherID)
}

Expand Down
39 changes: 39 additions & 0 deletions activation/malfeasance.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/spacemeshos/post/shared"
Expand Down Expand Up @@ -44,6 +45,19 @@ func NewMalfeasanceHandler(
}
}

func (mh *MalfeasanceHandler) Info(data wire.ProofData) (map[string]string, error) {
ap, ok := data.(*wire.AtxProof)
if !ok {
return nil, errors.New("wrong message type for multiple ATXs")
}
return map[string]string{
"atx1": ap.Messages[0].InnerMsg.MsgHash.String(),
"atx2": ap.Messages[1].InnerMsg.MsgHash.String(),
"publish_epoch": strconv.FormatUint(uint64(ap.Messages[0].InnerMsg.PublishEpoch), 10),
"smesher_id": ap.Messages[0].SmesherID.String(),
}, nil
}

func (mh *MalfeasanceHandler) Validate(ctx context.Context, data wire.ProofData) (types.NodeID, error) {
ap, ok := data.(*wire.AtxProof)
if !ok {
Expand Down Expand Up @@ -109,6 +123,18 @@ func NewInvalidPostIndexHandler(
}
}

func (mh *InvalidPostIndexHandler) Info(data wire.ProofData) (map[string]string, error) {
pp, ok := data.(*wire.InvalidPostIndexProof)
if !ok {
return nil, errors.New("wrong message type for invalid post index")
}
return map[string]string{
"atx": pp.Atx.ID().String(),
"index": strconv.FormatUint(uint64(pp.InvalidIdx), 10),
"smesher_id": pp.Atx.SmesherID.String(),
}, nil
}

func (mh *InvalidPostIndexHandler) Validate(ctx context.Context, data wire.ProofData) (types.NodeID, error) {
proof, ok := data.(*wire.InvalidPostIndexProof)
if !ok {
Expand Down Expand Up @@ -174,6 +200,19 @@ func NewInvalidPrevATXHandler(
}
}

func (mh *InvalidPrevATXHandler) Info(data wire.ProofData) (map[string]string, error) {
pp, ok := data.(*wire.InvalidPrevATXProof)
if !ok {
return nil, errors.New("wrong message type for invalid previous ATX")
}
return map[string]string{
"atx1": pp.Atx1.ID().String(),
"atx2": pp.Atx2.ID().String(),
"prev_atx": pp.Atx1.PrevATXID.String(),
"smesher_id": pp.Atx1.SmesherID.String(),
}, nil
}

func (mh *InvalidPrevATXHandler) Validate(ctx context.Context, data wire.ProofData) (types.NodeID, error) {
proof, ok := data.(*wire.InvalidPrevATXProof)
if !ok {
Expand Down
5 changes: 3 additions & 2 deletions api/grpcserver/activation_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"google.golang.org/protobuf/types/known/emptypb"

"github.com/spacemeshos/go-spacemesh/api/grpcserver"
"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/sql"
Expand Down Expand Up @@ -169,7 +170,7 @@ func TestGet_IdentityCanceled(t *testing.T) {
}
atx.SetID(id)
atxProvider.EXPECT().GetAtx(id).Return(&atx, nil)
atxProvider.EXPECT().MalfeasanceProof(smesher).Return(proof, nil)
atxProvider.EXPECT().MalfeasanceProof(smesher).Return(codec.MustEncode(proof), nil)
atxProvider.EXPECT().Previous(id).Return([]types.ATXID{previous}, nil)

response, err := activationService.Get(context.Background(), &pb.GetRequest{Id: id.Bytes()})
Expand All @@ -184,5 +185,5 @@ func TestGet_IdentityCanceled(t *testing.T) {
require.Equal(t, previous.Bytes(), response.Atx.PreviousAtxs[0].Id)
require.Equal(t, atx.NumUnits, response.Atx.NumUnits)
require.Equal(t, atx.Sequence, response.Atx.Sequence)
require.Equal(t, events.ToMalfeasancePB(smesher, proof, false), response.MalfeasanceProof)
require.Equal(t, events.ToMalfeasancePB(smesher, codec.MustEncode(proof), false), response.MalfeasanceProof)
}
5 changes: 4 additions & 1 deletion api/grpcserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
TransactionV2Alpha1 Service = "transaction_v2alpha1"
TransactionStreamV2Alpha1 Service = "transaction_stream_v2alpha1"
AccountV2Alpha1 Service = "account_v2alpha1"
MalfeasanceV2Alpha1 Service = "malfeasance_v2alpha1"
MalfeasanceStreamV2Alpha1 Service = "malfeasance_stream_v2alpha1"
)

// DefaultConfig defines the default configuration options for api.
Expand All @@ -57,12 +59,13 @@ func DefaultConfig() Config {
PublicServices: []Service{
GlobalState, Mesh, Transaction, Node, Activation, ActivationV2Alpha1,
RewardV2Alpha1, NetworkV2Alpha1, NodeV2Alpha1, LayerV2Alpha1, TransactionV2Alpha1,
AccountV2Alpha1,
AccountV2Alpha1, MalfeasanceV2Alpha1,
},
PublicListener: "0.0.0.0:9092",
PrivateServices: []Service{
Admin, Smesher, Debug, ActivationStreamV2Alpha1,
RewardStreamV2Alpha1, LayerStreamV2Alpha1, TransactionStreamV2Alpha1,
MalfeasanceStreamV2Alpha1,
},
PrivateListener: "127.0.0.1:9093",
PostServices: []Service{Post, PostInfo},
Expand Down
4 changes: 2 additions & 2 deletions api/grpcserver/globalstate_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestGlobalStateService(t *testing.T) {

_, err := c.AccountDataQuery(ctx, &pb.AccountDataQueryRequest{})
require.Error(t, err)
require.Contains(t, err.Error(), "`Filter` must be provided")
require.ErrorContains(t, err, "`Filter` must be provided")
})
t.Run("AccountDataQuery_MissingFlags", func(t *testing.T) {
t.Parallel()
Expand All @@ -91,7 +91,7 @@ func TestGlobalStateService(t *testing.T) {
},
})
require.Error(t, err)
require.Contains(t, err.Error(), "`Filter.AccountMeshDataFlags` must set at least one")
require.ErrorContains(t, err, "`Filter.AccountMeshDataFlags` must set at least one")
})
t.Run("AccountDataQuery_BadOffset", func(t *testing.T) {
t.Parallel()
Expand Down
3 changes: 1 addition & 2 deletions api/grpcserver/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/spacemeshos/go-spacemesh/activation"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/malfeasance/wire"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/peerinfo"
"github.com/spacemeshos/go-spacemesh/signing"
Expand Down Expand Up @@ -58,7 +57,7 @@ type atxProvider interface {
GetAtx(id types.ATXID) (*types.ActivationTx, error)
Previous(id types.ATXID) ([]types.ATXID, error)
MaxHeightAtx() (types.ATXID, error)
MalfeasanceProof(id types.NodeID) (*wire.MalfeasanceProof, error)
MalfeasanceProof(id types.NodeID) ([]byte, error)
}

type postState interface {
Expand Down
5 changes: 2 additions & 3 deletions api/grpcserver/mesh_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/malfeasance/wire"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
)
Expand Down Expand Up @@ -634,13 +633,13 @@ func (s *MeshService) MalfeasanceStream(
}

// first serve those already existed locally.
if err := s.cdb.IterateMalfeasanceProofs(func(id types.NodeID, mp *wire.MalfeasanceProof) error {
if err := s.cdb.IterateMalfeasanceProofs(func(id types.NodeID, proof []byte) error {
select {
case <-stream.Context().Done():
return nil
default:
res := &pb.MalfeasanceStreamResponse{
Proof: events.ToMalfeasancePB(id, mp, req.IncludeProof),
Proof: events.ToMalfeasancePB(id, proof, req.IncludeProof),
}
return stream.Send(res)
}
Expand Down
16 changes: 8 additions & 8 deletions api/grpcserver/mesh_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,7 @@ func BallotMalfeasance(tb testing.TB, db sql.Executor) (types.NodeID, *wire.Malf
Data: &bp,
},
}
data, err := codec.Encode(mp)
require.NoError(tb, err)
require.NoError(tb, identities.SetMalicious(db, sig.NodeID(), data, time.Now()))
require.NoError(tb, identities.SetMalicious(db, sig.NodeID(), codec.MustEncode(mp), time.Now()))
return sig.NodeID(), mp
}

Expand Down Expand Up @@ -176,7 +174,7 @@ func TestMeshService_MalfeasanceQuery(t *testing.T) {
require.Equal(t, nodeID, types.BytesToNodeID(resp.Proof.SmesherId.Id))
require.EqualValues(t, layer, resp.Proof.Layer.Number)
require.Equal(t, pb.MalfeasanceProof_MALFEASANCE_BALLOT, resp.Proof.Kind)
require.Equal(t, events.ToMalfeasancePB(nodeID, proof, true), resp.Proof)
require.Equal(t, events.ToMalfeasancePB(nodeID, codec.MustEncode(proof), true), resp.Proof)
require.NotEmpty(t, resp.Proof.Proof)
var got wire.MalfeasanceProof
require.NoError(t, codec.Decode(resp.Proof.Proof, &got))
Expand Down Expand Up @@ -247,15 +245,17 @@ func TestMeshService_MalfeasanceStream(t *testing.T) {
require.Equal(t, 10, hare)

id, proof := AtxMalfeasance(t, db)
events.ReportMalfeasance(id, proof)
proofBytes := codec.MustEncode(proof)
events.ReportMalfeasance(id, proofBytes)
resp, err := stream.Recv()
require.NoError(t, err)
require.Equal(t, events.ToMalfeasancePB(id, proof, false), resp.Proof)
require.Equal(t, events.ToMalfeasancePB(id, proofBytes, false), resp.Proof)
id, proof = BallotMalfeasance(t, db)
events.ReportMalfeasance(id, proof)
proofBytes = codec.MustEncode(proof)
events.ReportMalfeasance(id, proofBytes)
resp, err = stream.Recv()
require.NoError(t, err)
require.Equal(t, events.ToMalfeasancePB(id, proof, false), resp.Proof)
require.Equal(t, events.ToMalfeasancePB(id, proofBytes, false), resp.Proof)
}

type MeshAPIMockInstrumented struct {
Expand Down
11 changes: 5 additions & 6 deletions api/grpcserver/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/grpcserver/v2alpha1/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type testAccount struct {
}

func TestAccountService_List(t *testing.T) {
db := statesql.InMemory()
db := statesql.InMemoryTest(t)

ctrl, ctx := gomock.WithContext(context.Background(), t)
conState := NewMockaccountConState(ctrl)
Expand Down
47 changes: 28 additions & 19 deletions api/grpcserver/v2alpha1/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,30 +67,14 @@ func (s *ActivationStreamService) Stream(
}
}

dbChan := make(chan *types.ActivationTx, 100)
errChan := make(chan error, 1)

ops, err := toAtxOperations(toAtxRequest(request))
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}

// send db data to chan to avoid buffer overflow
go func() {
defer close(dbChan)
if err := atxs.IterateAtxsOps(s.db, ops, func(atx *types.ActivationTx) bool {
select {
case dbChan <- atx:
return true
case <-ctx.Done():
// exit if the stream context is canceled
return false
}
}); err != nil {
errChan <- status.Error(codes.Internal, err.Error())
return
}
}()
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
dbChan, errChan := s.fetchFromDB(ctx, ops)

var eventsOut <-chan events.ActivationTx
var eventsFull <-chan struct{}
Expand Down Expand Up @@ -145,6 +129,31 @@ func (s *ActivationStreamService) Stream(
}
}

func (s *ActivationStreamService) fetchFromDB(
ctx context.Context,
ops builder.Operations,
) (<-chan *types.ActivationTx, <-chan error) {
dbChan := make(chan *types.ActivationTx)
errChan := make(chan error, 1) // buffered to avoid blocking, routine should exit immediately after sending an error

go func() {
defer close(dbChan)
if err := atxs.IterateAtxsOps(s.db, ops, func(atx *types.ActivationTx) bool {
select {
case dbChan <- atx:
return true
case <-ctx.Done():
// exit if the context is canceled
return false
}
}); err != nil {
errChan <- status.Error(codes.Internal, err.Error())
}
}()

return dbChan, errChan
}

func toAtx(atx *types.ActivationTx) *spacemeshv2alpha1.Activation {
return &spacemeshv2alpha1.Activation{
Id: atx.ID().Bytes(),
Expand Down
Loading
Loading