diff options
Diffstat (limited to 'catbus.go')
-rw-r--r-- | catbus.go | 61 |
1 files changed, 22 insertions, 39 deletions
@@ -2,7 +2,6 @@ // // SPDX-License-Identifier: MIT -// Package catbus is a convenience wrapper around MQTT for use with Catbus. package catbus import ( @@ -14,13 +13,7 @@ import ( ) type ( - MessageHandler func(*Client, Message) - - Message struct { - message mqtt.Message - } - - Client struct { + client struct { mqtt mqtt.Client payloadByTopicMu sync.Mutex @@ -34,8 +27,8 @@ type ( } ClientOptions struct { - DisconnectHandler func(*Client, error) - ConnectHandler func(*Client) + DisconnectHandler func(Client, error) + ConnectHandler func(Client) // Publish previously seen or default values on connecting after OnconnectDelay ± [0,OnconnectJitter). OnconnectDelay time.Duration @@ -45,9 +38,6 @@ type ( // E.g. unless we've been told otherwise, assume a device is off. DefaultPayloadByTopic map[string]string } - - // Retention is whether or not the MQTT broker should retain the message. - Retention bool ) const ( @@ -57,17 +47,12 @@ const ( ) const ( - Retain = Retention(true) - DontRetain = Retention(false) -) - -const ( DefaultOnconnectDelay = 1 * time.Minute DefaultOnconnectJitter = 15 * time.Second ) -func NewClient(brokerURI string, options ClientOptions) *Client { - client := &Client{ +func NewClient(brokerURI string, options ClientOptions) Client { + client := &client{ payloadByTopic: map[string]string{}, onconnectTimerByTopic: map[string]*time.Timer{}, @@ -88,7 +73,7 @@ func NewClient(brokerURI string, options ClientOptions) *Client { mqttOpts := mqtt.NewClientOptions() mqttOpts.AddBroker(brokerURI) mqttOpts.SetAutoReconnect(true) - mqttOpts.SetOnConnectHandler(func(c mqtt.Client) { + mqttOpts.SetOnConnectHandler(func(_ mqtt.Client) { client.stopAllTimers() client.startAllTimers() @@ -96,7 +81,7 @@ func NewClient(brokerURI string, options ClientOptions) *Client { options.ConnectHandler(client) } }) - mqttOpts.SetConnectionLostHandler(func(c mqtt.Client, err error) { + mqttOpts.SetConnectionLostHandler(func(_ mqtt.Client, err error) { client.stopAllTimers() if options.DisconnectHandler != nil { @@ -109,7 +94,7 @@ func NewClient(brokerURI string, options ClientOptions) *Client { } // Connect connects to the Catbus MQTT broker and blocks forever. -func (c *Client) Connect() error { +func (c *client) Connect() error { if err := c.mqtt.Connect().Error(); err != nil { return err } @@ -117,22 +102,22 @@ func (c *Client) Connect() error { } // Subscribe subscribes to a Catbus MQTT topic. -func (c *Client) Subscribe(topic string, f MessageHandler) error { +func (c *client) Subscribe(topic string, f MessageHandler) error { return c.mqtt.Subscribe(topic, atLeastOnce, func(_ mqtt.Client, msg mqtt.Message) { c.storePayload(msg.Topic(), Retention(msg.Retained()), string(msg.Payload())) - f(c, Message{msg}) + f(c, messageFromMQTTMessage(msg)) }).Error() } // Publish publishes to a Catbus MQTT topic. -func (c *Client) Publish(topic string, retention Retention, payload string) error { +func (c *client) Publish(topic string, retention Retention, payload string) error { c.storePayload(topic, retention, payload) return c.mqtt.Publish(topic, atLeastOnce, bool(retention), payload).Error() } -func (c *Client) jitteredOnconnectDelay() time.Duration { +func (c *client) jitteredOnconnectDelay() time.Duration { jitter := time.Duration(rand.Intn(int(c.onconnectJitter))) if rand.Intn(2) == 0 { return c.onconnectDelay + jitter @@ -140,7 +125,7 @@ func (c *Client) jitteredOnconnectDelay() time.Duration { return c.onconnectDelay - jitter } -func (c *Client) storePayload(topic string, retention Retention, payload string) { +func (c *client) storePayload(topic string, retention Retention, payload string) { c.payloadByTopicMu.Lock() defer c.payloadByTopicMu.Unlock() @@ -157,7 +142,7 @@ func (c *Client) storePayload(topic string, retention Retention, payload string) } c.payloadByTopic[topic] = payload } -func (c *Client) stopTimer(topic string) { +func (c *client) stopTimer(topic string) { c.onconnectTimerByTopicMu.Lock() defer c.onconnectTimerByTopicMu.Unlock() @@ -165,7 +150,7 @@ func (c *Client) stopTimer(topic string) { _ = timer.Stop() } } -func (c *Client) stopAllTimers() { +func (c *client) stopAllTimers() { c.onconnectTimerByTopicMu.Lock() defer c.onconnectTimerByTopicMu.Unlock() @@ -173,7 +158,7 @@ func (c *Client) stopAllTimers() { _ = timer.Stop() } } -func (c *Client) startAllTimers() { +func (c *client) startAllTimers() { c.payloadByTopicMu.Lock() defer c.payloadByTopicMu.Unlock() @@ -193,12 +178,10 @@ func (c *Client) startAllTimers() { } } -func (m Message) Topic() string { - return m.message.Topic() -} -func (m Message) Payload() string { - return string(m.message.Payload()) -} -func (m Message) Retained() bool { - return m.message.Retained() +func messageFromMQTTMessage(msg mqtt.Message) Message { + return Message{ + Payload: string(msg.Payload()), + Retention: Retention(msg.Retained()), + Topic: msg.Topic(), + } } |