diff --git a/aof/aof.go b/aof/aof.go index 3c1bef09..7b9a4ca2 100644 --- a/aof/aof.go +++ b/aof/aof.go @@ -21,114 +21,116 @@ import ( "github.com/hdt3213/godis/redis/protocol" ) +/*在aof过程中需要注意的事情 +get 之类的读命令并不需要进行持久化 +expire 命令要用等效的 expireat 命令替换。 +举例说明,10:00 执行 expire a 3600 表示键 a 在 11:00 过期, +在 10:30 载入AOF文件时执行 expire a 3600 就成了 11:30 过期与原数据不符。 +*/ // CmdLine is alias for [][]byte, represents a command line type CmdLine = [][]byte +// aofQueueSize 定义AOF队列大小 const ( aofQueueSize = 1 << 20 ) +// Fsync 策略常量 const ( - // FsyncAlways do fsync for every command + // 每个命令执行后都同步到磁盘 FsyncAlways = "always" - // FsyncEverySec do fsync every second + // 每秒同步一次到磁盘 FsyncEverySec = "everysec" - // FsyncNo lets operating system decides when to do fsync + // 由操作系统决定何时同步到磁盘 FsyncNo = "no" ) +// payload 结构体用于封装要写入AOF的命令数据 type payload struct { cmdLine CmdLine dbIndex int - wg *sync.WaitGroup + wg *sync.WaitGroup // WaitGroup,用于同步等待所有命令的写入操作完成,主要用于测试和确保数据一致性 } -// Listener will be called-back after receiving a aof payload -// with a listener we can forward the updates to slave nodes etc. +// Listener 接口用于回调监听AOF的变更 type Listener interface { - // Callback will be called-back after receiving a aof payload + // Callback 会在接收到AOF 的 payload后被调用 Callback([]CmdLine) } -// Persister receive msgs from channel and write to AOF file +// Aof的处理抽象结构体,AOF持久化处理器 type Persister struct { - ctx context.Context - cancel context.CancelFunc - db database.DBEngine - tmpDBMaker func() database.DBEngine - // aofChan is the channel to receive aof payload(listenCmd will send payload to this channel) - aofChan chan *payload - // aofFile is the file handler of aof file - aofFile *os.File - // aofFilename is the path of aof file - aofFilename string - // aofFsync is the strategy of fsync - aofFsync string - // aof goroutine will send msg to main goroutine through this channel when aof tasks finished and ready to shut down - aofFinished chan struct{} - // pause aof for start/finish aof rewrite progress - pausingAof sync.Mutex - currentDB int - listeners map[Listener]struct{} - // reuse cmdLine buffer - buffer []CmdLine + ctx context.Context // 上下文,用于管理和取消长时间运行的goroutine + cancel context.CancelFunc // 取消函数,用于在关闭Persister时停止所有goroutine + db database.DBEngine // 数据库引擎接口,必须持有以便操作redis的业务核心 + tmpDBMaker func() database.DBEngine // 用于创建临时数据库实例的函数,通常在重写AOF时使用 + aofChan chan *payload // AOF载荷通道,用于接收来自其他组件的命令,以便异步写入AOF文件 + aofFile *os.File // AOF文件的文件句柄,用于文件读写操作 + aofFilename string // AOF文件的路径,用于文件操作时指定正确的文件 + aofFsync string // Fsync策略,控制数据何时从内存同步到磁盘("always", "everysec", "no") + + aofFinished chan struct{} // 当AOF处理goroutine完成后,通过此通道通知主goroutine + pausingAof sync.Mutex // 在AOF重写过程中用于暂停AOF记录的互斥锁 + currentDB int // 当前数据库的索引,用于支持多数据库环境,保证命令在正确的数据库上执行 + listeners map[Listener]struct{} // 监听器集合,用于实现发布-订阅模式,当AOF有更新时通知这些监听器 + buffer []CmdLine // 命令行缓冲区,用于减少内存分配,重用命令行数据 } -// NewPersister creates a new aof.Persister +// NewPersister 创建新的AOF持久化处理器 func NewPersister(db database.DBEngine, filename string, load bool, fsync string, tmpDBMaker func() database.DBEngine) (*Persister, error) { - persister := &Persister{} + persister := &Persister{} // 初始化Persister结构体实例 persister.aofFilename = filename - persister.aofFsync = strings.ToLower(fsync) + persister.aofFsync = strings.ToLower(fsync) // 设置文件同步策略,统一转为小写以防止大小写错误 persister.db = db - persister.tmpDBMaker = tmpDBMaker + persister.tmpDBMaker = tmpDBMaker // 设置临时数据库创建函数,用于AOF重写等场景 persister.currentDB = 0 // load aof file if needed if load { - persister.LoadAof(0) + persister.LoadAof(0) // 根据需要加载AOF文件 } aofFile, err := os.OpenFile(persister.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) if err != nil { return nil, err } persister.aofFile = aofFile - persister.aofChan = make(chan *payload, aofQueueSize) - persister.aofFinished = make(chan struct{}) + persister.aofChan = make(chan *payload, aofQueueSize) // 创建一个负载通道,用于异步接收要写入的命令 + persister.aofFinished = make(chan struct{}) // 创建一个通道,用于通知AOF处理完成 persister.listeners = make(map[Listener]struct{}) - // start aof goroutine to write aof file in background and fsync periodically if needed (see fsyncEverySecond) + // 启动一个协程来监听和处理AOF命令 go func() { - persister.listenCmd() + persister.listenCmd() // 启动一个协程监听和处理AOF命令 }() + // 设置context和cancel,用于管理协程的生命周期 ctx, cancel := context.WithCancel(context.Background()) persister.ctx = ctx persister.cancel = cancel - // fsync every second if needed - if persister.aofFsync == FsyncEverySec { + // 如果策略是每秒同步,则启动定时同步 + if persister.aofFsync == FsyncEverySec { // 如果策略是每秒同步,则启动定时同步 persister.fsyncEverySecond() } return persister, nil } -// RemoveListener removes a listener from aof handler, so we can close the listener +// RemoveListener 移除一个监听器 func (persister *Persister) RemoveListener(listener Listener) { persister.pausingAof.Lock() defer persister.pausingAof.Unlock() delete(persister.listeners, listener) } -// SaveCmdLine send command to aof goroutine through channel +// SaveCmdLine 将命令行数据发送到AOF处理协程 func (persister *Persister) SaveCmdLine(dbIndex int, cmdLine CmdLine) { - // aofChan will be set as nil temporarily during load aof see Persister.LoadAof + //如果没有初始化管道,会出错 if persister.aofChan == nil { return } - + //判断一下开启的aof的策略 if persister.aofFsync == FsyncAlways { p := &payload{ cmdLine: cmdLine, dbIndex: dbIndex, } - persister.writeAof(p) - return + persister.writeAof(p) // 如果策略是立即同步,则直接写入文件 } persister.aofChan <- &payload{ @@ -138,7 +140,7 @@ func (persister *Persister) SaveCmdLine(dbIndex int, cmdLine CmdLine) { } -// listenCmd listen aof channel and write into file +// listenCmd 监听AOF队列,并将命令写入文件 func (persister *Persister) listenCmd() { for p := range persister.aofChan { persister.writeAof(p) @@ -146,42 +148,44 @@ func (persister *Persister) listenCmd() { persister.aofFinished <- struct{}{} } +// writeAof 处理写入AOF文件的具体逻辑 func (persister *Persister) writeAof(p *payload) { - persister.buffer = persister.buffer[:0] // reuse underlying array - persister.pausingAof.Lock() // prevent other goroutines from pausing aof + persister.buffer = persister.buffer[:0] // 清空缓冲区以重用 + persister.pausingAof.Lock() // 加锁以同步对AOF操作的访问,防止重写冲突 defer persister.pausingAof.Unlock() - // ensure aof is in the right database + // 确认我当前操作是否需要切换db,如果跟上次相同则不需要再插入select db相关命令 if p.dbIndex != persister.currentDB { - // select db - selectCmd := utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex)) + // 如果当前数据库索引不正确,则发送SELECT命令切换数据库 + selectCmd := utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex)) //ToCmdLine功能是把输入的字符串序列,变成[][] string的切片 persister.buffer = append(persister.buffer, selectCmd) - data := protocol.MakeMultiBulkReply(selectCmd).ToBytes() - _, err := persister.aofFile.Write(data) + data := protocol.MakeMultiBulkReply(selectCmd).ToBytes() //调用ToBytes之后会变成redis协议的格式 + _, err := persister.aofFile.Write(data) //写入文件 if err != nil { logger.Warn(err) return // skip this command } - persister.currentDB = p.dbIndex + persister.currentDB = p.dbIndex // 更新当前数据库索引 } - // save command + // 写入实际的命令 data := protocol.MakeMultiBulkReply(p.cmdLine).ToBytes() persister.buffer = append(persister.buffer, p.cmdLine) _, err := persister.aofFile.Write(data) if err != nil { logger.Warn(err) } + // 通知所有注册的监听器 for listener := range persister.listeners { listener.Callback(persister.buffer) } + // 如果同步策略为每个命令后立即同步,则执行同步 if persister.aofFsync == FsyncAlways { _ = persister.aofFile.Sync() } } -// LoadAof read aof file, can only be used before Persister.listenCmd started +// LoadAof 从AOF文件加载数据,通常在服务器启动时调用 func (persister *Persister) LoadAof(maxBytes int) { - // persister.db.Exec may call persister.AddAof - // delete aofChan to prevent loaded commands back into aofChan + // 在加载过程中暂时关闭aofChan,防止新的写入命令干扰加载过程 aofChan := persister.aofChan persister.aofChan = nil defer func(aofChan chan *payload) { @@ -198,14 +202,14 @@ func (persister *Persister) LoadAof(maxBytes int) { } defer file.Close() - // load rdb preamble if needed + // 从文件中解析可能的RDB预数据 decoder := rdb.NewDecoder(file) err = persister.db.LoadRDB(decoder) if err != nil { - // no rdb preamble + // 如果没有RDB预数据,从文件开头开始加载 file.Seek(0, io.SeekStart) } else { - // has rdb preamble + // 如果存在RDB预数据,从预数据后开始读取 _, _ = file.Seek(int64(decoder.GetReadCount())+1, io.SeekStart) maxBytes = maxBytes - decoder.GetReadCount() } @@ -215,12 +219,13 @@ func (persister *Persister) LoadAof(maxBytes int) { } else { reader = file } + // 解析AOF数据流 ch := parser.ParseStream(reader) - fakeConn := connection.NewFakeConn() // only used for save dbIndex + fakeConn := connection.NewFakeConn() // 用于保存数据库索引的虚拟连接 for p := range ch { if p.Err != nil { if p.Err == io.EOF { - break + break // 文件读取完成 } logger.Error("parse error: " + p.Err.Error()) continue @@ -248,69 +253,68 @@ func (persister *Persister) LoadAof(maxBytes int) { } } -// Fsync flushes aof file to disk +// Fsync 将AOF文件的内容同步到磁盘 func (persister *Persister) Fsync() { - persister.pausingAof.Lock() + persister.pausingAof.Lock() // 加锁以防止在同步时发生并发写操作 if err := persister.aofFile.Sync(); err != nil { - logger.Errorf("fsync failed: %v", err) + logger.Errorf("fsync failed: %v", err) // 同步失败时记录错误 } persister.pausingAof.Unlock() } -// Close gracefully stops aof persistence procedure +// Close 优雅地停止AOF持久化过程 func (persister *Persister) Close() { if persister.aofFile != nil { - close(persister.aofChan) - <-persister.aofFinished // wait for aof finished - err := persister.aofFile.Close() + close(persister.aofChan) // 关闭AOF命令通道 + <-persister.aofFinished // 等待后台goroutine完成AOF处理 + err := persister.aofFile.Close() // 关闭AOF文件 if err != nil { logger.Warn(err) } } - persister.cancel() + persister.cancel() // 取消相关的context,确保所有goroutine可以清理并退出 } -// fsyncEverySecond fsync aof file every second +// fsyncEverySecond 每秒同步AOF文件到磁盘 func (persister *Persister) fsyncEverySecond() { - ticker := time.NewTicker(time.Second) + ticker := time.NewTicker(time.Second) // 创建一个定时器,每秒触发一次 go func() { for { select { - case <-ticker.C: - persister.Fsync() - case <-persister.ctx.Done(): - return + case <-ticker.C: // 每秒触发 + persister.Fsync() // 调用Fsync方法同步数据到磁盘 + case <-persister.ctx.Done(): // 监听context的取消信号 + return // 如果接收到取消信号,退出goroutine } } }() } +// generateAof 根据当前数据库状态生成新的AOF文件 func (persister *Persister) generateAof(ctx *RewriteCtx) error { - // rewrite aof tmpFile - tmpFile := ctx.tmpFile - // load aof tmpFile - tmpAof := persister.newRewriteHandler() - tmpAof.LoadAof(int(ctx.fileSize)) + tmpFile := ctx.tmpFile // 获取临时文件的文件句柄 + tmpAof := persister.newRewriteHandler() // 创建一个新的AOF重写处理器 + tmpAof.LoadAof(int(ctx.fileSize)) // 加载AOF数据到临时处理器 for i := 0; i < config.Properties.Databases; i++ { // select db data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(i))).ToBytes() - _, err := tmpFile.Write(data) + _, err := tmpFile.Write(data) // 写入选择数据库的命令到临时文件 if err != nil { return err } - // dump db + // 遍历数据库中的每个键值对,并写入临时文件 tmpAof.db.ForEach(i, func(key string, entity *database.DataEntity, expiration *time.Time) bool { cmd := EntityToCmd(key, entity) if cmd != nil { _, _ = tmpFile.Write(cmd.ToBytes()) } if expiration != nil { - cmd := MakeExpireCmd(key, *expiration) + cmd := MakeExpireCmd(key, *expiration) // 如果有过期时间,生成过期命令 if cmd != nil { - _, _ = tmpFile.Write(cmd.ToBytes()) + _, _ = tmpFile.Write(cmd.ToBytes()) // 写入过期命令到临时文件 } } - return true + return true // 继续遍历 }) } return nil diff --git a/aof/rewrite.go b/aof/rewrite.go index 8d3f7627..f6b7762f 100644 --- a/aof/rewrite.go +++ b/aof/rewrite.go @@ -11,67 +11,69 @@ import ( "github.com/hdt3213/godis/redis/protocol" ) +/* +通过创建一个临时文件并最终将其替换为主 AOF 文件,我们可以在不中断服务的情况下进行文件重写,同时保持数据的完整性和一致性。 +这种方法避免了需要系统级 fork 操作的需求,适应了 Go 语言的运行时特性。 +*/ + +// newRewriteHandler 创建一个新的重写处理器,用于在重写期间操作独立的数据库实例 func (persister *Persister) newRewriteHandler() *Persister { h := &Persister{} - h.aofFilename = persister.aofFilename - h.db = persister.tmpDBMaker() + h.aofFilename = persister.aofFilename // 保留原始 AOF 文件名 + h.db = persister.tmpDBMaker() // 创建一个临时的数据库实例,避免干扰现有数据 return h } -// RewriteCtx holds context of an AOF rewriting procedure +// RewriteCtx 保存 AOF 重写过程的上下文信息 type RewriteCtx struct { - tmpFile *os.File // tmpFile is the file handler of aof tmpFile - fileSize int64 - dbIdx int // selected db index when startRewrite + tmpFile *os.File // tmpFile 是临时 AOF 文件的文件句柄 + fileSize int64 // 记录重写开始时原 AOF 文件的大小 + dbIdx int // 重写开始时选定的数据库索引 } -// Rewrite carries out AOF rewrite +// Rewrite 执行AOF重写的整个流程 func (persister *Persister) Rewrite() error { - ctx, err := persister.StartRewrite() + ctx, err := persister.StartRewrite() // 准备重写操作 if err != nil { return err } - err = persister.DoRewrite(ctx) + err = persister.DoRewrite(ctx) // 执行实际的重写操作 if err != nil { return err } - persister.FinishRewrite(ctx) + persister.FinishRewrite(ctx) // 完成重写,整理并关闭相关资源 return nil } -// DoRewrite actually rewrite aof file -// makes DoRewrite public for testing only, please use Rewrite instead +// DoRewrite 实际执行 AOF 文件的重写 func (persister *Persister) DoRewrite(ctx *RewriteCtx) (err error) { // start rewrite if !config.Properties.AofUseRdbPreamble { logger.Info("generate aof preamble") - err = persister.generateAof(ctx) + err = persister.generateAof(ctx) // 生成 AOF 前导数据,// generateAof 根据当前数据库状态生成新的AOF文件 } else { logger.Info("generate rdb preamble") - err = persister.generateRDB(ctx) + err = persister.generateRDB(ctx) // 生成 RDB 前导数据 } return err } -// StartRewrite prepares rewrite procedure +// StartRewrite 准备重写流程,主要包括锁定写操作和创建临时文件 func (persister *Persister) StartRewrite() (*RewriteCtx, error) { // pausing aof - persister.pausingAof.Lock() + persister.pausingAof.Lock() // 锁定以暂停 AOF 写入 defer persister.pausingAof.Unlock() - - err := persister.aofFile.Sync() + err := persister.aofFile.Sync() // 同步现有 AOF 文件,确保所有数据都已写入, 将AOF文件的内容同步到磁盘 if err != nil { - logger.Warn("fsync failed") + logger.Warn("fsync failed") // 同步失败警告 return nil, err } - // get current aof file size - fileInfo, _ := os.Stat(persister.aofFilename) - filesize := fileInfo.Size() + fileInfo, _ := os.Stat(persister.aofFilename) // 获取当前 AOF 文件的状态 + filesize := fileInfo.Size() //获取当前AOF文件的大小 - // create tmp file - file, err := os.CreateTemp(config.GetTmpDir(), "*.aof") + file, err := os.CreateTemp(config.GetTmpDir(), "*.aof") // 在临时目录中创建一个新的临时文件 if err != nil { logger.Warn("tmp file create failed") return nil, err @@ -83,15 +85,16 @@ func (persister *Persister) StartRewrite() (*RewriteCtx, error) { }, nil } -// FinishRewrite finish rewrite procedure +// FinishRewrite 完成重写操作,整理资源并更新AOF文件 func (persister *Persister) FinishRewrite(ctx *RewriteCtx) { - persister.pausingAof.Lock() // pausing aof + persister.pausingAof.Lock() // 再次锁定AOF写入,准备替换文件 + // 这一步确保在重写的最后阶段,没有其他写入操作会干扰文件替换过程。 defer persister.pausingAof.Unlock() tmpFile := ctx.tmpFile - // copy commands executed during rewriting to tmpFile + // 处理并判断是否在重写期间有错误发生 errOccurs := func() bool { - /* read write commands executed during rewriting */ + // 重新打开当前 AOF 文件以便从重写开始时的位置继续拷贝数据 src, err := os.Open(persister.aofFilename) if err != nil { logger.Error("open aofFilename failed: " + err.Error()) @@ -102,19 +105,19 @@ func (persister *Persister) FinishRewrite(ctx *RewriteCtx) { _ = tmpFile.Close() }() - _, err = src.Seek(ctx.fileSize, 0) + _, err = src.Seek(ctx.fileSize, 0) // 定位到重写开始时的文件位置,即只拷贝重写后发生的写操作 if err != nil { logger.Error("seek failed: " + err.Error()) return true } - // sync tmpFile's db index with online aofFile + // 将临时文件的数据库索引与在线 AOF 文件同步,确保数据一致性 data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(ctx.dbIdx))).ToBytes() _, err = tmpFile.Write(data) if err != nil { logger.Error("tmp file rewrite failed: " + err.Error()) return true } - // copy data + // 将重写期间新接收的命令拷贝到临时文件,这样临时文件包含所有最新的数据 _, err = io.Copy(tmpFile, src) if err != nil { logger.Error("copy aof filed failed: " + err.Error()) @@ -126,22 +129,21 @@ func (persister *Persister) FinishRewrite(ctx *RewriteCtx) { return } - // replace current aof file by tmp file + // 完成重写后,用临时文件替换旧的 AOF 文件,这个操作使用原子操作确保数据不丢失 _ = persister.aofFile.Close() if err := os.Rename(tmpFile.Name(), persister.aofFilename); err != nil { logger.Warn(err) } - // reopen aof file for further write + // 重新打开新的 AOF 文件以便继续后续的写操作 aofFile, err := os.OpenFile(persister.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) if err != nil { - panic(err) + panic(err) // 如果无法打开新的 AOF 文件,抛出异常 } - persister.aofFile = aofFile + persister.aofFile = aofFile // 重新打开新的AOF文件,继续写入操作 - // write select command again to resume aof file selected db - // it should have the same db index with persister.currentDB + // 为了保证新的 AOF 文件的数据库索引与当前数据库索引一致,再次写入 SELECT 命令 data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(persister.currentDB))).ToBytes() - _, err = persister.aofFile.Write(data) + _, err = persister.aofFile.Write(data) // 保证新的AOF文件与当前数据库索引一致 if err != nil { panic(err) } diff --git a/database/commandinfo.go b/database/commandinfo.go index afe283cc..6a017d10 100644 --- a/database/commandinfo.go +++ b/database/commandinfo.go @@ -1,9 +1,10 @@ package database import ( + "strings" + "github.com/hdt3213/godis/interface/redis" "github.com/hdt3213/godis/redis/protocol" - "strings" ) const ( diff --git a/database/database.go b/database/database.go index 9b48eab6..42f5c751 100644 --- a/database/database.go +++ b/database/database.go @@ -5,53 +5,50 @@ import ( "strings" "time" - "github.com/hdt3213/godis/datastruct/dict" + "github.com/hdt3213/godis/datastruct/dict" // 并发安全的字典结构 "github.com/hdt3213/godis/interface/database" "github.com/hdt3213/godis/interface/redis" "github.com/hdt3213/godis/lib/logger" - "github.com/hdt3213/godis/lib/timewheel" + "github.com/hdt3213/godis/lib/timewheel" // 时间轮实现,用于处理键的过期 "github.com/hdt3213/godis/redis/protocol" ) const ( - dataDictSize = 1 << 16 - ttlDictSize = 1 << 10 + dataDictSize = 1 << 16 // 数据字典的初始大小 65536 + ttlDictSize = 1 << 10 // TTL字典的初始大小 ) -// DB stores data and execute user's commands +// DB 存储数据并执行用户命令 type DB struct { - index int - // key -> DataEntity - data *dict.ConcurrentDict - // key -> expireTime (time.Time) - ttlMap *dict.ConcurrentDict - // key -> version(uint32) - versionMap *dict.ConcurrentDict - - // addaof is used to add command to aof - addAof func(CmdLine) - - // callbacks - insertCallback database.KeyEventCallback - deleteCallback database.KeyEventCallback + index int // 数据库索引 + data *dict.ConcurrentDict // 用于存储键值对的字典 + ttlMap *dict.ConcurrentDict // 存储键的 TTL 信息的字典 + versionMap *dict.ConcurrentDict + addAof func(CmdLine) // 函数,用于将命令添加到 AOF,用于分db在执行命令的同时可以调用addaof命令把执行的命令放入到aof管道中 + insertCallback database.KeyEventCallback // 插入键时的回调 + deleteCallback database.KeyEventCallback // 删除键时的回调 } // ExecFunc is interface for command executor // args don't include cmd line +// ExecFunc 定义了命令执行器的接口,用于执行具体的数据库命令。 type ExecFunc func(db *DB, args [][]byte) redis.Reply // PreFunc analyses command line when queued command to `multi` // returns related write keys and read keys +// PreFunc 定义了命令预处理的接口,用于分析命令行和相关键。 type PreFunc func(args [][]byte) ([]string, []string) // CmdLine is alias for [][]byte, represents a command line +// CmdLine 是二维字节数组的别名,表示命令行。 type CmdLine = [][]byte // UndoFunc returns undo logs for the given command line // execute from head to tail when undo +// UndoFunc 定义了撤销命令的接口,用于生成撤销操作的日志。 type UndoFunc func(db *DB, args [][]byte) []CmdLine -// makeDB create DB instance +// makeDB 创建一个 DB 实例 func makeDB() *DB { db := &DB{ data: dict.MakeConcurrent(dataDictSize), @@ -62,7 +59,7 @@ func makeDB() *DB { return db } -// makeBasicDB create DB instance only with basic abilities. +// makeBasicDB 创建一个基础功能的DB实例,只包括基础数据结构,没有额外的功能配置。 func makeBasicDB() *DB { db := &DB{ data: dict.MakeConcurrent(dataDictSize), @@ -73,45 +70,47 @@ func makeBasicDB() *DB { return db } -// Exec executes command within one database +// Exec 执行单个数据库内的命令 func (db *DB) Exec(c redis.Connection, cmdLine [][]byte) redis.Reply { - // transaction control commands and other commands which cannot execute within transaction + // 将命令行的第一个词(通常是命令名)转换为小写。 cmdName := strings.ToLower(string(cmdLine[0])) + // 处理特殊的事务控制命令和其他不能在事务中执行的命令。 if cmdName == "multi" { if len(cmdLine) != 1 { return protocol.MakeArgNumErrReply(cmdName) } return StartMulti(c) - } else if cmdName == "discard" { + } else if cmdName == "discard" { // 取消一个事务。 if len(cmdLine) != 1 { return protocol.MakeArgNumErrReply(cmdName) } return DiscardMulti(c) - } else if cmdName == "exec" { + } else if cmdName == "exec" { // 执行一个事务。 if len(cmdLine) != 1 { return protocol.MakeArgNumErrReply(cmdName) } return execMulti(db, c) - } else if cmdName == "watch" { + } else if cmdName == "watch" { // 监视给定的键,用于事务。 if !validateArity(-2, cmdLine) { return protocol.MakeArgNumErrReply(cmdName) } return Watch(db, c, cmdLine[1:]) } - if c != nil && c.InMultiState() { + if c != nil && c.InMultiState() { // 如果处于事务中,将命令加入队列 return EnqueueCmd(c, cmdLine) } - + // 执行普通命令。 return db.execNormalCommand(cmdLine) } +// execNormalCommand 执行普通命令 func (db *DB) execNormalCommand(cmdLine [][]byte) redis.Reply { cmdName := strings.ToLower(string(cmdLine[0])) - cmd, ok := cmdTable[cmdName] + cmd, ok := cmdTable[cmdName] //此时的cmd是command类型 if !ok { return protocol.MakeErrReply("ERR unknown command '" + cmdName + "'") } - if !validateArity(cmd.arity, cmdLine) { + if !validateArity(cmd.arity, cmdLine) { //验证命令的 参数是否对应 return protocol.MakeArgNumErrReply(cmdName) } @@ -124,7 +123,7 @@ func (db *DB) execNormalCommand(cmdLine [][]byte) redis.Reply { return fun(db, cmdLine[1:]) } -// execWithLock executes normal commands, invoker should provide locks +// execWithLock 使用提供的锁执行命令,通常用于内部调用 func (db *DB) execWithLock(cmdLine [][]byte) redis.Reply { cmdName := strings.ToLower(string(cmdLine[0])) cmd, ok := cmdTable[cmdName] @@ -138,6 +137,7 @@ func (db *DB) execWithLock(cmdLine [][]byte) redis.Reply { return fun(db, cmdLine[1:]) } +// validateArity 验证给定命令的参数数量是否正确。-2表示最少为2个,可变长 func validateArity(arity int, cmdArgs [][]byte) bool { argNum := len(cmdArgs) if arity >= 0 { @@ -149,6 +149,7 @@ func validateArity(arity int, cmdArgs [][]byte) bool { /* ---- Data Access ----- */ // GetEntity returns DataEntity bind to given key +// DB把底层的dict包了一层,所以在DB层逻辑写一下 func (db *DB) GetEntity(key string) (*database.DataEntity, bool) { raw, ok := db.data.GetWithLock(key) if !ok { @@ -237,52 +238,56 @@ func (db *DB) RWUnLocks(writeKeys []string, readKeys []string) { } /* ---- TTL Functions ---- */ - +// genExpireTask 生成一个用于时间轮的任务键 +// 该函数接受一个键名,并返回一个与之相关的特定格式的任务键 func genExpireTask(key string) string { return "expire:" + key } -// Expire sets ttlCmd of key +// Expire 为指定的键设置过期时间 +// key 为键名,expireTime 为过期时间点 func (db *DB) Expire(key string, expireTime time.Time) { - db.ttlMap.Put(key, expireTime) - taskKey := genExpireTask(key) - timewheel.At(expireTime, taskKey, func() { + db.ttlMap.Put(key, expireTime) // 将键和过期时间存储到ttlMap中 + taskKey := genExpireTask(key) // 生成定时任务的键 + timewheel.At(expireTime, taskKey, func() { // 在时间轮上设置任务,当时间达到expireTime时执行 keys := []string{key} - db.RWLocks(keys, nil) + db.RWLocks(keys, nil) // 为该键加读写锁,确保在检查和删除过程中键不会被修改 defer db.RWUnLocks(keys, nil) - // check-lock-check, ttl may be updated during waiting lock + // 检查-锁定-再检查模式,以防在等待锁的过程中键的过期时间被更新 logger.Info("expire " + key) rawExpireTime, ok := db.ttlMap.Get(key) if !ok { - return + return // 如果键已经不存在于ttlMap中,直接返回 } expireTime, _ := rawExpireTime.(time.Time) - expired := time.Now().After(expireTime) + expired := time.Now().After(expireTime) // 检查当前时间是否超过过期时间 if expired { - db.Remove(key) + db.Remove(key) // 如果已过期,从数据库中删除该键 } }) } -// Persist cancel ttlCmd of key +// Persist 取消键的过期时间 +// key 为需要取消过期时间的键名 func (db *DB) Persist(key string) { - db.ttlMap.Remove(key) - taskKey := genExpireTask(key) - timewheel.Cancel(taskKey) + db.ttlMap.Remove(key) // 从ttlMap中移除键,意味着该键将不再有过期时间 + taskKey := genExpireTask(key) // 生成对应的定时任务键 + timewheel.Cancel(taskKey) // 取消该键对应的定时任务,避免它被错误地删除 } -// IsExpired check whether a key is expired +// IsExpired 检查键是否已经过期 +// key 为需要检查的键名 func (db *DB) IsExpired(key string) bool { - rawExpireTime, ok := db.ttlMap.Get(key) + rawExpireTime, ok := db.ttlMap.Get(key) // 获取键的过期时间 if !ok { return false } expireTime, _ := rawExpireTime.(time.Time) - expired := time.Now().After(expireTime) + expired := time.Now().After(expireTime) // 检查当前时间是否超过过期时间 if expired { - db.Remove(key) + db.Remove(key) // 如果已过期,从数据库中删除该键 } - return expired + return expired // 返回过期状态 } /* --- add version --- */ diff --git a/database/persistence.go b/database/persistence.go index ba020361..574c8304 100644 --- a/database/persistence.go +++ b/database/persistence.go @@ -107,7 +107,7 @@ func (server *Server) AddAof(dbIndex int, cmdLine CmdLine) { func (server *Server) bindPersister(aofHandler *aof.Persister) { server.persister = aofHandler - // bind SaveCmdLine + // 给我们的redis数据库核心中的每一个db初始化 aof写入功能 for _, db := range server.dbSet { singleDB := db.Load().(*DB) singleDB.addAof = func(line CmdLine) { diff --git a/database/router.go b/database/router.go index 96e2aa44..9427c2a3 100644 --- a/database/router.go +++ b/database/router.go @@ -1,23 +1,24 @@ package database import ( + "strings" + "github.com/hdt3213/godis/interface/redis" "github.com/hdt3213/godis/redis/protocol" - "strings" ) -var cmdTable = make(map[string]*command) +var cmdTable = make(map[string]*command) //存储我们的命令的map,key:指令 对应的command结构体 type command struct { - name string - executor ExecFunc + name string //指令名称 + executor ExecFunc //没一个command里面有一个执行方法 // prepare returns related keys command prepare PreFunc // undo generates undo-log before command actually executed, in case the command needs to be rolled back undo UndoFunc // arity means allowed number of cmdArgs, arity < 0 means len(args) >= -arity. // for example: the arity of `get` is 2, `mget` is -2 - arity int + arity int //命令的参数的数量 flags int extra *commandExtra } @@ -36,7 +37,7 @@ const ( flagSpecial // command invoked in Exec ) -// registerCommand registers a normal command, which only read or modify a limited number of keys +// 通过该方法注册指令的方法 func registerCommand(name string, executor ExecFunc, prepare PreFunc, rollback UndoFunc, arity int, flags int) *command { name = strings.ToLower(name) cmd := &command{ diff --git a/database/server.go b/database/server.go index 2653c853..1560f314 100644 --- a/database/server.go +++ b/database/server.go @@ -19,23 +19,23 @@ import ( "github.com/hdt3213/godis/redis/protocol" ) -var godisVersion = "1.2.8" // do not modify - -// Server is a redis-server with full capabilities including multiple database, rdb loader, replication +var godisVersion = "1.2.8" // 版本信息,不允许修改 +// Server 定义了一个全功能的 Redis 服务器,包括多个数据库,RDB 加载,复制等 +// redis内核 type Server struct { - dbSet []*atomic.Value // *DB + dbSet []*atomic.Value // 数据库数组,使用 atomic.Value 以支持并发安全的存取。 实现我们的interface下面的DB接口 - // handle publish/subscribe + // 处理发布/订阅机制 hub *pubsub.Hub - // handle aof persistence + // 处理 AOF 持久化 persister *aof.Persister - // for replication + // 主从复制相关字段 role int32 slaveStatus *slaveStatus masterStatus *masterStatus - // hooks + // hooks ,用于键值事件的回调 insertCallback database.KeyEventCallback deleteCallback database.KeyEventCallback } @@ -45,18 +45,18 @@ func fileExists(filename string) bool { return err == nil && !info.IsDir() } -// NewStandaloneServer creates a standalone redis server, with multi database and all other funtions +// NewStandaloneServer 创建一个独立的 Redis 服务器实例 func NewStandaloneServer() *Server { server := &Server{} if config.Properties.Databases == 0 { - config.Properties.Databases = 16 + config.Properties.Databases = 16 // 默认数据库数量 } - // creat tmp dir + // 创建临时目录 err := os.MkdirAll(config.GetTmpDir(), os.ModePerm) if err != nil { panic(fmt.Errorf("create tmp dir failed: %v", err)) } - // make db set + // 初始化数据库实例 server.dbSet = make([]*atomic.Value, config.Properties.Databases) for i := range server.dbSet { singleDB := makeDB() @@ -65,9 +65,10 @@ func NewStandaloneServer() *Server { holder.Store(singleDB) server.dbSet[i] = holder } - server.hub = pubsub.MakeHub() - // record aof + server.hub = pubsub.MakeHub() // 初始化 pub/sub hub + // 配置 AOF 持久化 validAof := false + //查看配置文件中是否打开持久化功能 if config.Properties.AppendOnly { validAof = fileExists(config.Properties.AppendFilename) aofHandler, err := NewPersister(server, @@ -77,6 +78,7 @@ func NewStandaloneServer() *Server { } server.bindPersister(aofHandler) } + // 如果配置了 RDB 且 AOF 未启用,尝试加载 RDB 文件 if config.Properties.RDBFilename != "" && !validAof { // load rdb err := server.loadRdbFile() @@ -87,21 +89,24 @@ func NewStandaloneServer() *Server { server.slaveStatus = initReplSlaveStatus() server.initMaster() server.startReplCron() - server.role = masterRole // The initialization process does not require atomicity + server.role = masterRole // 初始化为主节点 return server } -// Exec executes command -// parameter `cmdLine` contains command and its arguments, for example: "set key value" +// Exec 执行来自客户端的命令,调用底层db的Exec方法,那用户发来的指令转交给底层的分db去执行 +// `cmdLine` 包含了命令及其参数,例如:"set key value" func (server *Server) Exec(c redis.Connection, cmdLine [][]byte) (result redis.Reply) { + // 使用 defer 和 recover 来捕获和处理函数执行过程中可能发生的panic,确保服务器稳定性 defer func() { if err := recover(); err != nil { + // 如果发生错误,记录错误并返回未知错误的响应 logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack()))) result = &protocol.UnknownErrReply{} } }() - + // 将命令名称转换为小写,以忽略命令的大小写差异 cmdName := strings.ToLower(string(cmdLine[0])) + // 根据命令名分发处理不同的命令 // ping if cmdName == "ping" { return Ping(c, cmdLine[1:]) @@ -110,6 +115,7 @@ func (server *Server) Exec(c redis.Connection, cmdLine [][]byte) (result redis.R if cmdName == "auth" { return Auth(c, cmdLine[1:]) } + // 检查是否已通过身份验证 if !isAuthenticated(c) { return protocol.MakeErrReply("NOAUTH Authentication required") } @@ -121,9 +127,11 @@ func (server *Server) Exec(c redis.Connection, cmdLine [][]byte) (result redis.R return DbSize(c, server) } if cmdName == "slaveof" { + // 在multi状态中不允许改变主从配置 if c != nil && c.InMultiState() { return protocol.MakeErrReply("cannot use slave of database within multi") } + // 参数数量检查 if len(cmdLine) != 3 { return protocol.MakeArgNumErrReply("SLAVEOF") } @@ -132,7 +140,7 @@ func (server *Server) Exec(c redis.Connection, cmdLine [][]byte) (result redis.R return execCommand(cmdLine[1:]) } - // read only slave + // 只读从节点的处理逻辑 role := atomic.LoadInt32(&server.role) if role == slaveRole && !c.IsMaster() { // only allow read only command, forbid all special commands except `auth` and `slaveof` @@ -141,7 +149,7 @@ func (server *Server) Exec(c redis.Connection, cmdLine [][]byte) (result redis.R } } - // special commands which cannot execute within transaction + // 无法在事务中执行的特殊命令 if cmdName == "subscribe" { if len(cmdLine) < 2 { return protocol.MakeArgNumErrReply("subscribe") @@ -155,7 +163,7 @@ func (server *Server) Exec(c redis.Connection, cmdLine [][]byte) (result redis.R if !config.Properties.AppendOnly { return protocol.MakeErrReply("AppendOnly is false, you can't rewrite aof file") } - // aof.go imports router.go, router.go cannot import BGRewriteAOF from aof.go + // 这里注意:由于模块间的循环依赖问题,特意分开处理 return BGRewriteAOF(server, cmdLine[1:]) } else if cmdName == "rewriteaof" { if !config.Properties.AppendOnly { @@ -196,21 +204,22 @@ func (server *Server) Exec(c redis.Connection, cmdLine [][]byte) (result redis.R } // todo: support multi database transaction - // normal commands + // 正常命令的处理,首先获取当前连接的数据库索引 dbIndex := c.GetDBIndex() selectedDB, errReply := server.selectDB(dbIndex) if errReply != nil { return errReply } + // 在选定的数据库上执行命令 return selectedDB.Exec(c, cmdLine) } -// AfterClientClose does some clean after client close connection +// AfterClientClose 处理客户端关闭连接后的清理工作 func (server *Server) AfterClientClose(c redis.Connection) { pubsub.UnsubscribeAll(server.hub, c) } -// Close graceful shutdown database +// Close 优雅关闭数据库 func (server *Server) Close() { // stop slaveStatus first server.slaveStatus.close() @@ -220,6 +229,7 @@ func (server *Server) Close() { server.stopMaster() } +// execSelect 处理 select 命令,选择一个数据库 func execSelect(c redis.Connection, mdb *Server, args [][]byte) redis.Reply { dbIndex, err := strconv.Atoi(string(args[0])) if err != nil { @@ -228,6 +238,7 @@ func execSelect(c redis.Connection, mdb *Server, args [][]byte) redis.Reply { if dbIndex >= len(mdb.dbSet) || dbIndex < 0 { return protocol.MakeErrReply("ERR DB index is out of range") } + // 选择数据库并返回成功响应 c.SelectDB(dbIndex) return protocol.MakeOkReply() } @@ -239,39 +250,49 @@ func (server *Server) execFlushDB(dbIndex int) redis.Reply { return server.flushDB(dbIndex) } -// flushDB flushes the selected database +// flushDB 清空选定的数据库 +// dbIndex 为数据库索引 func (server *Server) flushDB(dbIndex int) redis.Reply { + // 检查数据库索引是否有效 if dbIndex >= len(server.dbSet) || dbIndex < 0 { return protocol.MakeErrReply("ERR DB index is out of range") } + // 创建一个新的数据库实例 newDB := makeDB() + // 用新的数据库实例替换旧的数据库实例 server.loadDB(dbIndex, newDB) return &protocol.OkReply{} } +// loadDB 加载指定索引的数据库 func (server *Server) loadDB(dbIndex int, newDB *DB) redis.Reply { if dbIndex >= len(server.dbSet) || dbIndex < 0 { return protocol.MakeErrReply("ERR DB index is out of range") } + // 获取旧的数据库实例 oldDB := server.mustSelectDB(dbIndex) + // 将新数据库的索引和AOF持久化设置从旧数据库继承 newDB.index = dbIndex newDB.addAof = oldDB.addAof // inherit oldDB + // 存储新的数据库实例 server.dbSet[dbIndex].Store(newDB) return &protocol.OkReply{} } -// flushAll flushes all databases. +// flushAll 清空所有数据库 func (server *Server) flushAll() redis.Reply { + // 遍历所有数据库并清空 for i := range server.dbSet { server.flushDB(i) } + // 如果开启了AOF持久化,记录这一操作 if server.persister != nil { server.persister.SaveCmdLine(0, utils.ToCmdLine("FlushAll")) } return &protocol.OkReply{} } -// selectDB returns the database with the given index, or an error if the index is out of range. +// selectDB 选择给定索引的数据库,如果索引无效则返回错误 func (server *Server) selectDB(dbIndex int) (*DB, *protocol.StandardErrReply) { if dbIndex >= len(server.dbSet) || dbIndex < 0 { return nil, protocol.MakeErrReply("ERR DB index is out of range") @@ -279,7 +300,7 @@ func (server *Server) selectDB(dbIndex int) (*DB, *protocol.StandardErrReply) { return server.dbSet[dbIndex].Load().(*DB), nil } -// mustSelectDB is like selectDB, but panics if an error occurs. +// mustSelectDB 类似 selectDB,但如果发生错误会引发panic func (server *Server) mustSelectDB(dbIndex int) *DB { selectedDB, err := server.selectDB(dbIndex) if err != nil { @@ -288,16 +309,17 @@ func (server *Server) mustSelectDB(dbIndex int) *DB { return selectedDB } -// ForEach traverses all the keys in the given database +// ForEach 遍历给定数据库中的所有键 func (server *Server) ForEach(dbIndex int, cb func(key string, data *database.DataEntity, expiration *time.Time) bool) { server.mustSelectDB(dbIndex).ForEach(cb) } -// GetEntity returns the data entity to the given key +// GetEntity 返回给定键的数据实体 func (server *Server) GetEntity(dbIndex int, key string) (*database.DataEntity, bool) { return server.mustSelectDB(dbIndex).GetEntity(key) } +// GetExpiration 获取给定键的过期时间 func (server *Server) GetExpiration(dbIndex int, key string) *time.Time { raw, ok := server.mustSelectDB(dbIndex).ttlMap.Get(key) if !ok { @@ -307,7 +329,7 @@ func (server *Server) GetExpiration(dbIndex int, key string) *time.Time { return &expireTime } -// ExecMulti executes multi commands transaction Atomically and Isolated +// ExecMulti 在一个事务中执行多个命令,保证原子性和隔离性 func (server *Server) ExecMulti(conn redis.Connection, watching map[string]uint32, cmdLines []CmdLine) redis.Reply { selectedDB, errReply := server.selectDB(conn.GetDBIndex()) if errReply != nil { @@ -316,22 +338,22 @@ func (server *Server) ExecMulti(conn redis.Connection, watching map[string]uint3 return selectedDB.ExecMulti(conn, watching, cmdLines) } -// RWLocks lock keys for writing and reading +// RWLocks 对写入和读取的键加锁 func (server *Server) RWLocks(dbIndex int, writeKeys []string, readKeys []string) { server.mustSelectDB(dbIndex).RWLocks(writeKeys, readKeys) } -// RWUnLocks unlock keys for writing and reading +// RWUnLocks 解锁写入和读取的键 func (server *Server) RWUnLocks(dbIndex int, writeKeys []string, readKeys []string) { server.mustSelectDB(dbIndex).RWUnLocks(writeKeys, readKeys) } -// GetUndoLogs return rollback commands +// GetUndoLogs 返回指定命令的回滚命令 func (server *Server) GetUndoLogs(dbIndex int, cmdLine [][]byte) []CmdLine { return server.mustSelectDB(dbIndex).GetUndoLogs(cmdLine) } -// ExecWithLock executes normal commands, invoker should provide locks +// ExecWithLock 在持有锁的情况下执行常规命令 func (server *Server) ExecWithLock(conn redis.Connection, cmdLine [][]byte) redis.Reply { db, errReply := server.selectDB(conn.GetDBIndex()) if errReply != nil { @@ -340,13 +362,13 @@ func (server *Server) ExecWithLock(conn redis.Connection, cmdLine [][]byte) redi return db.execWithLock(cmdLine) } -// BGRewriteAOF asynchronously rewrites Append-Only-File +// BGRewriteAOF 异步重写追加只写文件 func BGRewriteAOF(db *Server, args [][]byte) redis.Reply { go db.persister.Rewrite() return protocol.MakeStatusReply("Background append only file rewriting started") } -// RewriteAOF start Append-Only-File rewriting and blocked until it finished +// RewriteAOF 开始重写追加只写文件,并阻塞直到完成 func RewriteAOF(db *Server, args [][]byte) redis.Reply { err := db.persister.Rewrite() if err != nil { @@ -355,7 +377,7 @@ func RewriteAOF(db *Server, args [][]byte) redis.Reply { return protocol.MakeOkReply() } -// SaveRDB start RDB writing and blocked until it finished +// SaveRDB 开始RDB写入,并阻塞直到完成 func SaveRDB(db *Server, args [][]byte) redis.Reply { if db.persister == nil { return protocol.MakeErrReply("please enable aof before using save") @@ -371,7 +393,7 @@ func SaveRDB(db *Server, args [][]byte) redis.Reply { return protocol.MakeOkReply() } -// BGSaveRDB asynchronously save RDB +// BGSaveRDB 异步保存RDB func BGSaveRDB(db *Server, args [][]byte) redis.Reply { if db.persister == nil { return protocol.MakeErrReply("please enable aof before using save") @@ -394,12 +416,13 @@ func BGSaveRDB(db *Server, args [][]byte) redis.Reply { return protocol.MakeStatusReply("Background saving started") } -// GetDBSize returns keys count and ttl key count +// GetDBSize 返回数据库的键数和有TTL的键数 func (server *Server) GetDBSize(dbIndex int) (int, int) { db := server.mustSelectDB(dbIndex) return db.data.Len(), db.ttlMap.Len() } +// startReplCron 开启一个定时任务处理主从复制 func (server *Server) startReplCron() { go func(mdb *Server) { ticker := time.Tick(time.Second * 10) @@ -410,7 +433,7 @@ func (server *Server) startReplCron() { }(server) } -// GetAvgTTL Calculate the average expiration time of keys +// GetAvgTTL 计算给定数量随机键的平均过期时间 func (server *Server) GetAvgTTL(dbIndex, randomKeyCount int) int64 { var ttlCount int64 db := server.mustSelectDB(dbIndex) @@ -430,6 +453,7 @@ func (server *Server) GetAvgTTL(dbIndex, randomKeyCount int) int64 { return ttlCount / int64(len(keys)) } +// SetKeyInsertedCallback 设置键插入事件的回调函数 func (server *Server) SetKeyInsertedCallback(cb database.KeyEventCallback) { server.insertCallback = cb for i := range server.dbSet { @@ -439,6 +463,7 @@ func (server *Server) SetKeyInsertedCallback(cb database.KeyEventCallback) { } +// SetKeyDeletedCallback 设置键删除事件的回调函数 func (server *Server) SetKeyDeletedCallback(cb database.KeyEventCallback) { server.deleteCallback = cb for i := range server.dbSet { diff --git a/datastruct/dict/concurrent.go b/datastruct/dict/concurrent.go index 2b791224..070098f1 100644 --- a/datastruct/dict/concurrent.go +++ b/datastruct/dict/concurrent.go @@ -1,28 +1,33 @@ package dict import ( - "github.com/hdt3213/godis/lib/wildcard" "math" "math/rand" "sort" "sync" "sync/atomic" "time" + + "github.com/hdt3213/godis/lib/wildcard" ) -// ConcurrentDict is thread safe map using sharding lock +// ConcurrentDict 是一个使用分片锁的线程安全的哈希表 type ConcurrentDict struct { - table []*shard - count int32 - shardCount int + table []*shard // 分片的数组,每个分片持有一部分键值对 + count int32 // 键值对的总数量 + shardCount int // 分片的数量 } +// shard 代表单个分片,包含一个map和一个读写锁,该map中有多个锁 type shard struct { - m map[string]interface{} - mutex sync.RWMutex + m map[string]interface{} // 存储键值对的哈希表 + mutex sync.RWMutex // 保护map的读写锁 } +// computeCapacity 计算哈希表容量,确保为2的次幂,这有助于更快的计算索引 +// 这个函数的目的是确定一个接近于给定参数 param 但不小于它的2的次幂的数。这样做的好处主要有两点: func computeCapacity(param int) (size int) { + // 以下代码是经典的位操作技巧,用于找到大于等于param的最小2的次幂 if param <= 16 { return 16 } @@ -38,7 +43,7 @@ func computeCapacity(param int) (size int) { return n + 1 } -// MakeConcurrent creates ConcurrentDict with the given shard count +// MakeConcurrent 创建一个具有指定分片数量的ConcurrentDict func MakeConcurrent(shardCount int) *ConcurrentDict { shardCount = computeCapacity(shardCount) table := make([]*shard, shardCount) @@ -55,8 +60,14 @@ func MakeConcurrent(shardCount int) *ConcurrentDict { return d } -const prime32 = uint32(16777619) +/* +这部分代码定义了 ConcurrentDict 和它的构造函数。 +通过指定分片数来创建字典,每个分片包含一个map和一个读写锁。这种设计可以减小锁的粒度,提高并发性能。 +*/ +//---------------------------------------------------------------------------------------------------- +const prime32 = uint32(16777619) // 用于哈希函数的质数基 +// fnv32 是一种哈希函数,使用FNV算法为给定的键生成32位哈希码 func fnv32(key string) uint32 { hash := uint32(2166136261) for i := 0; i < len(key); i++ { @@ -66,14 +77,23 @@ func fnv32(key string) uint32 { return hash } +// spread 将哈希码散列到一个合适的分片索引 func (dict *ConcurrentDict) spread(hashCode uint32) uint32 { if dict == nil { panic("dict is nil") } tableSize := uint32(len(dict.table)) + + //因为表的大小是2的次幂,这个按位与操作等同于对表大小取模,但运算速度更快。 + //当表大小是2的次幂时,tableSize - 1 的二进制表示中所有低位都是1,这使得按位与操作只保留 hashCode 的最低的几位。 return (tableSize - 1) & hashCode } +//fnv32 是基于 FNV (Fowler-Noll-Vo) 算法的哈希函数,适用于快速散列字符串。 +//spread 函数通过位运算快速定位哈希码应该映射到哪一个分片。 +//-------------------------------------------------------------------------------------------------------- + +// getShard 根据索引获取对应的分片 func (dict *ConcurrentDict) getShard(index uint32) *shard { if dict == nil { panic("dict is nil") @@ -81,7 +101,7 @@ func (dict *ConcurrentDict) getShard(index uint32) *shard { return dict.table[index] } -// Get returns the binding value and whether the key is exist +// Get 返回给定键的值以及是否存在该键 func (dict *ConcurrentDict) Get(key string) (val interface{}, exists bool) { if dict == nil { panic("dict is nil") @@ -89,12 +109,13 @@ func (dict *ConcurrentDict) Get(key string) (val interface{}, exists bool) { hashCode := fnv32(key) index := dict.spread(hashCode) s := dict.getShard(index) - s.mutex.Lock() - defer s.mutex.Unlock() + s.mutex.RLock() // 对分片加读锁 + defer s.mutex.RUnlock() // 读锁结束时解锁 val, exists = s.m[key] return } +// GetWithLock 是一个示例方法,显示如何不使用锁安全地获取值 func (dict *ConcurrentDict) GetWithLock(key string) (val interface{}, exists bool) { if dict == nil { panic("dict is nil") @@ -106,7 +127,7 @@ func (dict *ConcurrentDict) GetWithLock(key string) (val interface{}, exists boo return } -// Len returns the number of dict +// Len 返回字典中键值对的数量 func (dict *ConcurrentDict) Len() int { if dict == nil { panic("dict is nil") @@ -114,7 +135,7 @@ func (dict *ConcurrentDict) Len() int { return int(atomic.LoadInt32(&dict.count)) } -// Put puts key value into dict and returns the number of new inserted key-value +// Put 将键值对放入字典中,并返回是否插入了新的键 func (dict *ConcurrentDict) Put(key string, val interface{}) (result int) { if dict == nil { panic("dict is nil") @@ -134,6 +155,7 @@ func (dict *ConcurrentDict) Put(key string, val interface{}) (result int) { return 1 } +// PutWithLock 是一个示例方法,显示如何不使用锁安全地放入键值对 func (dict *ConcurrentDict) PutWithLock(key string, val interface{}) (result int) { if dict == nil { panic("dict is nil") @@ -151,7 +173,7 @@ func (dict *ConcurrentDict) PutWithLock(key string, val interface{}) (result int return 1 } -// PutIfAbsent puts value if the key is not exists and returns the number of updated key-value +// PutIfAbsent 尝试只在键不存在时插入键值对,并返回是否插入了键 func (dict *ConcurrentDict) PutIfAbsent(key string, val interface{}) (result int) { if dict == nil { panic("dict is nil") @@ -170,6 +192,7 @@ func (dict *ConcurrentDict) PutIfAbsent(key string, val interface{}) (result int return 1 } +// PutIfAbsentWithLock 是一个示例方法,显示如何不使用锁安全地尝试插入键值对 func (dict *ConcurrentDict) PutIfAbsentWithLock(key string, val interface{}) (result int) { if dict == nil { panic("dict is nil") @@ -186,7 +209,7 @@ func (dict *ConcurrentDict) PutIfAbsentWithLock(key string, val interface{}) (re return 1 } -// PutIfExists puts value if the key is existed and returns the number of inserted key-value +// PutIfExists 尝试只在键存在时更新键值对,并返回是否更新了键 func (dict *ConcurrentDict) PutIfExists(key string, val interface{}) (result int) { if dict == nil { panic("dict is nil") @@ -204,6 +227,7 @@ func (dict *ConcurrentDict) PutIfExists(key string, val interface{}) (result int return 0 } +// PutIfExistsWithLock 是一个示例方法,显示如何不使用锁安全地尝试更新键值对 func (dict *ConcurrentDict) PutIfExistsWithLock(key string, val interface{}) (result int) { if dict == nil { panic("dict is nil") @@ -219,7 +243,7 @@ func (dict *ConcurrentDict) PutIfExistsWithLock(key string, val interface{}) (re return 0 } -// Remove removes the key and return the number of deleted key-value +// Remove 删除键,并返回被删除的值及是否成功删除 func (dict *ConcurrentDict) Remove(key string) (val interface{}, result int) { if dict == nil { panic("dict is nil") @@ -238,6 +262,7 @@ func (dict *ConcurrentDict) Remove(key string) (val interface{}, result int) { return nil, 0 } +// RemoveWithLock 是一个示例方法,显示如何不使用锁安全地删除键 func (dict *ConcurrentDict) RemoveWithLock(key string) (val interface{}, result int) { if dict == nil { panic("dict is nil") @@ -254,16 +279,18 @@ func (dict *ConcurrentDict) RemoveWithLock(key string) (val interface{}, result return val, 0 } +// addCount 增加字典中的键值对总数 func (dict *ConcurrentDict) addCount() int32 { return atomic.AddInt32(&dict.count, 1) } +// decreaseCount 减少字典中的键值对总数 func (dict *ConcurrentDict) decreaseCount() int32 { return atomic.AddInt32(&dict.count, -1) } -// ForEach traversal the dict -// it may not visit new entry inserted during traversal +// ForEach 遍历字典 +// 注意:遍历期间插入的新条目可能不会被访问 func (dict *ConcurrentDict) ForEach(consumer Consumer) { if dict == nil { panic("dict is nil") @@ -287,7 +314,7 @@ func (dict *ConcurrentDict) ForEach(consumer Consumer) { } } -// Keys returns all keys in dict +// Keys 返回字典中所有的键 func (dict *ConcurrentDict) Keys() []string { keys := make([]string, dict.Len()) i := 0 @@ -303,7 +330,7 @@ func (dict *ConcurrentDict) Keys() []string { return keys } -// RandomKey returns a key randomly +// RandomKey 随机返回一个键 func (shard *shard) RandomKey() string { if shard == nil { panic("shard is nil") @@ -317,7 +344,7 @@ func (shard *shard) RandomKey() string { return "" } -// RandomKeys randomly returns keys of the given number, may contain duplicated key +// RandomKeys 随机返回指定数量的键,可能包含重复的键 func (dict *ConcurrentDict) RandomKeys(limit int) []string { size := dict.Len() if limit >= size { @@ -341,7 +368,7 @@ func (dict *ConcurrentDict) RandomKeys(limit int) []string { return result } -// RandomDistinctKeys randomly returns keys of the given number, won't contain duplicated key +// RandomDistinctKeys 随机返回指定数量的不重复键 func (dict *ConcurrentDict) RandomDistinctKeys(limit int) []string { size := dict.Len() if limit >= size { @@ -373,7 +400,7 @@ func (dict *ConcurrentDict) RandomDistinctKeys(limit int) []string { return arr } -// Clear removes all keys in dict +// Clear 清空字典中的所有键 func (dict *ConcurrentDict) Clear() { *dict = *MakeConcurrent(dict.shardCount) } @@ -397,7 +424,7 @@ func (dict *ConcurrentDict) toLockIndices(keys []string, reverse bool) []uint32 return indices } -// RWLocks locks write keys and read keys together. allow duplicate keys +// RWLocks 锁定写键和读键,允许重复键 func (dict *ConcurrentDict) RWLocks(writeKeys []string, readKeys []string) { keys := append(writeKeys, readKeys...) indices := dict.toLockIndices(keys, false) @@ -417,7 +444,7 @@ func (dict *ConcurrentDict) RWLocks(writeKeys []string, readKeys []string) { } } -// RWUnLocks unlocks write keys and read keys together. allow duplicate keys +// RWUnLocks 解锁写键和读键,允许重复键 func (dict *ConcurrentDict) RWUnLocks(writeKeys []string, readKeys []string) { keys := append(writeKeys, readKeys...) indices := dict.toLockIndices(keys, true) @@ -437,6 +464,9 @@ func (dict *ConcurrentDict) RWUnLocks(writeKeys []string, readKeys []string) { } } +// stringsToBytes 将字符串切片转换为字节切片的切片。 +// 输入:strSlice - 字符串切片 +// 输出:每个字符串转换为字节切片后的切片 func stringsToBytes(strSlice []string) [][]byte { byteSlice := make([][]byte, len(strSlice)) for i, str := range strSlice { diff --git a/datastruct/dict/concurrent_test.go b/datastruct/dict/concurrent_test.go index c59efd57..e655b0f1 100644 --- a/datastruct/dict/concurrent_test.go +++ b/datastruct/dict/concurrent_test.go @@ -1,10 +1,11 @@ package dict import ( - "github.com/hdt3213/godis/lib/utils" "strconv" "sync" "testing" + + "github.com/hdt3213/godis/lib/utils" ) func TestConcurrentPut(t *testing.T) { diff --git a/datastruct/dict/dict.go b/datastruct/dict/dict.go index 4fdf2793..86816ee8 100644 --- a/datastruct/dict/dict.go +++ b/datastruct/dict/dict.go @@ -1,20 +1,20 @@ package dict -// Consumer is used to traversal dict, if it returns false the traversal will be break +// Consumer 是一个遍历字典时使用的函数类型,如果返回false,则终止遍历 type Consumer func(key string, val interface{}) bool -// Dict is interface of a key-value data structure +// Dict 是 key-value 数据结构的接口定义 type Dict interface { - Get(key string) (val interface{}, exists bool) - Len() int - Put(key string, val interface{}) (result int) - PutIfAbsent(key string, val interface{}) (result int) - PutIfExists(key string, val interface{}) (result int) - Remove(key string) (val interface{}, result int) - ForEach(consumer Consumer) - Keys() []string - RandomKeys(limit int) []string - RandomDistinctKeys(limit int) []string - Clear() + Get(key string) (val interface{}, exists bool) // 获取键的值 + Len() int // 返回字典的长度 + Put(key string, val interface{}) (result int) // 插入键值对 + PutIfAbsent(key string, val interface{}) (result int) // 如果键不存在,则插入 + PutIfExists(key string, val interface{}) (result int) // 如果键存在,则插入 + Remove(key string) (val interface{}, result int) // 移除键 + ForEach(consumer Consumer) // 遍历字典,传进去的函数是Consumer + Keys() []string // 返回所有键的列表 + RandomKeys(limit int) []string // 随机返回一定数量的键 + RandomDistinctKeys(limit int) []string //返回一些不重复的键 + Clear() // 清除字典中的所有元素 DictScan(cursor int, count int, pattern string) ([][]byte, int) } diff --git a/datastruct/lock/lock_map.go b/datastruct/lock/lock_map.go index 72fd0963..e15d8076 100644 --- a/datastruct/lock/lock_map.go +++ b/datastruct/lock/lock_map.go @@ -5,16 +5,31 @@ import ( "sync" ) +/* +即使使用了 ConcurrentMap 保证了对单个 key 的并发安全性,但在某些情况下这还不够。 +例如,Incr 命令需要完成从读取到修改再到写入的一系列操作,而 MSETNX 命令需要在所有给定键都不存在的情况下才设置值。 +这些操作需要更复杂的并发控制,即锁定多个键直到操作完成。 +*/ + +/* +锁定哈希槽而非单个键:通常,每个键都需要独立锁定来确保并发访问的安全性。 +但这在键数量很大时会造成大量内存的使用。通过锁定哈希槽,可以大大减少所需的锁的数量,因为多个键会映射到同一个哈希槽。 +避免内存泄漏:如果为每个键分配一个锁,随着键的增加和删除,锁的数量也会不断增加,可能会造成内存泄漏。 +锁定哈希槽可以固定锁的数量,即使在键的数量动态变化时也不会增加额外的内存负担。 +*/ +//在锁定多个key时需要注意,若协程A持有键a的锁试图获得键b的锁,此时协程B持有键b的锁试图获得键a的锁则会形成死锁。 +//解决方法是所有协程都按照相同顺序加锁,若两个协程都想获得键a和键b的锁,那么必须先获取键a的锁后获取键b的锁,这样就可以避免循环等待。 +// 使用固定素数,用于FNV哈希函数 const ( prime32 = uint32(16777619) ) -// Locks provides rw locks for key +// Locks结构体包含了一个用于读写锁定的锁表 type Locks struct { table []*sync.RWMutex } -// Make creates a new lock map +// Make 初始化并返回一个具有指定数量哈希槽的Locks实例 func Make(tableSize int) *Locks { table := make([]*sync.RWMutex, tableSize) for i := 0; i < tableSize; i++ { @@ -25,6 +40,7 @@ func Make(tableSize int) *Locks { } } +// fnv32 实现了FNV哈希算法,用于生成键的哈希值 func fnv32(key string) uint32 { hash := uint32(2166136261) for i := 0; i < len(key); i++ { @@ -34,6 +50,7 @@ func fnv32(key string) uint32 { return hash } +// spread 计算给定哈希码的哈希槽索引 func (locks *Locks) spread(hashCode uint32) uint32 { if locks == nil { panic("dict is nil") @@ -42,36 +59,37 @@ func (locks *Locks) spread(hashCode uint32) uint32 { return (tableSize - 1) & hashCode } -// Lock obtains exclusive lock for writing +// Lock 为给定键获取独占写锁 func (locks *Locks) Lock(key string) { index := locks.spread(fnv32(key)) mu := locks.table[index] mu.Lock() } -// RLock obtains shared lock for reading +// RLock 为给定键获取共享读锁 func (locks *Locks) RLock(key string) { index := locks.spread(fnv32(key)) mu := locks.table[index] mu.RLock() } -// UnLock release exclusive lock +// UnLock 释放给定键的独占写锁 func (locks *Locks) UnLock(key string) { index := locks.spread(fnv32(key)) mu := locks.table[index] mu.Unlock() } -// RUnLock release shared lock +// RUnLock 释放给定键的共享读锁 func (locks *Locks) RUnLock(key string) { index := locks.spread(fnv32(key)) mu := locks.table[index] mu.RUnlock() } +// toLockIndices 计算一组键的哈希槽索引,并进行排序(可选逆序) func (locks *Locks) toLockIndices(keys []string, reverse bool) []uint32 { - indexMap := make(map[uint32]struct{}) + indexMap := make(map[uint32]struct{}) // 使用集合去重 for _, key := range keys { index := locks.spread(fnv32(key)) indexMap[index] = struct{}{} @@ -89,18 +107,17 @@ func (locks *Locks) toLockIndices(keys []string, reverse bool) []uint32 { return indices } -// Locks obtains multiple exclusive locks for writing -// invoking Lock in loop may cause deadlock, please use Locks +// Locks 获取多个键的独占写锁,避免死锁 func (locks *Locks) Locks(keys ...string) { indices := locks.toLockIndices(keys, false) for _, index := range indices { mu := locks.table[index] - mu.Lock() + mu.Lock() // 锁定每个索引对应的哈希槽 } } -// RLocks obtains multiple shared locks for reading -// invoking RLock in loop may cause deadlock, please use RLocks +// RLocks 获取多个键的共享读锁,避免死锁 + func (locks *Locks) RLocks(keys ...string) { indices := locks.toLockIndices(keys, false) for _, index := range indices { @@ -109,7 +126,7 @@ func (locks *Locks) RLocks(keys ...string) { } } -// UnLocks releases multiple exclusive locks +// UnLocks 释放多个键的独占写锁 func (locks *Locks) UnLocks(keys ...string) { indices := locks.toLockIndices(keys, true) for _, index := range indices { @@ -118,7 +135,7 @@ func (locks *Locks) UnLocks(keys ...string) { } } -// RUnLocks releases multiple shared locks +// RUnLocks 释放多个键的共享读锁 func (locks *Locks) RUnLocks(keys ...string) { indices := locks.toLockIndices(keys, true) for _, index := range indices { @@ -127,7 +144,7 @@ func (locks *Locks) RUnLocks(keys ...string) { } } -// RWLocks locks write keys and read keys together. allow duplicate keys +// RWLocks 同时获取写锁和读锁,允许键重复 func (locks *Locks) RWLocks(writeKeys []string, readKeys []string) { keys := append(writeKeys, readKeys...) indices := locks.toLockIndices(keys, false) @@ -147,7 +164,7 @@ func (locks *Locks) RWLocks(writeKeys []string, readKeys []string) { } } -// RWUnLocks unlocks write keys and read keys together. allow duplicate keys +// RWUnLocks 同时释放写锁和读锁,允许键重复 func (locks *Locks) RWUnLocks(writeKeys []string, readKeys []string) { keys := append(writeKeys, readKeys...) indices := locks.toLockIndices(keys, true) diff --git a/go.mod b/go.mod index 5c96897f..36f18143 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,4 @@ module github.com/hdt3213/godis go 1.17 -require ( - github.com/hdt3213/rdb v1.0.10 -) +require github.com/hdt3213/rdb v1.0.10 diff --git a/go.sum b/go.sum index 08b5360b..9652dea0 100644 --- a/go.sum +++ b/go.sum @@ -9,8 +9,6 @@ github.com/hdt3213/rdb v1.0.10 h1:j0wJv6Cp1faMH3v5+u5SYa0MfBGOnOc5nn+JEYbIVxA= github.com/hdt3213/rdb v1.0.10/go.mod h1:A1RWBSb4QGdX8fNs2bSoWxkzcWlWGbCC7OgOTFhPG+k= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= -github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/interface/database/db.go b/interface/database/db.go index 9fc2d894..46f99b2a 100644 --- a/interface/database/db.go +++ b/interface/database/db.go @@ -7,38 +7,38 @@ import ( "github.com/hdt3213/rdb/core" ) -// CmdLine is alias for [][]byte, represents a command line +// CmdLine 是命令行的别名,表示一个命令行为一个二维字节切片 type CmdLine = [][]byte -// DB is the interface for redis style storage engine +// DB 是一个接口,为Redis风格的存储引擎定义了必要的方法 type DB interface { - Exec(client redis.Connection, cmdLine [][]byte) redis.Reply - AfterClientClose(c redis.Connection) - Close() - LoadRDB(dec *core.Decoder) error + Exec(client redis.Connection, cmdLine [][]byte) redis.Reply // 执行给定的命令行,并返回响应 + AfterClientClose(c redis.Connection) // 客户端关闭后的回调处理 + Close() // 关闭数据库连接 + LoadRDB(dec *core.Decoder) error // 从RDB解码器加载数据到数据库 } -// KeyEventCallback will be called back on key event, such as key inserted or deleted -// may be called concurrently +// KeyEventCallback 是键事件的回调函数类型,如键被插入或删除时调用 +// 可能会并发调用 type KeyEventCallback func(dbIndex int, key string, entity *DataEntity) -// DBEngine is the embedding storage engine exposing more methods for complex application +// DBEngine 是一个更高级的存储引擎接口,提供了更多的方法以支持复杂的应用场景 type DBEngine interface { DB - ExecWithLock(conn redis.Connection, cmdLine [][]byte) redis.Reply - ExecMulti(conn redis.Connection, watching map[string]uint32, cmdLines []CmdLine) redis.Reply - GetUndoLogs(dbIndex int, cmdLine [][]byte) []CmdLine - ForEach(dbIndex int, cb func(key string, data *DataEntity, expiration *time.Time) bool) - RWLocks(dbIndex int, writeKeys []string, readKeys []string) - RWUnLocks(dbIndex int, writeKeys []string, readKeys []string) - GetDBSize(dbIndex int) (int, int) - GetEntity(dbIndex int, key string) (*DataEntity, bool) - GetExpiration(dbIndex int, key string) *time.Time - SetKeyInsertedCallback(cb KeyEventCallback) - SetKeyDeletedCallback(cb KeyEventCallback) + ExecWithLock(conn redis.Connection, cmdLine [][]byte) redis.Reply // 在执行命令时加锁保护 + ExecMulti(conn redis.Connection, watching map[string]uint32, cmdLines []CmdLine) redis.Reply // 执行多个命令,支持事务 + GetUndoLogs(dbIndex int, cmdLine [][]byte) []CmdLine // 获取撤销日志 + ForEach(dbIndex int, cb func(key string, data *DataEntity, expiration *time.Time) bool) // 遍历数据库中的键 + RWLocks(dbIndex int, writeKeys []string, readKeys []string) // 读写锁定一组键 + RWUnLocks(dbIndex int, writeKeys []string, readKeys []string) // 解锁一组键 + GetDBSize(dbIndex int) (int, int) // 获取数据库大小 + GetEntity(dbIndex int, key string) (*DataEntity, bool) // 获取与键关联的数据实体 + GetExpiration(dbIndex int, key string) *time.Time // 获取键的过期时间 + SetKeyInsertedCallback(cb KeyEventCallback) // 设置键插入事件的回调 + SetKeyDeletedCallback(cb KeyEventCallback) // 设置键删除事件的回调 } -// DataEntity stores data bound to a key, including a string, list, hash, set and so on +// DataEntity 存储绑定到键的数据,包括字符串、列表、哈希、集合等 type DataEntity struct { - Data interface{} + Data interface{} // 存储实际的数据,数据类型可以是任何类型 } diff --git a/interface/tcp/handler.go b/interface/tcp/handler.go index 8c649bcb..f74a16c8 100644 --- a/interface/tcp/handler.go +++ b/interface/tcp/handler.go @@ -5,11 +5,15 @@ import ( "net" ) -// HandleFunc represents application handler function +// HandleFunc 定义了一个应用程序处理函数的类型 +// 这种类型的函数接收一个context和一个网络连接 +// ctx context.Context:上下文,用于控制子程序的生命周期 +// conn net.Conn:表示一个网络连接,用于读取和写入数据 type HandleFunc func(ctx context.Context, conn net.Conn) -// Handler represents application server over tcp +// Handler 接口代表一个基于TCP的应用服务器 +// 它定义了处理连接和关闭服务器的方法 type Handler interface { - Handle(ctx context.Context, conn net.Conn) - Close() error + Handle(ctx context.Context, conn net.Conn) // 处理接收到的网络连接 + Close() error // 关闭服务器,清理资源,如果有错误返回错误 } diff --git a/main.go b/main.go index c0b6b5ab..892a0114 100644 --- a/main.go +++ b/main.go @@ -20,40 +20,41 @@ var banner = ` ` var defaultProperties = &config.ServerProperties{ - Bind: "0.0.0.0", - Port: 6399, - AppendOnly: false, - AppendFilename: "", - MaxClients: 1000, - RunID: utils.RandString(40), + Bind: "0.0.0.0", // 默认绑定的IP地址 + Port: 6399, // 默认端口号 + AppendOnly: false, // 是否开启追加模式 + AppendFilename: "", // 追加模式的文件名 + MaxClients: 1000, // 最大客户端连接数 + RunID: utils.RandString(40), // 生成一个随机的运行ID } +// fileExists 检查文件是否存在且不是目录 func fileExists(filename string) bool { info, err := os.Stat(filename) return err == nil && !info.IsDir() } func main() { - print(banner) - logger.Setup(&logger.Settings{ + print(banner) // 打印启动标志 + logger.Setup(&logger.Settings{ // 设置日志文件配置 Path: "logs", Name: "godis", Ext: "log", TimeFormat: "2006-01-02", }) - configFilename := os.Getenv("CONFIG") + configFilename := os.Getenv("CONFIG") // 获取环境变量中的配置文件名 if configFilename == "" { - if fileExists("redis.conf") { - config.SetupConfig("redis.conf") + if fileExists("redis.conf") { // 检查默认配置文件是否存在 + config.SetupConfig("redis.conf") // 使用默认配置文件 } else { - config.Properties = defaultProperties + config.Properties = defaultProperties // 使用内置默认配置 } } else { - config.SetupConfig(configFilename) + config.SetupConfig(configFilename) // 使用环境变量指定的配置文件 } - err := tcp.ListenAndServeWithSignal(&tcp.Config{ + err := tcp.ListenAndServeWithSignal(&tcp.Config{ // 启动TCP服务器 Address: fmt.Sprintf("%s:%d", config.Properties.Bind, config.Properties.Port), - }, RedisServer.MakeHandler()) + }, RedisServer.MakeHandler()) //调用我们的RedisServer.MakeHandler if err != nil { logger.Error(err) } diff --git a/redis/connection/conn.go b/redis/connection/conn.go index 8e1755de..9dc40751 100644 --- a/redis/connection/conn.go +++ b/redis/connection/conn.go @@ -1,74 +1,79 @@ package connection import ( - "github.com/hdt3213/godis/lib/logger" - "github.com/hdt3213/godis/lib/sync/wait" "net" "sync" "time" + + "github.com/hdt3213/godis/lib/logger" + "github.com/hdt3213/godis/lib/sync/wait" ) const ( - // flagSlave means this a connection with slave + // flagSlave 表示这是一个从服务器的连接 flagSlave = uint64(1 << iota) - // flagSlave means this a connection with master + // flagMaster 表示这是一个主服务器的连接 flagMaster - // flagMulti means this connection is within a transaction + // flagMulti 表示这个连接正在执行事务 flagMulti ) -// Connection represents a connection with a redis-cli +// Connection 代表与一个Redis客户端的连接 type Connection struct { - conn net.Conn + conn net.Conn // 网络连接实例 - // wait until finish sending data, used for graceful shutdown + // sendingData 用于优雅关闭时等待数据发送完成 sendingData wait.Wait - // lock while server sending response + // mu 用于在发送响应时的互斥锁 mu sync.Mutex - flags uint64 + flags uint64 // 连接的标志位,如是否为主从连接、是否处于事务中等 - // subscribing channels + /// subs 保存订阅的频道 subs map[string]bool - // password may be changed by CONFIG command during runtime, so store the password + // password 可能会在运行时通过CONFIG命令被修改,因此需要存储密码 password string - // queued commands for `multi` - queue [][][]byte + // queue 保存在事务中排队的命令 + queue [][][]byte + // watching 保存被WATCH命令监视的键及其版本号 watching map[string]uint32 + // txErrors 保存事务中的错误信息 txErrors []error - // selected db + // selectedDB 表示当前选择的数据库索引 selectedDB int } +// 连接池,用于重用连接对象 var connPool = sync.Pool{ New: func() interface{} { return &Connection{} }, } -// RemoteAddr returns the remote network address +// RemoteAddr 返回远程连接的网络地址 func (c *Connection) RemoteAddr() string { return c.conn.RemoteAddr().String() } -// Close disconnect with the client +// Close 用于断开与客户端的连接 func (c *Connection) Close() error { - c.sendingData.WaitWithTimeout(10 * time.Second) - _ = c.conn.Close() + c.sendingData.WaitWithTimeout(10 * time.Second) // 等待正在发送的数据完成或超时 + _ = c.conn.Close() // 关闭底层网络连接 + // 清理连接相关的状态信息 c.subs = nil c.password = "" c.queue = nil c.watching = nil c.txErrors = nil c.selectedDB = 0 - connPool.Put(c) + connPool.Put(c) // 将连接对象放回池中 return nil } -// NewConn creates Connection instance +// NewConn 用于创建新的Connection实例 func NewConn(conn net.Conn) *Connection { c, ok := connPool.Get().(*Connection) if !ok { @@ -81,7 +86,7 @@ func NewConn(conn net.Conn) *Connection { return c } -// Write sends response to client over tcp connection +// Write 向客户端发送响应 func (c *Connection) Write(b []byte) (int, error) { if len(b) == 0 { return 0, nil @@ -101,7 +106,7 @@ func (c *Connection) Name() string { return "" } -// Subscribe add current connection into subscribers of the given channel +// Subscribe 将当前连接添加到指定频道的订阅者中 func (c *Connection) Subscribe(channel string) { c.mu.Lock() defer c.mu.Unlock() @@ -112,7 +117,7 @@ func (c *Connection) Subscribe(channel string) { c.subs[channel] = true } -// UnSubscribe removes current connection into subscribers of the given channel +// UnSubscribe 从指定频道的订阅者中移除当前连接 func (c *Connection) UnSubscribe(channel string) { c.mu.Lock() defer c.mu.Unlock() @@ -123,12 +128,12 @@ func (c *Connection) UnSubscribe(channel string) { delete(c.subs, channel) } -// SubsCount returns the number of subscribing channels +// SubsCount 返回当前连接订阅的频道数量 func (c *Connection) SubsCount() int { return len(c.subs) } -// GetChannels returns all subscribing channels +// GetChannels 返回当前连接订阅的所有频道 func (c *Connection) GetChannels() []string { if c.subs == nil { return make([]string, 0) @@ -142,58 +147,58 @@ func (c *Connection) GetChannels() []string { return channels } -// SetPassword stores password for authentication +// SetPassword 设置连接的密码,用于认证 func (c *Connection) SetPassword(password string) { c.password = password } -// GetPassword get password for authentication +// GetPassword 获取连接的密码 func (c *Connection) GetPassword() string { return c.password } -// InMultiState tells is connection in an uncommitted transaction +// InMultiState 检查连接是否处于事务状态 func (c *Connection) InMultiState() bool { return c.flags&flagMulti > 0 } -// SetMultiState sets transaction flag +// SetMultiState 设置连接的事务状态 func (c *Connection) SetMultiState(state bool) { - if !state { // reset data when cancel multi + if !state { // 如果取消事务,重置相关数据 c.watching = nil c.queue = nil - c.flags &= ^flagMulti // clean multi flag + c.flags &= ^flagMulti // 清除事务标志 return } - c.flags |= flagMulti + c.flags |= flagMulti // 设置事务标志 } -// GetQueuedCmdLine returns queued commands of current transaction +// GetQueuedCmdLine 返回事务中排队的命令 func (c *Connection) GetQueuedCmdLine() [][][]byte { return c.queue } -// EnqueueCmd enqueues command of current transaction +// EnqueueCmd 将命令添加到事务队列 func (c *Connection) EnqueueCmd(cmdLine [][]byte) { c.queue = append(c.queue, cmdLine) } -// AddTxError stores syntax error within transaction +// AddTxError 添加事务执行中的错误 func (c *Connection) AddTxError(err error) { c.txErrors = append(c.txErrors, err) } -// GetTxErrors returns syntax error within transaction +// GetTxErrors 获取事务中的错误 func (c *Connection) GetTxErrors() []error { return c.txErrors } -// ClearQueuedCmds clears queued commands of current transaction +// ClearQueuedCmds 清除事务中排队的命令 func (c *Connection) ClearQueuedCmds() { c.queue = nil } -// GetWatching returns watching keys and their version code when started watching +// GetWatching 返回被监视的键和它们的版本号 func (c *Connection) GetWatching() map[string]uint32 { if c.watching == nil { c.watching = make(map[string]uint32) @@ -201,28 +206,33 @@ func (c *Connection) GetWatching() map[string]uint32 { return c.watching } -// GetDBIndex returns selected db +// GetDBIndex 返回选定的数据库索引 func (c *Connection) GetDBIndex() int { return c.selectedDB } -// SelectDB selects a database +// SelectDB 选择一个数据库 func (c *Connection) SelectDB(dbNum int) { c.selectedDB = dbNum } +// SetSlave 设置连接为从服务器模式 func (c *Connection) SetSlave() { c.flags |= flagSlave } +// IsSlave 检查连接是否为从服务器模式 func (c *Connection) IsSlave() bool { return c.flags&flagSlave > 0 } +// SetMaster 设置连接为主服务器模式 + func (c *Connection) SetMaster() { c.flags |= flagMaster } +// IsMaster 检查连接是否为主服务器模式 func (c *Connection) IsMaster() bool { return c.flags&flagMaster > 0 } diff --git a/redis/parser/parser.go b/redis/parser/parser.go index 64110024..ed1828d3 100644 --- a/redis/parser/parser.go +++ b/redis/parser/parser.go @@ -14,41 +14,52 @@ import ( "github.com/hdt3213/godis/redis/protocol" ) -// Payload stores redis.Reply or error +// Payload 存储redis.Reply或错误,redis解析器里面解析完成存储的数据的数据结构。 type Payload struct { Data redis.Reply Err error } -// ParseStream reads data from io.Reader and send payloads through channel +/* + 有一个Reply接口,里面包含Tobytes函数,所有的不同类型的Reply结构体都实现了Tobytes函数 + 从而我们实现多态。每个不同类型的Reply结构体中有相关的字段记录了仅仅包含的字符串 + 如*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n + 存入到可以通过MakeMultiBulkReply 放入到 + type MultiBulkReply struct { + Args [][]byte + } + 被存储为[set][key][value]. + 如果我们调用每个不同Reply结构体类型的ToBytes函数,则会把他变成redis的resp格式。 +*/ +// ParseStream 从io.Reader读取数据并通过通道发送负载 func ParseStream(reader io.Reader) <-chan *Payload { - ch := make(chan *Payload) - go parse0(reader, ch) - return ch + ch := make(chan *Payload) // 创建通道 + go parse0(reader, ch) // 启动协程解析数据 + return ch // 返回通道 } -// ParseBytes reads data from []byte and return all replies +// ParseBytes 从[]byte读取数据并返回所有回复 func ParseBytes(data []byte) ([]redis.Reply, error) { - ch := make(chan *Payload) - reader := bytes.NewReader(data) - go parse0(reader, ch) - var results []redis.Reply - for payload := range ch { - if payload == nil { + ch := make(chan *Payload) // 创建通道 + reader := bytes.NewReader(data) // 创建bytes阅读器 + go parse0(reader, ch) // 启动协程解析数据 + var results []redis.Reply // 存储解析结果 + for payload := range ch { // 循环读取通道中的数据 + if payload == nil { // 检查负载是否为空 return nil, errors.New("no protocol") } - if payload.Err != nil { + if payload.Err != nil { // 检查是否有错误 if payload.Err == io.EOF { break } return nil, payload.Err } - results = append(results, payload.Data) + results = append(results, payload.Data) // 添加数据到结果集 } return results, nil } -// ParseOne reads data from []byte and return the first payload +// ParseOne 从[]byte读取数据并返回第一个回复 func ParseOne(data []byte) (redis.Reply, error) { ch := make(chan *Payload) reader := bytes.NewReader(data) @@ -60,58 +71,70 @@ func ParseOne(data []byte) (redis.Reply, error) { return payload.Data, payload.Err } +// parse0 函数解析来自io.Reader的数据,并将结果通过Payload结构发送到通道 +// rawReader io.Reader: 输入源,可以是网络连接、文件等 +// ch chan<- *Payload: 结果发送通道,Payload包含解析结果或错误信息 func parse0(rawReader io.Reader, ch chan<- *Payload) { defer func() { if err := recover(); err != nil { + // 如果发生panic,则捕获异常并记录错误和堆栈信息 logger.Error(err, string(debug.Stack())) } }() - reader := bufio.NewReader(rawReader) + reader := bufio.NewReader(rawReader) //创建一个缓冲读取器 for { - line, err := reader.ReadBytes('\n') + line, err := reader.ReadBytes('\n') // 逐行读取数据 + /* + 客户端可能发来的是单行数据,例如+OK\r\n + 或者多行数据$3\r\nset\r\n 字符串是两行数据,第一行是$3\r\n 第二行是set\r\n + */ if err != nil { - ch <- &Payload{Err: err} + ch <- &Payload{Err: err} // 读取错误处理,将错误发送到通道并关闭通道 close(ch) return } + // 处理每行数据 length := len(line) if length <= 2 || line[length-2] != '\r' { - // there are some empty lines within replication traffic, ignore this error - //protocolError(ch, "empty line") + // 如果读取的行长度小于等于2或者行的倒数第二个字符不是回车符,则忽略这行数据 continue } - line = bytes.TrimSuffix(line, []byte{'\r', '\n'}) - switch line[0] { - case '+': + line = bytes.TrimSuffix(line, []byte{'\r', '\n'}) // 移除行尾的回车换行符 + switch line[0] { // 根据行的首个字符判断数据类型,并进行相应的处理 + case '+': // 状态回复 content := string(line[1:]) ch <- &Payload{ - Data: protocol.MakeStatusReply(content), + Data: protocol.MakeStatusReply(content), // 创建状态回复并发送 } - if strings.HasPrefix(content, "FULLRESYNC") { + if strings.HasPrefix(content, "FULLRESYNC") { // 特定命令的额外处理逻辑 + // 如果内容以"FULLRESYNC"开始,处理RDB批量字符串 err = parseRDBBulkString(reader, ch) if err != nil { + // 如果处理过程中发生错误,发送错误信息并关闭通道 ch <- &Payload{Err: err} close(ch) return } } - case '-': + case '-': // 错误回复 ch <- &Payload{ - Data: protocol.MakeErrReply(string(line[1:])), + Data: protocol.MakeErrReply(string(line[1:])), // 创建错误回复并发送 } case ':': value, err := strconv.ParseInt(string(line[1:]), 10, 64) if err != nil { + // 如果整数解析失败,记录协议错误 protocolError(ch, "illegal number "+string(line[1:])) continue } ch <- &Payload{ - Data: protocol.MakeIntReply(value), + Data: protocol.MakeIntReply(value), // 创建整数回复并发送 } case '$': - err = parseBulkString(line, reader, ch) + err = parseBulkString(line, reader, ch) //因为第一行数据已经取走,所以后面的reader部分不包含line的东西 if err != nil { ch <- &Payload{Err: err} + // 如果解析批量字符串失败,发送错误信息并关闭通道 close(ch) return } @@ -122,7 +145,7 @@ func parse0(rawReader io.Reader, ch chan<- *Payload) { close(ch) return } - default: + default: // 默认情况,处理多批量回复 args := bytes.Split(line, []byte{' '}) ch <- &Payload{ Data: protocol.MakeMultiBulkReply(args), @@ -131,29 +154,41 @@ func parse0(rawReader io.Reader, ch chan<- *Payload) { } } +// parseBulkString 解析批量字符串回复。 +// header []byte: 接收到的以 '$' 开头的行,表示批量字符串的长度。 +// reader *bufio.Reader: 用于从连接中继续读取批量字符串的内容。 +// ch chan<- *Payload: 用于发送解析后的结果或错误信息的通道。 func parseBulkString(header []byte, reader *bufio.Reader, ch chan<- *Payload) error { + // 解析批量字符串长度 此时的header $3 strLen, err := strconv.ParseInt(string(header[1:]), 10, 64) if err != nil || strLen < -1 { + // 如果解析错误或字符串长度非法,则发送协议错误信息到通道 protocolError(ch, "illegal bulk string header: "+string(header)) return nil } else if strLen == -1 { + // 如果字符串长度为-1,表示这是一个空的批量字符串(即 "$-1\r\n") ch <- &Payload{ - Data: protocol.MakeNullBulkReply(), + Data: protocol.MakeNullBulkReply(), // 发送空批量字符串回复 } return nil } + // 分配足够的空间来存储字符串内容和结尾的CRLF body := make([]byte, strLen+2) + //从reader中读取指定长度的数据放到缓冲区中 _, err = io.ReadFull(reader, body) if err != nil { - return err + return err // 如果读取过程中出现错误,直接返回这个错误 } + // 将读取到的内容(去除最后的CRLF)发送到通道, + //所以最终存储在我们的Payload中的DATA的redis.Reply结构体中的Arg []byte。 + ch <- &Payload{ Data: protocol.MakeBulkReply(body[:len(body)-2]), } return nil } -// there is no CRLF between RDB and following AOF, therefore it needs to be treated differently +// parseRDBBulkString 处理RDB文件数据流中的批量字符串,因为RDB和AOF之间没有CRLF。 func parseRDBBulkString(reader *bufio.Reader, ch chan<- *Payload) error { header, err := reader.ReadBytes('\n') if err != nil { @@ -178,9 +213,12 @@ func parseRDBBulkString(reader *bufio.Reader, ch chan<- *Payload) error { return nil } +// parseArray 用于解析Redis协议中的数组数据。 +// header []byte: 包含数组元素数量的头部数据,如*3表示数组有三个元素。 func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error { nStrs, err := strconv.ParseInt(string(header[1:]), 10, 64) if err != nil || nStrs < 0 { + // 如果解析出错或数组长度小于0,发送协议错误信息 protocolError(ch, "illegal array header "+string(header[1:])) return nil } else if nStrs == 0 { @@ -189,6 +227,7 @@ func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error { } return nil } + // 初始化一个切片来存储数组中的元素,预分配nStrs长度的空间 lines := make([][]byte, 0, nStrs) for i := int64(0); i < nStrs; i++ { var line []byte @@ -197,17 +236,23 @@ func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error { return err } length := len(line) + // 检查读取的行是否合法,长度至少为4,并且以'$'开头 if length < 4 || line[length-2] != '\r' || line[0] != '$' { + // 如果不符合批量字符串的要求,发送协议错误 protocolError(ch, "illegal bulk string header "+string(line)) break } + // 解析批量字符串的长度 strLen, err := strconv.ParseInt(string(line[1:length-2]), 10, 64) if err != nil || strLen < -1 { + // 如果长度解析失败,发送协议错误 protocolError(ch, "illegal bulk string length "+string(line)) break } else if strLen == -1 { + // 如果长度为-1,表示空的批量字符串 lines = append(lines, []byte{}) } else { + // 分配足够的空间来存储字符串内容和结尾的CRLF body := make([]byte, strLen+2) _, err := io.ReadFull(reader, body) if err != nil { diff --git a/redis/protocol/asserts/assert.go b/redis/protocol/asserts/assert.go index 617f9e38..aae9e5f1 100644 --- a/redis/protocol/asserts/assert.go +++ b/redis/protocol/asserts/assert.go @@ -2,13 +2,15 @@ package asserts import ( "fmt" + "runtime" + "testing" + "github.com/hdt3213/godis/interface/redis" "github.com/hdt3213/godis/lib/utils" "github.com/hdt3213/godis/redis/protocol" - "runtime" - "testing" ) +// 这个文件包含了用于测试的断言函数,用于验证Redis响应是否符合预期。以下为一些示例函数: // AssertIntReply checks if the given redis.Reply is the expected integer func AssertIntReply(t *testing.T, actual redis.Reply, expected int) { intResult, ok := actual.(*protocol.IntReply) diff --git a/redis/protocol/consts.go b/redis/protocol/consts.go index 6b45430f..467ca37d 100644 --- a/redis/protocol/consts.go +++ b/redis/protocol/consts.go @@ -2,9 +2,12 @@ package protocol import ( "bytes" + "github.com/hdt3213/godis/interface/redis" ) +//这个文件定义了一些固定的Redis响应类型,如OK、PONG以及其他响应格式。 + // PongReply is +PONG type PongReply struct{} diff --git a/redis/protocol/errors.go b/redis/protocol/errors.go index 4d5b2092..80b3099c 100644 --- a/redis/protocol/errors.go +++ b/redis/protocol/errors.go @@ -1,5 +1,7 @@ package protocol +// 这个文件定义了与错误处理相关的几种特殊的Redis响应类型。每种类型都具备将自身转换为Redis协议字节序列的能力。 + // UnknownErrReply represents UnknownErr type UnknownErrReply struct{} diff --git a/redis/server/server.go b/redis/server/server.go index 29dd10ba..38d09fe3 100644 --- a/redis/server/server.go +++ b/redis/server/server.go @@ -26,46 +26,48 @@ var ( unknownErrReplyBytes = []byte("-ERR unknown\r\n") ) -// Handler implements tcp.Handler and serves as a redis server +// Handler 实现 tcp.Handler 接口,作为 Redis 服务器使用 type Handler struct { - activeConn sync.Map // *client -> placeholder - db database.DB - closing atomic.Boolean // refusing new client and new request + activeConn sync.Map // 存储活动的客户端连接 + db database.DB // 数据库接口 + closing atomic.Boolean // 是否正在关闭服务的标志 } -// MakeHandler creates a Handler instance +// MakeHandler 创建一个 Handler 实例 func MakeHandler() *Handler { var db database.DB if config.Properties.ClusterEnable { - db = cluster.MakeCluster() + db = cluster.MakeCluster() // 配置为集群模式 } else { - db = database2.NewStandaloneServer() + db = database2.NewStandaloneServer() // 单机模式 调用redis内核 } return &Handler{ db: db, } } +// closeClient 用于关闭客户端连接,并处理相关的清理工作 func (h *Handler) closeClient(client *connection.Connection) { - _ = client.Close() - h.db.AfterClientClose(client) - h.activeConn.Delete(client) + _ = client.Close() // 关闭连接 + h.db.AfterClientClose(client) // 处理客户端关闭后的数据库操作 + h.activeConn.Delete(client) // 从活动连接中移除 } -// Handle receives and executes redis commands +// Handle 接收并执行 Redis 命令 func (h *Handler) Handle(ctx context.Context, conn net.Conn) { if h.closing.Get() { - // closing handler refuse new connection + // 如果服务正在关闭,则拒绝新连接 _ = conn.Close() return } - client := connection.NewConn(conn) + client := connection.NewConn(conn) // 创建一个新的连接封装,此处的client指的是我们的一个tcp连接 h.activeConn.Store(client, struct{}{}) - ch := parser.ParseStream(conn) - for payload := range ch { + ch := parser.ParseStream(conn) // 解析连接流得到命令,ch是一个管道 + for payload := range ch { //循环的从管道中拿数据。 if payload.Err != nil { + //先判断我们的redis的解析器解析到的错误类型是什么样子的 if payload.Err == io.EOF || payload.Err == io.ErrUnexpectedEOF || strings.Contains(payload.Err.Error(), "use of closed network connection") { diff --git a/tcp/server.go b/tcp/server.go index b74de25d..a17f32c1 100644 --- a/tcp/server.go +++ b/tcp/server.go @@ -19,80 +19,86 @@ import ( "github.com/hdt3213/godis/lib/logger" ) -// Config stores tcp server properties +// Config 用于存储TCP服务器的配置属性 type Config struct { - Address string `yaml:"address"` - MaxConnect uint32 `yaml:"max-connect"` - Timeout time.Duration `yaml:"timeout"` + Address string `yaml:"address"` // 监听的服务器地址 + MaxConnect uint32 `yaml:"max-connect"` // 允许的最大连接数 + Timeout time.Duration `yaml:"timeout"` // 连接的超时时间 } -// ClientCounter Record the number of clients in the current Godis server +// ClientCounter 用于记录当前Godis服务器中的客户端数量,是一个原子计数器 var ClientCounter int32 -// ListenAndServeWithSignal binds port and handle requests, blocking until receive stop signal +// ListenAndServeWithSignal 在接收到信号时终止服务 +// cfg:服务器配置 +// handler:处理TCP连接的处理器接口 func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error { - closeChan := make(chan struct{}) - sigCh := make(chan os.Signal) + closeChan := make(chan struct{}) // 用于通知服务器关闭 + sigCh := make(chan os.Signal) // 监听系统信号 signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) go func() { sig := <-sigCh switch sig { case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT: - closeChan <- struct{}{} + closeChan <- struct{}{} // 接收到停止信号时,发送关闭通知 } }() - listener, err := net.Listen("tcp", cfg.Address) + listener, err := net.Listen("tcp", cfg.Address) // 开始在指定地址监听TCP连接 if err != nil { return err } //cfg.Address = listener.Addr().String() - logger.Info(fmt.Sprintf("bind: %s, start listening...", cfg.Address)) - ListenAndServe(listener, handler, closeChan) + logger.Info(fmt.Sprintf("bind: %s, start listening...", cfg.Address)) // 记录日志,开始监听 + ListenAndServe(listener, handler, closeChan) // 调用函数处理监听和连接请求 return nil } -// ListenAndServe binds port and handle requests, blocking until close +// ListenAndServe 维持服务运行,直到被告知关闭 +// listener:监听器 +// handler:处理TCP连接的处理器接口 +// closeChan:接收关闭服务器的通知 func ListenAndServe(listener net.Listener, handler tcp.Handler, closeChan <-chan struct{}) { // listen signal errCh := make(chan error, 1) defer close(errCh) + //TCP 服务器的优雅关闭模式通常为: 先关闭listener阻止新连接进入,然后遍历所有连接逐个进行关闭 go func() { select { case <-closeChan: - logger.Info("get exit signal") + logger.Info("get exit signal") // 接收到退出信号的日志 case er := <-errCh: - logger.Info(fmt.Sprintf("accept error: %s", er.Error())) + logger.Info(fmt.Sprintf("accept error: %s", er.Error())) // 接收连接时出错的日志 } logger.Info("shutting down...") - _ = listener.Close() // listener.Accept() will return err immediately - _ = handler.Close() // close connections + _ = listener.Close() // 关闭监听器 + _ = handler.Close() // 关闭处理器中的所有连接 }() ctx := context.Background() var waitDone sync.WaitGroup for { - conn, err := listener.Accept() + conn, err := listener.Accept() // 接受新的连接 if err != nil { - // learn from net/http/serve.go#Serve() + // 根据HTTP服务的错误处理方式,处理临时的网络错误 if ne, ok := err.(net.Error); ok && ne.Timeout() { logger.Infof("accept occurs temporary error: %v, retry in 5ms", err) time.Sleep(5 * time.Millisecond) continue } - errCh <- err + errCh <- err // 将错误发送到错误通道 break } // handle - logger.Info("accept link") - ClientCounter++ + logger.Info("accept link") // 记录接受连接的日志 + ClientCounter++ // 客户端计数器增加 waitDone.Add(1) go func() { defer func() { - waitDone.Done() - atomic.AddInt32(&ClientCounter, -1) + waitDone.Done() // 减少等待组的计数 + atomic.AddInt32(&ClientCounter, -1) // 原子减少客户端计数器 }() - handler.Handle(ctx, conn) + handler.Handle(ctx, conn) // 使用传入的handler处理连接 }() } - waitDone.Wait() + waitDone.Wait() // 等待所有连接处理完成 }