aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--catbus/catbus.go191
-rw-r--r--catbus/catbus_test.go206
-rw-r--r--cmd/catbus-actuator-wakeonlan/main.go4
-rw-r--r--default.nix2
-rw-r--r--go.mod2
-rw-r--r--go.sum4
-rw-r--r--logger/log.go131
7 files changed, 9 insertions, 531 deletions
diff --git a/catbus/catbus.go b/catbus/catbus.go
deleted file mode 100644
index bb674c6..0000000
--- a/catbus/catbus.go
+++ /dev/null
@@ -1,191 +0,0 @@
-// SPDX-FileCopyrightText: 2020 Ethel Morgan
-//
-// SPDX-License-Identifier: MIT
-
-// Package catbus is a convenience wrapper around MQTT for use with Catbus.
-package catbus
-
-import (
- "math/rand"
- "sync"
- "time"
-
- mqtt "github.com/eclipse/paho.mqtt.golang"
-)
-
-type (
- Message = mqtt.Message
- MessageHandler = func(*Client, Message)
-
- Client struct {
- mqtt mqtt.Client
-
- payloadByTopicMu sync.Mutex
- payloadByTopic map[string][]byte
-
- onconnectTimerByTopicMu sync.Mutex
- onconnectTimerByTopic map[string]*time.Timer
-
- onconnectDelay time.Duration
- onconnectJitter time.Duration
- }
-
- ClientOptions struct {
- DisconnectHandler func(*Client, error)
- ConnectHandler func(*Client)
-
- // Publish previously seen or default values on connecting after OnconnectDelay ± [0,OnconnectJitter).
- OnconnectDelay time.Duration
- OnconnectJitter time.Duration
-
- // DefaultPayloadByTopic are optional values to publish on connect if no prior values are seen.
- // E.g. unless we've been told otherwise, assume a device is off.
- DefaultPayloadByTopic map[string][]byte
- }
-
- // Retention is whether or not the MQTT broker should retain the message.
- Retention bool
-)
-
-const (
- atMostOnce byte = iota
- atLeastOnce
- exactlyOnce
-)
-
-const (
- Retain = Retention(true)
- DontRetain = Retention(false)
-)
-
-const (
- DefaultOnconnectDelay = 1 * time.Minute
- DefaultOnconnectJitter = 15 * time.Second
-)
-
-func NewClient(brokerURI string, options ClientOptions) *Client {
- client := &Client{
- payloadByTopic: map[string][]byte{},
- onconnectTimerByTopic: map[string]*time.Timer{},
-
- onconnectDelay: DefaultOnconnectDelay,
- onconnectJitter: DefaultOnconnectJitter,
- }
-
- if options.OnconnectDelay != 0 {
- client.onconnectDelay = options.OnconnectDelay
- }
- if options.OnconnectJitter != 0 {
- client.onconnectJitter = options.OnconnectJitter
- }
- for topic, payload := range options.DefaultPayloadByTopic {
- client.payloadByTopic[topic] = payload
- }
-
- mqttOpts := mqtt.NewClientOptions()
- mqttOpts.AddBroker(brokerURI)
- mqttOpts.SetAutoReconnect(true)
- mqttOpts.SetOnConnectHandler(func(c mqtt.Client) {
- client.stopAllTimers()
- client.startAllTimers()
-
- if options.ConnectHandler != nil {
- options.ConnectHandler(client)
- }
- })
- mqttOpts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
- client.stopAllTimers()
-
- if options.DisconnectHandler != nil {
- options.DisconnectHandler(client, err)
- }
- })
- client.mqtt = mqtt.NewClient(mqttOpts)
-
- return client
-}
-
-// Connect connects to the Catbus MQTT broker and blocks forever.
-func (c *Client) Connect() error {
- if err := c.mqtt.Connect().Error(); err != nil {
- return err
- }
- select {}
-}
-
-// Subscribe subscribes to a Catbus MQTT topic.
-func (c *Client) Subscribe(topic string, f MessageHandler) error {
- return c.mqtt.Subscribe(topic, atLeastOnce, func(_ mqtt.Client, msg mqtt.Message) {
- c.storePayload(msg.Topic(), Retention(msg.Retained()), msg.Payload())
-
- f(c, msg)
- }).Error()
-}
-
-// Publish publishes to a Catbus MQTT topic.
-func (c *Client) Publish(topic string, retention Retention, payload []byte) error {
- c.storePayload(topic, retention, payload)
-
- return c.mqtt.Publish(topic, atLeastOnce, bool(retention), payload).Error()
-}
-
-func (c *Client) jitteredOnconnectDelay() time.Duration {
- jitter := time.Duration(rand.Intn(int(c.onconnectJitter)))
- if rand.Intn(2) == 0 {
- return c.onconnectDelay + jitter
- }
- return c.onconnectDelay - jitter
-}
-
-func (c *Client) storePayload(topic string, retention Retention, payload []byte) {
- c.payloadByTopicMu.Lock()
- defer c.payloadByTopicMu.Unlock()
-
- if _, ok := c.payloadByTopic[topic]; !ok && retention == DontRetain {
- // If we don't have a copy, and the sender doesn't want it retained, don't retain it.
- return
- }
-
- c.stopTimer(topic)
-
- if len(payload) == 0 {
- delete(c.payloadByTopic, topic)
- return
- }
- c.payloadByTopic[topic] = payload
-}
-func (c *Client) stopTimer(topic string) {
- c.onconnectTimerByTopicMu.Lock()
- defer c.onconnectTimerByTopicMu.Unlock()
-
- if timer, ok := c.onconnectTimerByTopic[topic]; ok {
- _ = timer.Stop()
- }
-}
-func (c *Client) stopAllTimers() {
- c.onconnectTimerByTopicMu.Lock()
- defer c.onconnectTimerByTopicMu.Unlock()
-
- for _, timer := range c.onconnectTimerByTopic {
- _ = timer.Stop()
- }
-}
-func (c *Client) startAllTimers() {
- c.payloadByTopicMu.Lock()
- defer c.payloadByTopicMu.Unlock()
-
- c.onconnectTimerByTopicMu.Lock()
- defer c.onconnectTimerByTopicMu.Unlock()
-
- for topic := range c.payloadByTopic {
- c.onconnectTimerByTopic[topic] = time.AfterFunc(c.jitteredOnconnectDelay(), func() {
- c.payloadByTopicMu.Lock()
- payload, ok := c.payloadByTopic[topic]
- c.payloadByTopicMu.Unlock()
- if !ok {
- return
- }
- _ = c.Publish(topic, Retain, payload)
- })
- }
-}
diff --git a/catbus/catbus_test.go b/catbus/catbus_test.go
deleted file mode 100644
index d07367b..0000000
--- a/catbus/catbus_test.go
+++ /dev/null
@@ -1,206 +0,0 @@
-// SPDX-FileCopyrightText: 2020 Ethel Morgan
-//
-// SPDX-License-Identifier: MIT
-
-package catbus
-
-import (
- "fmt"
- "log"
- "reflect"
- "testing"
- "time"
-
- mqtt "github.com/eclipse/paho.mqtt.golang"
-)
-
-type (
- message struct {
- retention Retention
- payload []byte
- }
-)
-
-func TestOnConnect(t *testing.T) {
- tests := []struct {
- payloadByTopic map[string][]byte
- subscribe []string
- receive map[string]message
-
- want map[string][]byte
- }{
- {
- payloadByTopic: map[string][]byte{
- "tv/power": []byte("off"),
- },
- want: map[string][]byte{
- "tv/power": []byte("off"),
- },
- },
- {
- payloadByTopic: map[string][]byte{
- "tv/power": []byte("off"),
- },
- subscribe: []string{
- "tv/power",
- },
- receive: map[string]message{
- "tv/power": {Retain, []byte("on")},
- },
- want: map[string][]byte{
- "tv/power": []byte("on"),
- },
- },
- {
- subscribe: []string{
- "tv/power",
- },
- receive: map[string]message{
- "tv/power": {Retain, []byte("on")},
- },
- want: map[string][]byte{
- "tv/power": []byte("on"),
- },
- },
- {
- payloadByTopic: map[string][]byte{
- "tv/power": []byte("off"),
- },
- subscribe: []string{
- "tv/power",
- },
- receive: map[string]message{
- "tv/power": {DontRetain, []byte("on")},
- },
- want: map[string][]byte{
- "tv/power": []byte("on"),
- },
- },
- {
- subscribe: []string{
- "tv/power",
- },
- receive: map[string]message{
- "tv/power": {DontRetain, []byte("on")},
- },
- want: map[string][]byte{},
- },
- {
- payloadByTopic: map[string][]byte{
- "tv/power": []byte("off"),
- },
- subscribe: []string{
- "tv/power",
- },
- receive: map[string]message{
- "tv/power": {DontRetain, []byte{}},
- },
- want: map[string][]byte{},
- },
- }
-
- for i, tt := range tests {
- fakeMQTT := &fakeMQTT{
- callbackByTopic: map[string]mqtt.MessageHandler{},
- payloadByTopic: map[string][]byte{},
- }
-
- catbus := &Client{
- mqtt: fakeMQTT,
- payloadByTopic: map[string][]byte{},
- onconnectTimerByTopic: map[string]*time.Timer{},
- onconnectDelay: 1 * time.Millisecond,
- onconnectJitter: 1,
- }
- if tt.payloadByTopic != nil {
- catbus.payloadByTopic = tt.payloadByTopic
- }
-
- for _, topic := range tt.subscribe {
- catbus.Subscribe(topic, func(_ *Client, _ Message) {})
- }
- for topic, message := range tt.receive {
- fakeMQTT.send(topic, message.retention, message.payload)
- }
-
- catbus.stopAllTimers()
- catbus.startAllTimers()
-
- // TODO: replace with proper channel signaling or sth.
- time.Sleep(1 * time.Second)
-
- got := fakeMQTT.payloadByTopic
- if !reflect.DeepEqual(got, tt.want) {
- t.Errorf("[%d]: got %v, want %v", i, got, tt.want)
- }
- }
-}
-
-type (
- fakeMQTT struct {
- mqtt.Client
-
- callbackByTopic map[string]mqtt.MessageHandler
- payloadByTopic map[string][]byte
- }
-
- fakeMessage struct {
- mqtt.Message
-
- topic string
- retained bool
- payload []byte
- }
-
- fakeToken struct{}
-)
-
-func (f *fakeMQTT) Publish(topic string, qos byte, retain bool, payload interface{}) mqtt.Token {
- bytes, ok := payload.([]byte)
- if !ok {
- panic(fmt.Sprintf("expected type []byte, got %v", reflect.TypeOf(payload)))
- }
-
- log.Printf("topic %q payload %s", topic, payload)
- f.payloadByTopic[topic] = bytes
- return &fakeToken{}
-}
-func (f *fakeMQTT) Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token {
- f.callbackByTopic[topic] = callback
-
- return &fakeToken{}
-}
-func (f *fakeMQTT) send(topic string, retention Retention, payload []byte) {
- // if retention == Retain {
- // f.payloadByTopic[topic] = payload
- // }
-
- if callback, ok := f.callbackByTopic[topic]; ok {
- msg := &fakeMessage{
- topic: topic,
- retained: bool(retention),
- payload: payload,
- }
- callback(f, msg)
- }
-}
-
-func (f *fakeMessage) Topic() string {
- return f.topic
-}
-func (f *fakeMessage) Payload() []byte {
- return f.payload
-}
-func (f *fakeMessage) Retained() bool {
- return f.retained
-}
-
-func (_ *fakeToken) Wait() bool {
- return false
-}
-func (_ *fakeToken) WaitTimeout(_ time.Duration) bool {
- return false
-}
-func (_ *fakeToken) Error() error {
- return nil
-}
diff --git a/cmd/catbus-actuator-wakeonlan/main.go b/cmd/catbus-actuator-wakeonlan/main.go
index 2649206..af826ad 100644
--- a/cmd/catbus-actuator-wakeonlan/main.go
+++ b/cmd/catbus-actuator-wakeonlan/main.go
@@ -9,10 +9,10 @@ import (
"flag"
"log"
- "go.eth.moe/catbus-wakeonlan/catbus"
+ "go.eth.moe/catbus"
"go.eth.moe/catbus-wakeonlan/config"
- "go.eth.moe/catbus-wakeonlan/logger"
"go.eth.moe/catbus-wakeonlan/wakeonlan"
+ "go.eth.moe/logger"
)
var (
diff --git a/default.nix b/default.nix
index efee9fa..0d3a961 100644
--- a/default.nix
+++ b/default.nix
@@ -10,7 +10,7 @@ buildGoModule rec {
version = "latest";
goPackagePath = "go.eth.moe/catbus-wakeonlan";
- modSha256 = "0nj0ny9692bqcw04fh74g8hqgfh3qc095fsq0y9cy677kp7l2q94";
+ modSha256 = "19nr5pw4c4jap051djg4m1j83p3vkfirhvjvw2ld6mk3zjblbh2f";
src = ./.;
diff --git a/go.mod b/go.mod
index 6b5cbb8..01b1d95 100644
--- a/go.mod
+++ b/go.mod
@@ -9,5 +9,7 @@ go 1.14
require (
github.com/eclipse/paho.mqtt.golang v1.2.0
github.com/sirupsen/logrus v1.6.0
+ go.eth.moe/catbus v0.0.0-20200623152511-b8209be849b8
+ go.eth.moe/logger v0.0.0-20200623151751-52b7892858e7
golang.org/x/net v0.0.0-20200602114024-627f9648deb9 // indirect
)
diff --git a/go.sum b/go.sum
index cf4cce8..bc7c364 100644
--- a/go.sum
+++ b/go.sum
@@ -6,6 +6,10 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+go.eth.moe/catbus v0.0.0-20200623152511-b8209be849b8 h1:mCRNA6edCcc2IQvb41bO8jTxeCQIpChel3t1zuEF56s=
+go.eth.moe/catbus v0.0.0-20200623152511-b8209be849b8/go.mod h1:bVYTqNPbc4QCACFDMTKFv/5cFZWYDKPSamTP0/lbvW0=
+go.eth.moe/logger v0.0.0-20200623151751-52b7892858e7 h1:zIeNAn8mXwSVWPbvR64OjLAcDG5xzcsdDxYdNg7j7bc=
+go.eth.moe/logger v0.0.0-20200623151751-52b7892858e7/go.mod h1:G20TP3ON2S95olTep+qsBSoTfouZeKPukk3Ow42q5OQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20200602114024-627f9648deb9 h1:pNX+40auqi2JqRfOP1akLGtYcn15TUbkhwuCO3foqqM=
golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
diff --git a/logger/log.go b/logger/log.go
deleted file mode 100644
index a47d27d..0000000
--- a/logger/log.go
+++ /dev/null
@@ -1,131 +0,0 @@
-// SPDX-FileCopyrightText: 2020 Ethel Morgan
-//
-// SPDX-License-Identifier: MIT
-
-package logger
-
-import (
- "context"
- "fmt"
- "sync"
-
- log "github.com/sirupsen/logrus"
-)
-
-type (
- // log, ctx := logger.FromContext(ctx)
- // log.WithField("error", err).Warninging(...)
- // log.AddField("transport", udn)
- // log.Info(...)
- Logger interface {
- // AddField adds a field to the current logger.
- AddField(name string, value interface{})
-
- // WithField forks a logger, adding context.
- WithField(name string, value interface{}) Logger
-
- // WithError is a convenience method for one-off forks to log error messages under the key "error".
- WithError(err error) Logger
-
- // Fork returns a copy of the Logger and a fork of the context.Context to pass through.
- Fork(context.Context) (Logger, context.Context)
-
- Debug(string)
- Info(string)
- Warning(string)
- Error(string)
- Fatal(string)
- }
-
- logger struct {
- mu sync.Mutex
- values map[string]interface{}
- }
-
- // contextKey is a separate type to prevent collisions with other packages.
- contextKey int
-)
-
-const (
- loggerKey contextKey = iota
-)
-
-func FromContext(ctx context.Context) (Logger, context.Context) {
- maybeLogger := ctx.Value(loggerKey)
-
- if maybeLogger == nil {
- l := Background()
- return l, context.WithValue(ctx, loggerKey, l)
- }
-
- if l, ok := maybeLogger.(*logger); ok {
- return l, ctx
- }
-
- panic(fmt.Sprintf("expected logger in context, found %+v", maybeLogger))
-}
-func Background() Logger {
- return &logger{
- values: map[string]interface{}{},
- }
-}
-
-func (l *logger) AddField(name string, value interface{}) {
- l.mu.Lock()
- defer l.mu.Unlock()
-
- // TODO: check it doesn't exist already?
- l.values[name] = value
-}
-func (l *logger) WithField(name string, value interface{}) Logger {
- clone, _ := l.Fork(context.Background())
- clone.AddField(name, value)
- return clone
-}
-func (l *logger) WithError(err error) Logger {
- return l.WithField("error", err)
-}
-
-func (l *logger) Fork(ctx context.Context) (Logger, context.Context) {
- l.mu.Lock()
- defer l.mu.Unlock()
-
- clone := &logger{
- values: map[string]interface{}{},
- }
- for k, v := range l.values {
- clone.values[k] = v
- }
- return clone, context.WithValue(ctx, loggerKey, clone)
-}
-
-func (l *logger) Debug(message string) {
- l.mu.Lock()
- defer l.mu.Unlock()
-
- log.WithFields(log.Fields(l.values)).Debug(message)
-}
-func (l *logger) Info(message string) {
- l.mu.Lock()
- defer l.mu.Unlock()
-
- log.WithFields(log.Fields(l.values)).Info(message)
-}
-func (l *logger) Warning(message string) {
- l.mu.Lock()
- defer l.mu.Unlock()
-
- log.WithFields(log.Fields(l.values)).Warning(message)
-}
-func (l *logger) Error(message string) {
- l.mu.Lock()
- defer l.mu.Unlock()
-
- log.WithFields(log.Fields(l.values)).Error(message)
-}
-func (l *logger) Fatal(message string) {
- l.mu.Lock()
- defer l.mu.Unlock()
-
- log.WithFields(log.Fields(l.values)).Fatal(message)
-}