aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEthel Morgan <eth@ethulhu.co.uk>2020-06-20 10:09:12 +0100
committerEthel Morgan <eth@ethulhu.co.uk>2020-06-20 10:09:12 +0100
commit09bb9e12fcd0336af2a3a9f9aacc6384ac004943 (patch)
tree8f63a8fadcf3f823c86d06e29cc938065a345687
parentd8d9b04ebbdfff0dd3d952d2658f117d47730c9d (diff)
move package mqtt to package catbus, make more convenient
Diffstat (limited to '')
-rw-r--r--catbus/catbus.go67
-rw-r--r--cmd/catbus-actuator-wakeonlan/main.go80
-rw-r--r--config/config.go4
-rw-r--r--mqtt/mqtt.go33
4 files changed, 107 insertions, 77 deletions
diff --git a/catbus/catbus.go b/catbus/catbus.go
new file mode 100644
index 0000000..e676a14
--- /dev/null
+++ b/catbus/catbus.go
@@ -0,0 +1,67 @@
+// SPDX-FileCopyrightText: 2020 Ethel Morgan
+//
+// SPDX-License-Identifier: MIT
+
+// Package catbus is a convenience wrapper around MQTT for use with Catbus.
+package catbus
+
+import mqtt "github.com/eclipse/paho.mqtt.golang"
+
+type (
+ Message = mqtt.Message
+ MessageHandler = func(*Client, Message)
+
+ Client struct {
+ mqtt mqtt.Client
+ }
+
+ ClientOptions struct {
+ DisconnectHandler func(*Client, error)
+ ConnectHandler func(*Client)
+ }
+)
+
+const (
+ atMostOnce byte = iota
+ atLeastOnce
+ exactlyOnce
+)
+
+const (
+ Retain = true
+)
+
+func NewClient(brokerURI string, options ClientOptions) *Client {
+ opts := mqtt.NewClientOptions()
+
+ opts.AddBroker(brokerURI)
+ opts.SetAutoReconnect(true)
+
+ opts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
+ if options.DisconnectHandler != nil {
+ options.DisconnectHandler(&Client{c}, err)
+ }
+ })
+ opts.SetOnConnectHandler(func(c mqtt.Client) {
+ if options.ConnectHandler != nil {
+ options.ConnectHandler(&Client{c})
+ }
+ })
+
+ return &Client{mqtt.NewClient(opts)}
+}
+
+// Subscribe subscribes to a Catbus MQTT topic.
+func (c *Client) Subscribe(topic string, f MessageHandler) error {
+ return c.mqtt.Subscribe(topic, atLeastOnce, func(c mqtt.Client, msg mqtt.Message) {
+ f(&Client{c}, msg)
+ }).Error()
+}
+
+// Connect connects to the Catbus MQTT broker and blocks forever.
+func (c *Client) Connect() error {
+ if err := c.mqtt.Connect().Error(); err != nil {
+ return err
+ }
+ select {}
+}
diff --git a/cmd/catbus-actuator-wakeonlan/main.go b/cmd/catbus-actuator-wakeonlan/main.go
index 53da4d9..37efbbe 100644
--- a/cmd/catbus-actuator-wakeonlan/main.go
+++ b/cmd/catbus-actuator-wakeonlan/main.go
@@ -9,9 +9,9 @@ import (
"flag"
"log"
+ "go.eth.moe/catbus-wakeonlan/catbus"
"go.eth.moe/catbus-wakeonlan/config"
"go.eth.moe/catbus-wakeonlan/logger"
- "go.eth.moe/catbus-wakeonlan/mqtt"
"go.eth.moe/catbus-wakeonlan/wakeonlan"
)
@@ -34,52 +34,48 @@ func main() {
log.WithError(err).Fatal("could not parse config file")
}
- log.AddField("broker-uri", config.Broker)
+ log.AddField("broker-uri", config.BrokerURI)
- brokerOptions := mqtt.NewClientOptions()
- brokerOptions.AddBroker(config.Broker)
- brokerOptions.SetAutoReconnect(true)
- brokerOptions.SetConnectionLostHandler(func(_ mqtt.Client, err error) {
- log := log
- if err != nil {
- log = log.WithError(err)
- }
- log.Error("disconnected from MQTT broker")
- })
- brokerOptions.SetOnConnectHandler(func(broker mqtt.Client) {
- log.Info("connected to MQTT broker")
-
- for topic := range config.MACsByTopic {
- token := broker.Subscribe(topic, mqtt.AtLeastOnce, func(_ mqtt.Client, msg mqtt.Message) {
- if string(msg.Payload()) != "on" {
- return
- }
+ catbusOptions := catbus.ClientOptions{
+ DisconnectHandler: func(_ *catbus.Client, err error) {
+ log := log
+ if err != nil {
+ log = log.WithError(err)
+ }
+ log.Error("disconnected from MQTT broker")
+ },
+ ConnectHandler: func(client *catbus.Client) {
+ log.Info("connected to MQTT broker")
- mac, ok := config.MACsByTopic[msg.Topic()]
- if !ok {
- return
- }
+ for topic := range config.MACsByTopic {
+ err := client.Subscribe(topic, func(_ *catbus.Client, msg catbus.Message) {
+ if string(msg.Payload()) != "on" {
+ return
+ }
+ mac, ok := config.MACsByTopic[msg.Topic()]
+ if !ok {
+ return
+ }
- log.AddField("mac", mac)
- log.AddField("topic", topic)
- if err := wakeonlan.Wake(mac); err != nil {
- log.WithError(err).Error("could not send wake-on-lan packet")
- return
+ log.AddField("mac", mac)
+ log.AddField("topic", topic)
+ if err := wakeonlan.Wake(mac); err != nil {
+ log.WithError(err).Error("could not send wake-on-lan packet")
+ return
+ }
+ log.Info("sent wake-on-lan packet")
+ })
+ if err != nil {
+ log := log.WithError(err)
+ log.AddField("topic", topic)
+ log.Error("could not subscribe to MQTT topic")
}
- log.Info("sent wake-on-lan packet")
- })
- if err := token.Error(); err != nil {
- log := log.WithError(err)
- log.AddField("topic", topic)
- log.Error("could not subscribe to MQTT topic")
}
- }
- })
-
- broker := mqtt.NewClient(brokerOptions)
- if token := broker.Connect(); token.Error() != nil {
- log.WithError(token.Error()).Fatal("could not connect to MQTT broker")
+ },
}
+ catbus := catbus.NewClient(config.BrokerURI, catbusOptions)
- select {}
+ if err := catbus.Connect(); err != nil {
+ log.WithError(err).Fatal("could not connect to MQTT broker")
+ }
}
diff --git a/config/config.go b/config/config.go
index b31551f..fddc3f0 100644
--- a/config/config.go
+++ b/config/config.go
@@ -12,7 +12,7 @@ import (
type (
Config struct {
- Broker string
+ BrokerURI string
MACsByTopic map[string]net.HardwareAddr
}
@@ -46,7 +46,7 @@ func ParseFile(path string) (*Config, error) {
func configFromConfig(raw config) *Config {
c := &Config{
- Broker: raw.MQTTBroker,
+ BrokerURI: raw.MQTTBroker,
MACsByTopic: map[string]net.HardwareAddr{},
}
diff --git a/mqtt/mqtt.go b/mqtt/mqtt.go
deleted file mode 100644
index 025ebb4..0000000
--- a/mqtt/mqtt.go
+++ /dev/null
@@ -1,33 +0,0 @@
-// SPDX-FileCopyrightText: 2020 Ethel Morgan
-//
-// SPDX-License-Identifier: MIT
-
-// Package mqtt wraps Paho MQTT with a few quality-of-life features.
-package mqtt
-
-import (
- mqtt "github.com/eclipse/paho.mqtt.golang"
-)
-
-type (
- Client = mqtt.Client
- Message = mqtt.Message
- MessageHandler = mqtt.MessageHandler
-)
-
-const (
- AtMostOnce byte = iota
- AtLeastOnce
- ExactlyOnce
-)
-
-const (
- Retain = true
-)
-
-func NewClientOptions() *mqtt.ClientOptions {
- return mqtt.NewClientOptions()
-}
-func NewClient(opts *mqtt.ClientOptions) mqtt.Client {
- return mqtt.NewClient(opts)
-}