Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

381_wire authentication #405

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.43
version: v1.55

- name: Lint proto files
uses: plexsystems/[email protected]
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Exposure of protobuf converters & `SignedState`, abstraction of tests and bug fi
- Expose protobuf converters: [#384] [#393]
- Use absolute module path in wire.proto: [#383]
- Create AppID Type to generalize app identifiers: [#378] [:boom:]
- Add abstract methods for wire communication Authentication in `wire` and provide default wire `wire/net/simple` [#402] [:boom:]


[#359]: https://github.com/hyperledger-labs/go-perun/pull/359
Expand All @@ -46,6 +47,7 @@ Exposure of protobuf converters & `SignedState`, abstraction of tests and bug fi
[#393]: https://github.com/hyperledger-labs/go-perun/pull/393
[#394]: https://github.com/hyperledger-labs/go-perun/pull/394
[#399]: https://github.com/hyperledger-labs/go-perun/pull/399
[#402]: https://github.com/hyperledger-labs/go-perun/pull/402


## [0.10.0] Janus - 2022-05-25 [:warning:]
Expand Down
3 changes: 1 addition & 2 deletions wire/net/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
PublishAttempts = 3
// PublishCooldown defines how long should be waited before Bus.Publish is
// called again in case it failed.
PublishCooldown = 3 * time.Second
PublishCooldown = 1 * time.Second
)

// NewBus creates a new network bus. The dialer and listener are used to
Expand Down Expand Up @@ -84,7 +84,6 @@ func (b *Bus) Publish(ctx context.Context, e *wire.Envelope) (err error) {
}
}
log.WithError(err).Warn("Publishing failed.")

// Authentication errors are not retried.
if IsAuthenticationError(err) {
return err
Expand Down
1 change: 1 addition & 0 deletions wire/net/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func (p *Endpoint) Send(ctx context.Context, e *wire.Envelope) error {

// Close closes the Endpoint's connection. A closed Endpoint is no longer usable.
func (p *Endpoint) Close() (err error) {
// fmt.Printf("close endpoint (port: %d)\n", p.conn.(*ioConn).conn.(*tls.Conn).LocalAddr().(*net.TCPAddr).Port)
return p.conn.Close()
}

Expand Down
20 changes: 13 additions & 7 deletions wire/net/endpoint_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ func (r *EndpointRegistry) authenticatedDial(
if err != nil {
return nil, errors.WithMessage(err, "failed to dial")
}

if err := ExchangeAddrsActive(ctx, r.id, addr, conn); err != nil {
conn.Close()
return nil, errors.WithMessage(err, "ExchangeAddrs failed")
Expand Down Expand Up @@ -294,16 +293,26 @@ func (r *EndpointRegistry) Has(addr wire.Address) bool {

// addEndpoint adds a new peer to the registry.
func (r *EndpointRegistry) addEndpoint(addr wire.Address, conn Conn, dialer bool) *Endpoint {
r.mutex.Lock()
defer r.mutex.Unlock()
r.Log().WithField("peer", addr).Trace("EndpointRegistry.addEndpoint")

e := newEndpoint(addr, conn)
fe, created := r.fullEndpoint(addr, e)
var newE *Endpoint
if !created {
if e, closed := fe.replace(e, r.id.Address(), dialer); closed {
return e
var closed bool
newE, closed = fe.replace(e, r.id.Address(), dialer)
key := wire.Key(addr)
entry := newFullEndpoint(newE)
r.endpoints[key] = entry
if closed {
return newE
}
}

if newE != nil {
e = newE
}
consumer := r.onNewEndpoint(addr)
// Start receiving messages.
go func() {
Expand All @@ -319,8 +328,6 @@ func (r *EndpointRegistry) addEndpoint(addr wire.Address, conn Conn, dialer bool
// fullEndpoint retrieves or creates a fullEndpoint for the passed address.
func (r *EndpointRegistry) fullEndpoint(addr wire.Address, e *Endpoint) (_ *fullEndpoint, created bool) {
key := wire.Key(addr)
r.mutex.Lock()
defer r.mutex.Unlock()
entry, ok := r.endpoints[key]
if !ok {
entry = newFullEndpoint(e)
Expand Down Expand Up @@ -360,7 +367,6 @@ func (p *fullEndpoint) replace(newValue *Endpoint, self wire.Address, dialer boo
log.Warn("Old Endpoint was already closed")
}
}

return newValue, false
}

Expand Down
86 changes: 86 additions & 0 deletions wire/net/simple/bus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2024 - See NOTICE file for copyright holders.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package simple_test

import (
"fmt"
"testing"
"time"

"net"

"github.com/stretchr/testify/assert"

"perun.network/go-perun/wire"
perunnet "perun.network/go-perun/wire/net"
"perun.network/go-perun/wire/net/simple"
perunio "perun.network/go-perun/wire/perunio/serializer"
wiretest "perun.network/go-perun/wire/test"
)

func TestBus(t *testing.T) {
const numClients = 16
const numMsgs = 16
const defaultTimeout = 15 * time.Millisecond

commonName := "127.0.0.1"
sans := []string{"127.0.0.1", "localhost"}
tlsConfigs, err := simple.GenerateSelfSignedCertConfigs(commonName, sans, numClients)
assert.NoError(t, err)

hosts := make([]string, numClients)
for i := 0; i < numClients; i++ {
port, err := findFreePort()
assert.NoError(t, err)
hosts[i] = fmt.Sprintf("127.0.0.1:%d", port)
}

dialers := make([]*simple.Dialer, numClients)
for j := 0; j < numClients; j++ {
dialers[j] = simple.NewTCPDialer(defaultTimeout, tlsConfigs[j])
}

i := 0

wiretest.GenericBusTest(t, func(acc wire.Account) (wire.Bus, wire.Bus) {
for j := 0; j < numClients; j++ {
dialers[j].Register(acc.Address(), hosts[i])
}

bus := perunnet.NewBus(acc, dialers[i], perunio.Serializer())
listener, err := simple.NewTCPListener(hosts[i], tlsConfigs[i])
assert.NoError(t, err)
go bus.Listen(listener)
i++
return bus, bus
}, numClients, numMsgs)

for j := 0; j < numClients; j++ {
assert.NoError(t, dialers[j].Close())
}
}

func findFreePort() (int, error) {
// Create a listener on a random port to get an available port.
l, err := net.Listen("tcp", ":0") // Use ":0" to bind to a random free port
if err != nil {
return 0, err
}
defer l.Close()

// Get the port from the listener's address
addr := l.Addr().(*net.TCPAddr)
return addr.Port, nil
}
33 changes: 27 additions & 6 deletions wire/net/simple/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package simple
import (
"context"
"crypto/tls"
"fmt"
"net"
"sync"
"time"
Expand All @@ -30,10 +31,11 @@ import (
// Dialer is a simple lookup-table based dialer that can dial known peers.
// New peer addresses can be added via Register().
type Dialer struct {
mutex sync.RWMutex // Protects peers.
peers map[wire.AddrKey]string // Known peer addresses.
dialer tls.Dialer // Used to dial connections.
network string // The socket type.
mutex sync.RWMutex // Protects peers.
peers map[wire.AddrKey]string // Known peer addresses.
dialer tls.Dialer // Used to dial connections.
network string // The socket type.
connections []wirenet.Conn

pkgsync.Closer
}
Expand All @@ -54,7 +56,8 @@ func NewNetDialer(network string, defaultTimeout time.Duration, tlsConfig *tls.C
NetDialer: netDialer,
Config: tlsConfig,
},
network: network,
network: network,
connections: make([]wirenet.Conn, 0),
}
}

Expand Down Expand Up @@ -103,7 +106,9 @@ func (d *Dialer) Dial(ctx context.Context, addr wire.Address, ser wire.EnvelopeS
return nil, errors.Wrap(err, "failed to dial peer")
}

return wirenet.NewIoConn(conn, ser), nil
wireConn := wirenet.NewIoConn(conn, ser)
d.connections = append(d.connections, wireConn)
return wireConn, nil
}

// Register registers a network address for a peer address.
Expand All @@ -113,3 +118,19 @@ func (d *Dialer) Register(addr wire.Address, address string) {

d.peers[wire.Key(addr)] = address
}

// Close closes the Dialer and cleans up any associated resources.
func (d *Dialer) Close() error {
if !d.IsClosed() {
// Mark the Dialer as closed.
d.Closer.Close()

// Close all associated connections
for _, conn := range d.connections {
conn.Close()
}
return nil
}

return fmt.Errorf("dialer already closed")
}
Loading
Loading