diff --git a/broker/adapter/natsstream/output.go b/broker/adapter/natsstream/output.go index 5af12c264..b724918ec 100644 --- a/broker/adapter/natsstream/output.go +++ b/broker/adapter/natsstream/output.go @@ -31,9 +31,10 @@ type stream struct { } type msg struct { - Message string `json:"data"` - Sender string `json:"sender"` - Time time.Time `json:"time"` + Protocol string `json:"protocol"` + Data string `json:"data"` + Sender string `json:"sender"` + Time time.Time `json:"time"` } func newStream(name string, work chan adaptable, logger *log.Entry) ([]*stream, error) { @@ -141,9 +142,10 @@ func (sc *stream) publisher(ctx context.Context, wg *sync.WaitGroup) { defer obs.ObserveDuration() m := msg{ - Message: r.Message(), - Sender: r.SenderID(), - Time: r.Time().UTC(), + Protocol: "choria:adapters:natsstream:output:1", + Data: r.Message(), + Sender: r.SenderID(), + Time: r.Time().UTC(), } j, err := json.Marshal(m) diff --git a/schemas/choria:adapters:natsstream:output:1.json b/schemas/choria:adapters:natsstream:output:1.json new file mode 100644 index 000000000..7b70aeb92 --- /dev/null +++ b/schemas/choria:adapters:natsstream:output:1.json @@ -0,0 +1,32 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema", + "description": "Data generated by the NATS Stream Data Adapter", + "title": "choria:adapters:natsstream:output:1", + "type":"object", + "required":[ + "protocol", + "data", + "sender", + "time" + ], + "properties": { + "protocol": { + "type":"string", + "enum": [ + "choria:adapters:natsstream:output:1" + ] + }, + "data": { + "type":"string", + "description": "The unmodified content of the message received from NATS" + }, + "sender": { + "type":"string", + "description": "The Sender ID who produced the NATS message" + }, + "time": { + "type":"integer", + "description": "Unix time the message was converted in UTC time zone" + } + } +} \ No newline at end of file