diff options
author | Ethel Morgan <eth@ethulhu.co.uk> | 2020-06-24 12:24:00 +0100 |
---|---|---|
committer | Ethel Morgan <eth@ethulhu.co.uk> | 2020-06-24 12:24:00 +0100 |
commit | 9ccb33b03d4d2ac71fe0f1fec1ee0a64cca157f5 (patch) | |
tree | 01ac690aa2f4b39e9c859521a3336c2de5d57895 /catbus.go | |
parent | b8209be849b80f1018fd5d4aa8c209fd91326a8c (diff) |
use string instead of []bytev0.0.2
subsequently, create a catbus.Message wrapper struct for mqtt.Message.
Diffstat (limited to 'catbus.go')
-rw-r--r-- | catbus.go | 31 |
1 files changed, 22 insertions, 9 deletions
@@ -14,14 +14,17 @@ import ( ) type ( - Message = mqtt.Message - MessageHandler = func(*Client, Message) + MessageHandler func(*Client, Message) + + Message struct { + message mqtt.Message + } Client struct { mqtt mqtt.Client payloadByTopicMu sync.Mutex - payloadByTopic map[string][]byte + payloadByTopic map[string]string onconnectTimerByTopicMu sync.Mutex onconnectTimerByTopic map[string]*time.Timer @@ -40,7 +43,7 @@ type ( // DefaultPayloadByTopic are optional values to publish on connect if no prior values are seen. // E.g. unless we've been told otherwise, assume a device is off. - DefaultPayloadByTopic map[string][]byte + DefaultPayloadByTopic map[string]string } // Retention is whether or not the MQTT broker should retain the message. @@ -65,7 +68,7 @@ const ( func NewClient(brokerURI string, options ClientOptions) *Client { client := &Client{ - payloadByTopic: map[string][]byte{}, + payloadByTopic: map[string]string{}, onconnectTimerByTopic: map[string]*time.Timer{}, onconnectDelay: DefaultOnconnectDelay, @@ -116,14 +119,14 @@ func (c *Client) Connect() error { // Subscribe subscribes to a Catbus MQTT topic. func (c *Client) Subscribe(topic string, f MessageHandler) error { return c.mqtt.Subscribe(topic, atLeastOnce, func(_ mqtt.Client, msg mqtt.Message) { - c.storePayload(msg.Topic(), Retention(msg.Retained()), msg.Payload()) + c.storePayload(msg.Topic(), Retention(msg.Retained()), string(msg.Payload())) - f(c, msg) + f(c, Message{msg}) }).Error() } // Publish publishes to a Catbus MQTT topic. -func (c *Client) Publish(topic string, retention Retention, payload []byte) error { +func (c *Client) Publish(topic string, retention Retention, payload string) error { c.storePayload(topic, retention, payload) return c.mqtt.Publish(topic, atLeastOnce, bool(retention), payload).Error() @@ -137,7 +140,7 @@ func (c *Client) jitteredOnconnectDelay() time.Duration { return c.onconnectDelay - jitter } -func (c *Client) storePayload(topic string, retention Retention, payload []byte) { +func (c *Client) storePayload(topic string, retention Retention, payload string) { c.payloadByTopicMu.Lock() defer c.payloadByTopicMu.Unlock() @@ -189,3 +192,13 @@ func (c *Client) startAllTimers() { }) } } + +func (m Message) Topic() string { + return m.message.Topic() +} +func (m Message) Payload() string { + return string(m.message.Payload()) +} +func (m Message) Retained() bool { + return m.message.Retained() +} |