Skip to content

Commit

Permalink
Add terminal (mainflux#29)
Browse files Browse the repository at this point in the history
* implement Writer interface

Signed-off-by: Mirko Teodorovic <[email protected]>

* add terminal session

Signed-off-by: Mirko Teodorovic <[email protected]>

* add terminal session

Signed-off-by: Mirko Teodorovic <[email protected]>

* add terminal functionality

Signed-off-by: Mirko Teodorovic <[email protected]>

* add terminal functionality

Signed-off-by: Mirko Teodorovic <[email protected]>

* refactor broker

Signed-off-by: Mirko Teodorovic <[email protected]>

* update mod

Signed-off-by: Mirko Teodorovic <[email protected]>

* add terminal open. close

Signed-off-by: Mirko Teodorovic <[email protected]>

* add terminal functionality

Signed-off-by: Mirko Teodorovic <[email protected]>

* move write into terminal session

Signed-off-by: Mirko Teodorovic <[email protected]>

* encode senml as util

Signed-off-by: Mirko Teodorovic <[email protected]>

* upd mod

Signed-off-by: Mirko Teodorovic <[email protected]>

* add term subtopic

Signed-off-by: Mirko Teodorovic <[email protected]>

* remove white space

Signed-off-by: Mirko Teodorovic <[email protected]>

* reorganize topics

Signed-off-by: Mirko Teodorovic <[email protected]>

* remove commented out code

Signed-off-by: Mirko Teodorovic <[email protected]>

* add session timeout

Signed-off-by: Mirko Teodorovic <[email protected]>

* fix errors

Signed-off-by: Mirko Teodorovic <[email protected]>

* resolve comments

Signed-off-by: Mirko Teodorovic <[email protected]>

* fix env vars

Signed-off-by: Mirko Teodorovic <[email protected]>

* remove function arguments for close and open

Signed-off-by: Mirko Teodorovic <[email protected]>

* refactor timer logic

Signed-off-by: Mirko Teodorovic <[email protected]>

* check arg size

Signed-off-by: Mirko Teodorovic <[email protected]>

* move util to encoder

Signed-off-by: Mirko Teodorovic <[email protected]>

* move util to encoder

Signed-off-by: Mirko Teodorovic <[email protected]>
  • Loading branch information
mteodor authored Mar 31, 2020
1 parent 9016281 commit 0724a94
Show file tree
Hide file tree
Showing 12 changed files with 370 additions and 74 deletions.
19 changes: 6 additions & 13 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ const (
)

func main() {
logger, err := logger.New(os.Stdout, defLogLevel)
logLvl := mainflux.Env(envLogLevel, defLogLevel)
logger, err := logger.New(os.Stdout, logLvl)
if err != nil {
log.Fatalf(fmt.Sprintf("Failed to create logger: %s", err.Error()))
}
Expand Down Expand Up @@ -127,7 +128,8 @@ func main() {
Help: "Total duration of requests in microseconds.",
}, []string{"method"}),
)
go subscribeToMQTTBroker(svc, mqttClient, cfg.Agent.Channels.Control, nc, logger)
b := conn.NewBroker(svc, mqttClient, cfg.Agent.Channels.Control, nc, logger)
go b.Subscribe()

errs := make(chan error, 3)

