Skip to content

Commit

Permalink
feat: update code of process chan
Browse files Browse the repository at this point in the history
  • Loading branch information
dapeng committed Jan 2, 2025
1 parent 3d33137 commit 4dc083d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 26 deletions.
46 changes: 26 additions & 20 deletions goner/gin/responer.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ func (r *responser) ProcessResults(context XContext, writer gin.ResponseWriter,
return
}

of := reflect.TypeOf(result)
if of.Kind() == reflect.Chan {
isNotEnd = true
r.dealChan(result, writer)
return
}

switch result.(type) {
case error:
r.Failed(context, result.(error))
Expand All @@ -148,9 +155,9 @@ func (r *responser) ProcessResults(context XContext, writer gin.ResponseWriter,
if err != nil {
r.Warnf("copy data to writer failed, err: %v", err)
}
case chan any:
isNotEnd = true
r.dealChan(result.(chan any), writer)
//case chan any:
// isNotEnd = true
// r.dealChan(result.(chan any), writer)
default:
r.Success(context, result)
}
Expand All @@ -161,31 +168,30 @@ func (r *responser) ProcessResults(context XContext, writer gin.ResponseWriter,
}
}

func (r *responser) dealChan(ch <-chan any, writer gin.ResponseWriter) {
func (r *responser) dealChan(ch any, writer gin.ResponseWriter) {
sse := NewSSE(writer)
sse.Start()

for {
data, ok := <-ch
of := reflect.ValueOf(ch)

if !ok {
for {
if data, ok := of.Recv(); !ok {
err := sse.End()
if err != nil {
r.Errorf("write 'end' error: %v", err)
}
return
}
var err error
switch data.(type) {
case error:
err = sse.WriteError(ToError(data.(error)))
default:
err = sse.Write(data)
}

if err != nil {
r.Errorf("write data error: %v", err)
return
break
} else {
var err error
i := data.Interface()
if e, y := i.(error); y {
err = sse.WriteError(ToError(e))
} else {
err = sse.Write(i)
}
if err != nil {
r.Errorf("write data error: %v", err)
}
}
}
}
6 changes: 1 addition & 5 deletions goner/gin/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ func (s *Sse) Write(delta any) error {
return err
}

_, err = io.WriteString(s.Writer, "event: data\n")
if err != nil {
return err
}
_, err = io.WriteString(s.Writer, fmt.Sprintf("data: %s\n\n", jsonStr))
if err != nil {
return err
Expand All @@ -50,7 +46,7 @@ func (s *Sse) Write(delta any) error {
}

func (s *Sse) End() error {
_, err := io.WriteString(s.Writer, "event: done\n")
_, err := io.WriteString(s.Writer, "event: done\ndata: \n\ndata: [DONE]\n")
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion goner/gin/sse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ func TestSSE(t *testing.T) {
writer := NewMockResponseWriter(controller)
writer.EXPECT().Header().Return(http.Header{}).AnyTimes()
writer.EXPECT().Flush().AnyTimes()
writer.EXPECT().WriteString(gomock.Any()).Return(100, nil)
writer.EXPECT().WriteString(gomock.Any()).Return(0, errors.New("error"))

sse := NewSSE(writer)
Expand Down

0 comments on commit 4dc083d

Please sign in to comment.