From 09bb9e12fcd0336af2a3a9f9aacc6384ac004943 Mon Sep 17 00:00:00 2001 From: Ethel Morgan Date: Sat, 20 Jun 2020 10:09:12 +0100 Subject: move package mqtt to package catbus, make more convenient --- catbus/catbus.go | 67 +++++++++++++++++++++++++++++ cmd/catbus-actuator-wakeonlan/main.go | 80 +++++++++++++++++------------------ config/config.go | 4 +- mqtt/mqtt.go | 33 --------------- 4 files changed, 107 insertions(+), 77 deletions(-) create mode 100644 catbus/catbus.go delete mode 100644 mqtt/mqtt.go diff --git a/catbus/catbus.go b/catbus/catbus.go new file mode 100644 index 0000000..e676a14 --- /dev/null +++ b/catbus/catbus.go @@ -0,0 +1,67 @@ +// SPDX-FileCopyrightText: 2020 Ethel Morgan +// +// SPDX-License-Identifier: MIT + +// Package catbus is a convenience wrapper around MQTT for use with Catbus. +package catbus + +import mqtt "github.com/eclipse/paho.mqtt.golang" + +type ( + Message = mqtt.Message + MessageHandler = func(*Client, Message) + + Client struct { + mqtt mqtt.Client + } + + ClientOptions struct { + DisconnectHandler func(*Client, error) + ConnectHandler func(*Client) + } +) + +const ( + atMostOnce byte = iota + atLeastOnce + exactlyOnce +) + +const ( + Retain = true +) + +func NewClient(brokerURI string, options ClientOptions) *Client { + opts := mqtt.NewClientOptions() + + opts.AddBroker(brokerURI) + opts.SetAutoReconnect(true) + + opts.SetConnectionLostHandler(func(c mqtt.Client, err error) { + if options.DisconnectHandler != nil { + options.DisconnectHandler(&Client{c}, err) + } + }) + opts.SetOnConnectHandler(func(c mqtt.Client) { + if options.ConnectHandler != nil { + options.ConnectHandler(&Client{c}) + } + }) + + return &Client{mqtt.NewClient(opts)} +} + +// Subscribe subscribes to a Catbus MQTT topic. +func (c *Client) Subscribe(topic string, f MessageHandler) error { + return c.mqtt.Subscribe(topic, atLeastOnce, func(c mqtt.Client, msg mqtt.Message) { + f(&Client{c}, msg) + }).Error() +} + +// Connect connects to the Catbus MQTT broker and blocks forever. +func (c *Client) Connect() error { + if err := c.mqtt.Connect().Error(); err != nil { + return err + } + select {} +} diff --git a/cmd/catbus-actuator-wakeonlan/main.go b/cmd/catbus-actuator-wakeonlan/main.go index 53da4d9..37efbbe 100644 --- a/cmd/catbus-actuator-wakeonlan/main.go +++ b/cmd/catbus-actuator-wakeonlan/main.go @@ -9,9 +9,9 @@ import ( "flag" "log" + "go.eth.moe/catbus-wakeonlan/catbus" "go.eth.moe/catbus-wakeonlan/config" "go.eth.moe/catbus-wakeonlan/logger" - "go.eth.moe/catbus-wakeonlan/mqtt" "go.eth.moe/catbus-wakeonlan/wakeonlan" ) @@ -34,52 +34,48 @@ func main() { log.WithError(err).Fatal("could not parse config file") } - log.AddField("broker-uri", config.Broker) + log.AddField("broker-uri", config.BrokerURI) - brokerOptions := mqtt.NewClientOptions() - brokerOptions.AddBroker(config.Broker) - brokerOptions.SetAutoReconnect(true) - brokerOptions.SetConnectionLostHandler(func(_ mqtt.Client, err error) { - log := log - if err != nil { - log = log.WithError(err) - } - log.Error("disconnected from MQTT broker") - }) - brokerOptions.SetOnConnectHandler(func(broker mqtt.Client) { - log.Info("connected to MQTT broker") - - for topic := range config.MACsByTopic { - token := broker.Subscribe(topic, mqtt.AtLeastOnce, func(_ mqtt.Client, msg mqtt.Message) { - if string(msg.Payload()) != "on" { - return - } + catbusOptions := catbus.ClientOptions{ + DisconnectHandler: func(_ *catbus.Client, err error) { + log := log + if err != nil { + log = log.WithError(err) + } + log.Error("disconnected from MQTT broker") + }, + ConnectHandler: func(client *catbus.Client) { + log.Info("connected to MQTT broker") - mac, ok := config.MACsByTopic[msg.Topic()] - if !ok { - return - } + for topic := range config.MACsByTopic { + err := client.Subscribe(topic, func(_ *catbus.Client, msg catbus.Message) { + if string(msg.Payload()) != "on" { + return + } + mac, ok := config.MACsByTopic[msg.Topic()] + if !ok { + return + } - log.AddField("mac", mac) - log.AddField("topic", topic) - if err := wakeonlan.Wake(mac); err != nil { - log.WithError(err).Error("could not send wake-on-lan packet") - return + log.AddField("mac", mac) + log.AddField("topic", topic) + if err := wakeonlan.Wake(mac); err != nil { + log.WithError(err).Error("could not send wake-on-lan packet") + return + } + log.Info("sent wake-on-lan packet") + }) + if err != nil { + log := log.WithError(err) + log.AddField("topic", topic) + log.Error("could not subscribe to MQTT topic") } - log.Info("sent wake-on-lan packet") - }) - if err := token.Error(); err != nil { - log := log.WithError(err) - log.AddField("topic", topic) - log.Error("could not subscribe to MQTT topic") } - } - }) - - broker := mqtt.NewClient(brokerOptions) - if token := broker.Connect(); token.Error() != nil { - log.WithError(token.Error()).Fatal("could not connect to MQTT broker") + }, } + catbus := catbus.NewClient(config.BrokerURI, catbusOptions) - select {} + if err := catbus.Connect(); err != nil { + log.WithError(err).Fatal("could not connect to MQTT broker") + } } diff --git a/config/config.go b/config/config.go index b31551f..fddc3f0 100644 --- a/config/config.go +++ b/config/config.go @@ -12,7 +12,7 @@ import ( type ( Config struct { - Broker string + BrokerURI string MACsByTopic map[string]net.HardwareAddr } @@ -46,7 +46,7 @@ func ParseFile(path string) (*Config, error) { func configFromConfig(raw config) *Config { c := &Config{ - Broker: raw.MQTTBroker, + BrokerURI: raw.MQTTBroker, MACsByTopic: map[string]net.HardwareAddr{}, } diff --git a/mqtt/mqtt.go b/mqtt/mqtt.go deleted file mode 100644 index 025ebb4..0000000 --- a/mqtt/mqtt.go +++ /dev/null @@ -1,33 +0,0 @@ -// SPDX-FileCopyrightText: 2020 Ethel Morgan -// -// SPDX-License-Identifier: MIT - -// Package mqtt wraps Paho MQTT with a few quality-of-life features. -package mqtt - -import ( - mqtt "github.com/eclipse/paho.mqtt.golang" -) - -type ( - Client = mqtt.Client - Message = mqtt.Message - MessageHandler = mqtt.MessageHandler -) - -const ( - AtMostOnce byte = iota - AtLeastOnce - ExactlyOnce -) - -const ( - Retain = true -) - -func NewClientOptions() *mqtt.ClientOptions { - return mqtt.NewClientOptions() -} -func NewClient(opts *mqtt.ClientOptions) mqtt.Client { - return mqtt.NewClient(opts) -} -- cgit v1.2.3