aboutsummaryrefslogtreecommitdiff
path: root/catbus/catbus.go
diff options
context:
space:
mode:
authorEthel Morgan <eth@ethulhu.co.uk>2020-06-22 21:25:47 +0100
committerEthel Morgan <eth@ethulhu.co.uk>2020-06-22 21:25:47 +0100
commitb9f13df51979ff38e4a9578f982cc608e3bb2c0d (patch)
treec6763a4c8ab57b2670d635545d7d10225efe8314 /catbus/catbus.go
parent09bb9e12fcd0336af2a3a9f9aacc6384ac004943 (diff)
add rebroadcasting to package catbus
Diffstat (limited to 'catbus/catbus.go')
-rw-r--r--catbus/catbus.go107
1 files changed, 90 insertions, 17 deletions
diff --git a/catbus/catbus.go b/catbus/catbus.go
index e676a14..9325d8e 100644
--- a/catbus/catbus.go
+++ b/catbus/catbus.go
@@ -5,7 +5,13 @@
// Package catbus is a convenience wrapper around MQTT for use with Catbus.
package catbus
-import mqtt "github.com/eclipse/paho.mqtt.golang"
+import (
+ "math/rand"
+ "sync"
+ "time"
+
+ mqtt "github.com/eclipse/paho.mqtt.golang"
+)
type (
Message = mqtt.Message
@@ -13,12 +19,29 @@ type (
Client struct {
mqtt mqtt.Client
+
+ rebroadcastMu sync.Mutex
+ rebroadcastByTopic map[string]*time.Timer
+ rebroadcastPeriod time.Duration
+ rebroadcastJitter time.Duration
}
+
ClientOptions struct {
DisconnectHandler func(*Client, error)
ConnectHandler func(*Client)
+
+ // Rebroadcast previously seen values every RebroadcastPeriod ± [0,RebroadcastJitter).
+ RebroadcastPeriod time.Duration
+ RebroadcastJitter time.Duration
+
+ // RebroadcastDefaults are optional values to seed rebroadcasting if no prior values are seen.
+ // E.g. unless we've been told otherwise, assume a device is off.
+ RebroadcastDefaults map[string][]byte
}
+
+ // Retention is whether or not the MQTT broker should retain the message.
+ Retention bool
)
const (
@@ -28,34 +51,49 @@ const (
)
const (
- Retain = true
+ Retain = Retention(true)
+ DontRetain = Retention(false)
+)
+
+const (
+ DefaultRebroadcastPeriod = 1 * time.Minute
+ DefaultRebroadcastJitter = 15 * time.Second
)
func NewClient(brokerURI string, options ClientOptions) *Client {
- opts := mqtt.NewClientOptions()
+ client := &Client{
+ rebroadcastByTopic: map[string]*time.Timer{},
+ rebroadcastPeriod: DefaultRebroadcastPeriod,
+ rebroadcastJitter: DefaultRebroadcastJitter,
+ }
- opts.AddBroker(brokerURI)
- opts.SetAutoReconnect(true)
+ if options.RebroadcastPeriod != 0 {
+ client.rebroadcastPeriod = options.RebroadcastPeriod
+ }
+ if options.RebroadcastJitter != 0 {
+ client.rebroadcastJitter = options.RebroadcastJitter
+ }
+ for topic, payload := range options.RebroadcastDefaults {
+ // TODO: Allow users to set retention?
+ client.rebroadcastLater(topic, DontRetain, payload)
+ }
- opts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
+ mqttOpts := mqtt.NewClientOptions()
+ mqttOpts.AddBroker(brokerURI)
+ mqttOpts.SetAutoReconnect(true)
+ mqttOpts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
if options.DisconnectHandler != nil {
- options.DisconnectHandler(&Client{c}, err)
+ options.DisconnectHandler(client, err)
}
})
- opts.SetOnConnectHandler(func(c mqtt.Client) {
+ mqttOpts.SetOnConnectHandler(func(c mqtt.Client) {
if options.ConnectHandler != nil {
- options.ConnectHandler(&Client{c})
+ options.ConnectHandler(client)
}
})
+ client.mqtt = mqtt.NewClient(mqttOpts)
- return &Client{mqtt.NewClient(opts)}
-}
-
-// Subscribe subscribes to a Catbus MQTT topic.
-func (c *Client) Subscribe(topic string, f MessageHandler) error {
- return c.mqtt.Subscribe(topic, atLeastOnce, func(c mqtt.Client, msg mqtt.Message) {
- f(&Client{c}, msg)
- }).Error()
+ return client
}
// Connect connects to the Catbus MQTT broker and blocks forever.
@@ -65,3 +103,38 @@ func (c *Client) Connect() error {
}
select {}
}
+
+// Subscribe subscribes to a Catbus MQTT topic.
+func (c *Client) Subscribe(topic string, f MessageHandler) error {
+ return c.mqtt.Subscribe(topic, atLeastOnce, func(_ mqtt.Client, msg mqtt.Message) {
+ c.rebroadcastLater(msg.Topic(), Retention(msg.Retained()), msg.Payload())
+
+ f(c, msg)
+ }).Error()
+}
+
+// Publish publishes to a Catbus MQTT topic.
+func (c *Client) Publish(topic string, retention Retention, payload []byte) error {
+ c.rebroadcastLater(topic, retention, payload)
+
+ return c.mqtt.Publish(topic, atLeastOnce, bool(retention), payload).Error()
+}
+
+func (c *Client) rebroadcastLater(topic string, retention Retention, payload []byte) {
+ c.rebroadcastMu.Lock()
+ defer c.rebroadcastMu.Unlock()
+
+ if timer := c.rebroadcastByTopic[topic]; timer != nil {
+ _ = timer.Stop()
+ }
+ c.rebroadcastByTopic[topic] = time.AfterFunc(c.rebroadcastDuration(), func() {
+ _ = c.Publish(topic, retention, payload)
+ })
+}
+func (c *Client) rebroadcastDuration() time.Duration {
+ jitter := time.Duration(rand.Intn(int(c.rebroadcastJitter)))
+ if rand.Intn(1) == 0 {
+ return c.rebroadcastPeriod + jitter
+ }
+ return c.rebroadcastPeriod - jitter
+}