forked from dmarkwat/flux-recv-gcsr
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain_test.go
169 lines (140 loc) · 3.79 KB
/
main_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package main
import (
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsub/pstest"
"context"
"encoding/json"
fluxapi "github.com/fluxcd/flux/pkg/api"
v9 "github.com/fluxcd/flux/pkg/api/v9"
fluxhttp "github.com/fluxcd/flux/pkg/http"
fluxclient "github.com/fluxcd/flux/pkg/http/client"
fluxdaemon "github.com/fluxcd/flux/pkg/http/daemon"
"github.com/stretchr/testify/assert"
"google.golang.org/api/option"
"google.golang.org/grpc"
"log"
"net"
"net/http"
"testing"
"time"
)
// example message from: https://cloud.google.com/source-repositories/docs/pubsub-notifications#notification_example
const testMsg = `{
"name": "projects/test-project/repos/pubsub-test",
"url": "[URL_PATH]",
"eventTime": "2018-02-21T21:23:25.566175Z",
"refUpdateEvent": {
"email": "[email protected]",
"refUpdates": {
"refs/heads/master": {
"refName": "refs/heads/master",
"updateType": "UPDATE_FAST_FORWARD",
"oldId": "c7a28dd5de3403cc384a025834c9fce2886fe763",
"newId": "f00768887da8de62061210295914a0a8a2a38226"
}
}
}
}`
var notification sourceRepoNotification
func init() {
if err := json.Unmarshal([]byte(testMsg), ¬ification); err != nil {
log.Fatalf("Couldn't unmarshal pubsub message data, %v", err)
}
}
func TestUnmarshal(t *testing.T) {
var notification sourceRepoNotification
err := json.Unmarshal([]byte(testMsg), ¬ification)
if err != nil {
t.Error("couldn't unmarshal test message")
}
}
func TestConsume(t *testing.T) {
ctx := context.Background()
// Start a fake server running locally.
srv := pstest.NewServer()
defer srv.Close()
// Connect to the server without using TLS.
conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
if err != nil {
t.Fatal(err)
}
defer conn.Close()
// Use the connection when creating a pubsub client.
client, err := pubsub.NewClient(ctx, "project", option.WithGRPCConn(conn))
if err != nil {
t.Fatal(err)
}
defer client.Close()
log.Print("creating topic")
topic, err := client.CreateTopic(ctx, "projects/project/topics/testTopic")
if err != nil {
t.Fatal(err)
}
log.Print("topic created")
cm := make(chan *pubsub.Message)
consumeCtx, cancel := context.WithCancel(ctx)
go func() {
log.Print("awaiting messages...")
for msg := range cm {
log.Printf("got msg, %v", msg.ID)
cancel()
}
}()
sub, err := prepare(consumeCtx, client, cm, "projects/project/topics/testTopic", "testSub", 10*time.Second)
if err != nil {
t.Fatal(err)
}
id, err := topic.Publish(ctx, &pubsub.Message{
Data: []byte(testMsg),
}).Get(ctx)
if err != nil {
t.Fatal(err)
}
log.Printf("published msg, %v", id)
err = consume(consumeCtx, sub, cm)
if err != nil {
log.Fatal(err)
}
close(cm)
}
func TestHandle(t *testing.T) {
handler := fluxdaemon.NewHandler(MockServer{
T: t,
}, fluxdaemon.NewRouter())
httpServer := &http.Server{
Addr: "127.0.0.1:3030",
Handler: handler,
}
listener, err := net.ListenTCP("tcp4", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 3030})
stop := make(chan bool, 1)
go func() {
err := httpServer.Serve(listener)
if err != nil && err != http.ErrServerClosed {
t.Error(err)
}
stop <- true
}()
apiClient := fluxclient.New(http.DefaultClient, fluxhttp.NewAPIRouter(), "http://localhost:3030", fluxclient.Token(""))
err = handleMsg(context.Background(), notification, apiClient, 10*time.Second)
if err != nil {
t.Fatal(err)
}
if err = httpServer.Close(); err != nil {
t.Fatal(err)
}
_ = listener.Close()
<-stop
}
type MockServer struct {
fluxapi.Server
T *testing.T
}
func (ms MockServer) NotifyChange(ctx context.Context, change v9.Change) error {
switch change.Source.(type) {
case v9.GitUpdate:
assert.Equal(ms.T, "master", change.Source.(v9.GitUpdate).Branch)
default:
ms.T.Fatal("change source type didn't match")
}
return nil
}