From aa3450cf30be88ddd221911a44a85c2d1e96e7dd Mon Sep 17 00:00:00 2001 From: Ethel Morgan Date: Tue, 23 Jun 2020 16:45:00 +0100 Subject: use the library versions of catbus & logger --- catbus/catbus.go | 191 ------------------------------------------------------- 1 file changed, 191 deletions(-) delete mode 100644 catbus/catbus.go (limited to 'catbus/catbus.go') diff --git a/catbus/catbus.go b/catbus/catbus.go deleted file mode 100644 index bb674c6..0000000 --- a/catbus/catbus.go +++ /dev/null @@ -1,191 +0,0 @@ -// SPDX-FileCopyrightText: 2020 Ethel Morgan -// -// SPDX-License-Identifier: MIT - -// Package catbus is a convenience wrapper around MQTT for use with Catbus. -package catbus - -import ( - "math/rand" - "sync" - "time" - - mqtt "github.com/eclipse/paho.mqtt.golang" -) - -type ( - Message = mqtt.Message - MessageHandler = func(*Client, Message) - - Client struct { - mqtt mqtt.Client - - payloadByTopicMu sync.Mutex - payloadByTopic map[string][]byte - - onconnectTimerByTopicMu sync.Mutex - onconnectTimerByTopic map[string]*time.Timer - - onconnectDelay time.Duration - onconnectJitter time.Duration - } - - ClientOptions struct { - DisconnectHandler func(*Client, error) - ConnectHandler func(*Client) - - // Publish previously seen or default values on connecting after OnconnectDelay ± [0,OnconnectJitter). - OnconnectDelay time.Duration - OnconnectJitter time.Duration - - // DefaultPayloadByTopic are optional values to publish on connect if no prior values are seen. - // E.g. unless we've been told otherwise, assume a device is off. - DefaultPayloadByTopic map[string][]byte - } - - // Retention is whether or not the MQTT broker should retain the message. - Retention bool -) - -const ( - atMostOnce byte = iota - atLeastOnce - exactlyOnce -) - -const ( - Retain = Retention(true) - DontRetain = Retention(false) -) - -const ( - DefaultOnconnectDelay = 1 * time.Minute - DefaultOnconnectJitter = 15 * time.Second -) - -func NewClient(brokerURI string, options ClientOptions) *Client { - client := &Client{ - payloadByTopic: map[string][]byte{}, - onconnectTimerByTopic: map[string]*time.Timer{}, - - onconnectDelay: DefaultOnconnectDelay, - onconnectJitter: DefaultOnconnectJitter, - } - - if options.OnconnectDelay != 0 { - client.onconnectDelay = options.OnconnectDelay - } - if options.OnconnectJitter != 0 { - client.onconnectJitter = options.OnconnectJitter - } - for topic, payload := range options.DefaultPayloadByTopic { - client.payloadByTopic[topic] = payload - } - - mqttOpts := mqtt.NewClientOptions() - mqttOpts.AddBroker(brokerURI) - mqttOpts.SetAutoReconnect(true) - mqttOpts.SetOnConnectHandler(func(c mqtt.Client) { - client.stopAllTimers() - client.startAllTimers() - - if options.ConnectHandler != nil { - options.ConnectHandler(client) - } - }) - mqttOpts.SetConnectionLostHandler(func(c mqtt.Client, err error) { - client.stopAllTimers() - - if options.DisconnectHandler != nil { - options.DisconnectHandler(client, err) - } - }) - client.mqtt = mqtt.NewClient(mqttOpts) - - return client -} - -// Connect connects to the Catbus MQTT broker and blocks forever. -func (c *Client) Connect() error { - if err := c.mqtt.Connect().Error(); err != nil { - return err - } - select {} -} - -// Subscribe subscribes to a Catbus MQTT topic. -func (c *Client) Subscribe(topic string, f MessageHandler) error { - return c.mqtt.Subscribe(topic, atLeastOnce, func(_ mqtt.Client, msg mqtt.Message) { - c.storePayload(msg.Topic(), Retention(msg.Retained()), msg.Payload()) - - f(c, msg) - }).Error() -} - -// Publish publishes to a Catbus MQTT topic. -func (c *Client) Publish(topic string, retention Retention, payload []byte) error { - c.storePayload(topic, retention, payload) - - return c.mqtt.Publish(topic, atLeastOnce, bool(retention), payload).Error() -} - -func (c *Client) jitteredOnconnectDelay() time.Duration { - jitter := time.Duration(rand.Intn(int(c.onconnectJitter))) - if rand.Intn(2) == 0 { - return c.onconnectDelay + jitter - } - return c.onconnectDelay - jitter -} - -func (c *Client) storePayload(topic string, retention Retention, payload []byte) { - c.payloadByTopicMu.Lock() - defer c.payloadByTopicMu.Unlock() - - if _, ok := c.payloadByTopic[topic]; !ok && retention == DontRetain { - // If we don't have a copy, and the sender doesn't want it retained, don't retain it. - return - } - - c.stopTimer(topic) - - if len(payload) == 0 { - delete(c.payloadByTopic, topic) - return - } - c.payloadByTopic[topic] = payload -} -func (c *Client) stopTimer(topic string) { - c.onconnectTimerByTopicMu.Lock() - defer c.onconnectTimerByTopicMu.Unlock() - - if timer, ok := c.onconnectTimerByTopic[topic]; ok { - _ = timer.Stop() - } -} -func (c *Client) stopAllTimers() { - c.onconnectTimerByTopicMu.Lock() - defer c.onconnectTimerByTopicMu.Unlock() - - for _, timer := range c.onconnectTimerByTopic { - _ = timer.Stop() - } -} -func (c *Client) startAllTimers() { - c.payloadByTopicMu.Lock() - defer c.payloadByTopicMu.Unlock() - - c.onconnectTimerByTopicMu.Lock() - defer c.onconnectTimerByTopicMu.Unlock() - - for topic := range c.payloadByTopic { - c.onconnectTimerByTopic[topic] = time.AfterFunc(c.jitteredOnconnectDelay(), func() { - c.payloadByTopicMu.Lock() - payload, ok := c.payloadByTopic[topic] - c.payloadByTopicMu.Unlock() - if !ok { - return - } - _ = c.Publish(topic, Retain, payload) - }) - } -} -- cgit v1.2.3