diff options
author | Ethel Morgan <eth@ethulhu.co.uk> | 2020-07-13 23:42:52 +0100 |
---|---|---|
committer | Ethel Morgan <eth@ethulhu.co.uk> | 2020-07-13 23:42:52 +0100 |
commit | 34872fa3332f69600721860aa4c830c230ebdf87 (patch) | |
tree | 6c4ee0629da26a05c2fb9a7c9004e78a36c9dc8f | |
parent | 84cab608ff289ca3b6693ca8355ddc3afd62592d (diff) |
add initial MQTT publish support
Diffstat (limited to '')
-rw-r--r-- | cmd/dispatch/main.go | 49 | ||||
-rw-r--r-- | config/config.go | 37 | ||||
-rw-r--r-- | go.mod | 2 | ||||
-rw-r--r-- | go.sum | 14 |
4 files changed, 78 insertions, 24 deletions
diff --git a/cmd/dispatch/main.go b/cmd/dispatch/main.go index bd664ba..9a9bc59 100644 --- a/cmd/dispatch/main.go +++ b/cmd/dispatch/main.go @@ -7,15 +7,20 @@ package main import ( "context" + "fmt" "net" "net/http" "os" + "strings" + "time" "github.com/gorilla/mux" "go.eth.moe/dispatch/config" "go.eth.moe/flag" "go.eth.moe/httputil" "go.eth.moe/logger" + + mqtt "github.com/eclipse/paho.mqtt.golang" ) var ( @@ -115,14 +120,44 @@ func matchTriggers(triggers []config.Trigger, r *http.Request) bool { func runAction(ctx context.Context, action config.Action) { log, _ := logger.FromContext(ctx) + log, _ = log.Fork(ctx) - if action.Kind != config.HTTPAction { - log.Warning("only supports HTTP for now") - } + switch action.Kind { + case config.HTTPAction: + if _, err := http.PostForm(action.URL.String(), action.FormValues); err != nil { + log.WithError(err).Error("could not POST form") + return + } + log.Info("POSTed to URL") + + case config.MQTTAction: + broker, topic := fmt.Sprintf("%s://%s", action.MQTT.Scheme, action.MQTT.Host), strings.TrimPrefix(action.MQTT.Path, "/") + log.AddField("mqtt.broker-uri", broker) + log.AddField("mqtt.topic", topic) + log.AddField("mqtt.value", action.Value) + log.AddField("mqtt.retain", action.Retain) + + waitForAck := 5 * time.Second + client := mqtt.NewClient(mqtt.NewClientOptions().AddBroker(broker).SetOnConnectHandler(func(client mqtt.Client) { + defer client.Disconnect(uint(waitForAck.Milliseconds())) + + token := client.Publish(topic, 1, action.Retain, action.Value) + for !token.Wait() { + log.Error("could not wait") + } + if err := token.Error(); err != nil { + log.WithError(err).Error("could not publish to MQTT") + return + } + log.Info("published to MQTT") + })) + + if err := client.Connect().Error(); err != nil { + log.WithError(err).Error("could not connect to MQTT") + return + } - if _, err := http.PostForm(action.URL.String(), action.FormValues); err != nil { - log.WithError(err).Error("could not POST form") - return + default: + panic(fmt.Sprintf("unknown ActionKind: %v", action.Kind)) } - log.Info("POSTed to URL") } diff --git a/config/config.go b/config/config.go index 01f6b82..347fdac 100644 --- a/config/config.go +++ b/config/config.go @@ -31,8 +31,9 @@ type ( URL *url.URL FormValues url.Values - MQTT *url.URL - Value string + MQTT *url.URL + Value string + Retain bool } Config struct { @@ -50,6 +51,7 @@ type ( FormValues map[string]string `json:"formValues"` MQTT uurl `json:"mqtt"` Value string `json:"value"` + Retain bool `json:"retain"` } `json:"actions"` } `json:"rules"` } @@ -89,28 +91,29 @@ func configFromConfig(raw config) (*Config, error) { } for _, rawTrigger := range v.Triggers { - trigger := Trigger{ + rule.Triggers = append(rule.Triggers, Trigger{ URL: rawTrigger.URL.URL, FormValues: urlValuesFromRawValues(rawTrigger.FormValues), - } - rule.Triggers = append(rule.Triggers, trigger) + }) } - for _, rawAction := range v.Actions { - action := Action{ - URL: rawAction.URL.URL, - FormValues: urlValuesFromRawValues(rawAction.FormValues), - MQTT: rawAction.MQTT.URL, - Value: rawAction.Value, - } + for _, action := range v.Actions { + var kind ActionKind switch { - case action.URL != nil && action.MQTT == nil: - action.Kind = HTTPAction - case action.URL == nil && action.MQTT != nil: - action.Kind = MQTTAction + case action.URL != uurl{} && action.MQTT == uurl{}: + kind = HTTPAction + case action.URL == uurl{} && action.MQTT != uurl{}: + kind = MQTTAction default: return nil, errors.New("actions must be URL xor MQTT") } - rule.Actions = append(rule.Actions, action) + rule.Actions = append(rule.Actions, Action{ + Kind: kind, + URL: action.URL.URL, + FormValues: urlValuesFromRawValues(action.FormValues), + MQTT: action.MQTT.URL, + Value: action.Value, + Retain: action.Retain, + }) } c.RulesByName[k] = rule @@ -7,8 +7,10 @@ module go.eth.moe/dispatch go 1.14 require ( + github.com/eclipse/paho.mqtt.golang v1.2.0 github.com/gorilla/mux v1.7.4 go.eth.moe/flag v0.0.2 go.eth.moe/httputil v0.0.5 go.eth.moe/logger v0.0.1 + golang.org/x/net v0.0.0-20200707034311-ab3426394381 // indirect ) @@ -1,4 +1,6 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0= +github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc= github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -12,7 +14,19 @@ go.eth.moe/httputil v0.0.3 h1:kcE2kXLkqbhcjswCr4kwCB+uo1qgaPY5EyKs2JNzsVw= go.eth.moe/httputil v0.0.3/go.mod h1:LyOLlHBzYVOlwFltlTebWcfXJOyxcVkD+BRYADcZ7So= go.eth.moe/httputil v0.0.4 h1:OA6XaBId3KePppkIvuPpm3RRL9/2hlq7xlUCkrYjI4Q= go.eth.moe/httputil v0.0.4/go.mod h1:LyOLlHBzYVOlwFltlTebWcfXJOyxcVkD+BRYADcZ7So= +go.eth.moe/httputil v0.0.5 h1:ctxYwU/g0UQfNWgpKeWK6wphzDfoqm3fvtyH7JK+uZE= +go.eth.moe/httputil v0.0.5/go.mod h1:LyOLlHBzYVOlwFltlTebWcfXJOyxcVkD+BRYADcZ7So= go.eth.moe/logger v0.0.1 h1:ncY0iuVIljShMQtwy+77DvoHDlu6zVZ+7XIT7jyprrY= go.eth.moe/logger v0.0.1/go.mod h1:G20TP3ON2S95olTep+qsBSoTfouZeKPukk3Ow42q5OQ= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU= +golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= |