aboutsummaryrefslogtreecommitdiff
path: root/catbus/catbus.go
diff options
context:
space:
mode:
authorEthel Morgan <eth@ethulhu.co.uk>2020-06-23 16:45:00 +0100
committerEthel Morgan <eth@ethulhu.co.uk>2020-06-23 16:45:00 +0100
commitaa3450cf30be88ddd221911a44a85c2d1e96e7dd (patch)
treed1aadd4d9c40da4bbdfeab4243ef683a62e890dd /catbus/catbus.go
parent54d24cbc56c012f30de902c2746899ffbf9154eb (diff)
use the library versions of catbus & logger
Diffstat (limited to 'catbus/catbus.go')
-rw-r--r--catbus/catbus.go191
1 files changed, 0 insertions, 191 deletions
diff --git a/catbus/catbus.go b/catbus/catbus.go
deleted file mode 100644
index bb674c6..0000000
--- a/catbus/catbus.go
+++ /dev/null
@@ -1,191 +0,0 @@
-// SPDX-FileCopyrightText: 2020 Ethel Morgan
-//
-// SPDX-License-Identifier: MIT
-
-// Package catbus is a convenience wrapper around MQTT for use with Catbus.
-package catbus
-
-import (
- "math/rand"
- "sync"
- "time"
-
- mqtt "github.com/eclipse/paho.mqtt.golang"
-)
-
-type (
- Message = mqtt.Message
- MessageHandler = func(*Client, Message)
-
- Client struct {
- mqtt mqtt.Client
-
- payloadByTopicMu sync.Mutex
- payloadByTopic map[string][]byte
-
- onconnectTimerByTopicMu sync.Mutex
- onconnectTimerByTopic map[string]*time.Timer
-
- onconnectDelay time.Duration
- onconnectJitter time.Duration
- }
-
- ClientOptions struct {
- DisconnectHandler func(*Client, error)
- ConnectHandler func(*Client)
-
- // Publish previously seen or default values on connecting after OnconnectDelay ± [0,OnconnectJitter).
- OnconnectDelay time.Duration
- OnconnectJitter time.Duration
-
- // DefaultPayloadByTopic are optional values to publish on connect if no prior values are seen.
- // E.g. unless we've been told otherwise, assume a device is off.
- DefaultPayloadByTopic map[string][]byte
- }
-
- // Retention is whether or not the MQTT broker should retain the message.
- Retention bool
-)
-
-const (
- atMostOnce byte = iota
- atLeastOnce
- exactlyOnce
-)
-
-const (
- Retain = Retention(true)
- DontRetain = Retention(false)
-)
-
-const (
- DefaultOnconnectDelay = 1 * time.Minute
- DefaultOnconnectJitter = 15 * time.Second
-)
-
-func NewClient(brokerURI string, options ClientOptions) *Client {
- client := &Client{
- payloadByTopic: map[string][]byte{},
- onconnectTimerByTopic: map[string]*time.Timer{},
-
- onconnectDelay: DefaultOnconnectDelay,
- onconnectJitter: DefaultOnconnectJitter,
- }
-
- if options.OnconnectDelay != 0 {
- client.onconnectDelay = options.OnconnectDelay
- }
- if options.OnconnectJitter != 0 {
- client.onconnectJitter = options.OnconnectJitter
- }
- for topic, payload := range options.DefaultPayloadByTopic {
- client.payloadByTopic[topic] = payload
- }
-
- mqttOpts := mqtt.NewClientOptions()
- mqttOpts.AddBroker(brokerURI)
- mqttOpts.SetAutoReconnect(true)
- mqttOpts.SetOnConnectHandler(func(c mqtt.Client) {
- client.stopAllTimers()
- client.startAllTimers()
-
- if options.ConnectHandler != nil {
- options.ConnectHandler(client)
- }
- })
- mqttOpts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
- client.stopAllTimers()
-
- if options.DisconnectHandler != nil {
- options.DisconnectHandler(client, err)
- }
- })
- client.mqtt = mqtt.NewClient(mqttOpts)
-
- return client
-}
-
-// Connect connects to the Catbus MQTT broker and blocks forever.
-func (c *Client) Connect() error {
- if err := c.mqtt.Connect().Error(); err != nil {
- return err
- }
- select {}
-}
-
-// Subscribe subscribes to a Catbus MQTT topic.
-func (c *Client) Subscribe(topic string, f MessageHandler) error {
- return c.mqtt.Subscribe(topic, atLeastOnce, func(_ mqtt.Client, msg mqtt.Message) {
- c.storePayload(msg.Topic(), Retention(msg.Retained()), msg.Payload())
-
- f(c, msg)
- }).Error()
-}
-
-// Publish publishes to a Catbus MQTT topic.
-func (c *Client) Publish(topic string, retention Retention, payload []byte) error {
- c.storePayload(topic, retention, payload)
-
- return c.mqtt.Publish(topic, atLeastOnce, bool(retention), payload).Error()
-}
-
-func (c *Client) jitteredOnconnectDelay() time.Duration {
- jitter := time.Duration(rand.Intn(int(c.onconnectJitter)))
- if rand.Intn(2) == 0 {
- return c.onconnectDelay + jitter
- }
- return c.onconnectDelay - jitter
-}
-
-func (c *Client) storePayload(topic string, retention Retention, payload []byte) {
- c.payloadByTopicMu.Lock()
- defer c.payloadByTopicMu.Unlock()
-
- if _, ok := c.payloadByTopic[topic]; !ok && retention == DontRetain {
- // If we don't have a copy, and the sender doesn't want it retained, don't retain it.
- return
- }
-
- c.stopTimer(topic)
-
- if len(payload) == 0 {
- delete(c.payloadByTopic, topic)
- return
- }
- c.payloadByTopic[topic] = payload
-}
-func (c *Client) stopTimer(topic string) {
- c.onconnectTimerByTopicMu.Lock()
- defer c.onconnectTimerByTopicMu.Unlock()
-
- if timer, ok := c.onconnectTimerByTopic[topic]; ok {
- _ = timer.Stop()
- }
-}
-func (c *Client) stopAllTimers() {
- c.onconnectTimerByTopicMu.Lock()
- defer c.onconnectTimerByTopicMu.Unlock()
-
- for _, timer := range c.onconnectTimerByTopic {
- _ = timer.Stop()
- }
-}
-func (c *Client) startAllTimers() {
- c.payloadByTopicMu.Lock()
- defer c.payloadByTopicMu.Unlock()
-
- c.onconnectTimerByTopicMu.Lock()
- defer c.onconnectTimerByTopicMu.Unlock()
-
- for topic := range c.payloadByTopic {
- c.onconnectTimerByTopic[topic] = time.AfterFunc(c.jitteredOnconnectDelay(), func() {
- c.payloadByTopicMu.Lock()
- payload, ok := c.payloadByTopic[topic]
- c.payloadByTopicMu.Unlock()
- if !ok {
- return
- }
- _ = c.Publish(topic, Retain, payload)
- })
- }
-}