From 7d1e92175c5dc7bd49c3a099c77cd42b3efcc48e Mon Sep 17 00:00:00 2001 From: b5 Date: Sat, 24 Aug 2019 19:59:26 -0400 Subject: [PATCH 1/4] feat(dsync remove): added hooks, remove, and meta params to dsync having optional delete requests bumps the usefulness of dsync, I've also added a few extra hooks, and meta params in a few spots --- dsync/dsync.go | 81 +++++++++++++++++++++++++++++++++++++--------- dsync/http.go | 80 ++++++++++++++++++++++++++++++++++++++++++--- dsync/p2p.go | 77 ++++++++++++++++++++++++++++++++++++++----- dsync/pull.go | 9 +++--- dsync/pull_test.go | 2 +- go.mod | 1 + go.sum | 4 +++ 7 files changed, 221 insertions(+), 33 deletions(-) diff --git a/dsync/dsync.go b/dsync/dsync.go index a658ecb..fd7a2f5 100644 --- a/dsync/dsync.go +++ b/dsync/dsync.go @@ -63,11 +63,20 @@ type DagSyncable interface { // GetDagInfo asks the remote for info specified by a the root identifier // string of a DAG - GetDagInfo(ctx context.Context, cidStr string) (info *dag.Info, err error) + GetDagInfo(ctx context.Context, cidStr string, meta map[string]string) (info *dag.Info, err error) // GetBlock gets a block of data from the remote GetBlock(ctx context.Context, hash string) (rawdata []byte, err error) + + // RemoveCID asks the remote to remove a cid. Supporting deletes are optional. + // DagSyncables that don't implement DeleteCID must return + // ErrDeleteNotSupported + RemoveCID(ctx context.Context, cidStr string, meta map[string]string) (err error) } +// ErrRemoveNotSupported is the error value returned by remotes that don't +// support delete operations +var ErrRemoveNotSupported = fmt.Errorf("remove is not supported") + // Hook is a function that a dsync instance will call at specified points in the // sync lifecycle type Hook func(ctx context.Context, info dag.Info, meta map[string]string) error @@ -110,16 +119,23 @@ type Dsync struct { // struct for accepting p2p dsync requests p2pHandler *p2pHandler + // requireAllBlocks forces pushes to send *all* blocks, + // skipping manifest diffing + requireAllBlocks bool + // should dsync honor remove requests? + allowRemoves bool + // preCheck is called before creating a receive session preCheck Hook // dagFinalCheck is called before finalizing a receive session finalCheck Hook // onCompleteHook is optionally called once dag sync is complete onCompleteHook Hook - - // requireAllBlocks forces pushes to send *all* blocks, - // skipping manifest diffing - requireAllBlocks bool + // getDagInfoCheck is an optional hook to call when a client asks for a dag + // info + getDagInfoCheck Hook + // removeCheck is an optional hook to call before allowing a delete + removeCheck Hook // inbound transfers in progress, will be nil if not acting as a remote sessionLock sync.Mutex @@ -141,6 +157,15 @@ type Config struct { // to send & push over libp2p connections, provide a libp2p host Libp2pHost host.Host + // RequireAllBlocks will skip checking for blocks already present on the + // remote, requiring push requests to send all blocks each time + // This is a helpful override if the receiving node can't distinguish between + // local and network block access, as with the ipfs-http-api intreface + RequireAllBlocks bool + // AllowRemoves let's dsync opt into remove requests. removes are + // disabled by default + AllowRemoves bool + // PinAPI is required for remotes to accept PinAPI coreiface.PinAPI // required check function for a remote accepting DAGs @@ -149,12 +174,6 @@ type Config struct { FinalCheck Hook // optional check function called after successful transfer OnComplete Hook - - // RequireAllBlocks will skip checking for blocks already present on the - // remote, requiring push requests to send all blocks each time - // This is a helpful override if the receiving node can't distinguish between - // local and network block access, as with the ipfs-http-api intreface - RequireAllBlocks bool } // Validate confirms the configuration is valid @@ -303,12 +322,12 @@ func (ds *Dsync) NewPushInfo(info *dag.Info, remoteAddr string, pinOnComplete bo // NewPull creates a pull. A pull fetches an entire DAG from a remote, placing // it in the local block store -func (ds *Dsync) NewPull(cidStr, remoteAddr string) (*Pull, error) { +func (ds *Dsync) NewPull(cidStr, remoteAddr string, meta map[string]string) (*Pull, error) { rem, err := ds.syncableRemote(remoteAddr) if err != nil { return nil, err } - return NewPull(cidStr, ds.lng, ds.bapi, rem) + return NewPull(cidStr, ds.lng, ds.bapi, rem, meta) } // NewReceiveSession takes a manifest sent by a remote and initiates a @@ -419,7 +438,7 @@ func (ds *Dsync) finalizeReceive(sess *session) error { } // GetDagInfo gets the manifest for a DAG rooted at id, checking any configured cache before falling back to generating a new manifest -func (ds *Dsync) GetDagInfo(ctx context.Context, hash string) (info *dag.Info, err error) { +func (ds *Dsync) GetDagInfo(ctx context.Context, hash string, meta map[string]string) (info *dag.Info, err error) { // check cache if one is specified if ds.infoStore != nil { if info, err = ds.infoStore.DAGInfo(ctx, hash); err == nil { @@ -433,7 +452,18 @@ func (ds *Dsync) GetDagInfo(ctx context.Context, hash string) (info *dag.Info, e return nil, err } - return dag.NewInfo(ctx, ds.lng, id) + info, err = dag.NewInfo(ctx, ds.lng, id) + if err != nil { + return nil, err + } + + if ds.getDagInfoCheck != nil { + if err = ds.getDagInfoCheck(ctx, *info, meta); err != nil { + return nil, err + } + } + + return info, nil } // GetBlock returns a single block from the store @@ -445,3 +475,24 @@ func (ds *Dsync) GetBlock(ctx context.Context, hash string) ([]byte, error) { return ioutil.ReadAll(rdr) } + +// RemoveCID unpins a CID if removes are enabled, does not immideately remove +// unpinned content +func (ds *Dsync) RemoveCID(ctx context.Context, cidStr string, meta map[string]string) error { + if !ds.allowRemoves { + return ErrRemoveNotSupported + } + + if ds.removeCheck != nil { + info := dag.Info{Manifest: &dag.Manifest{Nodes: []string{cidStr}}} + if err := ds.removeCheck(ctx, info, meta); err != nil { + return err + } + } + + if ds.pin != nil { + return ds.pin.Rm(ctx, path.New(cidStr)) + } + + return nil +} diff --git a/dsync/http.go b/dsync/http.go index e0d38e5..9909390 100644 --- a/dsync/http.go +++ b/dsync/http.go @@ -107,9 +107,19 @@ func (rem *HTTPClient) ReceiveBlock(sid, hash string, data []byte) ReceiveRespon } // GetDagInfo fetches a manifest from a remote source over HTTP -func (rem *HTTPClient) GetDagInfo(ctx context.Context, id string) (info *dag.Info, err error) { - url := fmt.Sprintf("%s?manifest=%s", rem.URL, id) - req, err := http.NewRequest("GET", url, nil) +func (rem *HTTPClient) GetDagInfo(ctx context.Context, id string, meta map[string]string) (info *dag.Info, err error) { + u, err := url.Parse(rem.URL) + if err != nil { + return + } + q := u.Query() + q.Set("manifest", id) + for key, val := range meta { + q.Set(key, val) + } + u.RawQuery = q.Encode() + + req, err := http.NewRequest("GET", u.String(), nil) if err != nil { return nil, err } @@ -158,6 +168,43 @@ func (rem *HTTPClient) GetBlock(ctx context.Context, id string) (data []byte, er return ioutil.ReadAll(res.Body) } +// RemoveCID asks a remote to remove a CID +func (rem *HTTPClient) RemoveCID(ctx context.Context, id string, meta map[string]string) (err error) { + u, err := url.Parse(rem.URL) + if err != nil { + return + } + q := u.Query() + q.Set("manifest", id) + for key, val := range meta { + q.Set(key, val) + } + u.RawQuery = q.Encode() + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return err + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + + if res.StatusCode != http.StatusOK { + var msg string + if data, err := ioutil.ReadAll(res.Body); err == nil { + msg = string(data) + } + if msg == ErrRemoveNotSupported.Error() { + return ErrRemoveNotSupported + } + return fmt.Errorf("remote: %d %s", res.StatusCode, msg) + } + + return nil +} + // HTTPRemoteHandler exposes a Dsync remote over HTTP by exposing a HTTP handler // that interlocks with methods exposed by HTTPClient func HTTPRemoteHandler(ds *Dsync) http.HandlerFunc { @@ -178,7 +225,6 @@ func HTTPRemoteHandler(ds *Dsync) http.HandlerFunc { return } - log.Debug("new receive via HTTP", r.URL.String()) pinOnComplete := r.FormValue("pin") == "true" meta := map[string]string{} for key := range r.URL.Query() { @@ -224,7 +270,15 @@ func HTTPRemoteHandler(ds *Dsync) http.HandlerFunc { w.WriteHeader(http.StatusBadRequest) w.Write([]byte("either manifest or block query params are required")) } else if mfstID != "" { - mfst, err := ds.GetDagInfo(r.Context(), mfstID) + + meta := map[string]string{} + for key := range r.URL.Query() { + if key != "manifest" { + meta[key] = r.URL.Query().Get(key) + } + } + + mfst, err := ds.GetDagInfo(r.Context(), mfstID, meta) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) @@ -250,6 +304,22 @@ func HTTPRemoteHandler(ds *Dsync) http.HandlerFunc { w.Header().Set("Content-Type", "application/octet-stream") w.Write(data) } + case "DELETE": + cid := r.FormValue("cid") + meta := map[string]string{} + for key := range r.URL.Query() { + if key != "cid" { + meta[key] = r.URL.Query().Get(key) + } + } + + if err := ds.RemoveCID(r.Context(), cid, meta); err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + return + } + + w.WriteHeader(http.StatusOK) } } } diff --git a/dsync/p2p.go b/dsync/p2p.go index 2f852ad..2bca754 100644 --- a/dsync/p2p.go +++ b/dsync/p2p.go @@ -32,6 +32,8 @@ var ( mtGetDagInfo = p2putil.MsgType("get_daginfo") // mtGetBlock identifies the "get_block" message type mtGetBlock = p2putil.MsgType("get_block") + // mtRemoveCID identifies the "remove_cid" message type + mtRemoveCID = p2putil.MsgType("remove_cid") ) type p2pClient struct { @@ -106,10 +108,14 @@ func (c *p2pClient) ReceiveBlock(sid, cidStr string, data []byte) ReceiveRespons // GetDagInfo asks the remote for info specified by a the root identifier // string of a DAG -func (c *p2pClient) GetDagInfo(ctx context.Context, cidStr string) (info *dag.Info, err error) { - msg := p2putil.NewMessage(c.host.ID(), mtGetDagInfo, nil).WithHeaders( - "cid", cidStr, - ) +func (c *p2pClient) GetDagInfo(ctx context.Context, cidStr string, meta map[string]string) (info *dag.Info, err error) { + + headers := []string{"phase", "request", "cid", cidStr} + for key, val := range meta { + headers = append(headers, key, val) + } + + msg := p2putil.NewMessage(c.host.ID(), mtGetDagInfo, nil).WithHeaders(headers...) res, err := c.sendMessage(ctx, msg, c.remotePeerID) if err != nil { @@ -133,6 +139,27 @@ func (c *p2pClient) GetBlock(ctx context.Context, cidStr string) (rawdata []byte return res.Body, nil } +// RemoveCID asks the remote to remove a CID +func (c *p2pClient) RemoveCID(ctx context.Context, cidStr string, meta map[string]string) (err error) { + headers := []string{"phase", "request", "cid", cidStr} + for key, val := range meta { + headers = append(headers, key, val) + } + + msg := p2putil.NewMessage(c.host.ID(), mtRemoveCID, nil).WithHeaders(headers...) + + res, err := c.sendMessage(ctx, msg, c.remotePeerID) + if err != nil { + return err + } + + if e := res.Header("error"); e != "" { + return fmt.Errorf(e) + } + + return nil +} + // p2pHandler implements dsync as a libp2p protocol handler type p2pHandler struct { dsync *Dsync @@ -140,9 +167,6 @@ type p2pHandler struct { handlers map[p2putil.MsgType]p2putil.HandlerFunc } -// assert at compile time that p2pHandler implements DagSyncable -// var _ DagSyncable = (*p2pHandler)(nil) - // newp2pHandler creates a p2p remote stream handler from a dsync.Remote func newp2pHandler(dsync *Dsync, host host.Host) *p2pHandler { c := &p2pHandler{dsync: dsync, host: host} @@ -151,6 +175,7 @@ func newp2pHandler(dsync *Dsync, host host.Host) *p2pHandler { mtReceiveBlock: c.HandleReceiveBlock, mtGetDagInfo: c.HandleReqManifest, mtGetBlock: c.HandleGetBlock, + mtRemoveCID: c.HandleRemoveCID, } return c } @@ -310,8 +335,15 @@ func (c *p2pHandler) HandleReqManifest(ws *p2putil.WrappedStream, msg p2putil.Me cidStr := msg.Header("cid") res := msg.WithHeaders("phase", "response") + meta := map[string]string{} + for key, val := range msg.Headers { + if key != "cid" && key != "phase" { + meta[key] = val + } + } + // TODO (b5): pass a context into here - if di, err := c.dsync.GetDagInfo(context.Background(), cidStr); err != nil { + if di, err := c.dsync.GetDagInfo(context.Background(), cidStr, meta); err != nil { res = res.WithHeaders("error", err.Error()) } else { data, err := di.MarshalCBOR() @@ -345,3 +377,32 @@ func (c *p2pHandler) HandleGetBlock(ws *p2putil.WrappedStream, msg p2putil.Messa } return false } + +// HandleRemoveCID removes a CID on the remote +func (c *p2pHandler) HandleRemoveCID(ws *p2putil.WrappedStream, msg p2putil.Message) (hangup bool) { + if msg.Header("phase") == "request" { + + cid := msg.Header("cid") + meta := map[string]string{} + for key, val := range msg.Headers { + if key != "pin" && key != "phase" { + meta[key] = val + } + } + + res := msg.WithHeaders( + "phase", "response", + "cid", cid, + ) + + if err := c.dsync.RemoveCID(context.Background(), cid, meta); err != nil { + res = msg.WithHeaders("error", err.Error()) + } + + if err := ws.SendMessage(res); err != nil { + return true + } + } + + return false +} diff --git a/dsync/pull.go b/dsync/pull.go index 9702865..06fd4b6 100644 --- a/dsync/pull.go +++ b/dsync/pull.go @@ -12,7 +12,7 @@ import ( ) // NewPull sets up fetching a DAG at an id from a remote -func NewPull(cidStr string, lng ipld.NodeGetter, bapi coreiface.BlockAPI, rem DagSyncable) (pull *Pull, err error) { +func NewPull(cidStr string, lng ipld.NodeGetter, bapi coreiface.BlockAPI, rem DagSyncable, meta map[string]string) (pull *Pull, err error) { f := &Pull{ path: cidStr, lng: lng, @@ -28,8 +28,8 @@ func NewPull(cidStr string, lng ipld.NodeGetter, bapi coreiface.BlockAPI, rem Da } // NewPullWithInfo creates a pull when we already have a dag.Info -func NewPullWithInfo(info *dag.Info, lng ipld.NodeGetter, bapi coreiface.BlockAPI, rem DagSyncable) (pull *Pull, err error) { - f, err := NewPull(info.RootCID().String(), lng, bapi, rem) +func NewPullWithInfo(info *dag.Info, lng ipld.NodeGetter, bapi coreiface.BlockAPI, rem DagSyncable, meta map[string]string) (pull *Pull, err error) { + f, err := NewPull(info.RootCID().String(), lng, bapi, rem, meta) if err != nil { return nil, err } @@ -40,6 +40,7 @@ func NewPullWithInfo(info *dag.Info, lng ipld.NodeGetter, bapi coreiface.BlockAP // Pull coordinates the transfer of missing blocks in a DAG from a remote to a block store type Pull struct { path string + meta map[string]string info *dag.Info diff *dag.Manifest remote DagSyncable @@ -76,7 +77,7 @@ func (f *Pull) Do(ctx context.Context) (err error) { // - every time we receive a block, check if we're done if f.info == nil { // request a manifest from the remote if we don't have one - if f.info, err = f.remote.GetDagInfo(ctx, f.path); err != nil { + if f.info, err = f.remote.GetDagInfo(ctx, f.path, f.meta); err != nil { return } } diff --git a/dsync/pull_test.go b/dsync/pull_test.go index db406c4..bbe35d8 100644 --- a/dsync/pull_test.go +++ b/dsync/pull_test.go @@ -19,7 +19,7 @@ func TestPull(t *testing.T) { bapi: b.Block(), } - p, err := NewPull(id.String(), aGetter, a.Block(), rem) + p, err := NewPull(id.String(), aGetter, a.Block(), rem, nil) if err != nil { t.Fatal(err) } diff --git a/go.mod b/go.mod index 9b14d65..1fafbb4 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/libp2p/go-libp2p-protocol v0.0.1 github.com/multiformats/go-multicodec v0.1.6 github.com/multiformats/go-multihash v0.0.5 + github.com/prometheus/common v0.4.0 github.com/spf13/cobra v0.0.2 github.com/ugorji/go/codec v1.1.5-pre google.golang.org/appengine v1.4.0 // indirect diff --git a/go.sum b/go.sum index 97b287c..f8f7d95 100644 --- a/go.sum +++ b/go.sum @@ -15,7 +15,9 @@ github.com/Stebalien/go-bitfield v0.0.0-20180330043415-076a62f9ce6e/go.mod h1:3o github.com/Stebalien/go-bitfield v0.0.1 h1:X3kbSSPUaJK60wV2hjOPZwmpljr6VGCqdq4cBLhbQBo= github.com/Stebalien/go-bitfield v0.0.1/go.mod h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI+XWrX9Wf2XH0s= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= @@ -583,6 +585,7 @@ github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q github.com/shurcooL/go v0.0.0-20180423040247-9e1955d9fb6e/go.mod h1:TDJrrUr11Vxrven61rcy3hJMUqaf/CLWYhHNPmT14Lk= github.com/shurcooL/go-goon v0.0.0-20170922171312-37c2f522c041/go.mod h1:N5mDOmsrJOB+vfqUK+7DmDyjhSLIIBnXo9lvZJj3MWQ= github.com/sirupsen/logrus v1.0.5/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= +github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= @@ -759,6 +762,7 @@ google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod h1:JiN7NxoA google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= +gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= From f3c7d670734cbc78959ef38b55b77c33c520919b Mon Sep 17 00:00:00 2001 From: b5 Date: Sat, 24 Aug 2019 20:06:56 -0400 Subject: [PATCH 2/4] refactor(dsync): rename hooks also, like, wire 'em up. --- dsync/dsync.go | 35 ++++++++++++++++++++++------------- dsync/dsync_test.go | 2 +- dsync/http_test.go | 4 ++-- dsync/p2p_test.go | 2 +- 4 files changed, 26 insertions(+), 17 deletions(-) diff --git a/dsync/dsync.go b/dsync/dsync.go index fd7a2f5..1c5397c 100644 --- a/dsync/dsync.go +++ b/dsync/dsync.go @@ -156,6 +156,8 @@ type Config struct { HTTPRemoteAddress string // to send & push over libp2p connections, provide a libp2p host Libp2pHost host.Host + // PinAPI is required for remotes to accept pinning requests + PinAPI coreiface.PinAPI // RequireAllBlocks will skip checking for blocks already present on the // remote, requiring push requests to send all blocks each time @@ -166,22 +168,27 @@ type Config struct { // disabled by default AllowRemoves bool - // PinAPI is required for remotes to accept - PinAPI coreiface.PinAPI - // required check function for a remote accepting DAGs - PreCheck Hook + // required check function for a remote accepting DAGs, this hook will be + // called before a push is allowed to begin + PushPreCheck Hook // optional check function for screening a receive before potentially pinning - FinalCheck Hook + PushFinalCheck Hook // optional check function called after successful transfer - OnComplete Hook + PushComplete Hook + // optional check to run on dagInfo requests before sending an info back + GetDagInfoCheck Hook + // optional check to run before executing a remove operation + // the dag.Info given to this check will only contain the root CID being + // removed + RemoveCheck Hook } // Validate confirms the configuration is valid func (cfg *Config) Validate() error { - if cfg.PreCheck == nil { + if cfg.PushPreCheck == nil { return fmt.Errorf("PreCheck is required") } - if cfg.FinalCheck == nil { + if cfg.PushFinalCheck == nil { return fmt.Errorf("FinalCheck is required") } return nil @@ -201,8 +208,8 @@ func OptLibp2pHost(host host.Host) func(cfg *Config) { // to get an offline-only node getter from an ipfs CoreAPI interface func New(localNodes ipld.NodeGetter, blockStore coreiface.BlockAPI, opts ...func(cfg *Config)) (*Dsync, error) { cfg := &Config{ - PreCheck: DefaultDagPrecheck, - FinalCheck: DefaultDagFinalCheck, + PushPreCheck: DefaultDagPrecheck, + PushFinalCheck: DefaultDagFinalCheck, } for _, opt := range opts { @@ -217,9 +224,11 @@ func New(localNodes ipld.NodeGetter, blockStore coreiface.BlockAPI, opts ...func lng: localNodes, bapi: blockStore, - preCheck: cfg.PreCheck, - finalCheck: cfg.FinalCheck, - onCompleteHook: cfg.OnComplete, + preCheck: cfg.PushPreCheck, + finalCheck: cfg.PushFinalCheck, + onCompleteHook: cfg.PushComplete, + getDagInfoCheck: cfg.GetDagInfoCheck, + removeCheck: cfg.RemoveCheck, requireAllBlocks: cfg.RequireAllBlocks, sessionPool: map[string]*session{}, diff --git a/dsync/dsync_test.go b/dsync/dsync_test.go index 471a496..530918a 100644 --- a/dsync/dsync_test.go +++ b/dsync/dsync_test.go @@ -62,7 +62,7 @@ func ExampleNew() { // we MUST override the PreCheck function. In this example we're making sure // no one sends us a bad hash: - cfg.PreCheck = func(ctx context.Context, info dag.Info, _ map[string]string) error { + cfg.PushPreCheck = func(ctx context.Context, info dag.Info, _ map[string]string) error { if info.Manifest.Nodes[0] == "BadHash" { return fmt.Errorf("rejected for secret reasons") } diff --git a/dsync/http_test.go b/dsync/http_test.go index e0882cf..d3bd4f6 100644 --- a/dsync/http_test.go +++ b/dsync/http_test.go @@ -49,8 +49,8 @@ func TestSyncHTTP(t *testing.T) { bGetter := &dag.NodeGetter{Dag: b.Dag()} ts, err := New(bGetter, b.Block(), func(cfg *Config) { - cfg.PreCheck = func(context.Context, dag.Info, map[string]string) error { return nil } - cfg.OnComplete = onCompleteHook + cfg.PushPreCheck = func(context.Context, dag.Info, map[string]string) error { return nil } + cfg.PushComplete = onCompleteHook }) if err != nil { t.Fatal(err) diff --git a/dsync/p2p_test.go b/dsync/p2p_test.go index 2de142f..05dcc9c 100644 --- a/dsync/p2p_test.go +++ b/dsync/p2p_test.go @@ -53,7 +53,7 @@ func TestNewP2P(t *testing.T) { // we MUST override the PreCheck function. In this example we're making sure // no one sends us a bad hash: - cfg.PreCheck = func(ctx context.Context, info dag.Info, _ map[string]string) error { + cfg.PushPreCheck = func(ctx context.Context, info dag.Info, _ map[string]string) error { if info.Manifest.Nodes[0] == "BadHash" { return fmt.Errorf("rejected for secret reasons") } From e64a8bdf6fb2213a1ee0057ec4561d6a61cbf154 Mon Sep 17 00:00:00 2001 From: b5 Date: Sun, 25 Aug 2019 10:31:07 -0400 Subject: [PATCH 3/4] test(dsync): add test for remove hooks over HTTP found a few mistakes in the process --- dsync/dsync.go | 11 +++++--- dsync/http.go | 4 +-- dsync/http_test.go | 70 +++++++++++++++++++--------------------------- dsync/p2p_test.go | 2 +- 4 files changed, 39 insertions(+), 48 deletions(-) diff --git a/dsync/dsync.go b/dsync/dsync.go index 1c5397c..2e09f74 100644 --- a/dsync/dsync.go +++ b/dsync/dsync.go @@ -224,16 +224,18 @@ func New(localNodes ipld.NodeGetter, blockStore coreiface.BlockAPI, opts ...func lng: localNodes, bapi: blockStore, + requireAllBlocks: cfg.RequireAllBlocks, + allowRemoves: cfg.AllowRemoves, + preCheck: cfg.PushPreCheck, finalCheck: cfg.PushFinalCheck, onCompleteHook: cfg.PushComplete, getDagInfoCheck: cfg.GetDagInfoCheck, removeCheck: cfg.RemoveCheck, - requireAllBlocks: cfg.RequireAllBlocks, - sessionPool: map[string]*session{}, - sessionCancels: map[string]context.CancelFunc{}, - sessionTTLDur: time.Hour * 5, + sessionPool: map[string]*session{}, + sessionCancels: map[string]context.CancelFunc{}, + sessionTTLDur: time.Hour * 5, } if cfg.PinAPI != nil { @@ -492,6 +494,7 @@ func (ds *Dsync) RemoveCID(ctx context.Context, cidStr string, meta map[string]s return ErrRemoveNotSupported } + log.Debug("removing cid", cidStr) if ds.removeCheck != nil { info := dag.Info{Manifest: &dag.Manifest{Nodes: []string{cidStr}}} if err := ds.removeCheck(ctx, info, meta); err != nil { diff --git a/dsync/http.go b/dsync/http.go index 9909390..453cd3a 100644 --- a/dsync/http.go +++ b/dsync/http.go @@ -175,13 +175,13 @@ func (rem *HTTPClient) RemoveCID(ctx context.Context, id string, meta map[string return } q := u.Query() - q.Set("manifest", id) + q.Set("cid", id) for key, val := range meta { q.Set(key, val) } u.RawQuery = q.Encode() - req, err := http.NewRequest("GET", u.String(), nil) + req, err := http.NewRequest("DELETE", u.String(), nil) if err != nil { return err } diff --git a/dsync/http_test.go b/dsync/http_test.go index d3bd4f6..4794486 100644 --- a/dsync/http_test.go +++ b/dsync/http_test.go @@ -1,19 +1,14 @@ package dsync import ( - "bytes" "context" "io/ioutil" "net/http/httptest" "strings" "testing" - "github.com/qri-io/dag" - - "github.com/ipfs/go-cid" files "github.com/ipfs/go-ipfs-files" - ipld "github.com/ipfs/go-ipld-format" - coreiface "github.com/ipfs/interface-go-ipfs-core" + "github.com/qri-io/dag" ) func TestSyncHTTP(t *testing.T) { @@ -29,6 +24,7 @@ func TestSyncHTTP(t *testing.T) { t.Fatal(err) } + // yooooooooooooooooooooo f := files.NewReaderFile(ioutil.NopCloser(strings.NewReader("y" + strings.Repeat("o", 350)))) path, err := a.Unixfs().Add(ctx, f) if err != nil { @@ -47,16 +43,24 @@ func TestSyncHTTP(t *testing.T) { return nil } + removeCheckCalled := make(chan struct{}, 1) + removeCheckHook := func(_ context.Context, _ dag.Info, _ map[string]string) error { + removeCheckCalled <- struct{}{} + return nil + } + bGetter := &dag.NodeGetter{Dag: b.Dag()} - ts, err := New(bGetter, b.Block(), func(cfg *Config) { + bdsync, err := New(bGetter, b.Block(), func(cfg *Config) { + cfg.AllowRemoves = true cfg.PushPreCheck = func(context.Context, dag.Info, map[string]string) error { return nil } cfg.PushComplete = onCompleteHook + cfg.RemoveCheck = removeCheckHook }) if err != nil { t.Fatal(err) } - s := httptest.NewServer(HTTPRemoteHandler(ts)) + s := httptest.NewServer(HTTPRemoteHandler(bdsync)) defer s.Close() cli := &HTTPClient{URL: s.URL + "/dsync"} @@ -77,49 +81,33 @@ func TestSyncHTTP(t *testing.T) { } <-onCompleteCalled -} -// remote implements the Remote interface on a single receive session at a time -type remote struct { - receive *session - lng ipld.NodeGetter - bapi coreiface.BlockAPI -} - -func (r *remote) PushStart(info *dag.Info) (sid string, diff *dag.Manifest, err error) { - ctx := context.Background() - r.receive, err = newSession(ctx, r.lng, r.bapi, info, false, false, nil) - if err != nil { - return + if err := cli.RemoveCID(ctx, info.RootCID().String(), nil); err != nil { + t.Error(err) } - sid = r.receive.id - diff = r.receive.diff - return -} -func (r *remote) PushBlock(sid, hash string, data []byte) ReceiveResponse { - return r.receive.ReceiveBlock(hash, bytes.NewReader(data)) + <-removeCheckCalled } -func (r *remote) PullManifest(ctx context.Context, hash string) (mfst *dag.Manifest, err error) { - id, err := cid.Parse(hash) - if err != nil { - return nil, err - } - - return dag.NewManifest(ctx, r.lng, id) -} +func TestRemoveNotSupported(t *testing.T) { + ctx := context.Background() -func (r *remote) GetBlock(ctx context.Context, hash string) ([]byte, error) { - id, err := cid.Parse(hash) + _, b, err := makeAPI(ctx) if err != nil { - return nil, err + t.Fatal(err) } - node, err := r.lng.Get(ctx, id) + bGetter := &dag.NodeGetter{Dag: b.Dag()} + bdsync, err := New(bGetter, b.Block()) if err != nil { - return nil, err + t.Fatal(err) } - return node.RawData(), nil + s := httptest.NewServer(HTTPRemoteHandler(bdsync)) + defer s.Close() + + cli := &HTTPClient{URL: s.URL + "/dsync"} + if err := cli.RemoveCID(ctx, "foo", nil); err != ErrRemoveNotSupported { + t.Errorf("expected error remoce not supported, got: %s", err.Error()) + } } diff --git a/dsync/p2p_test.go b/dsync/p2p_test.go index 05dcc9c..c7a0523 100644 --- a/dsync/p2p_test.go +++ b/dsync/p2p_test.go @@ -79,7 +79,7 @@ func TestNewP2P(t *testing.T) { t.Fatal(err) } - // We want to see progress, so we spin up a goroutine to listen for updates + // We want to see progress, so we spin up a goroutine to listen for updates waitForFmt := make(chan struct{}) go func() { updates := push.Updates() From 64f1cb134a081cea4d129740e15233ca06c55227 Mon Sep 17 00:00:00 2001 From: b5 Date: Mon, 26 Aug 2019 08:00:47 -0400 Subject: [PATCH 4/4] refactor(plugin): update plugin to new api --- dsync/dsync_ipfs_plugin/plugin.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dsync/dsync_ipfs_plugin/plugin.go b/dsync/dsync_ipfs_plugin/plugin.go index d77fcc9..a86a33f 100644 --- a/dsync/dsync_ipfs_plugin/plugin.go +++ b/dsync/dsync_ipfs_plugin/plugin.go @@ -128,7 +128,7 @@ func (p *DsyncPlugin) Start(capi coreiface.CoreAPI) error { // we MUST override the PreCheck function. In this example we're making sure // no one sends us a bad hash: - cfg.PreCheck = p.pushPreCheck + cfg.PushPreCheck = p.pushPreCheck // in order for remotes to allow pinning, dsync must be provided a PinAPI: cfg.PinAPI = capi.Pin() @@ -320,7 +320,7 @@ func newPullHandler(dsyncHost *dsync.Dsync) http.HandlerFunc { } fmt.Printf("performing pull:\n\tcid: %s\n\tremote: %s\n\tpin: %t\n", p.Cid, p.Addr, p.Pin) - pull, err := dsyncHost.NewPull(p.Cid, p.Addr) + pull, err := dsyncHost.NewPull(p.Cid, p.Addr, nil) if err != nil { fmt.Printf("error creating pull: %s\n", err.Error()) w.Write([]byte(err.Error()))