diff options
author | Ethel Morgan <eth@ethulhu.co.uk> | 2020-06-23 16:45:00 +0100 |
---|---|---|
committer | Ethel Morgan <eth@ethulhu.co.uk> | 2020-06-23 16:45:00 +0100 |
commit | aa3450cf30be88ddd221911a44a85c2d1e96e7dd (patch) | |
tree | d1aadd4d9c40da4bbdfeab4243ef683a62e890dd | |
parent | 54d24cbc56c012f30de902c2746899ffbf9154eb (diff) |
use the library versions of catbus & logger
-rw-r--r-- | catbus/catbus.go | 191 | ||||
-rw-r--r-- | catbus/catbus_test.go | 206 | ||||
-rw-r--r-- | cmd/catbus-actuator-wakeonlan/main.go | 4 | ||||
-rw-r--r-- | default.nix | 2 | ||||
-rw-r--r-- | go.mod | 2 | ||||
-rw-r--r-- | go.sum | 4 | ||||
-rw-r--r-- | logger/log.go | 131 |
7 files changed, 9 insertions, 531 deletions
diff --git a/catbus/catbus.go b/catbus/catbus.go deleted file mode 100644 index bb674c6..0000000 --- a/catbus/catbus.go +++ /dev/null @@ -1,191 +0,0 @@ -// SPDX-FileCopyrightText: 2020 Ethel Morgan -// -// SPDX-License-Identifier: MIT - -// Package catbus is a convenience wrapper around MQTT for use with Catbus. -package catbus - -import ( - "math/rand" - "sync" - "time" - - mqtt "github.com/eclipse/paho.mqtt.golang" -) - -type ( - Message = mqtt.Message - MessageHandler = func(*Client, Message) - - Client struct { - mqtt mqtt.Client - - payloadByTopicMu sync.Mutex - payloadByTopic map[string][]byte - - onconnectTimerByTopicMu sync.Mutex - onconnectTimerByTopic map[string]*time.Timer - - onconnectDelay time.Duration - onconnectJitter time.Duration - } - - ClientOptions struct { - DisconnectHandler func(*Client, error) - ConnectHandler func(*Client) - - // Publish previously seen or default values on connecting after OnconnectDelay ± [0,OnconnectJitter). - OnconnectDelay time.Duration - OnconnectJitter time.Duration - - // DefaultPayloadByTopic are optional values to publish on connect if no prior values are seen. - // E.g. unless we've been told otherwise, assume a device is off. - DefaultPayloadByTopic map[string][]byte - } - - // Retention is whether or not the MQTT broker should retain the message. - Retention bool -) - -const ( - atMostOnce byte = iota - atLeastOnce - exactlyOnce -) - -const ( - Retain = Retention(true) - DontRetain = Retention(false) -) - -const ( - DefaultOnconnectDelay = 1 * time.Minute - DefaultOnconnectJitter = 15 * time.Second -) - -func NewClient(brokerURI string, options ClientOptions) *Client { - client := &Client{ - payloadByTopic: map[string][]byte{}, - onconnectTimerByTopic: map[string]*time.Timer{}, - - onconnectDelay: DefaultOnconnectDelay, - onconnectJitter: DefaultOnconnectJitter, - } - - if options.OnconnectDelay != 0 { - client.onconnectDelay = options.OnconnectDelay - } - if options.OnconnectJitter != 0 { - client.onconnectJitter = options.OnconnectJitter - } - for topic, payload := range options.DefaultPayloadByTopic { - client.payloadByTopic[topic] = payload - } - - mqttOpts := mqtt.NewClientOptions() - mqttOpts.AddBroker(brokerURI) - mqttOpts.SetAutoReconnect(true) - mqttOpts.SetOnConnectHandler(func(c mqtt.Client) { - client.stopAllTimers() - client.startAllTimers() - - if options.ConnectHandler != nil { - options.ConnectHandler(client) - } - }) - mqttOpts.SetConnectionLostHandler(func(c mqtt.Client, err error) { - client.stopAllTimers() - - if options.DisconnectHandler != nil { - options.DisconnectHandler(client, err) - } - }) - client.mqtt = mqtt.NewClient(mqttOpts) - - return client -} - -// Connect connects to the Catbus MQTT broker and blocks forever. -func (c *Client) Connect() error { - if err := c.mqtt.Connect().Error(); err != nil { - return err - } - select {} -} - -// Subscribe subscribes to a Catbus MQTT topic. -func (c *Client) Subscribe(topic string, f MessageHandler) error { - return c.mqtt.Subscribe(topic, atLeastOnce, func(_ mqtt.Client, msg mqtt.Message) { - c.storePayload(msg.Topic(), Retention(msg.Retained()), msg.Payload()) - - f(c, msg) - }).Error() -} - -// Publish publishes to a Catbus MQTT topic. -func (c *Client) Publish(topic string, retention Retention, payload []byte) error { - c.storePayload(topic, retention, payload) - - return c.mqtt.Publish(topic, atLeastOnce, bool(retention), payload).Error() -} - -func (c *Client) jitteredOnconnectDelay() time.Duration { - jitter := time.Duration(rand.Intn(int(c.onconnectJitter))) - if rand.Intn(2) == 0 { - return c.onconnectDelay + jitter - } - return c.onconnectDelay - jitter -} - -func (c *Client) storePayload(topic string, retention Retention, payload []byte) { - c.payloadByTopicMu.Lock() - defer c.payloadByTopicMu.Unlock() - - if _, ok := c.payloadByTopic[topic]; !ok && retention == DontRetain { - // If we don't have a copy, and the sender doesn't want it retained, don't retain it. - return - } - - c.stopTimer(topic) - - if len(payload) == 0 { - delete(c.payloadByTopic, topic) - return - } - c.payloadByTopic[topic] = payload -} -func (c *Client) stopTimer(topic string) { - c.onconnectTimerByTopicMu.Lock() - defer c.onconnectTimerByTopicMu.Unlock() - - if timer, ok := c.onconnectTimerByTopic[topic]; ok { - _ = timer.Stop() - } -} -func (c *Client) stopAllTimers() { - c.onconnectTimerByTopicMu.Lock() - defer c.onconnectTimerByTopicMu.Unlock() - - for _, timer := range c.onconnectTimerByTopic { - _ = timer.Stop() - } -} -func (c *Client) startAllTimers() { - c.payloadByTopicMu.Lock() - defer c.payloadByTopicMu.Unlock() - - c.onconnectTimerByTopicMu.Lock() - defer c.onconnectTimerByTopicMu.Unlock() - - for topic := range c.payloadByTopic { - c.onconnectTimerByTopic[topic] = time.AfterFunc(c.jitteredOnconnectDelay(), func() { - c.payloadByTopicMu.Lock() - payload, ok := c.payloadByTopic[topic] - c.payloadByTopicMu.Unlock() - if !ok { - return - } - _ = c.Publish(topic, Retain, payload) - }) - } -} diff --git a/catbus/catbus_test.go b/catbus/catbus_test.go deleted file mode 100644 index d07367b..0000000 --- a/catbus/catbus_test.go +++ /dev/null @@ -1,206 +0,0 @@ -// SPDX-FileCopyrightText: 2020 Ethel Morgan -// -// SPDX-License-Identifier: MIT - -package catbus - -import ( - "fmt" - "log" - "reflect" - "testing" - "time" - - mqtt "github.com/eclipse/paho.mqtt.golang" -) - -type ( - message struct { - retention Retention - payload []byte - } -) - -func TestOnConnect(t *testing.T) { - tests := []struct { - payloadByTopic map[string][]byte - subscribe []string - receive map[string]message - - want map[string][]byte - }{ - { - payloadByTopic: map[string][]byte{ - "tv/power": []byte("off"), - }, - want: map[string][]byte{ - "tv/power": []byte("off"), - }, - }, - { - payloadByTopic: map[string][]byte{ - "tv/power": []byte("off"), - }, - subscribe: []string{ - "tv/power", - }, - receive: map[string]message{ - "tv/power": {Retain, []byte("on")}, - }, - want: map[string][]byte{ - "tv/power": []byte("on"), - }, - }, - { - subscribe: []string{ - "tv/power", - }, - receive: map[string]message{ - "tv/power": {Retain, []byte("on")}, - }, - want: map[string][]byte{ - "tv/power": []byte("on"), - }, - }, - { - payloadByTopic: map[string][]byte{ - "tv/power": []byte("off"), - }, - subscribe: []string{ - "tv/power", - }, - receive: map[string]message{ - "tv/power": {DontRetain, []byte("on")}, - }, - want: map[string][]byte{ - "tv/power": []byte("on"), - }, - }, - { - subscribe: []string{ - "tv/power", - }, - receive: map[string]message{ - "tv/power": {DontRetain, []byte("on")}, - }, - want: map[string][]byte{}, - }, - { - payloadByTopic: map[string][]byte{ - "tv/power": []byte("off"), - }, - subscribe: []string{ - "tv/power", - }, - receive: map[string]message{ - "tv/power": {DontRetain, []byte{}}, - }, - want: map[string][]byte{}, - }, - } - - for i, tt := range tests { - fakeMQTT := &fakeMQTT{ - callbackByTopic: map[string]mqtt.MessageHandler{}, - payloadByTopic: map[string][]byte{}, - } - - catbus := &Client{ - mqtt: fakeMQTT, - payloadByTopic: map[string][]byte{}, - onconnectTimerByTopic: map[string]*time.Timer{}, - onconnectDelay: 1 * time.Millisecond, - onconnectJitter: 1, - } - if tt.payloadByTopic != nil { - catbus.payloadByTopic = tt.payloadByTopic - } - - for _, topic := range tt.subscribe { - catbus.Subscribe(topic, func(_ *Client, _ Message) {}) - } - for topic, message := range tt.receive { - fakeMQTT.send(topic, message.retention, message.payload) - } - - catbus.stopAllTimers() - catbus.startAllTimers() - - // TODO: replace with proper channel signaling or sth. - time.Sleep(1 * time.Second) - - got := fakeMQTT.payloadByTopic - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("[%d]: got %v, want %v", i, got, tt.want) - } - } -} - -type ( - fakeMQTT struct { - mqtt.Client - - callbackByTopic map[string]mqtt.MessageHandler - payloadByTopic map[string][]byte - } - - fakeMessage struct { - mqtt.Message - - topic string - retained bool - payload []byte - } - - fakeToken struct{} -) - -func (f *fakeMQTT) Publish(topic string, qos byte, retain bool, payload interface{}) mqtt.Token { - bytes, ok := payload.([]byte) - if !ok { - panic(fmt.Sprintf("expected type []byte, got %v", reflect.TypeOf(payload))) - } - - log.Printf("topic %q payload %s", topic, payload) - f.payloadByTopic[topic] = bytes - return &fakeToken{} -} -func (f *fakeMQTT) Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token { - f.callbackByTopic[topic] = callback - - return &fakeToken{} -} -func (f *fakeMQTT) send(topic string, retention Retention, payload []byte) { - // if retention == Retain { - // f.payloadByTopic[topic] = payload - // } - - if callback, ok := f.callbackByTopic[topic]; ok { - msg := &fakeMessage{ - topic: topic, - retained: bool(retention), - payload: payload, - } - callback(f, msg) - } -} - -func (f *fakeMessage) Topic() string { - return f.topic -} -func (f *fakeMessage) Payload() []byte { - return f.payload -} -func (f *fakeMessage) Retained() bool { - return f.retained -} - -func (_ *fakeToken) Wait() bool { - return false -} -func (_ *fakeToken) WaitTimeout(_ time.Duration) bool { - return false -} -func (_ *fakeToken) Error() error { - return nil -} diff --git a/cmd/catbus-actuator-wakeonlan/main.go b/cmd/catbus-actuator-wakeonlan/main.go index 2649206..af826ad 100644 --- a/cmd/catbus-actuator-wakeonlan/main.go +++ b/cmd/catbus-actuator-wakeonlan/main.go @@ -9,10 +9,10 @@ import ( "flag" "log" - "go.eth.moe/catbus-wakeonlan/catbus" + "go.eth.moe/catbus" "go.eth.moe/catbus-wakeonlan/config" - "go.eth.moe/catbus-wakeonlan/logger" "go.eth.moe/catbus-wakeonlan/wakeonlan" + "go.eth.moe/logger" ) var ( diff --git a/default.nix b/default.nix index efee9fa..0d3a961 100644 --- a/default.nix +++ b/default.nix @@ -10,7 +10,7 @@ buildGoModule rec { version = "latest"; goPackagePath = "go.eth.moe/catbus-wakeonlan"; - modSha256 = "0nj0ny9692bqcw04fh74g8hqgfh3qc095fsq0y9cy677kp7l2q94"; + modSha256 = "19nr5pw4c4jap051djg4m1j83p3vkfirhvjvw2ld6mk3zjblbh2f"; src = ./.; @@ -9,5 +9,7 @@ go 1.14 require ( github.com/eclipse/paho.mqtt.golang v1.2.0 github.com/sirupsen/logrus v1.6.0 + go.eth.moe/catbus v0.0.0-20200623152511-b8209be849b8 + go.eth.moe/logger v0.0.0-20200623151751-52b7892858e7 golang.org/x/net v0.0.0-20200602114024-627f9648deb9 // indirect ) @@ -6,6 +6,10 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +go.eth.moe/catbus v0.0.0-20200623152511-b8209be849b8 h1:mCRNA6edCcc2IQvb41bO8jTxeCQIpChel3t1zuEF56s= +go.eth.moe/catbus v0.0.0-20200623152511-b8209be849b8/go.mod h1:bVYTqNPbc4QCACFDMTKFv/5cFZWYDKPSamTP0/lbvW0= +go.eth.moe/logger v0.0.0-20200623151751-52b7892858e7 h1:zIeNAn8mXwSVWPbvR64OjLAcDG5xzcsdDxYdNg7j7bc= +go.eth.moe/logger v0.0.0-20200623151751-52b7892858e7/go.mod h1:G20TP3ON2S95olTep+qsBSoTfouZeKPukk3Ow42q5OQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20200602114024-627f9648deb9 h1:pNX+40auqi2JqRfOP1akLGtYcn15TUbkhwuCO3foqqM= golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= diff --git a/logger/log.go b/logger/log.go deleted file mode 100644 index a47d27d..0000000 --- a/logger/log.go +++ /dev/null @@ -1,131 +0,0 @@ -// SPDX-FileCopyrightText: 2020 Ethel Morgan -// -// SPDX-License-Identifier: MIT - -package logger - -import ( - "context" - "fmt" - "sync" - - log "github.com/sirupsen/logrus" -) - -type ( - // log, ctx := logger.FromContext(ctx) - // log.WithField("error", err).Warninging(...) - // log.AddField("transport", udn) - // log.Info(...) - Logger interface { - // AddField adds a field to the current logger. - AddField(name string, value interface{}) - - // WithField forks a logger, adding context. - WithField(name string, value interface{}) Logger - - // WithError is a convenience method for one-off forks to log error messages under the key "error". - WithError(err error) Logger - - // Fork returns a copy of the Logger and a fork of the context.Context to pass through. - Fork(context.Context) (Logger, context.Context) - - Debug(string) - Info(string) - Warning(string) - Error(string) - Fatal(string) - } - - logger struct { - mu sync.Mutex - values map[string]interface{} - } - - // contextKey is a separate type to prevent collisions with other packages. - contextKey int -) - -const ( - loggerKey contextKey = iota -) - -func FromContext(ctx context.Context) (Logger, context.Context) { - maybeLogger := ctx.Value(loggerKey) - - if maybeLogger == nil { - l := Background() - return l, context.WithValue(ctx, loggerKey, l) - } - - if l, ok := maybeLogger.(*logger); ok { - return l, ctx - } - - panic(fmt.Sprintf("expected logger in context, found %+v", maybeLogger)) -} -func Background() Logger { - return &logger{ - values: map[string]interface{}{}, - } -} - -func (l *logger) AddField(name string, value interface{}) { - l.mu.Lock() - defer l.mu.Unlock() - - // TODO: check it doesn't exist already? - l.values[name] = value -} -func (l *logger) WithField(name string, value interface{}) Logger { - clone, _ := l.Fork(context.Background()) - clone.AddField(name, value) - return clone -} -func (l *logger) WithError(err error) Logger { - return l.WithField("error", err) -} - -func (l *logger) Fork(ctx context.Context) (Logger, context.Context) { - l.mu.Lock() - defer l.mu.Unlock() - - clone := &logger{ - values: map[string]interface{}{}, - } - for k, v := range l.values { - clone.values[k] = v - } - return clone, context.WithValue(ctx, loggerKey, clone) -} - -func (l *logger) Debug(message string) { - l.mu.Lock() - defer l.mu.Unlock() - - log.WithFields(log.Fields(l.values)).Debug(message) -} -func (l *logger) Info(message string) { - l.mu.Lock() - defer l.mu.Unlock() - - log.WithFields(log.Fields(l.values)).Info(message) -} -func (l *logger) Warning(message string) { - l.mu.Lock() - defer l.mu.Unlock() - - log.WithFields(log.Fields(l.values)).Warning(message) -} -func (l *logger) Error(message string) { - l.mu.Lock() - defer l.mu.Unlock() - - log.WithFields(log.Fields(l.values)).Error(message) -} -func (l *logger) Fatal(message string) { - l.mu.Lock() - defer l.mu.Unlock() - - log.WithFields(log.Fields(l.values)).Fatal(message) -} |