diff options
-rw-r--r-- | catbus.go | 31 | ||||
-rw-r--r-- | catbus_test.go | 66 |
2 files changed, 55 insertions, 42 deletions
@@ -14,14 +14,17 @@ import ( ) type ( - Message = mqtt.Message - MessageHandler = func(*Client, Message) + MessageHandler func(*Client, Message) + + Message struct { + message mqtt.Message + } Client struct { mqtt mqtt.Client payloadByTopicMu sync.Mutex - payloadByTopic map[string][]byte + payloadByTopic map[string]string onconnectTimerByTopicMu sync.Mutex onconnectTimerByTopic map[string]*time.Timer @@ -40,7 +43,7 @@ type ( // DefaultPayloadByTopic are optional values to publish on connect if no prior values are seen. // E.g. unless we've been told otherwise, assume a device is off. - DefaultPayloadByTopic map[string][]byte + DefaultPayloadByTopic map[string]string } // Retention is whether or not the MQTT broker should retain the message. @@ -65,7 +68,7 @@ const ( func NewClient(brokerURI string, options ClientOptions) *Client { client := &Client{ - payloadByTopic: map[string][]byte{}, + payloadByTopic: map[string]string{}, onconnectTimerByTopic: map[string]*time.Timer{}, onconnectDelay: DefaultOnconnectDelay, @@ -116,14 +119,14 @@ func (c *Client) Connect() error { // Subscribe subscribes to a Catbus MQTT topic. func (c *Client) Subscribe(topic string, f MessageHandler) error { return c.mqtt.Subscribe(topic, atLeastOnce, func(_ mqtt.Client, msg mqtt.Message) { - c.storePayload(msg.Topic(), Retention(msg.Retained()), msg.Payload()) + c.storePayload(msg.Topic(), Retention(msg.Retained()), string(msg.Payload())) - f(c, msg) + f(c, Message{msg}) }).Error() } // Publish publishes to a Catbus MQTT topic. -func (c *Client) Publish(topic string, retention Retention, payload []byte) error { +func (c *Client) Publish(topic string, retention Retention, payload string) error { c.storePayload(topic, retention, payload) return c.mqtt.Publish(topic, atLeastOnce, bool(retention), payload).Error() @@ -137,7 +140,7 @@ func (c *Client) jitteredOnconnectDelay() time.Duration { return c.onconnectDelay - jitter } -func (c *Client) storePayload(topic string, retention Retention, payload []byte) { +func (c *Client) storePayload(topic string, retention Retention, payload string) { c.payloadByTopicMu.Lock() defer c.payloadByTopicMu.Unlock() @@ -189,3 +192,13 @@ func (c *Client) startAllTimers() { }) } } + +func (m Message) Topic() string { + return m.message.Topic() +} +func (m Message) Payload() string { + return string(m.message.Payload()) +} +func (m Message) Retained() bool { + return m.message.Retained() +} diff --git a/catbus_test.go b/catbus_test.go index d07367b..cc22ebb 100644 --- a/catbus_test.go +++ b/catbus_test.go @@ -17,38 +17,38 @@ import ( type ( message struct { retention Retention - payload []byte + payload string } ) func TestOnConnect(t *testing.T) { tests := []struct { - payloadByTopic map[string][]byte + payloadByTopic map[string]string subscribe []string receive map[string]message - want map[string][]byte + want map[string]string }{ { - payloadByTopic: map[string][]byte{ - "tv/power": []byte("off"), + payloadByTopic: map[string]string{ + "tv/power": "off", }, - want: map[string][]byte{ - "tv/power": []byte("off"), + want: map[string]string{ + "tv/power": "off", }, }, { - payloadByTopic: map[string][]byte{ - "tv/power": []byte("off"), + payloadByTopic: map[string]string{ + "tv/power": "off", }, subscribe: []string{ "tv/power", }, receive: map[string]message{ - "tv/power": {Retain, []byte("on")}, + "tv/power": {Retain, "on"}, }, - want: map[string][]byte{ - "tv/power": []byte("on"), + want: map[string]string{ + "tv/power": "on", }, }, { @@ -56,24 +56,24 @@ func TestOnConnect(t *testing.T) { "tv/power", }, receive: map[string]message{ - "tv/power": {Retain, []byte("on")}, + "tv/power": {Retain, "on"}, }, - want: map[string][]byte{ - "tv/power": []byte("on"), + want: map[string]string{ + "tv/power": "on", }, }, { - payloadByTopic: map[string][]byte{ - "tv/power": []byte("off"), + payloadByTopic: map[string]string{ + "tv/power": "off", }, subscribe: []string{ "tv/power", }, receive: map[string]message{ - "tv/power": {DontRetain, []byte("on")}, + "tv/power": {DontRetain, "on"}, }, - want: map[string][]byte{ - "tv/power": []byte("on"), + want: map[string]string{ + "tv/power": "on", }, }, { @@ -81,33 +81,33 @@ func TestOnConnect(t *testing.T) { "tv/power", }, receive: map[string]message{ - "tv/power": {DontRetain, []byte("on")}, + "tv/power": {DontRetain, "on"}, }, - want: map[string][]byte{}, + want: map[string]string{}, }, { - payloadByTopic: map[string][]byte{ - "tv/power": []byte("off"), + payloadByTopic: map[string]string{ + "tv/power": "off", }, subscribe: []string{ "tv/power", }, receive: map[string]message{ - "tv/power": {DontRetain, []byte{}}, + "tv/power": {DontRetain, ""}, }, - want: map[string][]byte{}, + want: map[string]string{}, }, } for i, tt := range tests { fakeMQTT := &fakeMQTT{ callbackByTopic: map[string]mqtt.MessageHandler{}, - payloadByTopic: map[string][]byte{}, + payloadByTopic: map[string]string{}, } catbus := &Client{ mqtt: fakeMQTT, - payloadByTopic: map[string][]byte{}, + payloadByTopic: map[string]string{}, onconnectTimerByTopic: map[string]*time.Timer{}, onconnectDelay: 1 * time.Millisecond, onconnectJitter: 1, @@ -141,7 +141,7 @@ type ( mqtt.Client callbackByTopic map[string]mqtt.MessageHandler - payloadByTopic map[string][]byte + payloadByTopic map[string]string } fakeMessage struct { @@ -156,9 +156,9 @@ type ( ) func (f *fakeMQTT) Publish(topic string, qos byte, retain bool, payload interface{}) mqtt.Token { - bytes, ok := payload.([]byte) + bytes, ok := payload.(string) if !ok { - panic(fmt.Sprintf("expected type []byte, got %v", reflect.TypeOf(payload))) + panic(fmt.Sprintf("expected type string, got %v", reflect.TypeOf(payload))) } log.Printf("topic %q payload %s", topic, payload) @@ -170,7 +170,7 @@ func (f *fakeMQTT) Subscribe(topic string, qos byte, callback mqtt.MessageHandle return &fakeToken{} } -func (f *fakeMQTT) send(topic string, retention Retention, payload []byte) { +func (f *fakeMQTT) send(topic string, retention Retention, payload string) { // if retention == Retain { // f.payloadByTopic[topic] = payload // } @@ -179,7 +179,7 @@ func (f *fakeMQTT) send(topic string, retention Retention, payload []byte) { msg := &fakeMessage{ topic: topic, retained: bool(retention), - payload: payload, + payload: []byte(payload), } callback(f, msg) } |