aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEthel Morgan <eth@ethulhu.co.uk>2020-07-13 23:42:52 +0100
committerEthel Morgan <eth@ethulhu.co.uk>2020-07-13 23:42:52 +0100
commit34872fa3332f69600721860aa4c830c230ebdf87 (patch)
tree6c4ee0629da26a05c2fb9a7c9004e78a36c9dc8f
parent84cab608ff289ca3b6693ca8355ddc3afd62592d (diff)
add initial MQTT publish support
-rw-r--r--cmd/dispatch/main.go49
-rw-r--r--config/config.go37
-rw-r--r--go.mod2
-rw-r--r--go.sum14
4 files changed, 78 insertions, 24 deletions
diff --git a/cmd/dispatch/main.go b/cmd/dispatch/main.go
index bd664ba..9a9bc59 100644
--- a/cmd/dispatch/main.go
+++ b/cmd/dispatch/main.go
@@ -7,15 +7,20 @@ package main
import (
"context"
+ "fmt"
"net"
"net/http"
"os"
+ "strings"
+ "time"
"github.com/gorilla/mux"
"go.eth.moe/dispatch/config"
"go.eth.moe/flag"
"go.eth.moe/httputil"
"go.eth.moe/logger"
+
+ mqtt "github.com/eclipse/paho.mqtt.golang"
)
var (
@@ -115,14 +120,44 @@ func matchTriggers(triggers []config.Trigger, r *http.Request) bool {
func runAction(ctx context.Context, action config.Action) {
log, _ := logger.FromContext(ctx)
+ log, _ = log.Fork(ctx)
- if action.Kind != config.HTTPAction {
- log.Warning("only supports HTTP for now")
- }
+ switch action.Kind {
+ case config.HTTPAction:
+ if _, err := http.PostForm(action.URL.String(), action.FormValues); err != nil {
+ log.WithError(err).Error("could not POST form")
+ return
+ }
+ log.Info("POSTed to URL")
+
+ case config.MQTTAction:
+ broker, topic := fmt.Sprintf("%s://%s", action.MQTT.Scheme, action.MQTT.Host), strings.TrimPrefix(action.MQTT.Path, "/")
+ log.AddField("mqtt.broker-uri", broker)
+ log.AddField("mqtt.topic", topic)
+ log.AddField("mqtt.value", action.Value)
+ log.AddField("mqtt.retain", action.Retain)
+
+ waitForAck := 5 * time.Second
+ client := mqtt.NewClient(mqtt.NewClientOptions().AddBroker(broker).SetOnConnectHandler(func(client mqtt.Client) {
+ defer client.Disconnect(uint(waitForAck.Milliseconds()))
+
+ token := client.Publish(topic, 1, action.Retain, action.Value)
+ for !token.Wait() {
+ log.Error("could not wait")
+ }
+ if err := token.Error(); err != nil {
+ log.WithError(err).Error("could not publish to MQTT")
+ return
+ }
+ log.Info("published to MQTT")
+ }))
+
+ if err := client.Connect().Error(); err != nil {
+ log.WithError(err).Error("could not connect to MQTT")
+ return
+ }
- if _, err := http.PostForm(action.URL.String(), action.FormValues); err != nil {
- log.WithError(err).Error("could not POST form")
- return
+ default:
+ panic(fmt.Sprintf("unknown ActionKind: %v", action.Kind))
}
- log.Info("POSTed to URL")
}
diff --git a/config/config.go b/config/config.go
index 01f6b82..347fdac 100644
--- a/config/config.go
+++ b/config/config.go
@@ -31,8 +31,9 @@ type (
URL *url.URL
FormValues url.Values
- MQTT *url.URL
- Value string
+ MQTT *url.URL
+ Value string
+ Retain bool
}
Config struct {
@@ -50,6 +51,7 @@ type (
FormValues map[string]string `json:"formValues"`
MQTT uurl `json:"mqtt"`
Value string `json:"value"`
+ Retain bool `json:"retain"`
} `json:"actions"`
} `json:"rules"`
}
@@ -89,28 +91,29 @@ func configFromConfig(raw config) (*Config, error) {
}
for _, rawTrigger := range v.Triggers {
- trigger := Trigger{
+ rule.Triggers = append(rule.Triggers, Trigger{
URL: rawTrigger.URL.URL,
FormValues: urlValuesFromRawValues(rawTrigger.FormValues),
- }
- rule.Triggers = append(rule.Triggers, trigger)
+ })
}
- for _, rawAction := range v.Actions {
- action := Action{
- URL: rawAction.URL.URL,
- FormValues: urlValuesFromRawValues(rawAction.FormValues),
- MQTT: rawAction.MQTT.URL,
- Value: rawAction.Value,
- }
+ for _, action := range v.Actions {
+ var kind ActionKind
switch {
- case action.URL != nil && action.MQTT == nil:
- action.Kind = HTTPAction
- case action.URL == nil && action.MQTT != nil:
- action.Kind = MQTTAction
+ case action.URL != uurl{} && action.MQTT == uurl{}:
+ kind = HTTPAction
+ case action.URL == uurl{} && action.MQTT != uurl{}:
+ kind = MQTTAction
default:
return nil, errors.New("actions must be URL xor MQTT")
}
- rule.Actions = append(rule.Actions, action)
+ rule.Actions = append(rule.Actions, Action{
+ Kind: kind,
+ URL: action.URL.URL,
+ FormValues: urlValuesFromRawValues(action.FormValues),
+ MQTT: action.MQTT.URL,
+ Value: action.Value,
+ Retain: action.Retain,
+ })
}
c.RulesByName[k] = rule
diff --git a/go.mod b/go.mod
index 02ea314..c70bf9e 100644
--- a/go.mod
+++ b/go.mod
@@ -7,8 +7,10 @@ module go.eth.moe/dispatch
go 1.14
require (
+ github.com/eclipse/paho.mqtt.golang v1.2.0
github.com/gorilla/mux v1.7.4
go.eth.moe/flag v0.0.2
go.eth.moe/httputil v0.0.5
go.eth.moe/logger v0.0.1
+ golang.org/x/net v0.0.0-20200707034311-ab3426394381 // indirect
)
diff --git a/go.sum b/go.sum
index ca6280b..1977ee1 100644
--- a/go.sum
+++ b/go.sum
@@ -1,4 +1,6 @@
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0=
+github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@@ -12,7 +14,19 @@ go.eth.moe/httputil v0.0.3 h1:kcE2kXLkqbhcjswCr4kwCB+uo1qgaPY5EyKs2JNzsVw=
go.eth.moe/httputil v0.0.3/go.mod h1:LyOLlHBzYVOlwFltlTebWcfXJOyxcVkD+BRYADcZ7So=
go.eth.moe/httputil v0.0.4 h1:OA6XaBId3KePppkIvuPpm3RRL9/2hlq7xlUCkrYjI4Q=
go.eth.moe/httputil v0.0.4/go.mod h1:LyOLlHBzYVOlwFltlTebWcfXJOyxcVkD+BRYADcZ7So=
+go.eth.moe/httputil v0.0.5 h1:ctxYwU/g0UQfNWgpKeWK6wphzDfoqm3fvtyH7JK+uZE=
+go.eth.moe/httputil v0.0.5/go.mod h1:LyOLlHBzYVOlwFltlTebWcfXJOyxcVkD+BRYADcZ7So=
go.eth.moe/logger v0.0.1 h1:ncY0iuVIljShMQtwy+77DvoHDlu6zVZ+7XIT7jyprrY=
go.eth.moe/logger v0.0.1/go.mod h1:G20TP3ON2S95olTep+qsBSoTfouZeKPukk3Ow42q5OQ=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU=
+golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=