aboutsummaryrefslogtreecommitdiff
path: root/catbus/catbus_test.go
diff options
context:
space:
mode:
authorEthel Morgan <eth@ethulhu.co.uk>2020-06-23 15:27:19 +0100
committerEthel Morgan <eth@ethulhu.co.uk>2020-06-23 15:27:19 +0100
commit54d24cbc56c012f30de902c2746899ffbf9154eb (patch)
tree41a770e36adcf78fd1398981695e254874cdaee9 /catbus/catbus_test.go
parentd5b2961bd2cc474d763d72ad1c180e7358e5fd6d (diff)
only rebroadcast on connect, not constantly
Diffstat (limited to '')
-rw-r--r--catbus/catbus_test.go206
1 files changed, 206 insertions, 0 deletions
diff --git a/catbus/catbus_test.go b/catbus/catbus_test.go
new file mode 100644
index 0000000..d07367b
--- /dev/null
+++ b/catbus/catbus_test.go
@@ -0,0 +1,206 @@
+// SPDX-FileCopyrightText: 2020 Ethel Morgan
+//
+// SPDX-License-Identifier: MIT
+
+package catbus
+
+import (
+ "fmt"
+ "log"
+ "reflect"
+ "testing"
+ "time"
+
+ mqtt "github.com/eclipse/paho.mqtt.golang"
+)
+
+type (
+ message struct {
+ retention Retention
+ payload []byte
+ }
+)
+
+func TestOnConnect(t *testing.T) {
+ tests := []struct {
+ payloadByTopic map[string][]byte
+ subscribe []string
+ receive map[string]message
+
+ want map[string][]byte
+ }{
+ {
+ payloadByTopic: map[string][]byte{
+ "tv/power": []byte("off"),
+ },
+ want: map[string][]byte{
+ "tv/power": []byte("off"),
+ },
+ },
+ {
+ payloadByTopic: map[string][]byte{
+ "tv/power": []byte("off"),
+ },
+ subscribe: []string{
+ "tv/power",
+ },
+ receive: map[string]message{
+ "tv/power": {Retain, []byte("on")},
+ },
+ want: map[string][]byte{
+ "tv/power": []byte("on"),
+ },
+ },
+ {
+ subscribe: []string{
+ "tv/power",
+ },
+ receive: map[string]message{
+ "tv/power": {Retain, []byte("on")},
+ },
+ want: map[string][]byte{
+ "tv/power": []byte("on"),
+ },
+ },
+ {
+ payloadByTopic: map[string][]byte{
+ "tv/power": []byte("off"),
+ },
+ subscribe: []string{
+ "tv/power",
+ },
+ receive: map[string]message{
+ "tv/power": {DontRetain, []byte("on")},
+ },
+ want: map[string][]byte{
+ "tv/power": []byte("on"),
+ },
+ },
+ {
+ subscribe: []string{
+ "tv/power",
+ },
+ receive: map[string]message{
+ "tv/power": {DontRetain, []byte("on")},
+ },
+ want: map[string][]byte{},
+ },
+ {
+ payloadByTopic: map[string][]byte{
+ "tv/power": []byte("off"),
+ },
+ subscribe: []string{
+ "tv/power",
+ },
+ receive: map[string]message{
+ "tv/power": {DontRetain, []byte{}},
+ },
+ want: map[string][]byte{},
+ },
+ }
+
+ for i, tt := range tests {
+ fakeMQTT := &fakeMQTT{
+ callbackByTopic: map[string]mqtt.MessageHandler{},
+ payloadByTopic: map[string][]byte{},
+ }
+
+ catbus := &Client{
+ mqtt: fakeMQTT,
+ payloadByTopic: map[string][]byte{},
+ onconnectTimerByTopic: map[string]*time.Timer{},
+ onconnectDelay: 1 * time.Millisecond,
+ onconnectJitter: 1,
+ }
+ if tt.payloadByTopic != nil {
+ catbus.payloadByTopic = tt.payloadByTopic
+ }
+
+ for _, topic := range tt.subscribe {
+ catbus.Subscribe(topic, func(_ *Client, _ Message) {})
+ }
+ for topic, message := range tt.receive {
+ fakeMQTT.send(topic, message.retention, message.payload)
+ }
+
+ catbus.stopAllTimers()
+ catbus.startAllTimers()
+
+ // TODO: replace with proper channel signaling or sth.
+ time.Sleep(1 * time.Second)
+
+ got := fakeMQTT.payloadByTopic
+ if !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("[%d]: got %v, want %v", i, got, tt.want)
+ }
+ }
+}
+
+type (
+ fakeMQTT struct {
+ mqtt.Client
+
+ callbackByTopic map[string]mqtt.MessageHandler
+ payloadByTopic map[string][]byte
+ }
+
+ fakeMessage struct {
+ mqtt.Message
+
+ topic string
+ retained bool
+ payload []byte
+ }
+
+ fakeToken struct{}
+)
+
+func (f *fakeMQTT) Publish(topic string, qos byte, retain bool, payload interface{}) mqtt.Token {
+ bytes, ok := payload.([]byte)
+ if !ok {
+ panic(fmt.Sprintf("expected type []byte, got %v", reflect.TypeOf(payload)))
+ }
+
+ log.Printf("topic %q payload %s", topic, payload)
+ f.payloadByTopic[topic] = bytes
+ return &fakeToken{}
+}
+func (f *fakeMQTT) Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token {
+ f.callbackByTopic[topic] = callback
+
+ return &fakeToken{}
+}
+func (f *fakeMQTT) send(topic string, retention Retention, payload []byte) {
+ // if retention == Retain {
+ // f.payloadByTopic[topic] = payload
+ // }
+
+ if callback, ok := f.callbackByTopic[topic]; ok {
+ msg := &fakeMessage{
+ topic: topic,
+ retained: bool(retention),
+ payload: payload,
+ }
+ callback(f, msg)
+ }
+}
+
+func (f *fakeMessage) Topic() string {
+ return f.topic
+}
+func (f *fakeMessage) Payload() []byte {
+ return f.payload
+}
+func (f *fakeMessage) Retained() bool {
+ return f.retained
+}
+
+func (_ *fakeToken) Wait() bool {
+ return false
+}
+func (_ *fakeToken) WaitTimeout(_ time.Duration) bool {
+ return false
+}
+func (_ *fakeToken) Error() error {
+ return nil
+}