diff options
author | Ethel Morgan <eth@ethulhu.co.uk> | 2020-06-24 23:21:32 +0100 |
---|---|---|
committer | Ethel Morgan <eth@ethulhu.co.uk> | 2020-06-24 23:21:32 +0100 |
commit | e825e6820389fab7c97ae3665155401124ab8ecf (patch) | |
tree | f8b15365e51b2f0c8de73215ac5fb5ab14cecdb0 | |
parent | 9ccb33b03d4d2ac71fe0f1fec1ee0a64cca157f5 (diff) |
make Client an interface, make Message a plain structv0.0.3
-rw-r--r-- | api.go | 30 | ||||
-rw-r--r-- | catbus.go | 61 | ||||
-rw-r--r-- | catbus_test.go | 4 |
3 files changed, 54 insertions, 41 deletions
@@ -0,0 +1,30 @@ +// SPDX-FileCopyrightText: 2020 Ethel Morgan +// +// SPDX-License-Identifier: MIT + +// Package catbus is a convenience wrapper around MQTT for use with Catbus. +package catbus + +type ( + MessageHandler func(Client, Message) + + Message struct { + Payload string + Retention Retention + Topic string + } + + Client interface { + Connect() error + Subscribe(topic string, f MessageHandler) error + Publish(topic string, retention Retention, payload string) error + } + + // Retention is whether or not the MQTT broker should retain the message. + Retention bool +) + +const ( + Retain = Retention(true) + DontRetain = Retention(false) +) @@ -2,7 +2,6 @@ // // SPDX-License-Identifier: MIT -// Package catbus is a convenience wrapper around MQTT for use with Catbus. package catbus import ( @@ -14,13 +13,7 @@ import ( ) type ( - MessageHandler func(*Client, Message) - - Message struct { - message mqtt.Message - } - - Client struct { + client struct { mqtt mqtt.Client payloadByTopicMu sync.Mutex @@ -34,8 +27,8 @@ type ( } ClientOptions struct { - DisconnectHandler func(*Client, error) - ConnectHandler func(*Client) + DisconnectHandler func(Client, error) + ConnectHandler func(Client) // Publish previously seen or default values on connecting after OnconnectDelay ± [0,OnconnectJitter). OnconnectDelay time.Duration @@ -45,9 +38,6 @@ type ( // E.g. unless we've been told otherwise, assume a device is off. DefaultPayloadByTopic map[string]string } - - // Retention is whether or not the MQTT broker should retain the message. - Retention bool ) const ( @@ -57,17 +47,12 @@ const ( ) const ( - Retain = Retention(true) - DontRetain = Retention(false) -) - -const ( DefaultOnconnectDelay = 1 * time.Minute DefaultOnconnectJitter = 15 * time.Second ) -func NewClient(brokerURI string, options ClientOptions) *Client { - client := &Client{ +func NewClient(brokerURI string, options ClientOptions) Client { + client := &client{ payloadByTopic: map[string]string{}, onconnectTimerByTopic: map[string]*time.Timer{}, @@ -88,7 +73,7 @@ func NewClient(brokerURI string, options ClientOptions) *Client { mqttOpts := mqtt.NewClientOptions() mqttOpts.AddBroker(brokerURI) mqttOpts.SetAutoReconnect(true) - mqttOpts.SetOnConnectHandler(func(c mqtt.Client) { + mqttOpts.SetOnConnectHandler(func(_ mqtt.Client) { client.stopAllTimers() client.startAllTimers() @@ -96,7 +81,7 @@ func NewClient(brokerURI string, options ClientOptions) *Client { options.ConnectHandler(client) } }) - mqttOpts.SetConnectionLostHandler(func(c mqtt.Client, err error) { + mqttOpts.SetConnectionLostHandler(func(_ mqtt.Client, err error) { client.stopAllTimers() if options.DisconnectHandler != nil { @@ -109,7 +94,7 @@ func NewClient(brokerURI string, options ClientOptions) *Client { } // Connect connects to the Catbus MQTT broker and blocks forever. -func (c *Client) Connect() error { +func (c *client) Connect() error { if err := c.mqtt.Connect().Error(); err != nil { return err } @@ -117,22 +102,22 @@ func (c *Client) Connect() error { } // Subscribe subscribes to a Catbus MQTT topic. -func (c *Client) Subscribe(topic string, f MessageHandler) error { +func (c *client) Subscribe(topic string, f MessageHandler) error { return c.mqtt.Subscribe(topic, atLeastOnce, func(_ mqtt.Client, msg mqtt.Message) { c.storePayload(msg.Topic(), Retention(msg.Retained()), string(msg.Payload())) - f(c, Message{msg}) + f(c, messageFromMQTTMessage(msg)) }).Error() } // Publish publishes to a Catbus MQTT topic. -func (c *Client) Publish(topic string, retention Retention, payload string) error { +func (c *client) Publish(topic string, retention Retention, payload string) error { c.storePayload(topic, retention, payload) return c.mqtt.Publish(topic, atLeastOnce, bool(retention), payload).Error() } -func (c *Client) jitteredOnconnectDelay() time.Duration { +func (c *client) jitteredOnconnectDelay() time.Duration { jitter := time.Duration(rand.Intn(int(c.onconnectJitter))) if rand.Intn(2) == 0 { return c.onconnectDelay + jitter @@ -140,7 +125,7 @@ func (c *Client) jitteredOnconnectDelay() time.Duration { return c.onconnectDelay - jitter } -func (c *Client) storePayload(topic string, retention Retention, payload string) { +func (c *client) storePayload(topic string, retention Retention, payload string) { c.payloadByTopicMu.Lock() defer c.payloadByTopicMu.Unlock() @@ -157,7 +142,7 @@ func (c *Client) storePayload(topic string, retention Retention, payload string) } c.payloadByTopic[topic] = payload } -func (c *Client) stopTimer(topic string) { +func (c *client) stopTimer(topic string) { c.onconnectTimerByTopicMu.Lock() defer c.onconnectTimerByTopicMu.Unlock() @@ -165,7 +150,7 @@ func (c *Client) stopTimer(topic string) { _ = timer.Stop() } } -func (c *Client) stopAllTimers() { +func (c *client) stopAllTimers() { c.onconnectTimerByTopicMu.Lock() defer c.onconnectTimerByTopicMu.Unlock() @@ -173,7 +158,7 @@ func (c *Client) stopAllTimers() { _ = timer.Stop() } } -func (c *Client) startAllTimers() { +func (c *client) startAllTimers() { c.payloadByTopicMu.Lock() defer c.payloadByTopicMu.Unlock() @@ -193,12 +178,10 @@ func (c *Client) startAllTimers() { } } -func (m Message) Topic() string { - return m.message.Topic() -} -func (m Message) Payload() string { - return string(m.message.Payload()) -} -func (m Message) Retained() bool { - return m.message.Retained() +func messageFromMQTTMessage(msg mqtt.Message) Message { + return Message{ + Payload: string(msg.Payload()), + Retention: Retention(msg.Retained()), + Topic: msg.Topic(), + } } diff --git a/catbus_test.go b/catbus_test.go index cc22ebb..b345c48 100644 --- a/catbus_test.go +++ b/catbus_test.go @@ -105,7 +105,7 @@ func TestOnConnect(t *testing.T) { payloadByTopic: map[string]string{}, } - catbus := &Client{ + catbus := &client{ mqtt: fakeMQTT, payloadByTopic: map[string]string{}, onconnectTimerByTopic: map[string]*time.Timer{}, @@ -117,7 +117,7 @@ func TestOnConnect(t *testing.T) { } for _, topic := range tt.subscribe { - catbus.Subscribe(topic, func(_ *Client, _ Message) {}) + catbus.Subscribe(topic, func(_ Client, _ Message) {}) } for topic, message := range tt.receive { fakeMQTT.send(topic, message.retention, message.payload) |