Skip to content

Commit

Permalink
Replace Router with OnPublishReceived
Browse files Browse the repository at this point in the history
This demonstrated how Router coule be replaced
in a mostly non-breaking way.
Note that I have not worked through all
examples to test (know that autopaho/rpc will
be broken).
  • Loading branch information
MattBrittan committed Nov 22, 2023
1 parent 28d6df3 commit 0b237dc
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 57 deletions.
126 changes: 110 additions & 16 deletions paho/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,15 @@ var (
)

type (
// ClientConfig are the user configurable options for the client, an
PublishReceived struct {
Packet *Publish
Client *Client // The Client that received the message (note that the connection may have been lost post-receipt)

AlreadyHandled bool // Set to true if a previous callback has returned true (indicating some action has already been taken re the message)
Errs []error // Errors returned by previous handlers (if any).
}

// ClientConfig are the user-configurable options for the client, an
// instance of this struct is passed into NewClient(), not all options
// are required to be set, defaults are provided for Persistence, MIDs,
// PingHandler, PacketTimeout and Router.
Expand All @@ -47,9 +55,21 @@ type (
Session session.SessionManager
autoCloseSession bool

AuthHandler Auther
PingHandler Pinger
Router Router
AuthHandler Auther
PingHandler Pinger

// Router - new inbound messages will be passed to the `Route(*packets.Publish)` function.
//
// Depreciated: If a router is provided, it will now be added to the end of the OnPublishReceived
// slice (which provides a more flexible approach to handling incoming messages).
Router Router

// OnPublishReceived provides a slice of callbacks; additional handlers may be added after the client has been
// created via the AddOnPublishReceived function (Client holds a copy of the slice; OnPublishReceived will not change).
// When a `PUBLISH` is received, the callbacks will be called in order. If a callback processes the message,
// then it should return true. This boolean, and any errors, will be passed to subsequent handlers.
OnPublishReceived []func(PublishReceived) (bool, error)

PacketTimeout time.Duration
// OnServerDisconnect is called only when a packets.DISCONNECT is received from server
OnServerDisconnect func(*Disconnect)
Expand All @@ -76,6 +96,12 @@ type (
Client struct {
mu sync.Mutex
ClientConfig

// OnPublishReceived copy of OnPublishReceived from ClientConfig (perhaps with added callback form Router)
onPublishReceived []func(PublishReceived) (bool, error)
onPublishReceivedTracker []int // Used to track positions in above
onPublishReceivedMu sync.Mutex

// authResponse is used for handling the MQTTv5 authentication exchange.
authResponse chan<- packets.ControlPacket
stop chan struct{}
Expand Down Expand Up @@ -129,10 +155,11 @@ func NewClient(conf ClientConfig) *Client {
MaximumPacketSize: 0,
TopicAliasMaximum: 0,
},
ClientConfig: conf,
done: make(chan struct{}),
errors: log.NOOPLogger{},
debug: log.NOOPLogger{},
ClientConfig: conf,
onPublishReceived: conf.OnPublishReceived,
done: make(chan struct{}),
errors: log.NOOPLogger{},
debug: log.NOOPLogger{},
}

if c.Session == nil {
Expand All @@ -142,9 +169,20 @@ func NewClient(conf ClientConfig) *Client {
if c.PacketTimeout == 0 {
c.PacketTimeout = 10 * time.Second
}
if c.Router == nil {
c.Router = NewStandardRouter()

if c.Router == nil && len(c.onPublishReceived) == 0 {
c.Router = NewStandardRouter() // Maintain backwards compatibility (for now!)
}
if c.Router != nil {
r := c.Router
c.onPublishReceived = append(c.onPublishReceived,
func(p PublishReceived) (bool, error) {
r.Route(p.Packet.Packet())
return false, nil
})
}
c.onPublishReceivedTracker = make([]int, len(c.onPublishReceived)) // Must have the same number of elements as onPublishReceived

if c.PingHandler == nil {
c.PingHandler = DefaultPingerWithCustomFailHandler(func(e error) {
go c.error(e)
Expand Down Expand Up @@ -373,17 +411,37 @@ func (c *Client) ack(pb *packets.Publish) {

func (c *Client) routePublishPackets() {
for pb := range c.publishPackets {
if !c.ClientConfig.EnableManualAcknowledgment {
c.Router.Route(pb)
c.ack(pb)
continue
// Copy onPublishReceived so lock is only held briefly
c.onPublishReceivedMu.Lock()
handlers := make([]func(PublishReceived) (bool, error), len(c.onPublishReceived))
for i := range c.onPublishReceived {
handlers[i] = c.onPublishReceived[i]
}
c.onPublishReceivedMu.Unlock()

if pb.QoS != 0 {
if c.ClientConfig.EnableManualAcknowledgment && pb.QoS != 0 {
c.acksTracker.add(pb)
}

c.Router.Route(pb)
var handled bool
var errs []error
pkt := PublishFromPacketPublish(pb)
for _, h := range handlers {
ha, err := h(PublishReceived{
Packet: pkt,
Client: c,
AlreadyHandled: handled,
Errs: errs,
})
if ha {
handled = true
}
errs = append(errs, err)
}

if !c.ClientConfig.EnableManualAcknowledgment {
c.ack(pb)
}
}
}

Expand Down Expand Up @@ -897,6 +955,42 @@ func (c *Client) Disconnect(d *Disconnect) error {
return err
}

// AddOnPublishReceived adds a function that will be called when a PUBLISH is received
// The new function will be called after any functions already in the list
// Returns a function that can be called to remove the callback
func (c *Client) AddOnPublishReceived(f func(PublishReceived) (bool, error)) func() {
c.onPublishReceivedMu.Lock()
defer c.onPublishReceivedMu.Unlock()

c.onPublishReceived = append(c.onPublishReceived, f)

// We insert a unique ID into the same position in onPublishReceivedTracker; this enables us to
// remove the handler later (without complicating onPublishReceived which will be called frequently)
var id int
idLoop:
for id = 0; ; id++ {
for _, used := range c.onPublishReceivedTracker {
if used == id {
continue idLoop
}
}
break
}
c.onPublishReceivedTracker = append(c.onPublishReceivedTracker, id)

return func() {
c.onPublishReceivedMu.Lock()
defer c.onPublishReceivedMu.Unlock()
for pos, storedID := range c.onPublishReceivedTracker {
if id == storedID {
c.onPublishReceivedTracker = append(c.onPublishReceivedTracker[:pos], c.onPublishReceivedTracker[pos+1:]...)
c.onPublishReceived = append(c.onPublishReceived[:pos], c.onPublishReceived[pos+1:]...)
}
}

}
}

// SetDebugLogger takes an instance of the paho Logger interface
// and sets it to be used by the debug log endpoint
func (c *Client) SetDebugLogger(l log.Logger) {
Expand Down
81 changes: 75 additions & 6 deletions paho/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,14 +542,17 @@ func TestManualAcksInOrder(t *testing.T) {

wg.Add(expectedPacketsCount)
c := NewClient(ClientConfig{
Conn: ts.ClientConn(),
Conn: ts.ClientConn(),
OnPublishReceived: []func(PublishReceived) (bool, error){
func(pr PublishReceived) (bool, error) {
defer wg.Done()
actualPublishPackets = append(actualPublishPackets, *pr.Packet.Packet())
require.NoError(t, pr.Client.Ack(pr.Packet))
return true, nil
},
},
EnableManualAcknowledgment: true,
})
c.Router = NewStandardRouterWithDefault(func(p *Publish) {
defer wg.Done()
actualPublishPackets = append(actualPublishPackets, *p.Packet())
require.NoError(t, c.Ack(p))
})
require.NotNil(t, c)
defer c.close()
c.SetDebugLogger(clientLogger)
Expand Down Expand Up @@ -1010,3 +1013,69 @@ func (f *fakeAuth) Authenticate(a *Auth) *Auth {
}

func (f *fakeAuth) Authenticated() {}

// TestAddOnPublishReceived checks we can add and remove onPublishReceived callbacks
func TestAddOnPublishReceived(t *testing.T) {
callAll := func(c *Client) {
for _, f := range c.onPublishReceived {
f(PublishReceived{Packet: &Publish{Properties: &PublishProperties{}}})
}
}

c := NewClient(ClientConfig{})
require.Lenf(t, c.onPublishReceived, 1, "onPublishReceived must contain one record (as c.Router is nil)")
require.Lenf(t, c.onPublishReceivedTracker, 1, "onPublishReceived must contain one record")
testOne := 0
removeOne := c.AddOnPublishReceived(func(_ PublishReceived) (bool, error) { testOne++; return false, nil })
require.Lenf(t, c.onPublishReceived, 2, "onPublishReceived should have one item")
require.Lenf(t, c.onPublishReceivedTracker, 2, "onPublishReceived should have one item")
callAll(c)
require.Equal(t, 1, testOne, "Expected 1")
removeOne()
callAll(c)
require.Equal(t, 1, testOne, "Expected 1")

testTwo := 0
testThree := 0
removeTwo := c.AddOnPublishReceived(func(_ PublishReceived) (bool, error) { testTwo++; return false, nil })
removeThree := c.AddOnPublishReceived(func(_ PublishReceived) (bool, error) { testThree++; return false, nil })
callAll(c)
require.Equal(t, 1, testOne, "Expected 1")
require.Equal(t, 1, testTwo, "Expected 1")
require.Equal(t, 1, testThree, "Expected 1")
removeTwo()
callAll(c)
require.Equal(t, 1, testOne, "Expected 1")
require.Equal(t, 1, testTwo, "Expected 1")
require.Equal(t, 2, testThree, "Expected 2")
removeThree()
callAll(c)
require.Equal(t, 1, testOne, "Expected 1")
require.Equal(t, 1, testTwo, "Expected 1")
require.Equal(t, 2, testThree, "Expected 2")

// Test with a pre-populated onPublishReceived
testOne = 0
testTwo = 0
testThree = 0
c = NewClient(ClientConfig{
OnPublishReceived: []func(PublishReceived) (bool, error){
func(_ PublishReceived) (bool, error) { testOne++; return false, nil },
func(_ PublishReceived) (bool, error) { testTwo++; return false, nil },
},
})
callAll(c)
require.Equal(t, 1, testOne, "Expected 1")
require.Equal(t, 1, testTwo, "Expected 1")
require.Equal(t, 0, testThree, "Expected 0")
removeThree = c.AddOnPublishReceived(func(_ PublishReceived) (bool, error) { testThree++; return false, nil })
callAll(c)
require.Equal(t, 2, testOne, "Expected 2")
require.Equal(t, 2, testTwo, "Expected 2")
require.Equal(t, 1, testThree, "Expected 1")
removeThree()
callAll(c)
require.Equal(t, 3, testOne, "Expected 3")
require.Equal(t, 3, testTwo, "Expected 3")
require.Equal(t, 1, testThree, "Expected 1")
}
75 changes: 40 additions & 35 deletions paho/cmd/rpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,41 +50,46 @@ func listener(server, rTopic, username, password string) {

c := paho.NewClient(paho.ClientConfig{
Conn: conn,
})
c.Router = paho.NewStandardRouterWithDefault(func(m *paho.Publish) {
if m.Properties != nil && m.Properties.CorrelationData != nil && m.Properties.ResponseTopic != "" {
log.Printf("Received message with response topic %s and correl id %s\n%s", m.Properties.ResponseTopic, string(m.Properties.CorrelationData), string(m.Payload))

var r Request
var resp Response

if err := json.NewDecoder(bytes.NewReader(m.Payload)).Decode(&r); err != nil {
log.Printf("Failed to decode Request: %v", err)
}

switch r.Function {
case "add":
resp.Value = r.Param1 + r.Param2
case "mul":
resp.Value = r.Param1 * r.Param2
case "div":
resp.Value = r.Param1 / r.Param2
case "sub":
resp.Value = r.Param1 - r.Param2
}

body, _ := json.Marshal(resp)
_, err := c.Publish(context.Background(), &paho.Publish{
Properties: &paho.PublishProperties{
CorrelationData: m.Properties.CorrelationData,
},
Topic: m.Properties.ResponseTopic,
Payload: body,
})
if err != nil {
log.Fatalf("failed to publish message: %s", err)
}
}
OnPublishReceived: []func(paho.PublishReceived) (bool, error){
func(pr paho.PublishReceived) (bool, error) {
m := pr.Packet
if m.Properties != nil && m.Properties.CorrelationData != nil && m.Properties.ResponseTopic != "" {
log.Printf("Received message with response topic %s and correl id %s\n%s", m.Properties.ResponseTopic, string(m.Properties.CorrelationData), string(m.Payload))

var r Request
var resp Response

if err := json.NewDecoder(bytes.NewReader(m.Payload)).Decode(&r); err != nil {
log.Printf("Failed to decode Request: %v", err)
}

switch r.Function {
case "add":
resp.Value = r.Param1 + r.Param2
case "mul":
resp.Value = r.Param1 * r.Param2
case "div":
resp.Value = r.Param1 / r.Param2
case "sub":
resp.Value = r.Param1 - r.Param2
}

body, _ := json.Marshal(resp)
_, err := pr.Client.Publish(context.Background(), &paho.Publish{
Properties: &paho.PublishProperties{
CorrelationData: m.Properties.CorrelationData,
},
Topic: m.Properties.ResponseTopic,
Payload: body,
})
if err != nil {
log.Fatalf("failed to publish message: %s", err)
}
return true, nil
}
return false, nil
},
},
})

cp := &paho.Connect{
Expand Down

0 comments on commit 0b237dc

Please sign in to comment.