From 9ccb33b03d4d2ac71fe0f1fec1ee0a64cca157f5 Mon Sep 17 00:00:00 2001 From: Ethel Morgan Date: Wed, 24 Jun 2020 12:24:00 +0100 Subject: use string instead of []byte subsequently, create a catbus.Message wrapper struct for mqtt.Message. --- catbus.go | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) (limited to 'catbus.go') diff --git a/catbus.go b/catbus.go index bb674c6..c2af247 100644 --- a/catbus.go +++ b/catbus.go @@ -14,14 +14,17 @@ import ( ) type ( - Message = mqtt.Message - MessageHandler = func(*Client, Message) + MessageHandler func(*Client, Message) + + Message struct { + message mqtt.Message + } Client struct { mqtt mqtt.Client payloadByTopicMu sync.Mutex - payloadByTopic map[string][]byte + payloadByTopic map[string]string onconnectTimerByTopicMu sync.Mutex onconnectTimerByTopic map[string]*time.Timer @@ -40,7 +43,7 @@ type ( // DefaultPayloadByTopic are optional values to publish on connect if no prior values are seen. // E.g. unless we've been told otherwise, assume a device is off. - DefaultPayloadByTopic map[string][]byte + DefaultPayloadByTopic map[string]string } // Retention is whether or not the MQTT broker should retain the message. @@ -65,7 +68,7 @@ const ( func NewClient(brokerURI string, options ClientOptions) *Client { client := &Client{ - payloadByTopic: map[string][]byte{}, + payloadByTopic: map[string]string{}, onconnectTimerByTopic: map[string]*time.Timer{}, onconnectDelay: DefaultOnconnectDelay, @@ -116,14 +119,14 @@ func (c *Client) Connect() error { // Subscribe subscribes to a Catbus MQTT topic. func (c *Client) Subscribe(topic string, f MessageHandler) error { return c.mqtt.Subscribe(topic, atLeastOnce, func(_ mqtt.Client, msg mqtt.Message) { - c.storePayload(msg.Topic(), Retention(msg.Retained()), msg.Payload()) + c.storePayload(msg.Topic(), Retention(msg.Retained()), string(msg.Payload())) - f(c, msg) + f(c, Message{msg}) }).Error() } // Publish publishes to a Catbus MQTT topic. -func (c *Client) Publish(topic string, retention Retention, payload []byte) error { +func (c *Client) Publish(topic string, retention Retention, payload string) error { c.storePayload(topic, retention, payload) return c.mqtt.Publish(topic, atLeastOnce, bool(retention), payload).Error() @@ -137,7 +140,7 @@ func (c *Client) jitteredOnconnectDelay() time.Duration { return c.onconnectDelay - jitter } -func (c *Client) storePayload(topic string, retention Retention, payload []byte) { +func (c *Client) storePayload(topic string, retention Retention, payload string) { c.payloadByTopicMu.Lock() defer c.payloadByTopicMu.Unlock() @@ -189,3 +192,13 @@ func (c *Client) startAllTimers() { }) } } + +func (m Message) Topic() string { + return m.message.Topic() +} +func (m Message) Payload() string { + return string(m.message.Payload()) +} +func (m Message) Retained() bool { + return m.message.Retained() +} -- cgit v1.2.3