diff options
author | Ethel Morgan <eth@ethulhu.co.uk> | 2020-07-13 23:42:52 +0100 |
---|---|---|
committer | Ethel Morgan <eth@ethulhu.co.uk> | 2020-07-13 23:42:52 +0100 |
commit | 34872fa3332f69600721860aa4c830c230ebdf87 (patch) | |
tree | 6c4ee0629da26a05c2fb9a7c9004e78a36c9dc8f /cmd | |
parent | 84cab608ff289ca3b6693ca8355ddc3afd62592d (diff) |
add initial MQTT publish support
Diffstat (limited to '')
-rw-r--r-- | cmd/dispatch/main.go | 49 |
1 files changed, 42 insertions, 7 deletions
diff --git a/cmd/dispatch/main.go b/cmd/dispatch/main.go index bd664ba..9a9bc59 100644 --- a/cmd/dispatch/main.go +++ b/cmd/dispatch/main.go @@ -7,15 +7,20 @@ package main import ( "context" + "fmt" "net" "net/http" "os" + "strings" + "time" "github.com/gorilla/mux" "go.eth.moe/dispatch/config" "go.eth.moe/flag" "go.eth.moe/httputil" "go.eth.moe/logger" + + mqtt "github.com/eclipse/paho.mqtt.golang" ) var ( @@ -115,14 +120,44 @@ func matchTriggers(triggers []config.Trigger, r *http.Request) bool { func runAction(ctx context.Context, action config.Action) { log, _ := logger.FromContext(ctx) + log, _ = log.Fork(ctx) - if action.Kind != config.HTTPAction { - log.Warning("only supports HTTP for now") - } + switch action.Kind { + case config.HTTPAction: + if _, err := http.PostForm(action.URL.String(), action.FormValues); err != nil { + log.WithError(err).Error("could not POST form") + return + } + log.Info("POSTed to URL") + + case config.MQTTAction: + broker, topic := fmt.Sprintf("%s://%s", action.MQTT.Scheme, action.MQTT.Host), strings.TrimPrefix(action.MQTT.Path, "/") + log.AddField("mqtt.broker-uri", broker) + log.AddField("mqtt.topic", topic) + log.AddField("mqtt.value", action.Value) + log.AddField("mqtt.retain", action.Retain) + + waitForAck := 5 * time.Second + client := mqtt.NewClient(mqtt.NewClientOptions().AddBroker(broker).SetOnConnectHandler(func(client mqtt.Client) { + defer client.Disconnect(uint(waitForAck.Milliseconds())) + + token := client.Publish(topic, 1, action.Retain, action.Value) + for !token.Wait() { + log.Error("could not wait") + } + if err := token.Error(); err != nil { + log.WithError(err).Error("could not publish to MQTT") + return + } + log.Info("published to MQTT") + })) + + if err := client.Connect().Error(); err != nil { + log.WithError(err).Error("could not connect to MQTT") + return + } - if _, err := http.PostForm(action.URL.String(), action.FormValues); err != nil { - log.WithError(err).Error("could not POST form") - return + default: + panic(fmt.Sprintf("unknown ActionKind: %v", action.Kind)) } - log.Info("POSTed to URL") } |