Expand All @@ -149,6 +151,7 @@ func main() {

func loadConfig(logger logger.Logger) (config.Config, error) {
file := mainflux.Env(envConfigFile, defConfigFile)

bcfg := bootstrap.Config{
URL: mainflux.Env(envBootstrapURL, defBootstrapURL),
ID: mainflux.Env(envBootstrapID, defBootstrapID),
Expand All @@ -164,7 +167,7 @@ func loadConfig(logger logger.Logger) (config.Config, error) {

sc := config.ServerConf{
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
Port: mainflux.Env(envLogLevel, defLogLevel),
Port: mainflux.Env(envHTTPPort, defHTTPPort),
}
cc := config.ChanConf{
Control: mainflux.Env(envCtrlChan, defCtrlChan),
Expand Down Expand Up @@ -243,16 +246,6 @@ func connectToMQTTBroker(conf config.MQTTConf, logger logger.Logger) (mqtt.Clien
return client, nil
}

func subscribeToMQTTBroker(svc agent.Service, mc mqtt.Client, ctrlChan string, nc *nats.Conn, logger logger.Logger) {
broker := conn.NewBroker(svc, mc, nc, logger)
topic := fmt.Sprintf("channels/%s/messages", ctrlChan)
if err := broker.Subscribe(topic); err != nil {
logger.Error(fmt.Sprintf("Failed to subscribe to MQTT broker: %s", err.Error()))
os.Exit(1)
}
logger.Info("Subscribed to MQTT broker")
}

func loadCertificate(cfg config.MQTTConf) (config.MQTTConf, error) {
c := cfg
caByte := []byte{}
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ module github.com/mainflux/agent
go 1.13

require (
github.com/creack/pty v1.1.9
github.com/eclipse/paho.mqtt.golang v1.2.0
github.com/edgexfoundry/go-mod-core-contracts v0.1.48
github.com/go-kit/kit v0.9.0
github.com/go-zoo/bone v1.3.0
github.com/mainflux/export v0.0.0-20200212162137-ef1cd8e48d44
github.com/mainflux/export v0.0.0-20200323141637-120ec7179230
github.com/mainflux/mainflux v0.0.0-20200212173448-51cd0524a11e
github.com/mainflux/senml v1.0.0
github.com/nats-io/nats.go v1.9.1
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/FZambia/sentinel v1.0.0/go.mod h1:ytL1Am/RLlAoAXG6Kj5LNuw/TRRQrv2rt2FT26vP5gI=
github.com/Microsoft/go-winio v0.4.7/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand All @@ -29,6 +31,8 @@ github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/creack/pty v1.1.9 h1:uDmaGzcdjhF4i/plgjmEsriH11Y0o7RKapEf/LDaM3w=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -54,6 +58,7 @@ github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0 h1:MP4Eh7ZCb31lleYCFuwm0oe4/YGak+5l1vA2NOE80nA=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-redis/redis v6.15.0+incompatible h1:/Wib9cA7CF3SQxBZRMHyQvqzlwzc8PJGDMkRfqQebSE=
github.com/go-redis/redis v6.15.0+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
Expand All @@ -65,6 +70,7 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
Expand Down Expand Up @@ -114,9 +120,12 @@ github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mainflux/export v0.0.0-20200212162137-ef1cd8e48d44 h1:fIq114ai+qXhVUtBcPLZBPOL1Yua4noF92ZMqYeDMlk=
github.com/mainflux/export v0.0.0-20200212162137-ef1cd8e48d44/go.mod h1:H6T63E1QmicHaarfE1/GqV39mRzHgZamK12VMroyfzY=
github.com/mainflux/export v0.0.0-20200323141637-120ec7179230 h1:gQ000adKbsheCdER2gghk6KJbEVmwEwKUgV6RcrrDL8=
github.com/mainflux/export v0.0.0-20200323141637-120ec7179230/go.mod h1:H6T63E1QmicHaarfE1/GqV39mRzHgZamK12VMroyfzY=
github.com/mainflux/mainflux v0.0.0-20191223163044-f42f2095bab4/go.mod h1:K3ghSIpAqwv5F/t30LO57+11S7tE97ur2Z6wWEHa2CA=
github.com/mainflux/mainflux v0.0.0-20200212173448-51cd0524a11e h1:H+mUgc+QtlGIJ3+BxiZFN9qCGoIRuAM9B+UE2cqrW5Y=
github.com/mainflux/mainflux v0.0.0-20200212173448-51cd0524a11e/go.mod h1:pTmqb5GxvsHzUkMHnHXhsoGfAx20foG0sdoxP4Xs8uE=
github.com/mainflux/mainflux v0.0.0-20200324100741-6ffa916ed229 h1:8GiJNFNtDivLSnL93yMimRw/KnZxnHI9tUsFX5CFLrQ=
github.com/mainflux/senml v1.0.0 h1:oLS5aBhvdHjgQ8kfq3jX7yD+DaquhvpyvIWNsPil3X0=
github.com/mainflux/senml v1.0.0/go.mod h1:g9i8pj4WMs29KkUpXivbe/PP0qJd1kt3b1CF77S8A3s=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
Expand All @@ -135,6 +144,7 @@ github.com/nats-io/go-nats v1.6.0/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/nats-server/v2 v2.1.2 h1:i2Ly0B+1+rzNZHHWtD4ZwKi+OU5l+uQo1iDHZ2PmiIc=
github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k=
github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ=
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
Expand Down
2 changes: 1 addition & 1 deletion internal/app/agent/api/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type mqttConf struct {
mtls bool `json:"mtls"`
skipTLSVer bool `json:"skip_tls_ver"`
retain bool `json:"retain"`
QoS int `json:"qos"`
QoS byte `json:"qos"`
caPath string `json:"ca_path"`
certPath string `json:"cert_path"`
privKeyPath string `json:"priv_key_path"`
Expand Down
13 changes: 13 additions & 0 deletions internal/app/agent/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,16 @@ func (lm loggingMiddleware) Services() []agent.ServiceInfo {

return lm.svc.Services()
}

func (lm loggingMiddleware) Terminal(uuid, cmdStr string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method terminal for uuid %s and payload %s took %s to complete", uuid, cmdStr, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.Terminal(uuid, cmdStr)
}
9 changes: 9 additions & 0 deletions internal/app/agent/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,12 @@ func (ms *metricsMiddleware) Publish(topic, payload string) error {

return ms.svc.Publish(topic, payload)
}

func (ms *metricsMiddleware) Terminal(topic, payload string) error {
defer func(begin time.Time) {
ms.counter.With("method", "publish").Add(1)
ms.latency.With("method", "publish").Observe(time.Since(begin).Seconds())
}(time.Now())

return ms.svc.Terminal(topic, payload)
}
Loading

0 comments on commit 0724a94

Please sign in to comment.