From b9f13df51979ff38e4a9578f982cc608e3bb2c0d Mon Sep 17 00:00:00 2001 From: Ethel Morgan Date: Mon, 22 Jun 2020 21:25:47 +0100 Subject: add rebroadcasting to package catbus --- catbus/catbus.go | 107 ++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 90 insertions(+), 17 deletions(-) (limited to 'catbus') diff --git a/catbus/catbus.go b/catbus/catbus.go index e676a14..9325d8e 100644 --- a/catbus/catbus.go +++ b/catbus/catbus.go @@ -5,7 +5,13 @@ // Package catbus is a convenience wrapper around MQTT for use with Catbus. package catbus -import mqtt "github.com/eclipse/paho.mqtt.golang" +import ( + "math/rand" + "sync" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) type ( Message = mqtt.Message @@ -13,12 +19,29 @@ type ( Client struct { mqtt mqtt.Client + + rebroadcastMu sync.Mutex + rebroadcastByTopic map[string]*time.Timer + rebroadcastPeriod time.Duration + rebroadcastJitter time.Duration } + ClientOptions struct { DisconnectHandler func(*Client, error) ConnectHandler func(*Client) + + // Rebroadcast previously seen values every RebroadcastPeriod ± [0,RebroadcastJitter). + RebroadcastPeriod time.Duration + RebroadcastJitter time.Duration + + // RebroadcastDefaults are optional values to seed rebroadcasting if no prior values are seen. + // E.g. unless we've been told otherwise, assume a device is off. + RebroadcastDefaults map[string][]byte } + + // Retention is whether or not the MQTT broker should retain the message. + Retention bool ) const ( @@ -28,34 +51,49 @@ const ( ) const ( - Retain = true + Retain = Retention(true) + DontRetain = Retention(false) +) + +const ( + DefaultRebroadcastPeriod = 1 * time.Minute + DefaultRebroadcastJitter = 15 * time.Second ) func NewClient(brokerURI string, options ClientOptions) *Client { - opts := mqtt.NewClientOptions() + client := &Client{ + rebroadcastByTopic: map[string]*time.Timer{}, + rebroadcastPeriod: DefaultRebroadcastPeriod, + rebroadcastJitter: DefaultRebroadcastJitter, + } - opts.AddBroker(brokerURI) - opts.SetAutoReconnect(true) + if options.RebroadcastPeriod != 0 { + client.rebroadcastPeriod = options.RebroadcastPeriod + } + if options.RebroadcastJitter != 0 { + client.rebroadcastJitter = options.RebroadcastJitter + } + for topic, payload := range options.RebroadcastDefaults { + // TODO: Allow users to set retention? + client.rebroadcastLater(topic, DontRetain, payload) + } - opts.SetConnectionLostHandler(func(c mqtt.Client, err error) { + mqttOpts := mqtt.NewClientOptions() + mqttOpts.AddBroker(brokerURI) + mqttOpts.SetAutoReconnect(true) + mqttOpts.SetConnectionLostHandler(func(c mqtt.Client, err error) { if options.DisconnectHandler != nil { - options.DisconnectHandler(&Client{c}, err) + options.DisconnectHandler(client, err) } }) - opts.SetOnConnectHandler(func(c mqtt.Client) { + mqttOpts.SetOnConnectHandler(func(c mqtt.Client) { if options.ConnectHandler != nil { - options.ConnectHandler(&Client{c}) + options.ConnectHandler(client) } }) + client.mqtt = mqtt.NewClient(mqttOpts) - return &Client{mqtt.NewClient(opts)} -} - -// Subscribe subscribes to a Catbus MQTT topic. -func (c *Client) Subscribe(topic string, f MessageHandler) error { - return c.mqtt.Subscribe(topic, atLeastOnce, func(c mqtt.Client, msg mqtt.Message) { - f(&Client{c}, msg) - }).Error() + return client } // Connect connects to the Catbus MQTT broker and blocks forever. @@ -65,3 +103,38 @@ func (c *Client) Connect() error { } select {} } + +// Subscribe subscribes to a Catbus MQTT topic. +func (c *Client) Subscribe(topic string, f MessageHandler) error { + return c.mqtt.Subscribe(topic, atLeastOnce, func(_ mqtt.Client, msg mqtt.Message) { + c.rebroadcastLater(msg.Topic(), Retention(msg.Retained()), msg.Payload()) + + f(c, msg) + }).Error() +} + +// Publish publishes to a Catbus MQTT topic. +func (c *Client) Publish(topic string, retention Retention, payload []byte) error { + c.rebroadcastLater(topic, retention, payload) + + return c.mqtt.Publish(topic, atLeastOnce, bool(retention), payload).Error() +} + +func (c *Client) rebroadcastLater(topic string, retention Retention, payload []byte) { + c.rebroadcastMu.Lock() + defer c.rebroadcastMu.Unlock() + + if timer := c.rebroadcastByTopic[topic]; timer != nil { + _ = timer.Stop() + } + c.rebroadcastByTopic[topic] = time.AfterFunc(c.rebroadcastDuration(), func() { + _ = c.Publish(topic, retention, payload) + }) +} +func (c *Client) rebroadcastDuration() time.Duration { + jitter := time.Duration(rand.Intn(int(c.rebroadcastJitter))) + if rand.Intn(1) == 0 { + return c.rebroadcastPeriod + jitter + } + return c.rebroadcastPeriod - jitter +} -- cgit v1.2.3