Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: unsafe method panic, reduce log, slice work with id #77

Merged
merged 7 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/util/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,13 @@ func UnsafeStringToBytes(s string) []byte {
return *(*[]byte)(unsafe.Pointer(&s))
}

func UnsafeGetStringToBytes(s string) []byte {
if len(s) == 0 {
return []byte{} // Handle empty string case to avoid nil slice
}
return unsafe.Slice(unsafe.StringData(s), len(s))
}

// ToLowercase convert string bytes to lowercase
func ToLowercase(str []byte) []byte {
for i, s := range str {
Expand Down
2 changes: 1 addition & 1 deletion modules/queue/disk_queue/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (module *DiskQueue) deleteUnusedFiles(queueID string, fileNum int64) {
var exists = false
if util.FileExists(file) {
exists = true
log.Debug("delete queue file:", file)
log.Trace("delete queue file:", file)
err := os.Remove(file)
if err != nil {
log.Error(err)
Expand Down
19 changes: 12 additions & 7 deletions modules/queue/disk_queue/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,17 @@ READ_MSG:
oldPart := d.segment
Notify(d.queue, ReadComplete, d.segment)
ctx.UpdateNextOffset(d.segment, d.readPos) //update next offset
log.Debugf("EOF, but current read segment_id [%v] is less than current write segment_id [%v], increase ++", oldPart, d.segment)
log.Debugf("EOF, but current read segment_id [%v] is less than current write segment_id [%v], increase ++", oldPart, d.diskQueue.writeSegmentNum)
err = d.ResetOffset(d.segment+1, 0) //locate next segment
if err != nil {
if strings.Contains(err.Error(), "not found") {
return messages, false, nil
}
log.Errorf("queue:%v,offset:%v,%v, invalid message size: %v, should between: %v TO %v, error: %v", d.queue, d.segment, d.readPos, msgSize, d.mCfg.MinMsgSize, d.mCfg.MaxMsgSize, err)
panic(err)
}
ctx.UpdateNextOffset(d.segment, d.readPos)

return messages, false, err
}

Expand Down Expand Up @@ -247,7 +249,7 @@ READ_MSG:
nextSegment := d.segment + 1
RETRY_NEXT_FILE:
nextFile, exists, _ := SmartGetFileName(d.mCfg, d.queue, nextSegment)
log.Debugf("try skip to next file: %v, exists: %v", nextFile, exists)
log.Debugf("retry skip to next file: %v, exists: %v", nextFile, exists)
if exists || util.FileExists(nextFile) {
//update offset
err = d.ResetOffset(nextSegment, 0)
Expand All @@ -256,6 +258,7 @@ READ_MSG:
if strings.Contains(err.Error(), "not found") {
return messages, false, nil
}
log.Errorf("queue:%v,offset:%v,%v, invalid message size: %v, should between: %v TO %v, error: %v", d.queue, d.segment, d.readPos, msgSize, d.mCfg.MinMsgSize, d.mCfg.MaxMsgSize, err)
panic(err)
}
ctx.UpdateNextOffset(nextSegment, 0)
Expand All @@ -264,13 +267,13 @@ READ_MSG:
} else {
//can't read ahead before current write file
if nextSegment >= d.diskQueue.writeSegmentNum {
log.Errorf("need to skip to next file, but next file not exists, current write segment:%v, current read segment:%v", d.diskQueue.writeSegmentNum, d.segment)
log.Debugf("need to skip to next file, but next file not exists, current write segment:%v, current read segment:%v", d.diskQueue.writeSegmentNum, d.segment)
d.diskQueue.skipToNextRWFile(false)
d.diskQueue.needSync = true
} else {
//let's continue move to next file
nextSegment++
log.Debugf("move to next file: %v", nextSegment)
log.Debugf("fetch messages move to next file: %v", nextSegment)
goto RETRY_NEXT_FILE
}

Expand Down Expand Up @@ -374,7 +377,9 @@ READ_MSG:
}

RELOAD_FILE:
log.Debugf("load queue file: %v/%v, read at: %v", d.queue, d.segment, d.readPos)
if global.Env().IsDebug {
log.Debugf("load queue file: %v/%v, read at: %v", d.queue, d.segment, d.readPos)
}
if nextReadPos >= d.maxBytesPerFileRead {

if !d.fileLoadCompleted {
Expand Down Expand Up @@ -503,15 +508,15 @@ func (d *Consumer) ResetOffset(segment, readPos int64) error {
// there are segments in the middle
if nextSegment < d.diskQueue.writeSegmentNum {
fileName, exists, next_file_exists = SmartGetFileName(d.mCfg, d.queue, nextSegment)
log.Debugf("try skip to next file: %v, exists: %v", fileName, exists)
if exists || util.FileExists(fileName) {
log.Debugf("retry skip to next file: %v, exists", fileName)
d.segment = nextSegment
d.readPos = 0
d.diskQueue.UpdateSegmentConsumerInReading(d.ID, d.segment)
goto FIND_NEXT_FILE
} else {
nextSegment++
log.Debugf("move to next file: %v", nextSegment)
log.Debugf("retry skip to next file: %v, not exists", fileName)
goto RETRY_NEXT_FILE
}
} else {
Expand Down
4 changes: 1 addition & 3 deletions modules/queue/disk_queue/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,9 +676,7 @@ func saveOffset(k *queue.QueueConfig, consumer *queue.ConsumerConfig, offset que
}
}

//log.Debugf("save offset: %v", offset.EncodeToString())

err = kv.AddValue(ConsumerOffsetBucket, util.UnsafeStringToBytes(getCommitKey(k, consumer)), []byte(offset.EncodeToString()))
err = kv.AddValue(ConsumerOffsetBucket, util.UnsafeGetStringToBytes(getCommitKey(k, consumer)), []byte(offset.EncodeToString()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not fix UnsafeStringToBytes rather create another function with same behavior, i don't think it is necessary, and the output of getCommitKey should not be empty, if empty you should call panic, i don't really get the case that you can fix with the change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when getCommitKey not empty, the return []byte just has len , but cap=0, so this k.AddValue will panic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

if err != nil {
return false, err
}
Expand Down
Loading
Loading