diff options
author | Ethel Morgan <eth@ethulhu.co.uk> | 2020-06-20 10:09:12 +0100 |
---|---|---|
committer | Ethel Morgan <eth@ethulhu.co.uk> | 2020-06-20 10:09:12 +0100 |
commit | 09bb9e12fcd0336af2a3a9f9aacc6384ac004943 (patch) | |
tree | 8f63a8fadcf3f823c86d06e29cc938065a345687 /catbus/catbus.go | |
parent | d8d9b04ebbdfff0dd3d952d2658f117d47730c9d (diff) |
move package mqtt to package catbus, make more convenient
Diffstat (limited to '')
-rw-r--r-- | catbus/catbus.go | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/catbus/catbus.go b/catbus/catbus.go new file mode 100644 index 0000000..e676a14 --- /dev/null +++ b/catbus/catbus.go @@ -0,0 +1,67 @@ +// SPDX-FileCopyrightText: 2020 Ethel Morgan +// +// SPDX-License-Identifier: MIT + +// Package catbus is a convenience wrapper around MQTT for use with Catbus. +package catbus + +import mqtt "github.com/eclipse/paho.mqtt.golang" + +type ( + Message = mqtt.Message + MessageHandler = func(*Client, Message) + + Client struct { + mqtt mqtt.Client + } + + ClientOptions struct { + DisconnectHandler func(*Client, error) + ConnectHandler func(*Client) + } +) + +const ( + atMostOnce byte = iota + atLeastOnce + exactlyOnce +) + +const ( + Retain = true +) + +func NewClient(brokerURI string, options ClientOptions) *Client { + opts := mqtt.NewClientOptions() + + opts.AddBroker(brokerURI) + opts.SetAutoReconnect(true) + + opts.SetConnectionLostHandler(func(c mqtt.Client, err error) { + if options.DisconnectHandler != nil { + options.DisconnectHandler(&Client{c}, err) + } + }) + opts.SetOnConnectHandler(func(c mqtt.Client) { + if options.ConnectHandler != nil { + options.ConnectHandler(&Client{c}) + } + }) + + return &Client{mqtt.NewClient(opts)} +} + +// Subscribe subscribes to a Catbus MQTT topic. +func (c *Client) Subscribe(topic string, f MessageHandler) error { + return c.mqtt.Subscribe(topic, atLeastOnce, func(c mqtt.Client, msg mqtt.Message) { + f(&Client{c}, msg) + }).Error() +} + +// Connect connects to the Catbus MQTT broker and blocks forever. +func (c *Client) Connect() error { + if err := c.mqtt.Connect().Error(); err != nil { + return err + } + select {} +} |