aboutsummaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorEthel Morgan <eth@ethulhu.co.uk>2020-07-13 23:42:52 +0100
committerEthel Morgan <eth@ethulhu.co.uk>2020-07-13 23:42:52 +0100
commit34872fa3332f69600721860aa4c830c230ebdf87 (patch)
tree6c4ee0629da26a05c2fb9a7c9004e78a36c9dc8f /cmd
parent84cab608ff289ca3b6693ca8355ddc3afd62592d (diff)
add initial MQTT publish support
Diffstat (limited to 'cmd')
-rw-r--r--cmd/dispatch/main.go49
1 files changed, 42 insertions, 7 deletions
diff --git a/cmd/dispatch/main.go b/cmd/dispatch/main.go
index bd664ba..9a9bc59 100644
--- a/cmd/dispatch/main.go
+++ b/cmd/dispatch/main.go
@@ -7,15 +7,20 @@ package main
import (
"context"
+ "fmt"
"net"
"net/http"
"os"
+ "strings"
+ "time"
"github.com/gorilla/mux"
"go.eth.moe/dispatch/config"
"go.eth.moe/flag"
"go.eth.moe/httputil"
"go.eth.moe/logger"
+
+ mqtt "github.com/eclipse/paho.mqtt.golang"
)
var (
@@ -115,14 +120,44 @@ func matchTriggers(triggers []config.Trigger, r *http.Request) bool {
func runAction(ctx context.Context, action config.Action) {
log, _ := logger.FromContext(ctx)
+ log, _ = log.Fork(ctx)
- if action.Kind != config.HTTPAction {
- log.Warning("only supports HTTP for now")
- }
+ switch action.Kind {
+ case config.HTTPAction:
+ if _, err := http.PostForm(action.URL.String(), action.FormValues); err != nil {
+ log.WithError(err).Error("could not POST form")
+ return
+ }
+ log.Info("POSTed to URL")
+
+ case config.MQTTAction:
+ broker, topic := fmt.Sprintf("%s://%s", action.MQTT.Scheme, action.MQTT.Host), strings.TrimPrefix(action.MQTT.Path, "/")
+ log.AddField("mqtt.broker-uri", broker)
+ log.AddField("mqtt.topic", topic)
+ log.AddField("mqtt.value", action.Value)
+ log.AddField("mqtt.retain", action.Retain)
+
+ waitForAck := 5 * time.Second
+ client := mqtt.NewClient(mqtt.NewClientOptions().AddBroker(broker).SetOnConnectHandler(func(client mqtt.Client) {
+ defer client.Disconnect(uint(waitForAck.Milliseconds()))
+
+ token := client.Publish(topic, 1, action.Retain, action.Value)
+ for !token.Wait() {
+ log.Error("could not wait")
+ }
+ if err := token.Error(); err != nil {
+ log.WithError(err).Error("could not publish to MQTT")
+ return
+ }
+ log.Info("published to MQTT")
+ }))
+
+ if err := client.Connect().Error(); err != nil {
+ log.WithError(err).Error("could not connect to MQTT")
+ return
+ }
- if _, err := http.PostForm(action.URL.String(), action.FormValues); err != nil {
- log.WithError(err).Error("could not POST form")
- return
+ default:
+ panic(fmt.Sprintf("unknown ActionKind: %v", action.Kind))
}
- log.Info("POSTed to URL")
}