aboutsummaryrefslogtreecommitdiff
path: root/catbus.go
diff options
context:
space:
mode:
Diffstat (limited to 'catbus.go')
-rw-r--r--catbus.go31
1 files changed, 22 insertions, 9 deletions
diff --git a/catbus.go b/catbus.go
index bb674c6..c2af247 100644
--- a/catbus.go
+++ b/catbus.go
@@ -14,14 +14,17 @@ import (
)
type (
- Message = mqtt.Message
- MessageHandler = func(*Client, Message)
+ MessageHandler func(*Client, Message)
+
+ Message struct {
+ message mqtt.Message
+ }
Client struct {
mqtt mqtt.Client
payloadByTopicMu sync.Mutex
- payloadByTopic map[string][]byte
+ payloadByTopic map[string]string
onconnectTimerByTopicMu sync.Mutex
onconnectTimerByTopic map[string]*time.Timer
@@ -40,7 +43,7 @@ type (
// DefaultPayloadByTopic are optional values to publish on connect if no prior values are seen.
// E.g. unless we've been told otherwise, assume a device is off.
- DefaultPayloadByTopic map[string][]byte
+ DefaultPayloadByTopic map[string]string
}
// Retention is whether or not the MQTT broker should retain the message.
@@ -65,7 +68,7 @@ const (
func NewClient(brokerURI string, options ClientOptions) *Client {
client := &Client{
- payloadByTopic: map[string][]byte{},
+ payloadByTopic: map[string]string{},
onconnectTimerByTopic: map[string]*time.Timer{},
onconnectDelay: DefaultOnconnectDelay,
@@ -116,14 +119,14 @@ func (c *Client) Connect() error {
// Subscribe subscribes to a Catbus MQTT topic.
func (c *Client) Subscribe(topic string, f MessageHandler) error {
return c.mqtt.Subscribe(topic, atLeastOnce, func(_ mqtt.Client, msg mqtt.Message) {
- c.storePayload(msg.Topic(), Retention(msg.Retained()), msg.Payload())
+ c.storePayload(msg.Topic(), Retention(msg.Retained()), string(msg.Payload()))
- f(c, msg)
+ f(c, Message{msg})
}).Error()
}
// Publish publishes to a Catbus MQTT topic.
-func (c *Client) Publish(topic string, retention Retention, payload []byte) error {
+func (c *Client) Publish(topic string, retention Retention, payload string) error {
c.storePayload(topic, retention, payload)
return c.mqtt.Publish(topic, atLeastOnce, bool(retention), payload).Error()
@@ -137,7 +140,7 @@ func (c *Client) jitteredOnconnectDelay() time.Duration {
return c.onconnectDelay - jitter
}
-func (c *Client) storePayload(topic string, retention Retention, payload []byte) {
+func (c *Client) storePayload(topic string, retention Retention, payload string) {
c.payloadByTopicMu.Lock()
defer c.payloadByTopicMu.Unlock()
@@ -189,3 +192,13 @@ func (c *Client) startAllTimers() {
})
}
}
+
+func (m Message) Topic() string {
+ return m.message.Topic()
+}
+func (m Message) Payload() string {
+ return string(m.message.Payload())
+}
+func (m Message) Retained() bool {
+ return m.message.Retained()
+}