aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEthel Morgan <eth@ethulhu.co.uk>2020-06-24 23:21:32 +0100
committerEthel Morgan <eth@ethulhu.co.uk>2020-06-24 23:21:32 +0100
commite825e6820389fab7c97ae3665155401124ab8ecf (patch)
treef8b15365e51b2f0c8de73215ac5fb5ab14cecdb0
parent9ccb33b03d4d2ac71fe0f1fec1ee0a64cca157f5 (diff)
make Client an interface, make Message a plain structv0.0.3
-rw-r--r--api.go30
-rw-r--r--catbus.go61
-rw-r--r--catbus_test.go4
3 files changed, 54 insertions, 41 deletions
diff --git a/api.go b/api.go
new file mode 100644
index 0000000..8397cb9
--- /dev/null
+++ b/api.go
@@ -0,0 +1,30 @@
+// SPDX-FileCopyrightText: 2020 Ethel Morgan
+//
+// SPDX-License-Identifier: MIT
+
+// Package catbus is a convenience wrapper around MQTT for use with Catbus.
+package catbus
+
+type (
+ MessageHandler func(Client, Message)
+
+ Message struct {
+ Payload string
+ Retention Retention
+ Topic string
+ }
+
+ Client interface {
+ Connect() error
+ Subscribe(topic string, f MessageHandler) error
+ Publish(topic string, retention Retention, payload string) error
+ }
+
+ // Retention is whether or not the MQTT broker should retain the message.
+ Retention bool
+)
+
+const (
+ Retain = Retention(true)
+ DontRetain = Retention(false)
+)
diff --git a/catbus.go b/catbus.go
index c2af247..a3eb115 100644
--- a/catbus.go
+++ b/catbus.go
@@ -2,7 +2,6 @@
//
// SPDX-License-Identifier: MIT
-// Package catbus is a convenience wrapper around MQTT for use with Catbus.
package catbus
import (
@@ -14,13 +13,7 @@ import (
)
type (
- MessageHandler func(*Client, Message)
-
- Message struct {
- message mqtt.Message
- }
-
- Client struct {
+ client struct {
mqtt mqtt.Client
payloadByTopicMu sync.Mutex
@@ -34,8 +27,8 @@ type (
}
ClientOptions struct {
- DisconnectHandler func(*Client, error)
- ConnectHandler func(*Client)
+ DisconnectHandler func(Client, error)
+ ConnectHandler func(Client)
// Publish previously seen or default values on connecting after OnconnectDelay ± [0,OnconnectJitter).
OnconnectDelay time.Duration
@@ -45,9 +38,6 @@ type (
// E.g. unless we've been told otherwise, assume a device is off.
DefaultPayloadByTopic map[string]string
}
-
- // Retention is whether or not the MQTT broker should retain the message.
- Retention bool
)
const (
@@ -57,17 +47,12 @@ const (
)
const (
- Retain = Retention(true)
- DontRetain = Retention(false)
-)
-
-const (
DefaultOnconnectDelay = 1 * time.Minute
DefaultOnconnectJitter = 15 * time.Second
)
-func NewClient(brokerURI string, options ClientOptions) *Client {
- client := &Client{
+func NewClient(brokerURI string, options ClientOptions) Client {
+ client := &client{
payloadByTopic: map[string]string{},
onconnectTimerByTopic: map[string]*time.Timer{},
@@ -88,7 +73,7 @@ func NewClient(brokerURI string, options ClientOptions) *Client {
mqttOpts := mqtt.NewClientOptions()
mqttOpts.AddBroker(brokerURI)
mqttOpts.SetAutoReconnect(true)
- mqttOpts.SetOnConnectHandler(func(c mqtt.Client) {
+ mqttOpts.SetOnConnectHandler(func(_ mqtt.Client) {
client.stopAllTimers()
client.startAllTimers()
@@ -96,7 +81,7 @@ func NewClient(brokerURI string, options ClientOptions) *Client {
options.ConnectHandler(client)
}
})
- mqttOpts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
+ mqttOpts.SetConnectionLostHandler(func(_ mqtt.Client, err error) {
client.stopAllTimers()
if options.DisconnectHandler != nil {
@@ -109,7 +94,7 @@ func NewClient(brokerURI string, options ClientOptions) *Client {
}
// Connect connects to the Catbus MQTT broker and blocks forever.
-func (c *Client) Connect() error {
+func (c *client) Connect() error {
if err := c.mqtt.Connect().Error(); err != nil {
return err
}
@@ -117,22 +102,22 @@ func (c *Client) Connect() error {
}
// Subscribe subscribes to a Catbus MQTT topic.
-func (c *Client) Subscribe(topic string, f MessageHandler) error {
+func (c *client) Subscribe(topic string, f MessageHandler) error {
return c.mqtt.Subscribe(topic, atLeastOnce, func(_ mqtt.Client, msg mqtt.Message) {
c.storePayload(msg.Topic(), Retention(msg.Retained()), string(msg.Payload()))
- f(c, Message{msg})
+ f(c, messageFromMQTTMessage(msg))
}).Error()
}
// Publish publishes to a Catbus MQTT topic.
-func (c *Client) Publish(topic string, retention Retention, payload string) error {
+func (c *client) Publish(topic string, retention Retention, payload string) error {
c.storePayload(topic, retention, payload)
return c.mqtt.Publish(topic, atLeastOnce, bool(retention), payload).Error()
}
-func (c *Client) jitteredOnconnectDelay() time.Duration {
+func (c *client) jitteredOnconnectDelay() time.Duration {
jitter := time.Duration(rand.Intn(int(c.onconnectJitter)))
if rand.Intn(2) == 0 {
return c.onconnectDelay + jitter
@@ -140,7 +125,7 @@ func (c *Client) jitteredOnconnectDelay() time.Duration {
return c.onconnectDelay - jitter
}
-func (c *Client) storePayload(topic string, retention Retention, payload string) {
+func (c *client) storePayload(topic string, retention Retention, payload string) {
c.payloadByTopicMu.Lock()
defer c.payloadByTopicMu.Unlock()
@@ -157,7 +142,7 @@ func (c *Client) storePayload(topic string, retention Retention, payload string)
}
c.payloadByTopic[topic] = payload
}
-func (c *Client) stopTimer(topic string) {
+func (c *client) stopTimer(topic string) {
c.onconnectTimerByTopicMu.Lock()
defer c.onconnectTimerByTopicMu.Unlock()
@@ -165,7 +150,7 @@ func (c *Client) stopTimer(topic string) {
_ = timer.Stop()
}
}
-func (c *Client) stopAllTimers() {
+func (c *client) stopAllTimers() {
c.onconnectTimerByTopicMu.Lock()
defer c.onconnectTimerByTopicMu.Unlock()
@@ -173,7 +158,7 @@ func (c *Client) stopAllTimers() {
_ = timer.Stop()
}
}
-func (c *Client) startAllTimers() {
+func (c *client) startAllTimers() {
c.payloadByTopicMu.Lock()
defer c.payloadByTopicMu.Unlock()
@@ -193,12 +178,10 @@ func (c *Client) startAllTimers() {
}
}
-func (m Message) Topic() string {
- return m.message.Topic()
-}
-func (m Message) Payload() string {
- return string(m.message.Payload())
-}
-func (m Message) Retained() bool {
- return m.message.Retained()
+func messageFromMQTTMessage(msg mqtt.Message) Message {
+ return Message{
+ Payload: string(msg.Payload()),
+ Retention: Retention(msg.Retained()),
+ Topic: msg.Topic(),
+ }
}
diff --git a/catbus_test.go b/catbus_test.go
index cc22ebb..b345c48 100644
--- a/catbus_test.go
+++ b/catbus_test.go
@@ -105,7 +105,7 @@ func TestOnConnect(t *testing.T) {
payloadByTopic: map[string]string{},
}
- catbus := &Client{
+ catbus := &client{
mqtt: fakeMQTT,
payloadByTopic: map[string]string{},
onconnectTimerByTopic: map[string]*time.Timer{},
@@ -117,7 +117,7 @@ func TestOnConnect(t *testing.T) {
}
for _, topic := range tt.subscribe {
- catbus.Subscribe(topic, func(_ *Client, _ Message) {})
+ catbus.Subscribe(topic, func(_ Client, _ Message) {})
}
for topic, message := range tt.receive {
fakeMQTT.send(topic, message.retention, message.payload)