aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--catbus.go31
-rw-r--r--catbus_test.go66
2 files changed, 55 insertions, 42 deletions
diff --git a/catbus.go b/catbus.go
index bb674c6..c2af247 100644
--- a/catbus.go
+++ b/catbus.go
@@ -14,14 +14,17 @@ import (
)
type (
- Message = mqtt.Message
- MessageHandler = func(*Client, Message)
+ MessageHandler func(*Client, Message)
+
+ Message struct {
+ message mqtt.Message
+ }
Client struct {
mqtt mqtt.Client
payloadByTopicMu sync.Mutex
- payloadByTopic map[string][]byte
+ payloadByTopic map[string]string
onconnectTimerByTopicMu sync.Mutex
onconnectTimerByTopic map[string]*time.Timer
@@ -40,7 +43,7 @@ type (
// DefaultPayloadByTopic are optional values to publish on connect if no prior values are seen.
// E.g. unless we've been told otherwise, assume a device is off.
- DefaultPayloadByTopic map[string][]byte
+ DefaultPayloadByTopic map[string]string
}
// Retention is whether or not the MQTT broker should retain the message.
@@ -65,7 +68,7 @@ const (
func NewClient(brokerURI string, options ClientOptions) *Client {
client := &Client{
- payloadByTopic: map[string][]byte{},
+ payloadByTopic: map[string]string{},
onconnectTimerByTopic: map[string]*time.Timer{},
onconnectDelay: DefaultOnconnectDelay,
@@ -116,14 +119,14 @@ func (c *Client) Connect() error {
// Subscribe subscribes to a Catbus MQTT topic.
func (c *Client) Subscribe(topic string, f MessageHandler) error {
return c.mqtt.Subscribe(topic, atLeastOnce, func(_ mqtt.Client, msg mqtt.Message) {
- c.storePayload(msg.Topic(), Retention(msg.Retained()), msg.Payload())
+ c.storePayload(msg.Topic(), Retention(msg.Retained()), string(msg.Payload()))
- f(c, msg)
+ f(c, Message{msg})
}).Error()
}
// Publish publishes to a Catbus MQTT topic.
-func (c *Client) Publish(topic string, retention Retention, payload []byte) error {
+func (c *Client) Publish(topic string, retention Retention, payload string) error {
c.storePayload(topic, retention, payload)
return c.mqtt.Publish(topic, atLeastOnce, bool(retention), payload).Error()
@@ -137,7 +140,7 @@ func (c *Client) jitteredOnconnectDelay() time.Duration {
return c.onconnectDelay - jitter
}
-func (c *Client) storePayload(topic string, retention Retention, payload []byte) {
+func (c *Client) storePayload(topic string, retention Retention, payload string) {
c.payloadByTopicMu.Lock()
defer c.payloadByTopicMu.Unlock()
@@ -189,3 +192,13 @@ func (c *Client) startAllTimers() {
})
}
}
+
+func (m Message) Topic() string {
+ return m.message.Topic()
+}
+func (m Message) Payload() string {
+ return string(m.message.Payload())
+}
+func (m Message) Retained() bool {
+ return m.message.Retained()
+}
diff --git a/catbus_test.go b/catbus_test.go
index d07367b..cc22ebb 100644
--- a/catbus_test.go
+++ b/catbus_test.go
@@ -17,38 +17,38 @@ import (
type (
message struct {
retention Retention
- payload []byte
+ payload string
}
)
func TestOnConnect(t *testing.T) {
tests := []struct {
- payloadByTopic map[string][]byte
+ payloadByTopic map[string]string
subscribe []string
receive map[string]message
- want map[string][]byte
+ want map[string]string
}{
{
- payloadByTopic: map[string][]byte{
- "tv/power": []byte("off"),
+ payloadByTopic: map[string]string{
+ "tv/power": "off",
},
- want: map[string][]byte{
- "tv/power": []byte("off"),
+ want: map[string]string{
+ "tv/power": "off",
},
},
{
- payloadByTopic: map[string][]byte{
- "tv/power": []byte("off"),
+ payloadByTopic: map[string]string{
+ "tv/power": "off",
},
subscribe: []string{
"tv/power",
},
receive: map[string]message{
- "tv/power": {Retain, []byte("on")},
+ "tv/power": {Retain, "on"},
},
- want: map[string][]byte{
- "tv/power": []byte("on"),
+ want: map[string]string{
+ "tv/power": "on",
},
},
{
@@ -56,24 +56,24 @@ func TestOnConnect(t *testing.T) {
"tv/power",
},
receive: map[string]message{
- "tv/power": {Retain, []byte("on")},
+ "tv/power": {Retain, "on"},
},
- want: map[string][]byte{
- "tv/power": []byte("on"),
+ want: map[string]string{
+ "tv/power": "on",
},
},
{
- payloadByTopic: map[string][]byte{
- "tv/power": []byte("off"),
+ payloadByTopic: map[string]string{
+ "tv/power": "off",
},
subscribe: []string{
"tv/power",
},
receive: map[string]message{
- "tv/power": {DontRetain, []byte("on")},
+ "tv/power": {DontRetain, "on"},
},
- want: map[string][]byte{
- "tv/power": []byte("on"),
+ want: map[string]string{
+ "tv/power": "on",
},
},
{
@@ -81,33 +81,33 @@ func TestOnConnect(t *testing.T) {
"tv/power",
},
receive: map[string]message{
- "tv/power": {DontRetain, []byte("on")},
+ "tv/power": {DontRetain, "on"},
},
- want: map[string][]byte{},
+ want: map[string]string{},
},
{
- payloadByTopic: map[string][]byte{
- "tv/power": []byte("off"),
+ payloadByTopic: map[string]string{
+ "tv/power": "off",
},
subscribe: []string{
"tv/power",
},
receive: map[string]message{
- "tv/power": {DontRetain, []byte{}},
+ "tv/power": {DontRetain, ""},
},
- want: map[string][]byte{},
+ want: map[string]string{},
},
}
for i, tt := range tests {
fakeMQTT := &fakeMQTT{
callbackByTopic: map[string]mqtt.MessageHandler{},
- payloadByTopic: map[string][]byte{},
+ payloadByTopic: map[string]string{},
}
catbus := &Client{
mqtt: fakeMQTT,
- payloadByTopic: map[string][]byte{},
+ payloadByTopic: map[string]string{},
onconnectTimerByTopic: map[string]*time.Timer{},
onconnectDelay: 1 * time.Millisecond,
onconnectJitter: 1,
@@ -141,7 +141,7 @@ type (
mqtt.Client
callbackByTopic map[string]mqtt.MessageHandler
- payloadByTopic map[string][]byte
+ payloadByTopic map[string]string
}
fakeMessage struct {
@@ -156,9 +156,9 @@ type (
)
func (f *fakeMQTT) Publish(topic string, qos byte, retain bool, payload interface{}) mqtt.Token {
- bytes, ok := payload.([]byte)
+ bytes, ok := payload.(string)
if !ok {
- panic(fmt.Sprintf("expected type []byte, got %v", reflect.TypeOf(payload)))
+ panic(fmt.Sprintf("expected type string, got %v", reflect.TypeOf(payload)))
}
log.Printf("topic %q payload %s", topic, payload)
@@ -170,7 +170,7 @@ func (f *fakeMQTT) Subscribe(topic string, qos byte, callback mqtt.MessageHandle
return &fakeToken{}
}
-func (f *fakeMQTT) send(topic string, retention Retention, payload []byte) {
+func (f *fakeMQTT) send(topic string, retention Retention, payload string) {
// if retention == Retain {
// f.payloadByTopic[topic] = payload
// }
@@ -179,7 +179,7 @@ func (f *fakeMQTT) send(topic string, retention Retention, payload []byte) {
msg := &fakeMessage{
topic: topic,
retained: bool(retention),
- payload: payload,
+ payload: []byte(payload),
}
callback(f, msg)
}