-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathtrigger.go
122 lines (93 loc) · 3.16 KB
/
trigger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// Package grpctrigger implements a trigger to receive messages over gRPC.
package grpctrigger
import (
"errors"
"fmt"
"net"
"reflect"
"github.com/TIBCOSoftware/flogo-lib/core/trigger"
"github.com/TIBCOSoftware/flogo-lib/logger"
"google.golang.org/grpc"
// This depencency is needed to properly build the trigger
// TODO: Check where this dependency should be registered
_ "github.com/jhump/protoreflect/desc/protoparse"
// This dependency is needed at runtime (will be part of registration.go)
_ "github.com/fatih/structs"
)
// log is the default package logger
var log = logger.GetLogger("trigger-grpc")
// Create a new map to hold the mapping of Service/RPC calls to Flogo handlers
var handlerMap = make(map[string]*trigger.Handler)
// Server struct, needed to register the gRPC server
type server struct{}
// GRPCTrigger struct
type GRPCTrigger struct {
metadata *trigger.Metadata
config *trigger.Config
}
// NewFactory create a new Trigger factory
func NewFactory(md *trigger.Metadata) trigger.Factory {
return &GRPCFactory{metadata: md}
}
// GRPCFactory gRPC Trigger factory
type GRPCFactory struct {
metadata *trigger.Metadata
}
// New Creates a new trigger instance for a given id
func (t *GRPCFactory) New(config *trigger.Config) trigger.Trigger {
return &GRPCTrigger{metadata: t.metadata, config: config}
}
// Metadata implements trigger.Trigger.Metadata
func (t *GRPCTrigger) Metadata() *trigger.Metadata {
return t.metadata
}
// Initialize makes sure the trigger is completely set up for the Start method to work
func (t *GRPCTrigger) Initialize(ctx trigger.InitContext) error {
// Init handlers
for _, handler := range ctx.GetHandlers() {
// Get the service name
service := handler.GetStringSetting("service")
// Get the rpc method name
rpc := handler.GetStringSetting("rpc")
// Add the channel/handler combination to the map
handlerMap[fmt.Sprintf("%s-%s", service, rpc)] = handler
}
return nil
}
func (t *GRPCTrigger) Start() error {
// Get the TCP port to listen on
tcpPort := t.config.GetSetting("tcpPort")
log.Infof("Starting gRPC TCP server on port %s", tcpPort)
lis, err := net.Listen("tcp", fmt.Sprintf(":%s", tcpPort))
if err != nil {
log.Errorf("failed to listen: %v", err)
}
s := grpc.NewServer()
registerServerMethods(s)
if err := s.Serve(lis); err != nil {
log.Errorf("failed to serve: %v", err)
}
return nil
}
func (t *GRPCTrigger) Stop() error {
return nil
}
// fillStruct maps the fields from a map[string]interface{} to a struct
func fillStruct(m map[string]interface{}, s interface{}) error {
structValue := reflect.ValueOf(s).Elem()
for name, value := range m {
structFieldValue := structValue.FieldByName(name)
if !structFieldValue.IsValid() {
return fmt.Errorf("No such field: %s in obj", name)
}
if !structFieldValue.CanSet() {
return fmt.Errorf("Cannot set %s field value", name)
}
val := reflect.ValueOf(value)
if structFieldValue.Type() != val.Type() {
return errors.New("Provided value type didn't match obj field type")
}
structFieldValue.Set(val)
}
return nil
}