From 15cd4a7e9751cee5daa85d5e5de5231cfe72ec58 Mon Sep 17 00:00:00 2001 From: reeered <1635970587@qq.com> Date: Sat, 9 Sep 2023 23:47:28 +0800 Subject: [PATCH] Add redis to video-service --- go.mod | 6 + internal/video/etc/config.yaml.sample | 5 +- internal/video/pkg/ctx/ctx.go | 4 +- internal/video/pkg/video/video.go | 177 ++++++++++++++++-- internal/video/pkg/video/video_test.go | 243 ++++++++++++++++++++++++- pkg/config/config.go | 3 + pkg/redis/redis.go | 47 +++++ 7 files changed, 459 insertions(+), 26 deletions(-) create mode 100644 pkg/redis/redis.go diff --git a/go.mod b/go.mod index 2faf088..6273e14 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,14 @@ module toktik go 1.20 require ( + github.com/alicebob/miniredis/v2 v2.30.5 github.com/cloudwego/fastpb v0.0.4 github.com/cloudwego/hertz v0.6.6 github.com/cloudwego/kitex v0.6.2 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/fsnotify/fsnotify v1.6.0 github.com/glebarez/sqlite v1.9.0 + github.com/go-redis/redis/v7 v7.4.1 github.com/golang/mock v1.6.0 github.com/hashicorp/consul/api v1.20.0 github.com/hertz-contrib/cors v0.0.0-20230423034624-2bc83a8400f0 @@ -26,6 +28,7 @@ require ( ) require ( + github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect github.com/apache/thrift v0.13.0 // indirect github.com/armon/go-metrics v0.4.0 // indirect github.com/aws/aws-sdk-go v1.38.20 // indirect @@ -76,6 +79,8 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/nyaruka/phonenumbers v1.0.55 // indirect github.com/oleiade/lane v1.0.1 // indirect + github.com/onsi/ginkgo v1.14.2 // indirect + github.com/onsi/gomega v1.10.4 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect @@ -89,6 +94,7 @@ require ( github.com/tidwall/pretty v1.2.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/u2takey/go-utils v0.3.1 // indirect + github.com/yuin/gopher-lua v1.1.0 // indirect golang.org/x/arch v0.2.0 // indirect golang.org/x/crypto v0.9.0 // indirect golang.org/x/net v0.10.0 // indirect diff --git a/internal/video/etc/config.yaml.sample b/internal/video/etc/config.yaml.sample index a97547b..f20c452 100644 --- a/internal/video/etc/config.yaml.sample +++ b/internal/video/etc/config.yaml.sample @@ -21,4 +21,7 @@ RabbitMQ: Password: "Aa112211" Queue: "publish" Exchange: "default" - Routing_Key: "publish" \ No newline at end of file + Routing_Key: "publish" +Redis: + Addr: "localhost:6379" + Password: "123456" \ No newline at end of file diff --git a/internal/video/pkg/ctx/ctx.go b/internal/video/pkg/ctx/ctx.go index 29aa4cc..af8bcf3 100644 --- a/internal/video/pkg/ctx/ctx.go +++ b/internal/video/pkg/ctx/ctx.go @@ -15,6 +15,7 @@ import ( "toktik/pkg/config" "toktik/pkg/db" "toktik/pkg/rabbitmq" + "toktik/pkg/redis" ) // ServiceContext contains the components required by the service. @@ -30,6 +31,7 @@ type ServiceContext struct { // NewServiceContext initialize the components and returns a new ServiceContext instance. func NewServiceContext() *ServiceContext { db.Init() + redis.InitRedisClient() minioClient, err := minio.New( config.Conf.GetString(config.KEY_MINIO_ENDPOINT), config.Conf.GetString(config.KEY_MINIO_ACCESS_KEY), @@ -58,7 +60,7 @@ func NewServiceContext() *ServiceContext { } return &ServiceContext{ - VideoService: video.NewVideoService(db.Instance), + VideoService: video.NewVideoService(db.Instance, redis.Instance), MinioClient: minioClient, UserClient: user.MustNewClient("user", client.WithResolver(r), client.WithRPCTimeout(time.Second*3)), FavoriteClient: favorite.MustNewClient("favorite", client.WithResolver(r), client.WithRPCTimeout(time.Second*3)), diff --git a/internal/video/pkg/video/video.go b/internal/video/pkg/video/video.go index 902bffd..c27ccd7 100644 --- a/internal/video/pkg/video/video.go +++ b/internal/video/pkg/video/video.go @@ -1,8 +1,13 @@ package video import ( + "errors" + "fmt" + "math/rand" + "strconv" "time" + goredis "github.com/go-redis/redis/v7" "gorm.io/gorm" "toktik/pkg/db/model" @@ -11,43 +16,159 @@ import ( type Video = model.Video type VideoService struct { - dbInstance func() *gorm.DB + dbInstance func() *gorm.DB + redisInstance func() *goredis.Client } -func NewVideoService(db func() *gorm.DB) *VideoService { +const ( + KeyId = "Id" + KeyUserId = "UserId" + KeyTitle = "Title" + KeyPlayUrl = "PlayUrl" + KeyCoverUrl = "CoverUrl" + KeyExisted = "Existed" +) + +func NewVideoService(db func() *gorm.DB, rdb func() *goredis.Client) *VideoService { return &VideoService{ - dbInstance: db, + dbInstance: db, + redisInstance: rdb, } } func (s *VideoService) CreateVideo(userId int64, title, playUrl, coverUrl string) error { db := s.dbInstance() + rdb := s.redisInstance() - return db.Create(&Video{ + video := &Video{ UserId: userId, Title: title, PlayUrl: playUrl, CoverUrl: coverUrl, - }).Error + } + + if err := db.Create(video).Error; err != nil { + return err + } + + key := strconv.FormatInt(video.Id, 10) + err := rdb.HSet(key, FormatVideoInfo(video, true)).Err() + if err != nil { + return err + } + rdb.Expire(key, generateExpireTime()) + + // 将视频ID添加到Set中 + setKey := fmt.Sprintf("user_videos:%d", userId) + + _, err = rdb.SAdd(setKey, video.Id).Result() + if err != nil { + return err + } + rdb.Expire(setKey, generateExpireTime()) + return nil } func (s *VideoService) ListVideoByUserId(userId int64) ([]*Video, error) { - db := s.dbInstance() + rdb := s.redisInstance() - videos := make([]*Video, 0) - if err := db.Where("user_id = ?", userId).Find(&videos).Error; err != nil { + setKey := fmt.Sprintf("user_videos:%d", userId) + videoIds, err := rdb.SMembers(setKey).Result() + if err != nil { return nil, err } + // 未缓存,从数据库中获取 + if len(videoIds) == 0 { + db := s.dbInstance() + + var videos []*Video + if err := db.Where("user_id = ?", userId).Order("created_at desc").Find(&videos).Error; err != nil { + return nil, err + } + + // 如果数据库中没有数据,将0添加到缓存中 + if len(videos) == 0 { + _, err := rdb.SAdd(setKey, 0).Result() + if err != nil { + return nil, err + } + rdb.Expire(setKey, generateExpireTime()) + return []*Video{}, nil + } + + // 将视频ID添加到Set中 + for _, video := range videos { + _, err := rdb.SAdd(setKey, video.Id).Result() + if err != nil { + return nil, err + } + } + rdb.Expire(setKey, generateExpireTime()) + + return videos, nil + } + + // 缓存了但为空,直接返回 + if len(videoIds) == 1 && videoIds[0] == "0" { + return []*Video{}, nil + } + + // 命中缓存,从缓存中获取 + videos := make([]*Video, 0, len(videoIds)) + for _, key := range videoIds { + videoData, err := rdb.HGetAll(key).Result() + if err != nil { + return nil, err + } + + // 解析videoData + video := ParseVideoInfo(videoData) + videos = append(videos, video) + } + return videos, nil } func (s *VideoService) GetVideoByIds(videoIds []int64) ([]*Video, error) { - db := s.dbInstance() - - videos := make([]*Video, 0) - if err := db.Where("id IN ?", videoIds).Find(&videos).Error; err != nil { - return nil, err + rdb := s.redisInstance() + + videos := make([]*Video, 0, len(videoIds)) + for _, videoId := range videoIds { + videoData, err := rdb.HGetAll(strconv.FormatInt(videoId, 10)).Result() + if err != nil { + return nil, err + } else if len(videoData) == 0 { + // 未命中缓存,从数据库中获取 + db := s.dbInstance() + video := &Video{} + if err := db.Where("id = ?", videoId).First(video).Error; err != nil { + // 如果未找到记录,则在缓存中标识视频不存在 + if errors.Is(err, gorm.ErrRecordNotFound) { + key := strconv.FormatInt(videoId, 10) + err = rdb.HSet(key, FormatVideoInfo(video, false)).Err() + if err != nil { + return nil, err + } + rdb.Expire(key, generateExpireTime()) + return nil, fmt.Errorf("video not found: %d", videoId) + } + // internal error + return nil, err + } else { + videos = append(videos, video) + } + } else { + // 判断视频是否存在 + existed := videoData[KeyExisted] + if existed != "1" { + return nil, fmt.Errorf("video not found: %d", videoId) + } + + // 解析videoData + video := ParseVideoInfo(videoData) + videos = append(videos, video) + } } return videos, nil @@ -73,7 +194,7 @@ func (s *VideoService) GetFeed(latestTime int64) ([]*Video, error) { if latestTime == 0 { timeValue = time.Now() } else { - timeValue = time.Unix(latestTime / time.Microsecond.Nanoseconds(), 0) + timeValue = time.Unix(latestTime/time.Microsecond.Nanoseconds(), 0) } if err := db.Where("created_at < ?", timeValue).Order("created_at desc").Limit(30).Find(&videos).Error; err != nil { @@ -82,3 +203,31 @@ func (s *VideoService) GetFeed(latestTime int64) ([]*Video, error) { return videos, nil } + +func FormatVideoInfo(video *Video, existed bool) map[string]interface{} { + return map[string]interface{}{ + KeyId: video.Id, + KeyUserId: video.UserId, + KeyTitle: video.Title, + KeyPlayUrl: video.PlayUrl, + KeyCoverUrl: video.CoverUrl, + KeyExisted: existed, + } +} + +func ParseVideoInfo(videoData map[string]string) *Video { + id, _ := strconv.ParseInt(videoData[KeyId], 10, 64) + userId, _ := strconv.ParseInt(videoData[KeyUserId], 10, 64) + return &Video{ + Id: id, + UserId: userId, + Title: videoData[KeyTitle], + PlayUrl: videoData[KeyPlayUrl], + CoverUrl: videoData[KeyCoverUrl], + } +} + +func generateExpireTime() time.Duration { + // 引入随机数减轻缓存雪崩 + return time.Duration(60*60*24+rand.Intn(3600)) * time.Second +} diff --git a/internal/video/pkg/video/video_test.go b/internal/video/pkg/video/video_test.go index 2ea5060..0c56579 100644 --- a/internal/video/pkg/video/video_test.go +++ b/internal/video/pkg/video/video_test.go @@ -1,41 +1,264 @@ package video import ( + "strconv" "testing" "time" + "github.com/alicebob/miniredis/v2" + "github.com/go-redis/redis/v7" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gorm.io/gorm" - "toktik/pkg/db/model" "toktik/pkg/test/testutil" ) func newMockDB(t *testing.T) *gorm.DB { db := testutil.NewMockDB() - require.NoError(t, db.AutoMigrate(&model.Video{})) + require.NoError(t, db.AutoMigrate(&Video{})) return db } -func TestVideoOperator_GetFeed(t *testing.T) { +func TestVideoOperator_Create(t *testing.T) { + db := newMockDB(t) + s := miniredis.RunT(t) + + rdb := redis.NewClient(&redis.Options{ + Addr: s.Addr(), + }) + + v := NewVideoService(func() *gorm.DB { + return db + }, func() *redis.Client { + return rdb + }) + + err := v.CreateVideo(1, "title1", "", "") + require.NoError(t, err) + + err = v.CreateVideo(2, "title2", "", "") + require.NoError(t, err) + + var videos []Video + db.Find(&videos) + assert.Equal(t, 2, len(videos)) + + keys := []string{strconv.FormatInt(videos[0].Id, 10), strconv.FormatInt(videos[1].Id, 10)} + video1, err := rdb.HGetAll(keys[0]).Result() + video2, err := rdb.HGetAll(keys[1]).Result() + require.NoError(t, err) + + userId, _ := strconv.ParseInt(video1["UserId"], 10, 64) + assert.Equal(t, int64(1), userId) + assert.Equal(t, "title1", video1["Title"]) + + userId, _ = strconv.ParseInt(video2["UserId"], 10, 64) + assert.Equal(t, int64(2), userId) + assert.Equal(t, "title2", video2["Title"]) +} + +func TestVideoService_ListVideoByUserId(t *testing.T) { + db := newMockDB(t) + s := miniredis.RunT(t) + + rdb := redis.NewClient(&redis.Options{ + Addr: s.Addr(), + }) + + v := NewVideoService(func() *gorm.DB { + return db + }, func() *redis.Client { + return rdb + }) + + rdb.HSet("1", map[string]interface{}{ + "Id": 1, + "UserId": 1, + "Title": "title1", + "PlayUrl": "", + "CoverUrl": "", + "Existed": true, + }) + rdb.HSet("2", map[string]interface{}{ + "Id": 2, + "UserId": 1, + "Title": "title2", + "PlayUrl": "", + "CoverUrl": "", + "Existed": true, + }) + rdb.HSet("3", map[string]interface{}{ + "Id": 3, + "UserId": 2, + "Title": "title3", + "PlayUrl": "", + "CoverUrl": "", + "Existed": true, + }) + rdb.SAdd("user_videos:1", "1", "2") + rdb.SAdd("user_videos:2", "3") + + videos, err := v.ListVideoByUserId(1) + require.NoError(t, err) + assert.Equal(t, 2, len(videos)) + assert.Equal(t, int64(1), videos[0].Id) + assert.Equal(t, int64(1), videos[0].UserId) + assert.Equal(t, int64(2), videos[1].Id) + assert.Equal(t, int64(1), videos[1].UserId) + + videos, err = v.ListVideoByUserId(2) + require.NoError(t, err) + assert.Equal(t, 1, len(videos)) + assert.Equal(t, int64(3), videos[0].Id) + assert.Equal(t, int64(2), videos[0].UserId) + + videos, err = v.ListVideoByUserId(3) + require.NoError(t, err) + assert.Equal(t, 0, len(videos)) + +} + +func TestVideoOperator_GetVideoByIds(t *testing.T) { + db := newMockDB(t) + s := miniredis.RunT(t) + + rdb := redis.NewClient(&redis.Options{ + Addr: s.Addr(), + }) + + v := NewVideoService(func() *gorm.DB { + return db + }, func() *redis.Client { + return rdb + }) + + rdb.HSet("1", map[string]interface{}{ + "Id": 1, + "UserId": 1, + "Title": "title1", + "PlayUrl": "", + "CoverUrl": "", + "Existed": true, + }) + rdb.HSet("2", map[string]interface{}{ + "Id": 2, + "UserId": 1, + "Title": "title2", + "PlayUrl": "", + "CoverUrl": "", + "Existed": true, + }) + rdb.HSet("3", map[string]interface{}{ + "Id": 3, + "UserId": 2, + "Title": "title3", + "PlayUrl": "", + "CoverUrl": "", + "Existed": true, + }) + db.Create(&Video{ + Id: 4, + UserId: 3, + Title: "title4", + PlayUrl: "", + CoverUrl: "", + }) + + videos, err := v.GetVideoByIds([]int64{1, 2, 3}) + require.NoError(t, err) + assert.Equal(t, 3, len(videos)) + assert.Equal(t, int64(1), videos[0].Id) + assert.Equal(t, int64(1), videos[0].UserId) + assert.Equal(t, int64(2), videos[1].Id) + assert.Equal(t, int64(1), videos[1].UserId) + assert.Equal(t, int64(3), videos[2].Id) + assert.Equal(t, int64(2), videos[2].UserId) + + videos, err = v.GetVideoByIds([]int64{1, 2}) + require.NoError(t, err) + assert.Equal(t, 2, len(videos)) + assert.Equal(t, int64(1), videos[0].Id) + assert.Equal(t, int64(1), videos[0].UserId) + assert.Equal(t, int64(2), videos[1].Id) + assert.Equal(t, int64(1), videos[1].UserId) + + videos, err = v.GetVideoByIds([]int64{4}) + require.NoError(t, err) + assert.Equal(t, int64(4), videos[0].Id) + assert.Equal(t, int64(3), videos[0].UserId) + assert.Equal(t, "title4", videos[0].Title) + + videos, err = v.GetVideoByIds([]int64{5}) + assert.ErrorContains(t, err, "video not found") +} + +func TestVideoOperator_CountWork(t *testing.T) { db := newMockDB(t) + s := miniredis.RunT(t) - r := NewVideoService(func() *gorm.DB { + rdb := redis.NewClient(&redis.Options{ + Addr: s.Addr(), + }) + + v := NewVideoService(func() *gorm.DB { return db + }, func() *redis.Client { + return rdb }) - testVideoCaseA := &model.Video{ + testVideoCaseA := &Video{ + UserId: 10, + } + + testVideoCaseB := &Video{ + UserId: 10, + } + + testVideoCaseC := &Video{ + UserId: 11, + } + db.Create(testVideoCaseA) + db.Create(testVideoCaseB) + db.Create(testVideoCaseC) + + count, err := v.CountWork(10) + require.NoError(t, err) + assert.Equal(t, int64(2), count) + + count, err = v.CountWork(11) + require.NoError(t, err) + assert.Equal(t, int64(1), count) + + count, err = v.CountWork(12) + require.NoError(t, err) + assert.Equal(t, int64(0), count) +} + +func TestVideoOperator_GetFeed(t *testing.T) { + db := newMockDB(t) + s := miniredis.RunT(t) + + rdb := redis.NewClient(&redis.Options{ + Addr: s.Addr(), + }) + + v := NewVideoService(func() *gorm.DB { + return db + }, func() *redis.Client { + return rdb + }) + testVideoCaseA := &Video{ UserId: 10, CreatedAt: time.Now().Add(-time.Hour * 24 * 5), } - testVideoCaseB := &model.Video{ + testVideoCaseB := &Video{ UserId: 10, CreatedAt: time.Now().Add(-time.Hour * 24 * 3), } - testVideoCaseC := &model.Video{ + testVideoCaseC := &Video{ UserId: 11, CreatedAt: time.Now().Add(-time.Hour * 24 * 1), } @@ -43,15 +266,15 @@ func TestVideoOperator_GetFeed(t *testing.T) { db.Create(testVideoCaseB) db.Create(testVideoCaseC) - videosBeforeTwoHoursAgo, err := r.GetFeed(time.Now().Add(-time.Hour * 24 * 2).Unix() * time.Microsecond.Nanoseconds()) + videosBeforeTwoHoursAgo, err := v.GetFeed(time.Now().Add(-time.Hour*24*2).Unix() * time.Microsecond.Nanoseconds()) require.NoError(t, err) assert.Equal(t, 2, len(videosBeforeTwoHoursAgo)) - videosBeforeFourHoursAgo, err := r.GetFeed(time.Now().Add(-time.Hour * 24 * 4).Unix() * time.Microsecond.Nanoseconds()) + videosBeforeFourHoursAgo, err := v.GetFeed(time.Now().Add(-time.Hour*24*4).Unix() * time.Microsecond.Nanoseconds()) require.NoError(t, err) assert.Equal(t, 1, len(videosBeforeFourHoursAgo)) - videosAllTime, err := r.GetFeed(int64(0)) + videosAllTime, err := v.GetFeed(int64(0)) require.NoError(t, err) assert.Equal(t, 3, len(videosAllTime)) diff --git a/pkg/config/config.go b/pkg/config/config.go index 51e75fb..f1d0d5b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -67,4 +67,7 @@ const ( KEY_RABBITMQ_QUEUE = "rabbitmq.queue" KEY_RABBITMQ_EXCHANGE = "rabbitmq.exchange" KEY_RABBITMQ_ROUTING_KEY = "rabbitmq.routing_key" + + KEY_REDIS_ADDR = "redis.addr" + KEY_REDIS_PASSWORD = "redis.password" ) diff --git a/pkg/redis/redis.go b/pkg/redis/redis.go new file mode 100644 index 0000000..8f4edaa --- /dev/null +++ b/pkg/redis/redis.go @@ -0,0 +1,47 @@ +package redis + +import ( + "github.com/go-redis/redis/v7" + + "toktik/pkg/config" +) + +type RedisBucket int + +var redisClient *redis.Client + +const ( + UserBucket RedisBucket = iota + VideoBucket + CommentBucket + FavoriteBucket + RelationBucket +) + +func InitRedisClient() { + var bucket RedisBucket + switch config.GetString(config.KEY_SERVICE_NAME) { + case "user": + bucket = UserBucket + case "video": + bucket = VideoBucket + case "comment": + bucket = CommentBucket + case "favorite": + bucket = FavoriteBucket + case "relation": + bucket = RelationBucket + } + redisClient = redis.NewClient(&redis.Options{ + Addr: config.GetString("redis.addr"), + Password: config.GetString("redis.password"), + DB: int(bucket), + }) +} + +func Instance() *redis.Client { + if redisClient == nil { + InitRedisClient() + } + return redisClient +}