Skip to content

Commit

Permalink
segmentio#726 remove reader safetyTimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
moogacs committed Sep 14, 2022
1 parent ee37c7f commit 72b0ee7
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 10 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ require (
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/net v0.0.0-20220706163947-c90051bbdb60
)

replace github.com/segmentio/kafka-go v0.4.32 => github.com/moogacs/kafka-go v0.0.1
11 changes: 1 addition & 10 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1431,7 +1431,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
r.sendError(ctx, err)
} else {
r.withErrorLogger(func(log Logger) {
log.Printf("the kafka reader got an unknown error reading partition %d of %s at offset %d: %s", r.partition, r.topic, toHumanOffset(offset), err)
log.Printf("mooga the kafka reader got an unknown error reading partition %d of %s at offset %d: %s", r.partition, r.topic, toHumanOffset(offset), err)
})
r.stats.errors.observe(1)
conn.Close()
Expand Down Expand Up @@ -1514,16 +1514,7 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err
var size int64
var bytes int64

const safetyTimeout = 10 * time.Second
deadline := time.Now().Add(safetyTimeout)
conn.SetReadDeadline(deadline)

for {
if now := time.Now(); deadline.Sub(now) < (safetyTimeout / 2) {
deadline = now.Add(safetyTimeout)
conn.SetReadDeadline(deadline)
}

if msg, err = batch.ReadMessage(); err != nil {
batch.Close()
break
Expand Down

0 comments on commit 72b0ee7

Please sign in to comment.